Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.11' into 3.12
Browse files Browse the repository at this point in the history
Conflicts:
	symmetric-assemble/common.gradle
	symmetric-util/src/main/java/org/jumpmind/util/LogSummary.java
  • Loading branch information
erilong committed May 12, 2020
2 parents f20f448 + 27f3dab commit 9b0763c
Show file tree
Hide file tree
Showing 19 changed files with 205 additions and 155 deletions.
Expand Up @@ -43,7 +43,7 @@ public OracleTriggerTemplate(ISymmetricDialect symmetricDialect) {
dateTimeWithLocalTimeZoneColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char(cast($(tableAlias).\"$(columnName)\" as timestamp), 'YYYY-MM-DD HH24:MI:SS.FF9')),'\"'))" ;
timeColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS','NLS_CALENDAR=''GREGORIAN''')),'\"'))" ;
dateColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', concat(concat('\"',to_char($(tableAlias).\"$(columnName)\", 'YYYY-MM-DD HH24:MI:SS','NLS_CALENDAR=''GREGORIAN''')),'\"'))" ;
clobColumnTemplate = "decode(dbms_lob.getlength(to_clob($(tableAlias).\"$(columnName)\")), null, to_clob(''), '\"'||replace(replace($(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"')" ;
clobColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then null else '\"'||replace(replace($(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"')||'\"' end" ;
blobColumnTemplate = "decode(dbms_lob.getlength($(tableAlias).\"$(columnName)\"), null, to_clob(''), '\"'||$(prefixName)_blob2clob($(tableAlias).\"$(columnName)\")||'\"')" ;
longColumnTemplate = "$(oracleToClob)'\"\\b\"'";
booleanColumnTemplate = "decode($(tableAlias).\"$(columnName)\", null, '', '\"'||cast($(tableAlias).\"$(columnName)\" as number("+symmetricDialect.getTemplateNumberPrecisionSpec()+"))||'\"')" ;
Expand Down Expand Up @@ -270,7 +270,7 @@ protected String getCreateTimeExpression(ISymmetricDialect symmetricDialect) {
}

protected String toClobExpression(Table table) {
if (table.hasNTypeColumns() || symmetricDialect.getParameterService().is(ParameterConstants.DBDIALECT_ORACLE_USE_NTYPES_FOR_SYNC)) {
if (symmetricDialect.getParameterService().is(ParameterConstants.DBDIALECT_ORACLE_USE_NTYPES_FOR_SYNC)) {
return "to_nclob('')||";
} else {
return "to_clob('')||";
Expand Down
Expand Up @@ -38,7 +38,7 @@
public class PostgresBulkDataLoaderFactory extends DefaultDataLoaderFactory {

public PostgresBulkDataLoaderFactory(ISymmetricEngine engine) {
super(engine.getParameterService());
super(engine);
}

public String getTypeName() {
Expand Down
Expand Up @@ -42,9 +42,13 @@ public JobDefaults getDefaults() {

@Override
public void doJob(boolean force) throws Exception {
if (engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE)) {
engine.getFileSyncExtractorService().queueWork(force);
if (engine.getParameterService().is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB)) {

if (engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE)) {
engine.getFileSyncExtractorService().queueWork(force);
}

engine.getDataExtractorService().queueWork(force);
}
engine.getDataExtractorService().queueWork(force);
}
}
Expand Up @@ -544,9 +544,9 @@ protected String replaceTemplateVariables(DataEventType dml, Trigger trigger,
ddl = FormatUtils.replace("sourceNodeExpression",
symmetricDialect.getSourceNodeExpression(), ddl);

ddl = FormatUtils.replace("oracleLobType", trigger.isUseCaptureLobs() ? "clob" : "long",
ddl = FormatUtils.replace("oracleLobType", trigger.isUseCaptureLobs() ? getClobType(table) : "long",
ddl);
ddl = FormatUtils.replace("oracleLobTypeClobAlways", "clob", ddl);
ddl = FormatUtils.replace("oracleLobTypeClobAlways", getClobType(table), ddl);

String syncTriggersExpression = symmetricDialect.getSyncTriggersExpression();
ddl = FormatUtils.replace("syncOnIncomingBatchCondition",
Expand Down Expand Up @@ -680,13 +680,17 @@ private String getChannelExpression() {
}

protected String toClobExpression(Table table) {
if (table.hasNTypeColumns()) {
if (symmetricDialect.getParameterService().is(ParameterConstants.DBDIALECT_ORACLE_USE_NTYPES_FOR_SYNC)) {
return "to_nclob('')||";
} else {
return "to_clob('')||";
}
}


protected String getClobType(Table table) {
return symmetricDialect.getParameterService().is(ParameterConstants.DBDIALECT_ORACLE_USE_NTYPES_FOR_SYNC) ? "nclob" : "clob";
}

protected String getChannelExpression(Trigger trigger, TriggerHistory history, Table originalTable) {
if (trigger.getChannelId().equals(Constants.CHANNEL_DYNAMIC)) {
if (StringUtils.isNotBlank(trigger.getChannelExpression())) {
Expand Down Expand Up @@ -910,6 +914,7 @@ else if (column.getJdbcTypeName() != null
break;
}
case Types.CLOB:
case Types.NCLOB:
if (isOld && symmetricDialect.needsToSelectLobData()) {
templateToUse = emptyColumnTemplate;
} else {
Expand Down
Expand Up @@ -47,7 +47,6 @@
import org.jumpmind.symmetric.io.data.writer.KafkaWriter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.service.IParameterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,8 +59,9 @@ public class DefaultDataLoaderFactory extends AbstractDataLoaderFactory implemen
public DefaultDataLoaderFactory() {
}

public DefaultDataLoaderFactory(IParameterService parameterService) {
this.parameterService = parameterService;
public DefaultDataLoaderFactory(ISymmetricEngine engine) {
this.engine = engine;
this.parameterService = engine.getParameterService();
}

public String getTypeName() {
Expand Down Expand Up @@ -214,7 +214,7 @@ protected DatabaseWriterSettings buildDatabaseWriterSettings(List<IDatabaseWrite
@Override
public void setSymmetricEngine(ISymmetricEngine engine) {
this.engine = engine;
this.parameterService = engine.getParameterService();
}


}
Expand Up @@ -72,7 +72,7 @@
public class KafkaWriterFilter implements IDatabaseWriterFilter {
protected final String KAFKA_TEXT_CACHE = "KAFKA_TEXT_CACHE" + this.hashCode();

protected Map<String, List<String>> kafkaDataMap = new HashMap<String, List<String>>();
protected Map<String, List<ProducerRecord<String, Object>>> kafkaDataMap = new HashMap<String, List<ProducerRecord<String, Object>>>();
protected String kafkaDataKey;

private final Logger log = LoggerFactory.getLogger(getClass());
Expand Down Expand Up @@ -131,6 +131,8 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter {
Map<String, String> tableNameCache = new HashMap<String, String>();
Map<String, Map<String, String>> tableColumnCache = new HashMap<String, Map<String, String>>();

public static KafkaProducer<String, Object> kafkaProducer;

public KafkaWriterFilter(IParameterService parameterService) {
schema = parser.parse(AVRO_CDC_SCHEMA);
this.url = parameterService.getString(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + "db.url");
Expand All @@ -147,24 +149,28 @@ public KafkaWriterFilter(IParameterService parameterService) {
this.confluentUrl = parameterService.getString(ParameterConstants.KAFKA_CONFLUENT_REGISTRY_URL);
this.schemaPackage = parameterService.getString(ParameterConstants.KAFKA_AVRO_JAVA_PACKAGE);

configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.url);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.CLIENT_ID_CONFIG, this.producer);

if (confluentUrl != null) {
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());

configs.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentUrl);
}

TypedProperties props = parameterService.getAllParameters();
for (Object key : props.keySet()) {
if (key.toString().startsWith("kafkaclient.")) {
configs.put(key.toString().substring(12), props.get(key));
}
}
if (kafkaProducer == null) {
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.url);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.CLIENT_ID_CONFIG, this.producer);

if (confluentUrl != null) {
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());

configs.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentUrl);
}

TypedProperties props = parameterService.getAllParameters();
for (Object key : props.keySet()) {
if (key.toString().startsWith("kafkaclient.")) {
configs.put(key.toString().substring(12), props.get(key));
}
}
kafkaProducer = new KafkaProducer<String, Object>(configs);
this.log.debug("Kafka client config: {}", configs);
}
}

public boolean beforeWrite(DataContext context, Table table, CsvData data) {
Expand All @@ -178,6 +184,19 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) {
}

StringBuffer kafkaText = new StringBuffer();
String kafkaKey = null;

if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) {
StringBuffer sb = new StringBuffer();
sb.append(table.getName()).append(":");
for (int i = 0; i < table.getPrimaryKeyColumnNames().length; i++) {
sb.append(":").append(rowData[i]);
}
kafkaKey = String.valueOf(sb.toString().hashCode());
} else if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) {
String s = context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId();
kafkaKey = String.valueOf(s.hashCode());
}

if (topicBy.equals(KAFKA_TOPIC_BY_CHANNEL)) {
kafkaDataKey = context.getBatch().getChannelId();
Expand All @@ -188,9 +207,9 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) {
log.debug("Processing table {} for Kafka on topic {}", table, kafkaDataKey);

if (kafkaDataMap.get(kafkaDataKey) == null) {
kafkaDataMap.put(kafkaDataKey, new ArrayList<String>());
kafkaDataMap.put(kafkaDataKey, new ArrayList<ProducerRecord<String, Object>>());
}
List<String> kafkaDataList = kafkaDataMap.get(kafkaDataKey);
List<ProducerRecord<String, Object>> kafkaDataList = kafkaDataMap.get(kafkaDataKey);

if (outputFormat.equals(KAFKA_FORMAT_JSON)) {
kafkaText.append("{\"").append(table.getName()).append("\": {").append("\"eventType\": \"" + data.getDataEventType() + "\",")
Expand Down Expand Up @@ -259,7 +278,7 @@ else if (Long.class.equals(propertyTypeClass)) {
}
}
}
sendKafkaMessageByObject(pojo, kafkaDataKey);
sendKafkaMessage(new ProducerRecord<String, Object>(kafkaDataKey, kafkaKey, pojo));
} else {
throw new RuntimeException("Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName);
}
Expand Down Expand Up @@ -300,7 +319,7 @@ else if (Long.class.equals(propertyTypeClass)) {
}
}
}
kafkaDataList.add(kafkaText.toString());
kafkaDataList.add(new ProducerRecord<String, Object>(kafkaDataKey, kafkaKey, kafkaText.toString()));
}
return false;
}
Expand Down Expand Up @@ -430,21 +449,22 @@ public void batchComplete(DataContext context) {
try {
if (confluentUrl == null && kafkaDataMap.size() > 0) {
StringBuffer kafkaText = new StringBuffer();
String kafkaKey = null;


for (Map.Entry<String, List<String>> entry : kafkaDataMap.entrySet()) {
for (String row : entry.getValue()) {
for (Map.Entry<String, List<ProducerRecord<String, Object>>> entry : kafkaDataMap.entrySet()) {
for (ProducerRecord<String, Object> record : entry.getValue()) {
if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) {
sendKafkaMessage(producer, row, entry.getKey());
sendKafkaMessage(record);
} else {
kafkaText.append(row);
kafkaKey = record.key();
kafkaText.append(record.value());
}
}
if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) {
sendKafkaMessage(producer, kafkaText.toString(), entry.getKey());
sendKafkaMessage(new ProducerRecord<String, Object>(entry.getKey(), kafkaKey, kafkaText.toString()));
}
}
kafkaDataMap = new HashMap<String, List<String>>();
kafkaDataMap = new HashMap<String, List<ProducerRecord<String, Object>>>();
}
} catch (Exception e) {
log.warn("Unable to write batch to Kafka " + batchFileName, e);
Expand All @@ -464,16 +484,9 @@ public void batchCommitted(DataContext context) {
public void batchRolledback(DataContext context) {
}

public void sendKafkaMessage(KafkaProducer<String, String> producer, String kafkaText, String topic) {
log.debug("Sending message (topic={}) {}", topic, kafkaText);
producer.send(new ProducerRecord<String, String>(topic, kafkaText));
}

public void sendKafkaMessageByObject(Object bean, String topic) {
log.debug("Sending object (topic={}) {}", topic, bean);
KafkaProducer<String, Object> producer = new KafkaProducer<String, Object>(configs);
producer.send(new ProducerRecord<String, Object>(topic, bean));
producer.close();
public void sendKafkaMessage(ProducerRecord<String, Object> record) {
log.debug("Sending message (topic={}) (key={}) {}", record.topic(), record.key(), record.value());
kafkaProducer.send(record);
}

public static byte[] datumToByteArray(Schema schema, GenericRecord datum) throws IOException {
Expand Down
Expand Up @@ -75,7 +75,7 @@ public MonitorEvent check(Monitor monitor) {
protected String serializeDetails(List<LogSummary> logs) {
String result = null;
try {
GsonBuilder builder = new GsonBuilder();
GsonBuilder builder = new GsonBuilder().excludeFieldsWithoutExposeAnnotation();
builder.addSerializationExclusionStrategy(new SuperClassExclusion());
builder.addDeserializationExclusionStrategy(new SuperClassExclusion());
result = builder.create().toJson(logs);
Expand Down

0 comments on commit 9b0763c

Please sign in to comment.