From 9212d184d9c7f59c92e45b648ed75a991a63cfea Mon Sep 17 00:00:00 2001 From: elong Date: Wed, 30 May 2018 12:43:01 -0400 Subject: [PATCH 1/9] 0003583: Windows service installer missing config files --- symmetric-server/src/main/deploy/bin/sym_service | 2 +- symmetric-server/src/main/deploy/bin/sym_service.bat | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/symmetric-server/src/main/deploy/bin/sym_service b/symmetric-server/src/main/deploy/bin/sym_service index 7405e38ef0..8935b69e4e 100644 --- a/symmetric-server/src/main/deploy/bin/sym_service +++ b/symmetric-server/src/main/deploy/bin/sym_service @@ -21,4 +21,4 @@ # . "`dirname "$0"`/setenv" -exec "$SYM_JAVA" -cp "$CLASSPATH" -Djava.io.tmpdir=tmp org.jumpmind.symmetric.wrapper.Wrapper "$1" "$SYM_HOME/conf/sym_service.conf" +exec "$SYM_JAVA" -cp "$CLASSPATH" -Djava.io.tmpdir="$SYM_HOME/tmp" org.jumpmind.symmetric.wrapper.Wrapper "$1" "$SYM_HOME/conf/sym_service.conf" diff --git a/symmetric-server/src/main/deploy/bin/sym_service.bat b/symmetric-server/src/main/deploy/bin/sym_service.bat index e3accb48cb..bd09202760 100644 --- a/symmetric-server/src/main/deploy/bin/sym_service.bat +++ b/symmetric-server/src/main/deploy/bin/sym_service.bat @@ -22,5 +22,5 @@ @echo off setlocal call "%~dp0\setenv.bat" -"%SYM_JAVA%" -cp "%CLASSPATH%" -Djava.io.tmpdir=tmp org.jumpmind.symmetric.wrapper.Wrapper %* +"%SYM_JAVA%" -cp "%CLASSPATH%" -Djava.io.tmpdir="%SYM_HOME%\tmp" org.jumpmind.symmetric.wrapper.Wrapper %1 "%SYM_HOME%\conf\sym_service.conf" endlocal From a0a5f6cece48379ffd5bedfc38d1de67ca499358 Mon Sep 17 00:00:00 2001 From: elong Date: Wed, 30 May 2018 13:49:28 -0400 Subject: [PATCH 2/9] 0003584: Update BSH variables documentation --- .../configuration/transforms/types.ad | 29 +++++++++++++++---- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/symmetric-assemble/src/asciidoc/configuration/transforms/types.ad b/symmetric-assemble/src/asciidoc/configuration/transforms/types.ad index 143afeba75..431e3fd5cb 100644 --- a/symmetric-assemble/src/asciidoc/configuration/transforms/types.ad +++ b/symmetric-assemble/src/asciidoc/configuration/transforms/types.ad @@ -239,13 +239,32 @@ Some variables are provided to the script: .Variables |=== -|_COLUMN_NAME_|The variable name is the source column name in uppercase of the row being changed (replace COLUMN_NAME with your column) +|__|The variable name is the source column name in uppercase of the row being changed (replace with your column) |currentValue|The value of the current source column |oldValue|The old value of the source column for an updated row -|sqlTemplate|a org.jumpmind.db.sql.ISqlTemplate object for querying or updating the database -|channelId|a reference to the channel on which the transformation is happening -|sourceNode|a org.jumpmind.symmetric.model.Node object that represents the node from where the data came -|targetNode|a org.jumpmind.symmetric.model.Node object that represents the node where the data is being loaded. +|sqlTemplate| org.jumpmind.db.sql.ISqlTemplate object for querying or updating the database +|channelId| name of the channel on which the transformation is happening +|sourceNode| org.jumpmind.symmetric.model.Node object that represents the node from where the data came +|sourceNodeId|same as sourceNode.getNodeId() +|sourceNodeGroupId|same as sourceNode.getNodeGroupId() +|sourceNodeExternalId|same as sourceNode.getNodeExternalId() +|targetNode| org.jumpmind.symmetric.model.Node object that represents the node where the data is being loaded. +|targetNodeId|same as targetNode.getNodeId() +|targetNodeGroupId|same as targetNode.getNodeGroupId() +|targetNodeExternalId|same as targetNode.getNodeExternalId() +|transformColumn| org.jumpmind.symmetric.io.data.transform.TransformColumn that is the transform configuration +|includeOn| org.jumpmind.symmetric.io.data.transform.TransformColumn.IncludeOnType, same as transformColumn.getIncludeOn(), tells whether column transform is configured for all, insert, update, or delete +|sourceSchemaName | source schema name that the transform matched +|sourceCatalogName | source catalog name that the transform matched +|sourceTableName | source table name that the transform matched +|transformedData | org.jumpmind.symmetric.io.data.transform.TransformedData, the model object representing the outputted transformed data +|sourceDmlType| org.jumpmind.symmetric.io.data.DataEventType that is the source row change type, either insert, update, or delete +|sourceDmlTypeString| same as sourceDmlType.toString(), returning insert, update, or delete +|log | org.slf4j.Logger, write to the log file +|context | org.jumpmind.symmetric.io.data.DataContext containing internal variables and also acts like a Map for sharing variables between transforms for the current sync session +|bshContext | java.util.Map, static map of variables to share between transforms +|engine | org.jumpmind.symmetric.ISymmetricEngine, access to engine functions and services + |=== From f4f812910fee406c67e1037c330058149e6b272c Mon Sep 17 00:00:00 2001 From: elong Date: Wed, 30 May 2018 14:10:26 -0400 Subject: [PATCH 3/9] 0003585: Default BSH column transform to return old value for deletes --- .../symmetric/io/data/transform/BshColumnTransform.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/data/transform/BshColumnTransform.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/data/transform/BshColumnTransform.java index 2f48872289..e2730fe808 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/data/transform/BshColumnTransform.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/data/transform/BshColumnTransform.java @@ -40,6 +40,7 @@ import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.io.data.DataContext; +import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.model.Data; import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.util.Context; @@ -151,7 +152,11 @@ public NewAndOldValue transform(IDatabasePlatform platform, } if (result instanceof String) { - return new NewAndOldValue((String) result, null); + if (data.getSourceDmlType().equals(DataEventType.DELETE)) { + return new NewAndOldValue(null, (String) result); + } else { + return new NewAndOldValue((String) result, null); + } } else if (result instanceof NewAndOldValue) { return (NewAndOldValue) result; } else if (result != null) { From 6ef72b61298e712a6452e30312cdba846e7476cc Mon Sep 17 00:00:00 2001 From: elong Date: Thu, 31 May 2018 14:08:07 -0400 Subject: [PATCH 4/9] 0003586: Noorder sequence for performance on Oracle RAC --- .../db/oracle/OracleSymmetricDialect.java | 36 +++++++++++++++++++ .../symmetric/common/ParameterConstants.java | 1 + .../symmetric/route/DataGapRouteReader.java | 4 ++- .../service/impl/AcknowledgeService.java | 10 ++++-- .../impl/AcknowledgeServiceSqlMap.java | 8 +++-- .../symmetric/service/impl/DataService.java | 16 +++++++-- .../service/impl/RouterServiceSqlMap.java | 35 +++++++----------- .../resources/symmetric-default.properties | 10 ++++++ 8 files changed, 89 insertions(+), 31 deletions(-) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleSymmetricDialect.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleSymmetricDialect.java index 6c9f409e84..3c80be6c60 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleSymmetricDialect.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleSymmetricDialect.java @@ -22,11 +22,15 @@ import static org.apache.commons.lang.StringUtils.isBlank; +import java.io.IOException; import java.text.ParseException; import java.util.Date; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; +import org.jumpmind.db.model.Database; +import org.jumpmind.db.model.IndexColumn; +import org.jumpmind.db.model.NonUniqueIndex; import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.IDatabasePlatform; import org.jumpmind.db.platform.PermissionType; @@ -34,6 +38,7 @@ import org.jumpmind.db.sql.SqlException; import org.jumpmind.db.util.BinaryEncoding; import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.common.TableConstants; import org.jumpmind.symmetric.db.AbstractSymmetricDialect; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.db.SequenceIdentifier; @@ -207,6 +212,21 @@ public void createRequiredDatabaseObjects() { + " END $(functionName); "; install(sql, wkt2geom); } + + boolean isNoOrder = parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false); + String seqName = getSequenceName(SequenceIdentifier.DATA).toUpperCase(); + String orderFlag = platform.getSqlTemplate().queryForString( + "select order_flag from user_sequences where sequence_name = ?", seqName); + String sql = null; + if (orderFlag != null && orderFlag.equals("N") && !isNoOrder) { + sql = "alter sequence " + seqName + " order"; + } else if (orderFlag != null && orderFlag.equals("Y") && isNoOrder) { + sql = "alter sequence " + seqName + " noorder"; + } + if (sql != null) { + log.info("DDL applied: " + sql); + platform.getSqlTemplate().update(sql); + } } @Override @@ -370,4 +390,20 @@ public PermissionType[] getSymTablePermissions() { PermissionType[] permissions = { PermissionType.CREATE_TABLE, PermissionType.DROP_TABLE, PermissionType.CREATE_TRIGGER, PermissionType.DROP_TRIGGER, PermissionType.EXECUTE}; return permissions; } + + @Override + protected Database readDatabaseFromXml(String resourceName) throws IOException { + Database database = super.readDatabaseFromXml(resourceName); + if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false)) { + Table table = database.findTable(TableConstants.SYM_DATA); + if (table != null) { + NonUniqueIndex index = new NonUniqueIndex("idx_crt_tm_dt_d"); + index.addColumn(new IndexColumn(table.findColumn("create_time"))); + index.addColumn(new IndexColumn(table.findColumn("data_id"))); + table.addIndex(index); + } + } + return database; + } + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index a0557d713b..1a03333e08 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -224,6 +224,7 @@ private ParameterConstants() { public final static String DBDIALECT_ORACLE_USE_TRANSACTION_VIEW = "oracle.use.transaction.view"; public final static String DBDIALECT_ORACLE_TEMPLATE_NUMBER_SPEC = "oracle.template.precision"; public final static String DBDIALECT_ORACLE_USE_HINTS = "oracle.use.hints"; + public final static String DBDIALECT_ORACLE_SEQUENCE_NOORDER = "oracle.sequence.noorder"; public final static String DBDIALECT_TIBERO_USE_TRANSACTION_VIEW = "tibero.use.transaction.view"; public final static String DBDIALECT_TIBERO_TEMPLATE_NUMBER_SPEC = "tibero.template.precision"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java index 2b1d2e4a4d..90c664fa1a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java @@ -301,7 +301,9 @@ protected ISqlReadCursor prepareCursor() { } } - if (parameterService.is(ParameterConstants.ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED, true)) { + if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false)) { + sql = String.format("%s %s", sql, engine.getRouterService().getSql("orderByCreateTime")); + } else if (parameterService.is(ParameterConstants.ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED, true)) { sql = String.format("%s %s", sql, engine.getRouterService().getSql("orderByDataId")); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java index 26ad226626..40f257b310 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java @@ -111,8 +111,14 @@ public BatchAckResult ack(final BatchAck batch) { boolean isNewError = false; if (!batch.isOk() && batch.getErrorLine() != 0) { - List ids = sqlTemplateDirty.query(getSql("selectDataIdSql"), - new NumberMapper(), outgoingBatch.getBatchId()); + String sql = getSql("selectDataIdSql"); + if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false)) { + sql += getSql("orderByCreateTime"); + } else if (parameterService.is(ParameterConstants.ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED, true)) { + sql += getSql("orderByDataId"); + } + + List ids = sqlTemplateDirty.query(sql, new NumberMapper(), outgoingBatch.getBatchId()); if (ids.size() >= batch.getErrorLine()) { long failedDataId = ids.get((int) batch.getErrorLine() - 1).longValue(); if (outgoingBatch.getFailedDataId() == 0 || outgoingBatch.getFailedDataId() != failedDataId) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeServiceSqlMap.java index 228fe4db4a..a2f8c80f44 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeServiceSqlMap.java @@ -29,8 +29,12 @@ public class AcknowledgeServiceSqlMap extends AbstractSqlMap { public AcknowledgeServiceSqlMap(IDatabasePlatform platform, Map replacementTokens) { super(platform, replacementTokens); - putSql("selectDataIdSql", - "select data_id from $(data_event) b where batch_id = ? order by data_id "); + putSql("selectDataIdSql", "select data_id from $(data_event) b where batch_id = ?"); + + putSql("orderByDataId", " order by data_id asc"); + + putSql("orderByCreateTime", " order by create_time asc, data_id asc "); + } } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index f9a78a50cd..4170e40f0b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -2232,7 +2232,7 @@ public void heartbeat(boolean force) { } public List listDataIds(long batchId, String nodeId) { - return sqlTemplateDirty.query(getSql("selectEventDataIdsSql", " order by d.data_id asc"), + return sqlTemplateDirty.query(getSql("selectEventDataIdsSql", getDataOrderBy()), new NumberMapper(), batchId, nodeId); } @@ -2266,17 +2266,27 @@ public ISqlReadCursor selectDataFor(Long batchId, String channelId) { protected String getDataSelectByBatchSql(long batchId, long startDataId, String channelId) { String startAtDataIdSql = startDataId >= 0l ? " and d.data_id >= ? " : ""; return symmetricDialect.massageDataExtractionSql( - getSql("selectEventDataByBatchIdSql", startAtDataIdSql, " order by d.data_id asc"), + getSql("selectEventDataByBatchIdSql", startAtDataIdSql, getDataOrderBy()), engine.getConfigurationService().getNodeChannel(channelId, false).getChannel()); } protected String getDataSelectSql(long batchId, long startDataId, String channelId) { String startAtDataIdSql = startDataId >= 0l ? " and d.data_id >= ? " : ""; return symmetricDialect.massageDataExtractionSql( - getSql("selectEventDataToExtractSql", startAtDataIdSql, " order by d.data_id asc"), + getSql("selectEventDataToExtractSql", startAtDataIdSql, getDataOrderBy()), engine.getConfigurationService().getNodeChannel(channelId, false).getChannel()); } + protected String getDataOrderBy() { + String orderBy = ""; + if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false)) { + orderBy = " order by d.create_time asc, d.data_id asc"; + } else if (parameterService.is(ParameterConstants.ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED, true)) { + orderBy = " order by d.data_id asc"; + } + return orderBy; + } + public long findMaxDataId() { return sqlTemplateDirty.queryForLong(getSql("selectMaxDataIdSql")); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterServiceSqlMap.java index ca5e818cee..964246c7cb 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterServiceSqlMap.java @@ -34,36 +34,25 @@ public RouterServiceSqlMap(IDatabasePlatform platform, Map repla putSql("selectChannelsUsingStartDataId", "select distinct channel_id from $(data) where data_id >= ?"); putSql("selectDataUsingGapsSql", - "" - + "select $(selectDataUsingGapsSqlHint) d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, " - + " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list " - + " from $(data) d where d.channel_id=? $(dataRange) "); + "select $(selectDataUsingGapsSqlHint) d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, " + + " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list " + + " from $(data) d where d.channel_id=? $(dataRange) "); putSql("selectDataUsingStartDataId", - "" - + "select d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, " - + " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list " - + " from $(data) d where d.channel_id=? and data_id >= ? "); + "select d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, " + + " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list " + + " from $(data) d where d.channel_id=? and data_id >= ? "); - putSql("orderByDataId", - " order by d.data_id asc "); - - putSql("selectDistinctDataIdFromDataEventSql", - "" - + "select distinct(data_id) from $(data_event) where data_id > ? order by data_id asc "); + putSql("orderByDataId", " order by d.data_id asc "); + + putSql("orderByCreateTime", " order by d.create_time asc, d.data_id asc "); putSql("selectDistinctDataIdFromDataEventUsingGapsSql", - "" - + "select distinct(data_id) from $(data_event) where data_id >=? and data_id <= ? order by data_id asc "); - - putSql("selectDataIdFromDataSql", - "select data_id from $(data) where $(dataRange) order by data_id asc"); + "select distinct(data_id) from $(data_event) where data_id >=? and data_id <= ? order by data_id asc "); - putSql("selectUnroutedCountForChannelSql", "" - + "select count(*) from $(data) where channel_id=? and data_id >=? "); + putSql("selectUnroutedCountForChannelSql", "select count(*) from $(data) where channel_id=? and data_id >=? "); - putSql("selectLastDataIdRoutedUsingDataGapSql", "" - + "select max(start_id) from $(data_gap) "); + putSql("selectLastDataIdRoutedUsingDataGapSql", "select max(start_id) from $(data_gap) "); } diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index 4269ae1318..7f8b7b1a57 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -1505,6 +1505,16 @@ oracle.use.transaction.view=false # DatabaseOverridable: true oracle.transaction.view.clock.sync.threshold.ms=60000 +# On Oracle RAC, an ordered sequence for sym_data must be coordinated across RAC nodes, which has +# wait overhead. By setting this to true, a no-order sequence is used instead, which +# performs better for high throughput. Because the sequence is no longer ordered, +# sym_data is queried using an order by of create_time and data_id. You will need to restart +# after changing this parameter to get DDL applied to the sequence and sym_data. +# Tags: other +# Type: boolean +# DatabaseOverridable: true +oracle.sequence.noorder=false + # Use to map the version string a zseries jdbc driver returns to the 'zseries' dialect # Tags: other db2.zseries.version=DSN08015 From b017650fc95b8b8204dab6586226230604d4add2 Mon Sep 17 00:00:00 2001 From: mmichalek Date: Thu, 31 May 2018 15:34:43 -0400 Subject: [PATCH 5/9] 0003588: Configuring a custom job with a cron schedule may result in exceptions in the log and the custom job failing to start --- .../main/java/org/jumpmind/symmetric/job/AbstractJob.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java index eecdecbf97..a753e1d605 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.SymmetricException; import org.jumpmind.symmetric.common.Constants; @@ -398,12 +399,12 @@ public long getAverageExecutionTimeInMs() { } public boolean isCronSchedule() { - String cronSchedule = parameterService.getString(jobDefinition.getCronParameter()); - return !StringUtils.isEmpty(cronSchedule); + return !isPeriodicSchedule(); } public boolean isPeriodicSchedule() { - return !isCronSchedule(); + String schedule = getSchedule(); + return NumberUtils.isDigits(schedule); } public String getSchedule() { From c9da38611db8b4839a000650ac454ec54215aab8 Mon Sep 17 00:00:00 2001 From: elong Date: Thu, 31 May 2018 15:48:19 -0400 Subject: [PATCH 6/9] 0003589: Initial load extract job should check file.sync.enabled parameter --- .../org/jumpmind/symmetric/job/InitialLoadExtractorJob.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/InitialLoadExtractorJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/InitialLoadExtractorJob.java index 8997ef34d3..3271fd0821 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/InitialLoadExtractorJob.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/InitialLoadExtractorJob.java @@ -23,6 +23,7 @@ import static org.jumpmind.symmetric.job.JobDefaults.EVERY_10_SECONDS; import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.service.ClusterConstants; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -41,7 +42,9 @@ public JobDefaults getDefaults() { @Override public void doJob(boolean force) throws Exception { - engine.getFileSyncExtractorService().queueWork(force); + if (engine.getParameterService().is(ParameterConstants.FILE_SYNC_ENABLE)) { + engine.getFileSyncExtractorService().queueWork(force); + } engine.getDataExtractorService().queueWork(force); } } From ae443c0f589ab8bb6dcf83a015ac6c9fcc168d24 Mon Sep 17 00:00:00 2001 From: Chris Henson Date: Fri, 1 Jun 2018 14:55:24 -0400 Subject: [PATCH 7/9] 0003590: If routing is backed up and a restart happens gaps with data can be expired causing missing data --- .../org/jumpmind/symmetric/route/DataGapFastDetector.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java index e8962b8eca..85ed905681 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java @@ -124,6 +124,7 @@ public void beforeRouting() { queryDataIdMap(); processInfo.setStatus(ProcessStatus.OK); log.info("Querying data in gaps from database took {} ms", System.currentTimeMillis() - ts); + isAllDataRead = false; afterRouting(); reset(); log.info("Full gap analysis is done after {} ms", System.currentTimeMillis() - ts); @@ -543,6 +544,9 @@ public void addDataIds(List dataIds) { this.dataIds.addAll(dataIds); } + /** + * This method is called for each channel that is routed. Once it is set for a routing pass it should remain set until the routing pass is done. + */ public void setIsAllDataRead(boolean isAllDataRead) { this.isAllDataRead &= isAllDataRead; } From 926c1a6b19f561c16ee823282aa405caf8ef6274 Mon Sep 17 00:00:00 2001 From: mmichalek Date: Fri, 1 Jun 2018 15:34:55 -0400 Subject: [PATCH 8/9] 0003559: Log expiration of data gaps at info level --- .../symmetric/route/DataGapFastDetector.java | 56 +++++++++++++++---- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java index e8962b8eca..6025fa06d8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapFastDetector.java @@ -192,7 +192,9 @@ public void afterRouting() { } else if (lastBusyExpireRunTime != 0) { setLastBusyExpireRunTime(0); } - + + List skippedDataGaps = new ArrayList<>(); + try { long ts = System.currentTimeMillis(); long lastDataId = -1; @@ -237,14 +239,7 @@ public void afterRouting() { expireChecked++; } if (isAllDataRead || isGapEmpty) { - if (dataGap.getStartId() == dataGap.getEndId()) { - log.info("Found a gap in data_id at {}. Skipping it because " + - (supportsTransactionViews ? "there are no pending transactions" : "the gap expired"), dataGap.getStartId()); - } else { - log.info("Found a gap in data_id from {} to {}. Skipping it because " + - (supportsTransactionViews ? "there are no pending transactions" : "the gap expired"), - dataGap.getStartId(), dataGap.getEndId()); - } + skippedDataGaps.add(dataGap); gapsDeleted.add(dataGap); gapsAll.remove(dataGap); } @@ -299,6 +294,8 @@ public void afterRouting() { } catch (RuntimeException ex) { processInfo.setStatus(ProcessStatus.ERROR); throw ex; + } finally { + logSkippedDataGaps(skippedDataGaps); } } @@ -529,7 +526,46 @@ protected void fixOverlappingGaps(List gapsToCheck, ProcessInfo process throw ex; } } - + + protected void logSkippedDataGaps(List skippedDataGaps) { + if (skippedDataGaps.isEmpty()) { + return; + } + + if (log.isDebugEnabled()) { + for (DataGap dataGap : skippedDataGaps) { + if (dataGap.getStartId() == dataGap.getEndId()) { + log.debug("Expired data gap at data_id {} create_time {}. Skipping it because " + + (supportsTransactionViews ? "there are no pending transactions" : "the gap expired"), dataGap.getStartId(), dataGap.getCreateTime()); + } else { + log.debug("Expired data gap between data_id {} and {} create_time {}. Skipping it because " + + (supportsTransactionViews ? "there are no pending transactions" : "the gap expired"), + dataGap.getStartId(), dataGap.getEndId(), dataGap.getCreateTime()); + } + } + return; + } + + Date minDate = skippedDataGaps.get(0).getCreateTime(); + Date maxDate = skippedDataGaps.get(0).getCreateTime(); + long minDataId = skippedDataGaps.get(0).getStartId(); + long maxDataId = skippedDataGaps.get(0).getEndId(); + + for (DataGap dataGap : skippedDataGaps) { + if (dataGap.getCreateTime().before(minDate)) { + minDate = dataGap.getCreateTime(); + } + if (dataGap.getCreateTime().after(maxDate)) { + maxDate = dataGap.getCreateTime(); + } + minDataId = Math.min(minDataId, dataGap.getStartId()); + maxDataId = Math.min(maxDataId, dataGap.getEndId()); + } + + log.info("Expired {} data gap(s) between data_id {} and {} and between create_time {} and {}", + skippedDataGaps.size(), minDataId, maxDataId, minDate, maxDate); + + } public Long mapRow(Row row) { return row.getLong("data_id"); From 5bf209e0da6d80213f29114a2ee3c8eb1d983ac0 Mon Sep 17 00:00:00 2001 From: elong Date: Tue, 5 Jun 2018 14:14:53 -0400 Subject: [PATCH 9/9] 0003586: Noorder sequence for performance on Oracle RAC --- .../symmetric/route/DataGapRouteReader.java | 37 +++++++++++++++---- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java index 90c664fa1a..ac1b1e4ea4 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java @@ -88,7 +88,11 @@ public class DataGapRouteReader implements IDataToRouteReader { protected long peekAheadSizeInBytes = 0; protected boolean finishTransactionMode = false; + + protected boolean isEachGapQueried; + protected boolean isOracleNoOrder; + protected String lastTransactionId = null; protected static Map lastSelectUsedGreaterThanQueryByEngineName = new HashMap(); @@ -102,6 +106,7 @@ public DataGapRouteReader(ChannelRouterContext context, ISymmetricEngine engine) this.percentOfHeapToUse = (double)parameterService.getInt(ParameterConstants.ROUTING_PEEK_AHEAD_MEMORY_THRESHOLD)/(double)100; this.takeTimeout = engine.getParameterService().getInt( ParameterConstants.ROUTING_WAIT_FOR_DATA_TIMEOUT_SECONDS, 330); + this.isOracleNoOrder = parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false); if (parameterService.is(ParameterConstants.SYNCHRONIZE_ALL_JOBS)) { /* there will not be a separate thread to read a blocked queue so make sure the queue is big enough that it can be filled */ this.dataQueue = new LinkedBlockingQueue(); @@ -231,15 +236,28 @@ protected boolean process(Data data) { if (!finishTransactionMode || (lastTransactionId != null && finishTransactionMode && lastTransactionId .equals(data.getTransactionId()))) { - while (!okToProcess && currentGap != null && dataId >= currentGap.getStartId()) { - if (dataId <= currentGap.getEndId()) { + if (isOracleNoOrder) { + if (isEachGapQueried) { okToProcess = true; } else { - // past current gap. move to next gap - if (dataGaps.size() > 0) { - currentGap = dataGaps.remove(0); + for (DataGap gap : dataGaps) { + if (dataId >= gap.getStartId() && dataId <= gap.getEndId()) { + okToProcess = true; + break; + } + } + } + } else { + while (!okToProcess && currentGap != null && dataId >= currentGap.getStartId()) { + if (dataId <= currentGap.getEndId()) { + okToProcess = true; } else { - currentGap = null; + // past current gap. move to next gap + if (dataGaps.size() > 0) { + currentGap = dataGaps.remove(0); + } else { + currentGap = null; + } } } } @@ -276,6 +294,7 @@ protected ISqlReadCursor prepareCursor() { useGreaterThanDataId = true; } + isEachGapQueried = !useGreaterThanDataId && this.dataGaps.size() <= numberOfGapsToQualify; String channelId = context.getChannel().getChannelId(); String sql = null; @@ -301,7 +320,7 @@ protected ISqlReadCursor prepareCursor() { } } - if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false)) { + if (isOracleNoOrder) { sql = String.format("%s %s", sql, engine.getRouterService().getSql("orderByCreateTime")); } else if (parameterService.is(ParameterConstants.ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED, true)) { sql = String.format("%s %s", sql, engine.getRouterService().getSql("orderByDataId")); @@ -340,7 +359,9 @@ protected ISqlReadCursor prepareCursor() { } } - this.currentGap = dataGaps.remove(0); + if (!isOracleNoOrder) { + this.currentGap = dataGaps.remove(0); + } ISqlRowMapper dataMapper = new ISqlRowMapper() { public Data mapRow(Row row) {