From 4985860df3f0893314f1b1584689033680fda5fd Mon Sep 17 00:00:00 2001 From: elong Date: Thu, 23 Aug 2018 17:11:51 -0400 Subject: [PATCH 1/4] fix typo --- symmetric-core/src/main/resources/symmetric-default.properties | 1 - 1 file changed, 1 deletion(-) diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index 87917564e3..a84d282c82 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -1510,7 +1510,6 @@ oracle.use.transaction.view=false # an oracle rac environment. It is only applicable when oracle.use.transaction.view is set # to true. # Tags: other -# Type: boolean # DatabaseOverridable: true oracle.transaction.view.clock.sync.threshold.ms=60000 From 36cbbffd882a02a56bcad77eef1d71ba6dec9115 Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Mon, 27 Aug 2018 10:27:24 -0400 Subject: [PATCH 2/4] Logging cleanup --- .../java/org/jumpmind/symmetric/load/KafkaWriterFilter.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java index d6956fd8f3..2a9488a5f8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/KafkaWriterFilter.java @@ -97,7 +97,6 @@ public class KafkaWriterFilter implements IDatabaseWriterFilter { Map> tableColumnCache = new HashMap>(); public KafkaWriterFilter(IParameterService parameterService) { - log.info(AVRO_CDC_SCHEMA); schema = parser.parse(AVRO_CDC_SCHEMA); this.url = parameterService.getString(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + "db.url"); if (url == null) { @@ -371,7 +370,6 @@ public void earlyCommit(DataContext context) { public void batchComplete(DataContext context) { if (!context.getBatch().getChannelId().equals("heartbeat") && !context.getBatch().getChannelId().equals("config")) { String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId(); - log.info("Processing batch " + batchFileName + " for Kafka"); try { if (confluentUrl == null && kafkaDataMap.size() > 0) { StringBuffer kafkaText = new StringBuffer(); From fac0fa369c5499a05987a2964f4d6c6509d43a10 Mon Sep 17 00:00:00 2001 From: elong Date: Tue, 28 Aug 2018 12:06:47 -0400 Subject: [PATCH 3/4] 0003692: Purge stranded data and batches after channel is deleted --- .../symmetric/service/impl/PurgeService.java | 29 ++++++++++++++++++- .../service/impl/PurgeServiceSqlMap.java | 8 +++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java index cd6cad90f9..049b43a9a4 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java @@ -33,6 +33,7 @@ import org.jumpmind.db.platform.DatabaseNamesConstants; import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.Row; +import org.jumpmind.db.sql.mapper.StringMapper; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.ext.IPurgeListener; @@ -130,7 +131,8 @@ public long purgeOutgoing(Calendar retentionCutoff, boolean force) { rowsPurged += purgeDataRows(retentionCutoff); rowsPurged += purgeOutgoingBatch(retentionCutoff); rowsPurged += purgeStranded(retentionCutoff); - rowsPurged += purgeExtractRequests(); + rowsPurged += purgeExtractRequests(); + rowsPurged += purgeStrandedChannels(); } } finally { if (!force) { @@ -208,9 +210,34 @@ private long purgeStrandedBatches() { OutgoingBatch.Status.OK.name(), updateStrandedBatchesCount); statisticManager.incrementPurgedBatchOutgoingRows(updateStrandedBatchesCount); } + + updateStrandedBatchesCount = sqlTemplate.update(getSql("updateStrandedBatchesByChannel"), + OutgoingBatch.Status.OK.name(), OutgoingBatch.Status.OK.name()); + if (updateStrandedBatchesCount > 0) { + log.info("Set the status to {} for {} batches that no longer are associated with valid channels", + OutgoingBatch.Status.OK.name(), updateStrandedBatchesCount); + statisticManager.incrementPurgedBatchOutgoingRows(updateStrandedBatchesCount); + } + return updateStrandedBatchesCount; } + private long purgeStrandedChannels() { + int rowsPurged = 0; + log.info("Looking for old channels in data"); + List channels = sqlTemplateDirty.query(getSql("selectOldChannelsForData"), new StringMapper()); + if (channels.size() > 0) { + log.info("Found {} old channels", channels.size()); + for (String channelId : channels) { + log.info("Purging data for channel {}", channelId); + rowsPurged += sqlTemplate.update(getSql("deleteDataByChannel"), channelId); + } + statisticManager.incrementPurgedDataRows(rowsPurged); + log.info("Done purging {} rows", rowsPurged); + } + return rowsPurged; + } + private long purgeDataRows(final Calendar time) { log.info("Getting range for data"); long[] minMax = queryForMinMax(getSql("selectDataRangeSql"), new Object[0]); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java index 2073488e45..3d44b68bc9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeServiceSqlMap.java @@ -61,6 +61,10 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map replac "update $(outgoing_batch) set status=? where node_id not " + " in (select node_id from $(node) where sync_enabled=?) and status != ? " ); + putSql("updateStrandedBatchesByChannel", +"update $(outgoing_batch) set status=? where channel_id not " + +"in (select channel_id from $(channel)) and status != ?"); + putSql("deleteStrandedData" , "delete from $(data) where " + " data_id between ? and ? and " + @@ -139,7 +143,11 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map replac "where batch_id in (select batch_id from $(outgoing_batch) where status != ?)"); putSql("deleteDataByRangeSql", "delete from $(data) where data_id between ? and ? and create_time < ?"); + + putSql("selectOldChannelsForData", "select distinct channel_id from $(data) where channel_id not in (select channel_id from $(channel))"); + putSql("deleteDataByChannel", "delete from $(data) where channel_id = ?"); + } } \ No newline at end of file From 70b2359adaa750c85ae6de0dde59735381857ad8 Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Tue, 28 Aug 2018 13:48:15 -0400 Subject: [PATCH 4/4] 0003694: XMLType support for Oracle --- .../symmetric/io/data/reader/ExtractDataReader.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ExtractDataReader.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ExtractDataReader.java index 48185ff6fc..ab95ceb8c1 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ExtractDataReader.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/reader/ExtractDataReader.java @@ -224,6 +224,18 @@ protected String buildSelect(Table table, Column lobColumn, Column[] pkColumns) StringBuilder sql = new StringBuilder("select "); DatabaseInfo dbInfo = platform.getDatabaseInfo(); String quote = platform.getDdlBuilder().isDelimitedIdentifierModeOn() ? dbInfo.getDelimiterToken() : ""; + + if ("XMLTYPE".equalsIgnoreCase(lobColumn.getJdbcTypeName()) && 2009 == lobColumn.getJdbcTypeCode()) { + sql.append("extract("); + sql.append(quote); + sql.append(lobColumn.getName()); + sql.append(quote); + sql.append(", '/').getClobVal()"); + } else { + sql.append(quote); + sql.append(lobColumn.getName()); + sql.append(quote); + } sql.append(quote); sql.append(lobColumn.getName()); sql.append(quote);