Skip to content

Commit

Permalink
0005518: Updating KafkaDataWriter to work with Filters
Browse files Browse the repository at this point in the history
  • Loading branch information
jakobvanmeter committed Oct 10, 2022
1 parent 33d60d1 commit 90dd557
Show file tree
Hide file tree
Showing 5 changed files with 782 additions and 495 deletions.
16 changes: 0 additions & 16 deletions symmetric-core/build.gradle
Expand Up @@ -19,22 +19,6 @@ apply from: symAssembleDir + '/common.gradle'
exclude group: 'commons-lang', module: 'commons-lang'
exclude group: 'joda-time', module: 'joda-time'
}
compileOnly ("org.apache.kafka:kafka-clients:1.1.0") {
exclude group: 'log4j'
exclude group: 'org.slf4j'
}
compileOnly ("io.confluent:kafka-avro-serializer:3.2.1") {
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'com.fasterxml.jackson.dataformat'
exclude group: 'com.fasterxml.jackson.databind'
exclude group: 'log4j'
exclude group: 'org.slf4j'
exclude group: 'io.netty'
}
compileOnly ("org.apache.avro:avro:1.8.2") {
exclude group: 'log4j'
exclude group: 'org.slf4j'
}

testCompile project(path: ':symmetric-util', configuration: 'testArtifacts')
testCompile project(path: ':symmetric-jdbc', configuration: 'testArtifacts')
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.util.DatabaseConstants;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.properties.TypedProperties;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand Down Expand Up @@ -92,13 +93,38 @@ public IDataWriter getDataWriter(final String sourceNodeId, final ISymmetricDial
symmetricDialect.getTablePrefix(), new DefaultTransformWriterConflictResolver(transformWriter),
buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData));
} else if (targetPlatform instanceof KafkaPlatform) {
if (filters == null) {
filters = new ArrayList<IDatabaseWriterFilter>();
}
filters.add(new KafkaWriterFilter(this.parameterService));
// if (filters == null) {
// filters = new ArrayList<IDatabaseWriterFilter>();
// }
String url;
String producer;
String externalNodeID;
String outputFormat;
String topicBy;
String messageBy;
String confluentUrl;
String schemaPackage;
String loadOnlyPrefix;
TypedProperties props;
String runtimeConfigTablePrefix;
String channelReload;
// filters.add(new KafkaWriterFilter(this.parameterService));
producer = this.parameterService.getString(ParameterConstants.KAFKA_PRODUCER, "SymmetricDS");
outputFormat = parameterService.getString(ParameterConstants.KAFKA_FORMAT, KafkaWriter.KAFKA_FORMAT_JSON);
topicBy = parameterService.getString(ParameterConstants.KAFKA_TOPIC_BY, KafkaWriter.KAFKA_TOPIC_BY_CHANNEL);
messageBy = parameterService.getString(ParameterConstants.KAFKA_MESSAGE_BY, KafkaWriter.KAFKA_MESSAGE_BY_BATCH);
confluentUrl = parameterService.getString(ParameterConstants.KAFKA_CONFLUENT_REGISTRY_URL);
schemaPackage = parameterService.getString(ParameterConstants.KAFKA_AVRO_JAVA_PACKAGE);
externalNodeID = parameterService.getExternalId();
loadOnlyPrefix = ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX;
props = parameterService.getAllParameters();
url = parameterService.getString(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + "db.url");
runtimeConfigTablePrefix = parameterService.getString(ParameterConstants.RUNTIME_CONFIG_TABLE_PREFIX);
channelReload = Constants.CHANNEL_RELOAD;
return new KafkaWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
symmetricDialect.getTablePrefix(), new DefaultTransformWriterConflictResolver(transformWriter),
buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData));
buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData), producer, outputFormat, topicBy,
messageBy, confluentUrl, schemaPackage, externalNodeID, url, loadOnlyPrefix, props, runtimeConfigTablePrefix, channelReload);
}
} catch (Exception e) {
log.warn("Failed to create writer for platform " + targetPlatform.getClass().getSimpleName(), e);
Expand Down

0 comments on commit 90dd557

Please sign in to comment.