Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds into 3.9
Browse files Browse the repository at this point in the history
  • Loading branch information
klementinastojanovska committed Jan 25, 2018
2 parents 2839068 + 1dd3c01 commit f6c51c8
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
10 changes: 7 additions & 3 deletions symmetric-assemble/src/asciidoc/examples/kafka.ad
Expand Up @@ -156,7 +156,9 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter {

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

producer.send(new ProducerRecord<String, String>("symmetricds", kafkaText));
String topic = "test";

producer.send(new ProducerRecord<String, String>(topic, kafkaText));
log.debug("Data to be sent to Kafka-" + kafkaText);

producer.close();
Expand Down Expand Up @@ -303,7 +305,9 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter {

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);

producer.send(new ProducerRecord<String, String>("test", kafkaText));
String topic = "test";

producer.send(new ProducerRecord<String, String>(topic, kafkaText));
log.debug("Data to be sent to Kafka-" + kafkaText);

producer.close();
Expand All @@ -317,7 +321,7 @@ endif::pro[]


* The default kafka server and port are set to localhost:9092 with a client id of "symmetricds-producer". You will need to adjust these variables in the sendKafkaMessage function to match your Kafka setup if they are different.

* The default topic used is "test". You will need to create a topic named "test" on your kafka or adjust the topic in the extension (Line 129 String topic = "test";)
[source, Java]
-----
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
Expand Down
Expand Up @@ -123,8 +123,9 @@ public Node registerPullOnlyNode(String externalId, String nodeGroupId,
node.setNodeGroupId(nodeGroupId);
node.setDatabaseType(databaseType);
node.setDatabaseVersion(databaseVersion);

node = processRegistration(node, null, null, true, Constants.DEPLOYMENT_TYPE_REST);
node.setDeploymentType(Constants.DEPLOYMENT_TYPE_REST);

node = processRegistration(node, null, null, true);

if (node.isSyncEnabled()) {
//set the node as registered as we have no
Expand All @@ -144,7 +145,7 @@ protected void extractConfiguration(OutputStream out, Node registeredNode) {
}

protected Node processRegistration(Node nodePriorToRegistration, String remoteHost,
String remoteAddress, boolean isRequestedRegistration, String deploymentType)
String remoteAddress, boolean isRequestedRegistration)
throws IOException {

Node processedNode = new Node();
Expand Down Expand Up @@ -227,14 +228,11 @@ protected Node processRegistration(Node nodePriorToRegistration, String remoteHo
}

foundNode.setSyncEnabled(true);
if (Constants.DEPLOYMENT_TYPE_REST.equalsIgnoreCase(deploymentType)) {
foundNode.setSymmetricVersion(null);
foundNode.setDeploymentType(deploymentType);
}
foundNode.setSyncUrl(nodePriorToRegistration.getSyncUrl());
foundNode.setDatabaseType(nodePriorToRegistration.getDatabaseType());
foundNode.setDatabaseVersion(nodePriorToRegistration.getDatabaseVersion());
foundNode.setSymmetricVersion(nodePriorToRegistration.getSymmetricVersion());
foundNode.setDeploymentType(nodePriorToRegistration.getDeploymentType());
nodeService.save(foundNode);

/**
Expand Down Expand Up @@ -277,7 +275,7 @@ public boolean registerNode(Node nodePriorToRegistration, String remoteHost,
throws IOException {

Node processedNode = processRegistration(nodePriorToRegistration, remoteHost,
remoteAddress, isRequestedRegistration, null);
remoteAddress, isRequestedRegistration);

if (processedNode.isSyncEnabled()) {
/*
Expand Down

0 comments on commit f6c51c8

Please sign in to comment.