Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,262 changes: 19 additions & 1,243 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

Large diffs are not rendered by default.

1,246 changes: 1,246 additions & 0 deletions src/main/java/com/github/shyiko/mysql/binlog/NewBinaryLogClient.java

Large diffs are not rendered by default.

1,230 changes: 1,230 additions & 0 deletions src/main/java/com/github/shyiko/mysql/binlog/OldBinaryLogClient.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package com.github.shyiko.mysql.binlog.jmx;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.BinaryLogClient.EventListener;
import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
import com.github.shyiko.mysql.binlog.NewBinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;

Expand All @@ -26,7 +29,7 @@
* @author <a href="mailto:stanley.shyiko@gmail.com">Stanley Shyiko</a>
*/
public class BinaryLogClientStatistics implements BinaryLogClientStatisticsMXBean,
BinaryLogClient.EventListener, BinaryLogClient.LifecycleListener {
EventListener, LifecycleListener {

private AtomicReference<EventHeader> lastEventHeader = new AtomicReference<EventHeader>();
private AtomicLong timestampOfLastEvent = new AtomicLong();
Expand All @@ -38,7 +41,7 @@ public class BinaryLogClientStatistics implements BinaryLogClientStatisticsMXBea
public BinaryLogClientStatistics() {
}

public BinaryLogClientStatistics(BinaryLogClient binaryLogClient) {
public BinaryLogClientStatistics(NewBinaryLogClient binaryLogClient) {
binaryLogClient.registerEventListener(this);
binaryLogClient.registerLifecycleListener(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void execute(ResultSet rs) throws SQLException {
EventDeserializer eventDeserializer = new EventDeserializer();
try {
client.disconnect();
final BinaryLogClient clientWithKeepAlive = new BinaryLogClient(slave.hostname(), slave.port(),
final com.github.shyiko.mysql.binlog.NewBinaryLogClient clientWithKeepAlive = new com.github.shyiko.mysql.binlog.NewBinaryLogClient(slave.hostname(), slave.port(),
slave.username(), slave.password());

clientWithKeepAlive.setGtidSet(initialGTIDSet[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class BinaryLogClientIntegrationTest {
private final TimeZone timeZoneBeforeTheTest = TimeZone.getDefault();

protected MySQLConnection master, slave;
protected BinaryLogClient client;
protected NewBinaryLogClient client;
protected CountDownEventListener eventListener;

@BeforeClass
Expand All @@ -114,7 +114,7 @@ public void setUp() throws Exception {
slave = new MySQLConnection(bundle.getString(prefix + "slave.hostname"),
Integer.parseInt(bundle.getString(prefix + "slave.port")),
bundle.getString(prefix + "slave.superUsername"), bundle.getString(prefix + "slave.superPassword"));
client = new BinaryLogClient(slave.hostname, slave.port, slave.username, slave.password);
client = new NewBinaryLogClient(slave.hostname, slave.port, slave.username, slave.password);
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY,
CompatibilityMode.DATE_AND_TIME_AS_LONG);
Expand Down Expand Up @@ -384,7 +384,7 @@ public void execute(Statement statement) throws SQLException {

@Test
public void testDeserializationOfDateAndTimeAsLong() throws Exception {
final BinaryLogClient client = new BinaryLogClient(slave.hostname, slave.port,
final NewBinaryLogClient client = new NewBinaryLogClient(slave.hostname, slave.port,
slave.username, slave.password);
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(CompatibilityMode.DATE_AND_TIME_AS_LONG);
Expand All @@ -400,7 +400,7 @@ public void testDeserializationOfDateAndTimeAsLong() throws Exception {

@Test
public void testDeserializationOfDateAndTimeAsLongMicrosecondsPrecision() throws Exception {
final BinaryLogClient client = new BinaryLogClient(slave.hostname, slave.port,
final NewBinaryLogClient client = new NewBinaryLogClient(slave.hostname, slave.port,
slave.username, slave.password);
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setCompatibilityMode(CompatibilityMode.DATE_AND_TIME_AS_LONG_MICRO);
Expand Down Expand Up @@ -439,8 +439,8 @@ private Serializable[] writeAndCaptureRow(final String columnDefinition, final S
return writeAndCaptureRow(client, columnDefinition, values);
}

private Serializable[] writeAndCaptureRow(BinaryLogClient client, final String columnDefinition,
final String... values) throws Exception {
private Serializable[] writeAndCaptureRow(NewBinaryLogClient client, final String columnDefinition,
final String... values) throws Exception {
CapturingEventListener capturingEventListener = new CapturingEventListener();
client.registerEventListener(capturingEventListener);
CountDownEventListener eventListener = new CountDownEventListener();
Expand Down Expand Up @@ -503,7 +503,7 @@ public void execute(Statement statement) throws SQLException {
}
});
eventListener.waitFor(WriteRowsEventData.class, 3, DEFAULT_TIMEOUT);
final BinaryLogClient anotherClient = new BinaryLogClient(slave.hostname, slave.port,
final NewBinaryLogClient anotherClient = new NewBinaryLogClient(slave.hostname, slave.port,
slave.username, slave.password);
anotherClient.registerLifecycleListener(new TraceLifecycleListener());
CountDownEventListener anotherClientEventListener = new CountDownEventListener();
Expand Down Expand Up @@ -648,7 +648,7 @@ public void testAutomaticFailover() throws Exception {
bindInSeparateThread(tcpReverseProxy);
try {
client.disconnect();
final BinaryLogClient clientOverProxy = new BinaryLogClient(slave.hostname, tcpReverseProxy.getPort(),
final NewBinaryLogClient clientOverProxy = new NewBinaryLogClient(slave.hostname, tcpReverseProxy.getPort(),
slave.username, slave.password);
clientOverProxy.setKeepAliveInterval(TimeUnit.MILLISECONDS.toMillis(100));
clientOverProxy.setKeepAliveConnectTimeout(TimeUnit.SECONDS.toMillis(2));
Expand Down Expand Up @@ -728,7 +728,7 @@ private void testCommunicationFailureInTheMiddleOfEventDataDeserialization(final
protected void testCommunicationFailure(EventDeserializer eventDeserializer) throws Exception {
try {
client.disconnect();
final BinaryLogClient clientWithKeepAlive = new BinaryLogClient(slave.hostname, slave.port,
final NewBinaryLogClient clientWithKeepAlive = new NewBinaryLogClient(slave.hostname, slave.port,
slave.username, slave.password);
clientWithKeepAlive.setKeepAliveInterval(TimeUnit.MILLISECONDS.toMillis(100));
clientWithKeepAlive.setKeepAliveConnectTimeout(TimeUnit.SECONDS.toMillis(2));
Expand Down Expand Up @@ -766,7 +766,7 @@ public void execute(Statement statement) throws SQLException {
public void testCustomEventDataDeserializers() throws Exception {
try {
client.disconnect();
final BinaryLogClient binaryLogClient = new BinaryLogClient(slave.hostname, slave.port,
final NewBinaryLogClient binaryLogClient = new NewBinaryLogClient(slave.hostname, slave.port,
slave.username, slave.password);
binaryLogClient.registerEventListener(new TraceEventListener());
binaryLogClient.registerEventListener(eventListener);
Expand Down Expand Up @@ -812,8 +812,8 @@ public void testExceptionIsThrownWhenTryingToConnectAlreadyConnectedClient() thr

@Test
public void testExceptionIsThrownWhenProvidedWithWrongCredentials() throws Exception {
BinaryLogClient binaryLogClient =
new BinaryLogClient(slave.hostname, slave.port, slave.username, slave.password + "^_^");
NewBinaryLogClient binaryLogClient =
new NewBinaryLogClient(slave.hostname, slave.port, slave.username, slave.password + "^_^");
try {
binaryLogClient.connect();
fail("Wrong password should have resulted in AuthenticationException being thrown");
Expand All @@ -828,7 +828,7 @@ public void testExceptionIsThrownWhenInsufficientPermissionsToDetectPosition() t
String prefix = "jdbc.mysql.replication.";
String slaveUsername = bundle.getString(prefix + "slave.slaveUsername");
String slavePassword = bundle.getString(prefix + "slave.slavePassword");
new BinaryLogClient(slave.hostname, slave.port, slaveUsername, slavePassword).connect();
new NewBinaryLogClient(slave.hostname, slave.port, slaveUsername, slavePassword).connect();
}

private void bindInSeparateThread(final TCPReverseProxy tcpReverseProxy) throws InterruptedException {
Expand All @@ -848,7 +848,7 @@ public void run() {

@Test(expectedExceptions = AuthenticationException.class)
public void testAuthenticationFailsWhenNonExistingSchemaProvided() throws Exception {
new BinaryLogClient(slave.hostname, slave.port, "mbcj_test_non_existing", slave.username, slave.password).
new NewBinaryLogClient(slave.hostname, slave.port, "mbcj_test_non_existing", slave.username, slave.password).
connect(DEFAULT_TIMEOUT);
}

Expand All @@ -864,8 +864,8 @@ public void execute(Statement statement) throws SQLException {
}
});
eventListener.waitFor(QueryEventData.class, 4, DEFAULT_TIMEOUT);
BinaryLogClient isolatedClient =
new BinaryLogClient(slave.hostname, slave.port, "mbcj_test_isolated", slave.username, slave.password);
NewBinaryLogClient isolatedClient =
new NewBinaryLogClient(slave.hostname, slave.port, "mbcj_test_isolated", slave.username, slave.password);
try {
CountDownEventListener isolatedEventListener = new CountDownEventListener();
isolatedClient.registerEventListener(isolatedEventListener);
Expand All @@ -890,8 +890,8 @@ public void testReconnectRaceCondition() throws Exception {
// a more reliable way would be to use buffered 2-level concurrent filter input stream
try {
client.disconnect();
final BinaryLogClient binaryLogClient =
new BinaryLogClient(slave.hostname, slave.port, slave.username, slave.password);
final NewBinaryLogClient binaryLogClient =
new NewBinaryLogClient(slave.hostname, slave.port, slave.username, slave.password);
final Lock inputStreamLock = new ReentrantLock();
final AtomicBoolean breakOutputStream = new AtomicBoolean();
binaryLogClient.setSocketFactory(new SocketFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class BinaryLogClientTest {

@Test
public void testEventListenersManagement() {
BinaryLogClient binaryLogClient = new BinaryLogClient("localhost", 3306, "root", "mysql");
NewBinaryLogClient binaryLogClient = new NewBinaryLogClient("localhost", 3306, "root", "mysql");
assertTrue(binaryLogClient.getEventListeners().isEmpty());
TraceEventListener traceEventListener = new TraceEventListener();
binaryLogClient.registerEventListener(traceEventListener);
Expand All @@ -55,7 +55,7 @@ public void testEventListenersManagement() {

@Test
public void testLifecycleListenersManagement() {
BinaryLogClient binaryLogClient = new BinaryLogClient("localhost", 3306, "root", "mysql");
NewBinaryLogClient binaryLogClient = new NewBinaryLogClient("localhost", 3306, "root", "mysql");
assertTrue(binaryLogClient.getLifecycleListeners().isEmpty());
TraceLifecycleListener traceLifecycleListener = new TraceLifecycleListener();
binaryLogClient.registerLifecycleListener(traceLifecycleListener);
Expand All @@ -71,12 +71,12 @@ public void testLifecycleListenersManagement() {

@Test(expectedExceptions = TimeoutException.class)
public void testNoConnectionTimeout() throws Exception {
new BinaryLogClient("_localhost_", 3306, "root", "mysql").connect(0);
new NewBinaryLogClient("_localhost_", 3306, "root", "mysql").connect(0);
}

@Test(timeOut = 15000)
public void testConnectionTimeout() throws Exception {
final BinaryLogClient binaryLogClient = new BinaryLogClient("localhost", 33059, "root", "mysql");
final NewBinaryLogClient binaryLogClient = new NewBinaryLogClient("localhost", 33059, "root", "mysql");
final CountDownLatch socketBound = new CountDownLatch(1);
final CountDownLatch binaryLogClientDisconnected = new CountDownLatch(1);
new Thread(new Runnable() {
Expand Down Expand Up @@ -110,12 +110,12 @@ public void run() {

@Test(expectedExceptions = IllegalArgumentException.class)
public void testNullEventDeserializerIsNotAllowed() throws Exception {
new BinaryLogClient("localhost", 3306, "root", "mysql").setEventDeserializer(null);
new NewBinaryLogClient("localhost", 3306, "root", "mysql").setEventDeserializer(null);
}

@Test(timeOut = 15000)
public void testDisconnectWhileBlockedByFBRead() throws Exception {
final BinaryLogClient binaryLogClient = new BinaryLogClient("localhost", 33060, "root", "mysql");
final NewBinaryLogClient binaryLogClient = new NewBinaryLogClient("localhost", 33060, "root", "mysql");
final CountDownLatch readAttempted = new CountDownLatch(1);
binaryLogClient.setSocketFactory(new SocketFactory() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ public void onDisconnect(BinaryLogClient client) {
logger.log(Level.INFO, "Disconnected");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,7 @@
*/
package com.github.shyiko.mysql.binlog.event.deserialization.json;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.BinaryLogClientIntegrationTest;
import com.github.shyiko.mysql.binlog.CapturingEventListener;
import com.github.shyiko.mysql.binlog.CountDownEventListener;
import com.github.shyiko.mysql.binlog.TraceEventListener;
import com.github.shyiko.mysql.binlog.TraceLifecycleListener;
import com.github.shyiko.mysql.binlog.*;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.EventType;
Expand Down Expand Up @@ -64,7 +59,7 @@ public class JsonBinaryValueIntegrationTest {
private final TimeZone timeZoneBeforeTheTest = TimeZone.getDefault();

private BinaryLogClientIntegrationTest.MySQLConnection master;
private BinaryLogClient client;
private NewBinaryLogClient client;
private CountDownEventListener eventListener;

@BeforeClass
Expand All @@ -75,7 +70,7 @@ public void setUp() throws Exception {
master = new BinaryLogClientIntegrationTest.MySQLConnection(bundle.getString(prefix + "master.hostname"),
Integer.parseInt(bundle.getString(prefix + "master.port")),
bundle.getString(prefix + "master.username"), bundle.getString(prefix + "master.password"));
client = new BinaryLogClient(master.hostname(), master.port(), master.username(), master.password());
client = new NewBinaryLogClient(master.hostname(), master.port(), master.username(), master.password());
client.setServerId(client.getServerId() - 1); // avoid clashes between BinaryLogClient instances
client.setKeepAlive(false);
client.registerEventListener(new TraceEventListener());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.github.shyiko.mysql.binlog.jmx;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.NewBinaryLogClient;
import org.testng.annotations.Test;

import javax.management.MBeanServer;
Expand All @@ -32,7 +32,7 @@ public class BinaryLogClientMXBeanTest {
@Test
public void testRegistration() throws Exception {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
BinaryLogClient binaryLogClient = new BinaryLogClient("localhost", 3306, "root", "mysql");
NewBinaryLogClient binaryLogClient = new NewBinaryLogClient("localhost", 3306, "root", "mysql");
ObjectName objectName = new ObjectName("mysql.binlog:type=BinaryLogClient");
mBeanServer.registerMBean(binaryLogClient, objectName);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.github.shyiko.mysql.binlog.jmx;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.NewBinaryLogClient;
import org.junit.Test;

import javax.management.MBeanServer;
Expand All @@ -32,7 +32,7 @@ public class BinaryLogClientStatisticsMXBeanTest {
@Test
public void testRegistration() throws Exception {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
BinaryLogClient binaryLogClient = new BinaryLogClient("localhost", 3306, "root", "mysql");
NewBinaryLogClient binaryLogClient = new NewBinaryLogClient("localhost", 3306, "root", "mysql");
BinaryLogClientStatistics binaryLogClientStats = new BinaryLogClientStatistics(binaryLogClient);
ObjectName objectName = new ObjectName("mysql.binlog:type=BinaryLogClientStatistics");
mBeanServer.registerMBean(binaryLogClientStats, objectName);
Expand Down