Skip to content

support mysql binlog sync to pulsar by canal#2998

Merged
sijie merged 17 commits intoapache:masterfrom
AmateurEvents:feature/support-mysql-binlog-canal
Nov 25, 2018
Merged

support mysql binlog sync to pulsar by canal#2998
sijie merged 17 commits intoapache:masterfrom
AmateurEvents:feature/support-mysql-binlog-canal

Conversation

@tuteng
Copy link
Member

@tuteng tuteng commented Nov 16, 2018

Motivation

support alibaba canal
https://github.com/alibaba/canal/wiki

Modifications

Integrated canal client

Result

support binlog sync to pulsar
use python pulsar-client to consume

import pulsar

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic',
                            subscription_name='my-sub')

while True:
    msg = consumer.receive()
    print("Received message: '%s'" % msg.data())
    consumer.acknowledge(msg)

client.close()
output:
Received message: '[{"data":null,"database":"testdb","es":1542446501000,"id":44,"isDdl":true,"mysqlType":null,"old":null,"sql":"CREATE TABLE `users320` (   `id` int(11) NOT NULL AUTO_INCREMENT,   `name` varchar(50) DEFAULT NULL,   `extra` varchar(50) DEFAULT NULL,   PRIMARY KEY (`id`),   KEY `ix_users_name` (`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8","sqlType":null,"table":"users320","ts":1542446501114,"type":"CREATE"}]'


protected void start() {
Assert.notNull(connector, "connector is null");
thread = new Thread(new Runnable() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe assign a name to thread so it's easier to debug with jstack

@@ -0,0 +1,152 @@
package org.apache.pulsar.io.canal;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to have Apache headers. You can use mvn license:format to automatically add

@sijie sijie requested a review from jiazhai November 16, 2018 21:42
@sijie sijie added type/feature The PR added a new feature or issue requested a new feature area/connector labels Nov 16, 2018
@sijie sijie added this to the 2.3.0 milestone Nov 16, 2018
Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good. it is a great contributions. have some review comments and also need some clarifications from you.


@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("parse events has an error", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it is good to add thread name in the logging message for troubleshooting purpose.


private CanalSourceConfig canalSourceConfig;

protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

protected final

new InetSocketAddress(canalSourceConfig.getSingleHostname(), canalSourceConfig.getSinglePort()),
canalSourceConfig.getDestination(), canalSourceConfig.getUsername(), canalSourceConfig.getPassword());
}
log.info("start canal connect");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you improve the logging here?

  • in cluster mode, can you print out the zookeeper user and destination?

  • in single mode, can you print out the single host name, port and destination?

}

protected void start() {
Assert.notNull(connector, "connector is null");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Use java builtin objects.requirenonnull or guava checkNotNull?

connector.disconnect();
}

MDC.remove("destination");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you define destination as a constant string?

static private class CanalRecord implements Record<byte[]> {

private final byte[] record;
private final Long id;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this id incremental? if you might consider using it as record sequence.

}
}

connector.ack(batchId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are acking here, it only supports at-most-once semantic. you might consider implementing ack() in CanalRecord

<module>elastic-search</module>
<module>kafka-connect-adaptor</module>
<module>debezium</module>
<module>canal</module>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix the indent here

add thread name
add log info
set to constant string destination
Class CanalRecord add getRecordSequence
add Batch consume
@tuteng
Copy link
Member Author

tuteng commented Nov 17, 2018

Please take the time to review my code again. It has been changed and tested locally.

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great contribution! @tuteng

@sijie
Copy link
Member

sijie commented Nov 19, 2018

run integration tests

@sijie
Copy link
Member

sijie commented Nov 22, 2018

run integration tests

2018-11-22\T\01:48:22.273 [INFO] Pulsar :: Distribution :: Offloader ................ SUCCESS [  3.790 s]
2018-11-22\T\01:48:22.274 [INFO] Apache Pulsar :: Docker Images :: Pulsar Latest Version FAILURE [  9.247 s]
2018-11-22\T\01:48:22.274 [INFO] Apache Pulsar :: Docker Images :: Grafana .......... SKIPPED
2018-11-22\T\01:48:22.274 [INFO] Apache Pulsar :: Docker Images :: Pulsar Latest Version (Include All Components) SKIPPED
2018-11-22\T\01:48:22.274 [INFO] Apache Pulsar :: Docker Images :: Pulsar Standalone  SKIPPED
2018-11-22\T\01:48:22.274 [INFO] Apache Pulsar :: Tests ............................. SKIPPED
2018-11-22\T\01:48:22.274 [INFO] Apache Pulsar :: Tests :: Docker Images ............ SKIPPED
2018-11-22\T\01:48:22.274 [INFO] Apache Pulsar :: Tests :: Docker Images :: Latest Version Testing SKIPPED
2018-11-22\T\01:48:22.274 [INFO] Apache Pulsar :: Tests :: Integration .............. SKIPPED
2018-11-22\T\01:48:22.274 [INFO] Apache Pulsar :: Tests :: Pulsar Kafka Compat Client Tests SKIPPED
2018-11-22\T\01:48:22.274 [INFO] Pulsar Storm adapter Tests ......................... SKIPPED
2018-11-22\T\01:48:22.274 [INFO] Spark Streaming Pulsar Receivers Tests ............. SKIPPED
2018-11-22\T\01:48:22.274 [INFO] ------------------------------------------------------------------------
2018-11-22\T\01:48:22.274 [INFO] BUILD FAILURE
2018-11-22\T\01:48:22.275 [INFO] ------------------------------------------------------------------------
2018-11-22\T\01:48:22.275 [INFO] Total time: 11:02 min
2018-11-22\T\01:48:22.275 [INFO] Finished at: 2018-11-22T01:48:22Z
2018-11-22\T\01:48:24.095 [INFO] Final Memory: 293M/3757M
2018-11-22\T\01:48:24.095 [INFO] ------------------------------------------------------------------------
2018-11-22\T\01:48:24.102 [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:exec (build-pulsar-clients) on project pulsar-docker-image: Command execution failed.: Process exited with an error: 1 (Exit value: 1) -> [Help 1]
2018-11-22\T\01:48:24.103 [ERROR] 
2018-11-22\T\01:48:24.103 [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
2018-11-22\T\01:48:24.103 [ERROR] Re-run Maven using the -X switch to enable full debug logging.
2018-11-22\T\01:48:24.103 [ERROR] 

@jiazhai
Copy link
Member

jiazhai commented Nov 22, 2018

run integration tests

org.apache.pulsar.tests.integration.offload.TestS3Offload.tiered-storage-test-suite

1 similar comment
@jiazhai
Copy link
Member

jiazhai commented Nov 22, 2018

run integration tests

org.apache.pulsar.tests.integration.offload.TestS3Offload.tiered-storage-test-suite

Copy link
Member

@jiazhai jiazhai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. Thanks for the contribute.

@jiazhai
Copy link
Member

jiazhai commented Nov 22, 2018

run integration tests
org.apache.pulsar.tests.integration.offload.TestS3Offload.tiered-storage-test-suite

@jiazhai
Copy link
Member

jiazhai commented Nov 23, 2018

run integration tests

2 similar comments
@jiazhai
Copy link
Member

jiazhai commented Nov 25, 2018

run integration tests

@jiazhai
Copy link
Member

jiazhai commented Nov 25, 2018

run integration tests

@sijie sijie merged commit c2c9f33 into apache:master Nov 25, 2018
@sijie
Copy link
Member

sijie commented Nov 25, 2018

@tuteng thank you so much for your contribution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/connector type/feature The PR added a new feature or issue requested a new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants