diff --git a/symmetric-assemble/src/asciidoc/configuration/ldap.ad b/symmetric-assemble/src/asciidoc/configuration/ldap.ad index b95002e59d..9cb77b1579 100644 --- a/symmetric-assemble/src/asciidoc/configuration/ldap.ad +++ b/symmetric-assemble/src/asciidoc/configuration/ldap.ad @@ -6,6 +6,11 @@ Configuring a <> authentication on SymmetricDS using LDAP is as simple as console.auth.ldap.baseDN:: The LDAP base DN to search for a user. [ Default: ] + +A list of base DNs can be specified by separating each entry by a pipe ("|") symbol. + +For example: + +ou=Users,o=IT,c=US,dc=corp,dc=local|ou=Users,o=Tech,c=US,dc=corp,dc=local + console.auth.ldap.host:: The LDAP server host name. [ Default: ] diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/JdbcSymmetricDialectFactory.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/JdbcSymmetricDialectFactory.java index 3d51b99637..e61103a67b 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/JdbcSymmetricDialectFactory.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/JdbcSymmetricDialectFactory.java @@ -63,6 +63,7 @@ import org.jumpmind.symmetric.db.informix.InformixSymmetricDialect; import org.jumpmind.symmetric.db.interbase.InterbaseSymmetricDialect; import org.jumpmind.symmetric.db.mariadb.MariaDBSymmetricDialect; +import org.jumpmind.symmetric.db.mssql.MsSql2008SymmetricDialect; import org.jumpmind.symmetric.db.mssql.MsSqlSymmetricDialect; import org.jumpmind.symmetric.db.mssql2000.MsSql2000SymmetricDialect; import org.jumpmind.symmetric.db.mysql.MySqlSymmetricDialect; @@ -108,7 +109,7 @@ public ISymmetricDialect create() { } else if (platform instanceof OracleDatabasePlatform) { dialect = new OracleSymmetricDialect(parameterService, platform); } else if (platform instanceof MsSql2008DatabasePlatform) { - dialect = new MsSqlSymmetricDialect(parameterService, platform); + dialect = new MsSql2008SymmetricDialect(parameterService, platform); } else if (platform instanceof MsSql2005DatabasePlatform) { dialect = new MsSqlSymmetricDialect(parameterService, platform); } else if (platform instanceof MsSql2000DatabasePlatform) { diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/db2/Db2As400TriggerTemplate.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/db2/Db2As400TriggerTemplate.java index 5247e91483..fee7d7c0dd 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/db2/Db2As400TriggerTemplate.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/db2/Db2As400TriggerTemplate.java @@ -9,8 +9,8 @@ public Db2As400TriggerTemplate(ISymmetricDialect symmetricDialect) { super(symmetricDialect); stringColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || replace(replace($(tableAlias).\"$(columnName)\",'\\','\\\\'),'\"','\\\"') || '\"' end" ; + datetimeColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || rtrim(char(year(timestamp_iso($(tableAlias).\"$(columnName)\"))))||'-'||substr(digits(month(timestamp_iso($(tableAlias).\"$(columnName)\"))),9)||'-'||substr(digits(day(timestamp_iso($(tableAlias).\"$(columnName)\"))),9)||' '||substr(digits(hour(timestamp_iso($(tableAlias).\"$(columnName)\"))),9)||':'||substr(digits(minute(timestamp_iso($(tableAlias).\"$(columnName)\"))),9)||':'||substr(digits(second(timestamp_iso($(tableAlias).\"$(columnName)\"))),9)||'.'||RIGHT(REPEAT('0',6)||rtrim(char(microsecond(timestamp_iso($(tableAlias).\"$(columnName)\")))),6) || '\"' end"; String castClobTo = symmetricDialect.getParameterService().getString(ParameterConstants.AS400_CAST_CLOB_TO, "DCLOB"); - clobColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else '\"' || replace(replace(cast($(tableAlias).\"$(columnName)\" as "+castClobTo+"),'\\','\\\\'),'\"','\\\"') || '\"' end" ; sqlTemplates.put("insertTriggerTemplate" , diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSql2008SymmetricDialect.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSql2008SymmetricDialect.java new file mode 100644 index 0000000000..a1e2fd1f32 --- /dev/null +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSql2008SymmetricDialect.java @@ -0,0 +1,22 @@ +package org.jumpmind.symmetric.db.mssql; + +import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.symmetric.model.Trigger; +import org.jumpmind.symmetric.service.IParameterService; + +public class MsSql2008SymmetricDialect extends MsSqlSymmetricDialect { + public MsSql2008SymmetricDialect() { + super(); + } + + public MsSql2008SymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) { + super(parameterService, platform); + this.triggerTemplate = new MsSql2008TriggerTemplate(this); + } + + @Override + protected String getDbSpecificDataHasChangedCondition(Trigger trigger) { + /* gets filled/replaced by trigger template as it will compare by each column */ + return "$(anyColumnChanged)"; + } +} diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSql2008TriggerTemplate.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSql2008TriggerTemplate.java new file mode 100644 index 0000000000..d2bdb5615d --- /dev/null +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSql2008TriggerTemplate.java @@ -0,0 +1,50 @@ +package org.jumpmind.symmetric.db.mssql; + +import org.jumpmind.db.model.Column; +import org.jumpmind.db.model.Table; +import org.jumpmind.symmetric.db.ISymmetricDialect; +import org.jumpmind.symmetric.io.data.DataEventType; +import org.jumpmind.symmetric.model.Channel; +import org.jumpmind.symmetric.model.Trigger; +import org.jumpmind.symmetric.model.TriggerHistory; +import org.jumpmind.util.FormatUtils; + +public class MsSql2008TriggerTemplate extends MsSqlTriggerTemplate { + public MsSql2008TriggerTemplate(ISymmetricDialect symmetricDialect) { + super(symmetricDialect); + } + + @Override + protected String replaceTemplateVariables(DataEventType dml, Trigger trigger, + TriggerHistory history, Channel channel, String tablePrefix, Table originalTable, Table table, + String defaultCatalog, String defaultSchema, String ddl) { + ddl = super.replaceTemplateVariables(dml, trigger, history, channel, tablePrefix, originalTable, table, defaultCatalog, defaultSchema, ddl); + ddl = FormatUtils.replace("anyColumnChanged", + buildColumnsAreNotEqualString(table, newTriggerValue, oldTriggerValue), ddl); + return ddl; + } + + private String buildColumnsAreNotEqualString(Table table, String table1Name, String table2Name){ + StringBuilder builder = new StringBuilder(); + + for(Column column : table.getColumns()){ + if (builder.length() > 0) { + builder.append(" or "); + } + + if(isNotComparable(column)) { + // Can't compare the value. + // Let's use the UPDATE() function to see if it showed up in the SET list of the update statement + builder.append(String.format("UPDATE(\"%1$s\")", column.getName())); + } else { + builder.append(String.format("((%1$s.\"%2$s\" IS NOT NULL AND %3$s.\"%2$s\" IS NOT NULL AND %1$s.\"%2$s\"<>%3$s.\"%2$s\") or " + + "(%1$s.\"%2$s\" IS NULL AND %3$s.\"%2$s\" IS NOT NULL) or " + + "(%1$s.\"%2$s\" IS NOT NULL AND %3$s.\"%2$s\" IS NULL))", table1Name, column.getName(), table2Name)); + } + } + if (builder.length() == 0) { + builder.append("1=1"); + } + return builder.toString(); + } +} diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlTriggerTemplate.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlTriggerTemplate.java index 411b41d799..394052ff38 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlTriggerTemplate.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/mssql/MsSqlTriggerTemplate.java @@ -284,7 +284,7 @@ protected String replaceTemplateVariables(DataEventType dml, Trigger trigger, return ddl; } - private boolean isNotComparable(Column column) { + protected boolean isNotComparable(Column column) { String columnType = column.getJdbcTypeName(); return StringUtils.equalsIgnoreCase(columnType, "IMAGE") || StringUtils.equalsIgnoreCase(columnType, "TEXT") diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleSymmetricDialect.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleSymmetricDialect.java index a17596f4dc..fb48111550 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleSymmetricDialect.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/db/oracle/OracleSymmetricDialect.java @@ -77,6 +77,10 @@ protected void buildSqlReplacementTokens() { if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_USE_HINTS, true)) { sqlReplacementTokens.put("selectDataUsingGapsSqlHint", "/*+ index(d " + parameterService.getTablePrefix() + "_IDX_D_CHANNEL_ID) */"); } + if (parameterService.is(ParameterConstants.DBDIALECT_ORACLE_USE_SELECT_START_DATA_ID_HINT, false)) { + sqlReplacementTokens.put("selectDataUsingStartDataIdHint", "/*+ full (d) */"); + } + } @Override diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index 8d93c9bd7a..226d0337e0 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -228,6 +228,7 @@ private ParameterConstants() { public final static String DBDIALECT_ORACLE_USE_TRANSACTION_VIEW = "oracle.use.transaction.view"; public final static String DBDIALECT_ORACLE_TEMPLATE_NUMBER_SPEC = "oracle.template.precision"; public final static String DBDIALECT_ORACLE_USE_HINTS = "oracle.use.hints"; + public final static String DBDIALECT_ORACLE_USE_SELECT_START_DATA_ID_HINT = "oracle.use.select.data.using.start.data.id.hint"; public final static String DBDIALECT_ORACLE_SEQUENCE_NOORDER = "oracle.sequence.noorder"; public final static String DBDIALECT_ORACLE_SEQUENCE_NOORDER_NEXTVALUE_DB_URLS = "oracle.sequence.noorder.nextvalue.db.urls"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ChannelRouterContext.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ChannelRouterContext.java index cdd2678021..6289a55c20 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ChannelRouterContext.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/ChannelRouterContext.java @@ -73,7 +73,6 @@ public class ChannelRouterContext extends SimpleRouterContext { private long maxPeekAheadQueueSize; private long dataRereadCount; private List dataGaps = new ArrayList(); - private Set transactions = new HashSet(); private long lastDataId = -1; private List dataIds = new ArrayList(); @@ -153,7 +152,7 @@ synchronized public void logStats(Logger log, long totalTimeInMs) { if (log.isDebugEnabled()) { log.debug(channel.getChannelId() + ", startDataId=" + startDataId + ", endDataId=" + endDataId + ", dataReadCount=" + dataReadCount + ", peekAheadFillCount=" + peekAheadFillCount + - ", transactions=" + transactions.toString() + ", dataGaps=" + dataGaps.toString()); + ", dataGaps=" + dataGaps.toString()); } } @@ -264,16 +263,6 @@ public List getDataGaps() { public void setDataGaps(List dataGaps) { this.dataGaps = dataGaps; } - - public Set getTransactions() { - return transactions; - } - - public void addTransaction(String transactionId) { - if (isNotBlank(transactionId)) { - this.transactions.add(transactionId); - } - } public void setOnlyDefaultRoutersAssigned(boolean onlyDefaultRoutersAssigned) { this.onlyDefaultRoutersAssigned = onlyDefaultRoutersAssigned; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java index 37d01ab62e..7036dcc802 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapRouteReader.java @@ -135,8 +135,6 @@ public void run() { } protected void execute() { - - long maxPeekAheadSizeInBytes = (long)(Runtime.getRuntime().maxMemory() * percentOfHeapToUse); ISymmetricDialect symmetricDialect = engine.getSymmetricDialect(); ISqlReadCursor cursor = null; processInfo = engine.getStatisticManager().newProcessInfo( @@ -144,73 +142,16 @@ protected void execute() { ProcessType.ROUTER_READER)); processInfo.setCurrentChannelId(context.getChannel().getChannelId()); try { - int lastPeekAheadIndex = 0; - int dataCount = 0; - long maxDataToRoute = context.getChannel().getMaxDataToRoute(); - List peekAheadQueue = new ArrayList(peekAheadCount); boolean transactional = !context.getChannel().getBatchAlgorithm() .equals(NonTransactionalBatchAlgorithm.NAME) || !symmetricDialect.supportsTransactionId(); - - processInfo.setStatus(ProcessStatus.QUERYING); - cursor = prepareCursor(); - processInfo.setStatus(ProcessStatus.EXTRACTING); - boolean moreData = true; - while (dataCount < maxDataToRoute || (lastTransactionId != null && transactional)) { - if (moreData && (lastTransactionId != null || peekAheadQueue.size() == 0)) { - moreData = fillPeekAheadQueue(peekAheadQueue, peekAheadCount, cursor); - } - - int dataWithSameTransactionIdCount = 0; - - while (peekAheadQueue.size() > 0 && lastTransactionId == null && - dataCount < maxDataToRoute) { - Data data = peekAheadQueue.remove(0); - copyToQueue(data); - dataCount++; - processInfo.incrementCurrentDataCount(); - processInfo.setCurrentTableName(data.getTableName()); - lastTransactionId = data.getTransactionId(); - context.addTransaction(lastTransactionId); - dataWithSameTransactionIdCount++; - } - - if (lastTransactionId != null && peekAheadQueue.size() > 0) { - Iterator datas = peekAheadQueue.iterator(); - int index = 0; - while (datas.hasNext() && (dataCount < maxDataToRoute || transactional)) { - Data data = datas.next(); - if (lastTransactionId.equals(data.getTransactionId())) { - dataWithSameTransactionIdCount++; - datas.remove(); - copyToQueue(data); - dataCount++; - processInfo.incrementCurrentDataCount(); - processInfo.setCurrentTableName(data.getTableName()); - lastPeekAheadIndex = index; - } else { - context.addTransaction(data.getTransactionId()); - index++; - } - - } - - if (dataWithSameTransactionIdCount == 0 || peekAheadQueue.size()-lastPeekAheadIndex > peekAheadCount) { - lastTransactionId = null; - lastPeekAheadIndex = 0; - } - - } - - if (!moreData && peekAheadQueue.size() == 0) { - // we've reached the end of the result set - break; - } else if (peekAheadSizeInBytes >= maxPeekAheadSizeInBytes) { - log.info("The peek ahead queue has reached its max size of {} bytes. Finishing reading the current transaction", peekAheadSizeInBytes); - finishTransactionMode = true; - peekAheadQueue.clear(); - } - } + + if (transactional) { + executeTransactional(cursor); + } else { + executeNonTransactional(cursor); + } + processInfo.setStatus(ProcessStatus.OK); } catch (Throwable ex) { processInfo.setStatus(ProcessStatus.ERROR); @@ -233,6 +174,95 @@ && isNotBlank(ex.getMessage()) } } + + protected void executeTransactional(ISqlReadCursor cursor) throws Exception { + long maxPeekAheadSizeInBytes = (long)(Runtime.getRuntime().maxMemory() * percentOfHeapToUse); + int lastPeekAheadIndex = 0; + int dataCount = 0; + long maxDataToRoute = context.getChannel().getMaxDataToRoute(); + List peekAheadQueue = new ArrayList(peekAheadCount); + + processInfo.setStatus(ProcessStatus.QUERYING); + cursor = prepareCursor(); + processInfo.setStatus(ProcessStatus.EXTRACTING); + boolean moreData = true; + while (dataCount < maxDataToRoute || (lastTransactionId != null)) { + if (moreData && (lastTransactionId != null || peekAheadQueue.size() == 0)) { + moreData = fillPeekAheadQueue(peekAheadQueue, peekAheadCount, cursor); + } + + int dataWithSameTransactionIdCount = 0; + + while (peekAheadQueue.size() > 0 && lastTransactionId == null && + dataCount < maxDataToRoute) { + Data data = peekAheadQueue.remove(0); + copyToQueue(data); + dataCount++; + processInfo.incrementCurrentDataCount(); + processInfo.setCurrentTableName(data.getTableName()); + lastTransactionId = data.getTransactionId(); + dataWithSameTransactionIdCount++; + } + + if (lastTransactionId != null && peekAheadQueue.size() > 0) { + Iterator datas = peekAheadQueue.iterator(); + int index = 0; + while (datas.hasNext()) { + Data data = datas.next(); + if (lastTransactionId.equals(data.getTransactionId())) { + dataWithSameTransactionIdCount++; + datas.remove(); + copyToQueue(data); + dataCount++; + processInfo.incrementCurrentDataCount(); + processInfo.setCurrentTableName(data.getTableName()); + lastPeekAheadIndex = index; + } else { + index++; + } + + } + + if (dataWithSameTransactionIdCount == 0 || peekAheadQueue.size()-lastPeekAheadIndex > peekAheadCount) { + lastTransactionId = null; + lastPeekAheadIndex = 0; + } + + } + + if (!moreData && peekAheadQueue.size() == 0) { + // we've reached the end of the result set + break; + } else if (peekAheadSizeInBytes >= maxPeekAheadSizeInBytes) { + log.info("The peek ahead queue has reached its max size of {} bytes. Finishing reading the current transaction", peekAheadSizeInBytes); + finishTransactionMode = true; + peekAheadQueue.clear(); + } + } + } + + + protected void executeNonTransactional(ISqlReadCursor cursor) throws Exception { + long maxDataToRoute = context.getChannel().getMaxDataToRoute(); + List peekAheadQueue = new ArrayList(peekAheadCount); + int dataCount = 0; + while (dataCount < maxDataToRoute) { + fillPeekAheadQueue(peekAheadQueue, peekAheadCount, cursor); + + if (peekAheadQueue.size() > 0) { + while (peekAheadQueue.size() > 0 && dataCount < maxDataToRoute) { + Data data = peekAheadQueue.remove(0); + copyToQueue(data); + dataCount++; + processInfo.incrementCurrentDataCount(); + processInfo.setCurrentTableName(data.getTableName()); + } + + } else { + break; + } + } + } protected boolean process(Data data) { long dataId = data.getDataId(); @@ -244,12 +274,7 @@ protected boolean process(Data data) { if (isEachGapQueried) { okToProcess = true; } else { - for (DataGap gap : dataGaps) { - if (dataId >= gap.getStartId() && dataId <= gap.getEndId()) { - okToProcess = true; - break; - } - } + okToProcess = isInDataGap(dataId); } } else { while (!okToProcess && currentGap != null && dataId >= currentGap.getStartId()) { @@ -268,6 +293,25 @@ protected boolean process(Data data) { } return okToProcess; } + + protected boolean isInDataGap(long dataId) { + // binary search algorithm + int start = 0; + int end = dataGaps.size() - 1; + while (start <= end) { + int mid = (start + end) / 2; + DataGap midGap = dataGaps.get(mid); + if (dataId >= midGap.getStartId() && dataId <= midGap.getEndId()) { + return true; + } + if (dataId< midGap.getStartId()) { + end = mid - 1; + } else { + start = mid + 1; + } + } + return false; + } public Data take() throws InterruptedException { Data data = null; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index f23d310294..54fadaf561 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -576,18 +576,19 @@ protected List loadDataFromTransport(final ProcessInfo transferIn OutputStreamWriter outWriter = null; if (out != null) { - outWriter = new OutputStreamWriter(out, IoConstants.ENCODING); - long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE); - while (!executor.awaitTermination(keepAliveMillis, TimeUnit.MILLISECONDS)) { - outWriter.write("1=1&"); - outWriter.flush(); + try { + outWriter = new OutputStreamWriter(out, IoConstants.ENCODING); + long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE); + while (!executor.awaitTermination(keepAliveMillis, TimeUnit.MILLISECONDS)) { + outWriter.write("1=1&"); + outWriter.flush(); + } + } catch (Exception ex) { + log.warn("Failed to send keep alives to " + sourceNode + " " + ex.toString()); + awaitTermination(executor); } } else { - long hours = 1; - while (!executor.awaitTermination(1, TimeUnit.HOURS)) { - log.info(String.format("Executor has been awaiting loader termination for %d hour(s).", hours)); - hours++; - } + awaitTermination(executor); } loadListener.isDone(); @@ -633,6 +634,14 @@ protected IDataWriter chooseDataWriter(Batch batch) { return listener.getBatchesProcessed(); } + private void awaitTermination(ExecutorService executor) throws InterruptedException { + long hours = 1; + while (!executor.awaitTermination(1, TimeUnit.HOURS)) { + log.info(String.format("Executor has been awaiting loader termination for %d hour(s).", hours)); + hours++; + } + } + protected void logOrRethrow(Throwable ex) throws IOException { if (ex instanceof RegistrationRequiredException) { throw (RegistrationRequiredException) ex; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index 9698a15fbe..0c38bcdb16 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -798,7 +798,7 @@ protected void completeBatchesAndCommit(ChannelRouterContext context) { if (engine.getParameterService().is(ParameterConstants.ROUTING_LOG_STATS_ON_BATCH_ERROR)) { engine.getStatisticManager().addRouterStats(context.getStartDataId(), context.getEndDataId(), context.getDataReadCount(), context.getPeekAheadFillCount(), - context.getDataGaps(), context.getTransactions(), batches); + context.getDataGaps(), null, batches); } for (OutgoingBatch batch : batches) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterServiceSqlMap.java index 4028d9e21d..cfffa461a8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterServiceSqlMap.java @@ -39,7 +39,7 @@ public RouterServiceSqlMap(IDatabasePlatform platform, Map repla + " from $(data) d where d.channel_id=? $(dataRange) "); putSql("selectDataUsingStartDataId", - "select d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, " + "select $(selectDataUsingStartDataIdHint) d.data_id, d.table_name, d.event_type, d.row_data as row_data, d.pk_data as pk_data, d.old_data as old_data, " + " d.create_time, d.trigger_hist_id, d.channel_id, d.transaction_id, d.source_node_id, d.external_data, d.node_list " + " from $(data) d where d.channel_id=? and data_id >= ? "); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/RouterStats.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/RouterStats.java index f72b3175c2..1848c8be1b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/RouterStats.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/RouterStats.java @@ -37,8 +37,6 @@ public class RouterStats { private List dataGaps; - private Set transactions; - public RouterStats() { } @@ -49,14 +47,12 @@ public RouterStats(long startDataId, long endDataId, long dataReadCount, long pe this.dataReadCount = dataReadCount; this.peekAheadFillCount = peekAheadFillCount; this.dataGaps = dataGaps; - this.transactions = transactions; } @Override public String toString() { return "{ startDataId: " + startDataId + ", endDataId: " + endDataId + ", dataReadCount: " + dataReadCount + - ", peekAheadFillCount: " + peekAheadFillCount + ", dataGaps: " + dataGaps.toString() + - ", transactions: " + transactions.toString() + " }"; + ", peekAheadFillCount: " + peekAheadFillCount + ", dataGaps: " + dataGaps.toString() + " }"; } public long getStartDataId() { @@ -98,12 +94,4 @@ public List getDataGaps() { public void setDataGaps(List dataGaps) { this.dataGaps = dataGaps; } - - public Set getTransactions() { - return transactions; - } - - public void setTransactions(Set transactions) { - this.transactions = transactions; - } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java index 1f8e59fdcc..9349890735 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java @@ -579,7 +579,14 @@ public TreeMap> getNodeStatsForPeriod(Date start public Map getWorkingChannelStats() { if (channelStats != null) { - return new HashMap(channelStats); + HashMap stats = new HashMap(); + for (ChannelStats stat : channelStats.values()) { + ChannelStats newStat = new ChannelStats(stat.getNodeId(), stat.getHostName(), stat.getStartTime(), + stat.getEndTime(), stat.getChannelId()); + newStat.add(stat); + stats.put(newStat.getChannelId(), newStat); + } + return stats; } else { return new HashMap(); } diff --git a/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/mysql/MySqlJdbcSqlTemplate.java b/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/mysql/MySqlJdbcSqlTemplate.java index 18f6d9d8ae..2dc928c0a1 100644 --- a/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/mysql/MySqlJdbcSqlTemplate.java +++ b/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/mysql/MySqlJdbcSqlTemplate.java @@ -33,7 +33,7 @@ public MySqlJdbcSqlTemplate(DataSource dataSource, SqlTemplateSettings settings, SymmetricLobHandler lobHandler, DatabaseInfo databaseInfo) { super(dataSource, settings, lobHandler, databaseInfo); primaryKeyViolationCodes = new int[] {1062}; - foreignKeyViolationCodes = new int[] {1452}; + foreignKeyViolationCodes = new int[] {1452, 1216}; } @Override