From 848aadebf01f565a2c3aa2ea20d8a48d3e4d7c67 Mon Sep 17 00:00:00 2001 From: Chris Henson Date: Wed, 4 Oct 2017 17:22:24 -0400 Subject: [PATCH] 0003247: Include check for next data_id to ensure within range of sym_data_gap --- .../symmetric/service/IDataService.java | 4 +- .../symmetric/service/impl/DataService.java | 49 ++++++++++++++----- .../service/impl/DataServiceSqlMap.java | 4 -- .../symmetric/service/impl/RouterService.java | 12 +++-- 4 files changed, 47 insertions(+), 22 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index dad9b06199..31ed376c9b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -145,8 +145,6 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId, public List listData(long batchId, String nodeId, long startDataId, String channelId, int maxRowsToRetrieve); - public void updateDataGap(DataGap gap, DataGap.Status status); - public void insertDataGap(DataGap gap); public void insertDataGap(ISqlTransaction transaction, DataGap gap); @@ -159,6 +157,8 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId, public void deleteCapturedConfigChannelData(); + public boolean fixLastDataGap(); + public long findMaxDataId(); public Data findData(long dataId); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 34edc7c0fe..3a76d9f830 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -1986,15 +1986,6 @@ public void insertDataGap(ISqlTransaction transaction, DataGap gap) { gap.getLastUpdateTime(), gap.getCreateTime() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.TIMESTAMP }); } - - public void updateDataGap(DataGap gap, DataGap.Status status) { - sqlTemplate.update( - getSql("updateDataGapSql"), - new Object[] { status.name(), AppUtils.getHostName(), gap.getLastUpdateTime(), gap.getStartId(), - gap.getEndId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, - symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() }); - } - @Override public void deleteDataGap(DataGap gap) { @@ -2174,6 +2165,42 @@ public Map getLastDataCaptureByChannel() { return mapper.getCaptureMap(); } + @Override + public boolean fixLastDataGap() { + boolean fixed = false; + long maxDataId = findMaxDataId(); + List gaps = findDataGaps(); + if (gaps.size() > 0) { + DataGap lastGap = gaps.get(gaps.size()-1); + if (lastGap.getEndId() < maxDataId) { + fixed = true; + log.warn("The last data id of {} was bigger than the last gap's end_id of {}. Increasing the gap size", maxDataId, lastGap.getEndId()); + final long maxDataToSelect = parameterService + .getLong(ParameterConstants.ROUTING_LARGEST_GAP_SIZE); + ISqlTransaction transaction = null; + try { + transaction = sqlTemplate.startSqlTransaction(); + deleteDataGap(transaction, lastGap); + insertDataGap(transaction, new DataGap(lastGap.getStartId(), maxDataId+maxDataToSelect)); + transaction.commit(); + } catch (Error ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } catch (RuntimeException ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } finally { + close(transaction); + } + } + } + return fixed; + } + class TableRow { Table table; Row row; @@ -2254,9 +2281,7 @@ public String getReferenceColumnName() { public String getFkName() { return fkName; } - - - + } public class DataMapper implements ISqlRowMapper { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java index 6645fdfb7d..23e49f534d 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java @@ -106,10 +106,6 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace "" + "insert into $(data_gap) (status, last_update_hostname, start_id, end_id, last_update_time, create_time) values(?, ?, ?, ?, ?, ?) "); - putSql("updateDataGapSql", - "" - + "update $(data_gap) set status=?, last_update_hostname=?, last_update_time=? where start_id=? and end_id=? "); - putSql("deleteDataGapSql", "delete from $(data_gap) where start_id=? and end_id=? "); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index 8822f3c69f..8a609c34fc 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -46,6 +46,7 @@ import org.jumpmind.symmetric.SymmetricException; import org.jumpmind.symmetric.SyntaxParsingException; import org.jumpmind.symmetric.common.Constants; +import org.jumpmind.symmetric.common.ContextConstants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.model.AbstractBatch.Status; @@ -123,7 +124,7 @@ public class RouterService extends AbstractService implements IRouterService { protected boolean syncTriggersBeforeInitialLoadAttempted = false; - protected boolean firstTimeCheckForAbandonedBatches = true; + protected boolean firstTimeCheck = true; public RouterService(ISymmetricEngine engine) { super(engine.getParameterService(), engine.getSymmetricDialect()); @@ -185,9 +186,12 @@ synchronized public long routeData(boolean force) { if (identity != null) { if (force || engine.getClusterService().lock(ClusterConstants.ROUTE)) { try { - if (firstTimeCheckForAbandonedBatches) { + if (firstTimeCheck) { engine.getOutgoingBatchService().updateAbandonedRoutingBatches(); - firstTimeCheckForAbandonedBatches = false; + if (engine.getDataService().fixLastDataGap()) { + engine.getContextService().save(ContextConstants.ROUTING_FULL_GAP_ANALYSIS, Boolean.TRUE.toString()); + } + firstTimeCheck = false; } @@ -486,7 +490,7 @@ protected int routeDataForEachChannel() { processInfo.setStatus(ProcessInfo.ProcessStatus.OK); } catch (RuntimeException ex) { processInfo.setStatus(ProcessInfo.ProcessStatus.ERROR); - firstTimeCheckForAbandonedBatches = true; + firstTimeCheck = true; throw ex; } return dataCount;