From 420af570076016f31dbe3aad523bc8030530c7c5 Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Thu, 8 Jun 2017 13:52:20 -0400 Subject: [PATCH 01/12] 0003147: Create table reload requests by channel --- .../symmetric/service/impl/DataService.java | 14 +++++++++++++- .../symmetric/service/impl/DataServiceSqlMap.java | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 5975b8dc68..9c0f2159cf 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -241,7 +241,7 @@ public void insertTableReloadRequest(TableReloadRequest request) { request.getLastUpdateTime(), request.getSourceNodeId(), request.getTargetNodeId(), request.getTriggerId(), request.getRouterId(), request.isCreateTable() ? 1 : 0, - request.isDeleteFirst() ? 1 : 0 }); + request.isDeleteFirst() ? 1 : 0, request.getChannelId() }); } public TableReloadRequest getTableReloadRequest(final TableReloadRequestKey key) { @@ -405,6 +405,17 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List channelTriggerHistories = new ArrayList(); + + for (TriggerHistory history : triggerHistories) { + if (channelId.equals(engine.getTriggerRouterService().getTriggerById(history.getTriggerId()).getChannelId())) { + channelTriggerHistories.add(history); + } + } + triggerHistories = channelTriggerHistories; + } } else { for (TableReloadRequest reloadRequest : reloadRequests) { @@ -412,6 +423,7 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List> triggerRoutersByHistoryId = triggerRouterService diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index 9d06922ae8..bd01379a52 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -31,7 +31,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace putSql("selectTableReloadRequest", "select reload_select, before_custom_sql, reload_time, create_time, last_update_by, last_update_time from $(table_reload_request) where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?"); - putSql("insertTableReloadRequest", "insert into $(table_reload_request) (reload_select, before_custom_sql, create_time, last_update_by, last_update_time, source_node_id, target_node_id, trigger_id, router_id, create_table, delete_first) values (?,?,?,?,?,?,?,?,?,?,?)"); + putSql("insertTableReloadRequest", "insert into $(table_reload_request) (reload_select, before_custom_sql, create_time, last_update_by, last_update_time, source_node_id, target_node_id, trigger_id, router_id, create_table, delete_first, channel_id) values (?,?,?,?,?,?,?,?,?,?,?,?)"); putSql("updateTableReloadRequest", "update $(table_reload_request) set reload_select=?, before_custom_sql=?, reload_time=?, create_time=?, last_update_by=?, last_update_time=? where source_node_id=? and target_node_id=? and trigger_id=? and router_id=?"); From a42e53bd16173e3af0679044a5c3bf5db2d1e703 Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Fri, 9 Jun 2017 08:57:48 -0400 Subject: [PATCH 02/12] 0003148: Stream rows does not honor sync key names if provided --- .../jumpmind/symmetric/service/impl/DataExtractorService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 943ead1f06..3c823de352 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -1723,7 +1723,7 @@ public CsvData next() { if (initialLoadSelect == null && triggerRouter.getTrigger().isStreamRow()) { if (sourceTable == null) { sourceTable = columnsAccordingToTriggerHistory.lookup(triggerRouter - .getRouter().getRouterId(), triggerHistory, false, true); + .getRouter().getRouterId(), triggerHistory, false, false); } Column[] columns = sourceTable.getPrimaryKeyColumns(); DmlStatement dmlStmt = platform.createDmlStatement(DmlType.WHERE, sourceTable, null); From d83c49631eb5a1311f8343411de333e4153c2893 Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Fri, 9 Jun 2017 15:26:00 -0400 Subject: [PATCH 03/12] 0003148: Stream rows does not honor sync key names if provided --- .../symmetric/service/impl/DataExtractorService.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 3c823de352..fe561cb8cd 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -1721,10 +1721,10 @@ public CsvData next() { String initialLoadSelect = data.getRowData(); if (initialLoadSelect == null && triggerRouter.getTrigger().isStreamRow()) { - if (sourceTable == null) { + //if (sourceTable == null) { sourceTable = columnsAccordingToTriggerHistory.lookup(triggerRouter - .getRouter().getRouterId(), triggerHistory, false, false); - } + .getRouter().getRouterId(), triggerHistory, false, true); + // } Column[] columns = sourceTable.getPrimaryKeyColumns(); DmlStatement dmlStmt = platform.createDmlStatement(DmlType.WHERE, sourceTable, null); String[] pkData = data.getParsedData(CsvData.PK_DATA); @@ -1734,6 +1734,10 @@ public CsvData next() { row.put(columns[i].getName(), pkData[i]); } initialLoadSelect = dmlStmt.buildDynamicSql(batch.getBinaryEncoding(), row, false, true, columns); + if (initialLoadSelect.endsWith(platform.getDatabaseInfo().getSqlCommandDelimiter())) { + initialLoadSelect = initialLoadSelect.substring(0, + initialLoadSelect.length() - platform.getDatabaseInfo().getSqlCommandDelimiter().length()); + } } SelectFromTableEvent event = new SelectFromTableEvent(targetNode, From 561f299755f28ec96d2024f2741f5e177bc60038 Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Mon, 12 Jun 2017 08:33:47 -0400 Subject: [PATCH 04/12] 0003147: Create table reload requests by channel --- .../symmetric/service/impl/DataService.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 9c0f2159cf..c0a0ceb24b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -405,16 +405,18 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List channelTriggerHistories = new ArrayList(); - - for (TriggerHistory history : triggerHistories) { - if (channelId.equals(engine.getTriggerRouterService().getTriggerById(history.getTriggerId()).getChannelId())) { - channelTriggerHistories.add(history); + if (reloadRequests != null && reloadRequests.size() == 1) { + String channelId = reloadRequests.get(0).getChannelId(); + if (channelId != null) { + List channelTriggerHistories = new ArrayList(); + + for (TriggerHistory history : triggerHistories) { + if (channelId.equals(engine.getTriggerRouterService().getTriggerById(history.getTriggerId()).getChannelId())) { + channelTriggerHistories.add(history); + } } + triggerHistories = channelTriggerHistories; } - triggerHistories = channelTriggerHistories; } } else { From 4ddce0be1703a6995734fb7b8c115cc34dcd2b72 Mon Sep 17 00:00:00 2001 From: maxwellpettit Date: Mon, 12 Jun 2017 11:13:06 -0400 Subject: [PATCH 05/12] 0003151: Add the ability in the Symmetric Pro installer to configure the com.sun.management.jmxremote.rmi.port property. --- symmetric-server/src/main/deploy/conf/sym_service.conf | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/symmetric-server/src/main/deploy/conf/sym_service.conf b/symmetric-server/src/main/deploy/conf/sym_service.conf index 575665f1e5..f15ef84228 100644 --- a/symmetric-server/src/main/deploy/conf/sym_service.conf +++ b/symmetric-server/src/main/deploy/conf/sym_service.conf @@ -29,8 +29,9 @@ wrapper.java.additional.12=-Djava.net.preferIPv4Stack=true wrapper.java.additional.13=-Dcom.sun.management.jmxremote wrapper.java.additional.14=-Dcom.sun.management.jmxremote.authenticate=false wrapper.java.additional.15=-Dcom.sun.management.jmxremote.port=31418 -wrapper.java.additional.16=-Dcom.sun.management.jmxremote.ssl=false -wrapper.java.additional.17=-Djava.rmi.server.hostname=localhost +wrapper.java.additional.16=-Dcom.sun.management.jmxremote.rmi.port=31418 +wrapper.java.additional.17=-Dcom.sun.management.jmxremote.ssl=false +wrapper.java.additional.18=-Djava.rmi.server.hostname=localhost # Initial Java Heap Size (in MB) wrapper.java.initmemory=256 From 53345c643c5d0404c235a441ea7b7fda7ee7fb91 Mon Sep 17 00:00:00 2001 From: klementinastojanovska Date: Mon, 12 Jun 2017 14:23:53 -0400 Subject: [PATCH 06/12] 0003154: On node screen. Reopen button set created_at_node_id to be the selected node rather than registration server/managing node --- .../jumpmind/symmetric/service/impl/RegistrationService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java index fd73926358..670b8a852f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java @@ -576,7 +576,7 @@ protected synchronized void reOpenRegistration(String nodeId, String remoteHost, // node table, but not in node security. // lets go ahead and try to insert into node security. sqlTemplate.update(getSql("openRegistrationNodeSecuritySql"), new Object[] { - nodeId, password, nodeService.findNode(nodeId).getNodeId() }); + nodeId, password, nodeService.findIdentityNodeId() }); log.info("Registration was opened for {}", nodeId); } else if (updateCount == 0) { log.warn("Registration was already enabled for {}. No need to reenable it", nodeId); From ed3a1c25a61cea5bde1d26450a415bf9a4b19dd7 Mon Sep 17 00:00:00 2001 From: Chris Henson Date: Mon, 12 Jun 2017 17:14:52 -0400 Subject: [PATCH 07/12] 0003156: Validate the number of table columns match the number of data elements before attempting to load data --- .../data/writer/AbstractDatabaseWriter.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java index a290f54fc2..95636449f8 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java @@ -29,6 +29,7 @@ import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Table; import org.jumpmind.db.sql.SqlException; +import org.jumpmind.exception.ParseException; import org.jumpmind.symmetric.io.IoConstants; import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.CsvData; @@ -147,9 +148,27 @@ public void write(CsvData data) { if (targetTable != null || !data.requiresTable() || (targetTable == null && data.getDataEventType() == DataEventType.SQL)) { try { + statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); if (filterBefore(data)) { + + switch (data.getDataEventType()) { + case UPDATE: + case INSERT: + if (targetTable.getColumnCount() != data.getParsedData(CsvData.ROW_DATA).length) { + throw new ParseException(String.format("The (%s) table's column count (%d) does not match the data's column count (%d)", targetTable.getName(), targetTable.getColumnCount(), data.getParsedData(CsvData.ROW_DATA).length)); + } + break; + case DELETE: + if (targetTable.getPrimaryKeyColumnCount() != data.getParsedData(CsvData.PK_DATA).length) { + throw new ParseException(String.format("The (%s) table's pk column count (%d) does not match the data's pk column count (%d)", targetTable.getName(), targetTable.getPrimaryKeyColumnCount(), data.getParsedData(CsvData.PK_DATA).length)); + } + break; + default: + break; + } + LoadStatus loadStatus = LoadStatus.SUCCESS; switch (data.getDataEventType()) { case UPDATE: From b704508c22b8cbd6c64e89d33299785ea1c62539 Mon Sep 17 00:00:00 2001 From: maxwellpettit Date: Tue, 13 Jun 2017 13:09:56 -0400 Subject: [PATCH 08/12] 0003155: Refresh the group link cache when node group links are modified. --- .../route/ConfigurationChangedDataRouter.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java index f8e81acd9f..57a111cfd0 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ConfigurationChangedDataRouter.java @@ -95,6 +95,9 @@ public class ConfigurationChangedDataRouter extends AbstractDataRouter implement final String CTX_KEY_FLUSHED_TRIGGER_ROUTERS = "FlushedTriggerRouters." + ConfigurationChangedDataRouter.class.getSimpleName() + hashCode(); + + final String CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED = "FlushNodeGroupLink." + + ConfigurationChangedDataRouter.class.getSimpleName() + hashCode(); public final static String KEY = "symconfig"; @@ -239,6 +242,10 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData if (tableMatches(dataMetaData, TableConstants.SYM_NOTIFICATION)) { routingContext.put(CTX_KEY_FLUSH_NOTIFICATIONS_NEEDED, Boolean.TRUE); } + + if (tableMatches(dataMetaData, TableConstants.SYM_NODE_GROUP_LINK)) { + routingContext.put(CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED, Boolean.TRUE); + } } } @@ -633,6 +640,12 @@ public void contextCommitted(SimpleRouterContext routingContext) { log.info("About to refresh the cache of node security because new configuration came through the data router"); engine.getNodeService().flushNodeAuthorizedCache(); } + + if (routingContext.get(CTX_KEY_FLUSH_NODE_GROUP_LINK_NEEDED) != null) { + log.info("About to refresh the cache of node group link because new configuration came through the data router"); + engine.getConfigurationService().clearCache(); + engine.getNodeService().flushNodeGroupCache(); + } } } From b955d3516b93235f7049cba84973d70b8f8fd8b1 Mon Sep 17 00:00:00 2001 From: Chris Henson Date: Tue, 13 Jun 2017 13:21:15 -0400 Subject: [PATCH 09/12] 0003156: Validate the number of table columns match the number of data elements before attempting to load data --- .../symmetric/io/data/writer/AbstractDatabaseWriter.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java index 95636449f8..a3ff346b47 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/AbstractDatabaseWriter.java @@ -156,13 +156,13 @@ public void write(CsvData data) { switch (data.getDataEventType()) { case UPDATE: case INSERT: - if (targetTable.getColumnCount() != data.getParsedData(CsvData.ROW_DATA).length) { - throw new ParseException(String.format("The (%s) table's column count (%d) does not match the data's column count (%d)", targetTable.getName(), targetTable.getColumnCount(), data.getParsedData(CsvData.ROW_DATA).length)); + if (sourceTable.getColumnCount() != data.getParsedData(CsvData.ROW_DATA).length) { + throw new ParseException(String.format("The (%s) table's column count (%d) does not match the data's column count (%d)", sourceTable.getName(), sourceTable.getColumnCount(), data.getParsedData(CsvData.ROW_DATA).length)); } break; case DELETE: - if (targetTable.getPrimaryKeyColumnCount() != data.getParsedData(CsvData.PK_DATA).length) { - throw new ParseException(String.format("The (%s) table's pk column count (%d) does not match the data's pk column count (%d)", targetTable.getName(), targetTable.getPrimaryKeyColumnCount(), data.getParsedData(CsvData.PK_DATA).length)); + if (sourceTable.getPrimaryKeyColumnCount() != data.getParsedData(CsvData.PK_DATA).length) { + throw new ParseException(String.format("The (%s) table's pk column count (%d) does not match the data's pk column count (%d)", sourceTable.getName(), sourceTable.getPrimaryKeyColumnCount(), data.getParsedData(CsvData.PK_DATA).length)); } break; default: From c7d814e8045816ccd21c1f6439346d115a983c2b Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Tue, 13 Jun 2017 14:49:57 -0400 Subject: [PATCH 10/12] 0003157: Allow bulk loaders to fall back to default loader when an error occurs --- .../symmetric/io/PostgresBulkDatabaseWriter.java | 15 +++++++++++++++ .../symmetric/service/impl/DataLoaderService.java | 5 ++++- .../service/impl/IncomingBatchService.java | 1 + 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java index 7f7efe8270..9320e8314c 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/PostgresBulkDatabaseWriter.java @@ -41,6 +41,7 @@ import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings; import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter; +import org.jumpmind.symmetric.model.IncomingBatch; import org.postgresql.copy.CopyIn; import org.postgresql.copy.CopyManager; import org.postgresql.core.BaseConnection; @@ -60,6 +61,8 @@ public class PostgresBulkDatabaseWriter extends DefaultDatabaseWriter { protected boolean needsBinaryConversion; + protected boolean useDefaultDataWriter; + public PostgresBulkDatabaseWriter(IDatabasePlatform platform, DatabaseWriterSettings settings, NativeJdbcExtractor jdbcExtractor, int maxRowsBeforeFlush) { super(platform, settings); @@ -68,6 +71,11 @@ public PostgresBulkDatabaseWriter(IDatabasePlatform platform, DatabaseWriterSett } public void write(CsvData data) { + if (useDefaultDataWriter) { + super.write(data); + return; + } + statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS); @@ -182,6 +190,13 @@ protected void endCopy() { } } + @Override + public void start(Batch batch) { + super.start(batch); + IncomingBatch currentBatch = (IncomingBatch) context.get("currentBatch"); + useDefaultDataWriter = currentBatch == null ? false : currentBatch.isErrorFlag(); + } + @Override public boolean start(Table table) { return super.start(table); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index a183de0089..72119e3382 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -964,7 +964,6 @@ public Table nextTable() { return table; } }; - DataProcessor processor = new DataProcessor(reader, null, listener, "data load from stage") { @Override protected IDataWriter chooseDataWriter(Batch batch) { @@ -1043,6 +1042,8 @@ public void beforeBatchEnd(DataContext context) { public boolean beforeBatchStarted(DataContext context) { this.currentBatch = null; Batch batch = context.getBatch(); + context.remove("currentBatch"); + if (parameterService.is(ParameterConstants.DATA_LOADER_ENABLED) || (batch.getChannelId() != null && batch.getChannelId().equals( Constants.CHANNEL_CONFIG))) { @@ -1059,6 +1060,8 @@ public boolean beforeBatchStarted(DataContext context) { this.batchesProcessed.add(incomingBatch); if (incomingBatchService.acquireIncomingBatch(incomingBatch)) { this.currentBatch = incomingBatch; + context.put("currentBatch", this.currentBatch); + return true; } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java index 7e9a3c6a09..44470d2774 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/IncomingBatchService.java @@ -214,6 +214,7 @@ public boolean acquireIncomingBatch(IncomingBatch batch) { || !parameterService .is(ParameterConstants.INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED)) { okayToProcess = true; + batch.setErrorFlag(existingBatch.isErrorFlag()); existingBatch.setStatus(Status.LD); log.info("Retrying batch {}", batch.getNodeBatchId()); } else if (existingBatch.getStatus() == Status.IG) { From 570f237d7995bb241d02ca6805b2a59df3bc5653 Mon Sep 17 00:00:00 2001 From: klementinastojanovska Date: Tue, 13 Jun 2017 14:51:45 -0400 Subject: [PATCH 11/12] 0003153: Create loads by channel --- .../src/main/java/org/jumpmind/symmetric/common/Constants.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java index e7ec9a7f01..c59785f1b0 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java @@ -105,6 +105,8 @@ private Constants() { public static final String CHANNEL_FILESYNC_RELOAD = "filesync_reload"; public static final String CHANNEL_DYNAMIC = "dynamic"; + + public static final String CHANNEL_MONITOR = "monitor"; public static final String PUSH_JOB_TIMER = "job.push"; From 29dc47e110fa65ae557650f75ff7bd2199cd20a4 Mon Sep 17 00:00:00 2001 From: maxwellpettit Date: Wed, 14 Jun 2017 10:04:14 -0400 Subject: [PATCH 12/12] 0003150: ProcessInfo duration is for the entire sync (versus batch) and the rows/sec is off because it is calculated using the total time --- .../org/jumpmind/symmetric/model/ProcessInfo.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java index 442e66536b..3a17c82379 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/ProcessInfo.java @@ -100,6 +100,8 @@ public String toString() { private Map statusHistory; private Date endTime; + + private long totalDataCount = 0; public ProcessInfo() { this(new ProcessInfoKey("", "", null)); @@ -167,6 +169,7 @@ public void setBatchCount(long batchCount) { public void incrementCurrentDataCount() { this.currentDataCount++; + this.totalDataCount++; } public void incrementBatchCount() { @@ -352,6 +355,14 @@ public static ThreadData getThreadData(long threadId) { } } + public long getTotalDataCount() { + return totalDataCount; + } + + public void setTotalDataCount(long totalDataCount) { + this.totalDataCount = totalDataCount; + } + static public class ThreadData { public ThreadData(String threadName, String stackTrace) {