From 4f5ff94988d7d4c2a4e535202e43a5a4d0d2589c Mon Sep 17 00:00:00 2001 From: chenson42 Date: Mon, 22 Jun 2015 17:42:17 +0000 Subject: [PATCH] 0002318: Performance improvements in gap detection during routing. Improves routing performance with lots of gaps. --- .../symmetric/route/DataGapDetector.java | 184 +++++++++--------- .../symmetric/service/IDataService.java | 90 +++++---- .../symmetric/service/impl/DataService.java | 62 ++++-- .../symmetric/route/DataGapDetectorTest.java | 60 ------ 4 files changed, 194 insertions(+), 202 deletions(-) delete mode 100644 symmetric-core/src/test/java/org/jumpmind/symmetric/route/DataGapDetectorTest.java diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapDetector.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapDetector.java index e15abd5140..b5099be708 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapDetector.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/DataGapDetector.java @@ -20,18 +20,18 @@ */ package org.jumpmind.symmetric.route; -import java.util.ArrayList; import java.util.Date; import java.util.List; import org.jumpmind.db.sql.ISqlTemplate; +import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.mapper.NumberMapper; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.model.DataGap; import org.jumpmind.symmetric.model.ProcessInfo; -import org.jumpmind.symmetric.model.ProcessInfoKey; import org.jumpmind.symmetric.model.ProcessInfo.Status; +import org.jumpmind.symmetric.model.ProcessInfoKey; import org.jumpmind.symmetric.model.ProcessInfoKey.ProcessType; import org.jumpmind.symmetric.service.IDataService; import org.jumpmind.symmetric.service.INodeService; @@ -79,17 +79,23 @@ public DataGapDetector(IDataService dataService, IParameterService parameterServ * dual route data. */ public void beforeRouting() { + long printStats = System.currentTimeMillis(); ProcessInfo processInfo = this.statisticManager.newProcessInfo(new ProcessInfoKey( nodeService.findIdentityNodeId(), null, ProcessType.GAP_DETECT)); try { long ts = System.currentTimeMillis(); - final List gaps = removeAbandonedGaps(dataService.findDataGaps()); + processInfo.setStatus(Status.QUERYING); + final List gaps = dataService.findDataGaps(); long lastDataId = -1; final int dataIdIncrementBy = parameterService .getInt(ParameterConstants.DATA_ID_INCREMENT_BY); final long maxDataToSelect = parameterService .getInt(ParameterConstants.ROUTING_LARGEST_GAP_SIZE); long databaseTime = symmetricDialect.getDatabaseTime(); + int idsFilled = 0; + int newGapsInserted = 0; + int rangeChecked = 0; + int gapsDeleted = 0; for (final DataGap dataGap : gaps) { final boolean lastGap = dataGap.equals(gaps.get(gaps.size() - 1)); String sql = routerService.getSql("selectDistinctDataIdFromDataEventUsingGapsSql"); @@ -99,76 +105,99 @@ public void beforeRouting() { processInfo.setStatus(Status.QUERYING); List ids = sqlTemplate.query(sql, new NumberMapper(), params); processInfo.setStatus(Status.PROCESSING); - for (Number number : ids) { - long dataId = number.longValue(); - processInfo.incrementCurrentDataCount(); - if (lastDataId == -1 && dataGap.getStartId() + dataIdIncrementBy <= dataId) { - // there was a new gap at the start - dataService.insertDataGap(new DataGap(dataGap.getStartId(), dataId - 1)); - } else if (lastDataId != -1 && lastDataId + dataIdIncrementBy != dataId - && lastDataId != dataId) { - // found a gap somewhere in the existing gap - dataService.insertDataGap(new DataGap(lastDataId + 1, dataId - 1)); + + idsFilled += ids.size(); + rangeChecked += dataGap.getEndId() - dataGap.getStartId(); + + ISqlTransaction transaction = null; + try { + transaction = sqlTemplate.startSqlTransaction(); + for (Number number : ids) { + long dataId = number.longValue(); + processInfo.incrementCurrentDataCount(); + if (lastDataId == -1 && dataGap.getStartId() + dataIdIncrementBy <= dataId) { + // there was a new gap at the start + dataService.insertDataGap(transaction, new DataGap(dataGap.getStartId(), dataId - 1)); + newGapsInserted++; + } else if (lastDataId != -1 && lastDataId + dataIdIncrementBy != dataId && lastDataId != dataId) { + // found a gap somewhere in the existing gap + dataService.insertDataGap(transaction, new DataGap(lastDataId + 1, dataId - 1)); + newGapsInserted++; + } + lastDataId = dataId; } - lastDataId = dataId; - } - // if we found data in the gap - if (lastDataId != -1) { - if (!lastGap && lastDataId + dataIdIncrementBy <= dataGap.getEndId()) { - dataService.insertDataGap(new DataGap(lastDataId + dataIdIncrementBy, - dataGap.getEndId())); - } + // if we found data in the gap + if (lastDataId != -1) { + if (!lastGap && lastDataId + dataIdIncrementBy <= dataGap.getEndId()) { + dataService.insertDataGap(transaction, new DataGap(lastDataId + dataIdIncrementBy, dataGap.getEndId())); + newGapsInserted++; + } - dataService.deleteDataGap(dataGap); - - // if we did not find data in the gap and it was not the - // last gap - } else if (!lastGap) { - if (dataService.countDataInRange(dataGap.getStartId() - 1, - dataGap.getEndId() + 1) == 0) { - if (symmetricDialect.supportsTransactionViews()) { - long transactionViewClockSyncThresholdInMs = parameterService - .getLong( - ParameterConstants.DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS, - 60000); - Date createTime = dataService - .findCreateTimeOfData(dataGap.getEndId() + 1); - if (createTime != null - && !symmetricDialect - .areDatabaseTransactionsPendingSince(createTime - .getTime() - + transactionViewClockSyncThresholdInMs)) { - if (dataService.countDataInRange(dataGap.getStartId() - 1, - dataGap.getEndId() + 1) == 0) { - if (dataGap.getStartId() == dataGap.getEndId()) { - log.info( - "Found a gap in data_id at {}. Skipping it because there are no pending transactions in the database", - dataGap.getStartId()); - } else { - log.info( - "Found a gap in data_id from {} to {}. Skipping it because there are no pending transactions in the database", - dataGap.getStartId(), dataGap.getEndId()); + dataService.deleteDataGap(transaction, dataGap); + gapsDeleted++; + + // if we did not find data in the gap and it was not the + // last gap + } else if (!lastGap) { + if (dataService.countDataInRange(dataGap.getStartId() - 1, dataGap.getEndId() + 1) == 0) { + if (symmetricDialect.supportsTransactionViews()) { + long transactionViewClockSyncThresholdInMs = parameterService.getLong( + ParameterConstants.DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS, 60000); + Date createTime = dataService.findCreateTimeOfData(dataGap.getEndId() + 1); + if (createTime != null + && !symmetricDialect.areDatabaseTransactionsPendingSince(createTime.getTime() + + transactionViewClockSyncThresholdInMs)) { + if (dataService.countDataInRange(dataGap.getStartId() - 1, dataGap.getEndId() + 1) == 0) { + if (dataGap.getStartId() == dataGap.getEndId()) { + log.info( + "Found a gap in data_id at {}. Skipping it because there are no pending transactions in the database", + dataGap.getStartId()); + } else { + log.info( + "Found a gap in data_id from {} to {}. Skipping it because there are no pending transactions in the database", + dataGap.getStartId(), dataGap.getEndId()); + } + + dataService.deleteDataGap(transaction, dataGap); + gapsDeleted++; } - - dataService.deleteDataGap(dataGap); } + } else if (isDataGapExpired(dataGap.getEndId() + 1, databaseTime)) { + if (dataGap.getStartId() == dataGap.getEndId()) { + log.info("Found a gap in data_id at {}. Skipping it because the gap expired", dataGap.getStartId()); + } else { + log.info("Found a gap in data_id from {} to {}. Skipping it because the gap expired", + dataGap.getStartId(), dataGap.getEndId()); + } + dataService.deleteDataGap(transaction, dataGap); + gapsDeleted++; } - } else if (isDataGapExpired(dataGap.getEndId() + 1, databaseTime)) { - if (dataGap.getStartId() == dataGap.getEndId()) { - log.info( - "Found a gap in data_id at {}. Skipping it because the gap expired", - dataGap.getStartId()); - } else { - log.info( - "Found a gap in data_id from {} to {}. Skipping it because the gap expired", - dataGap.getStartId(), dataGap.getEndId()); - } - dataService.deleteDataGap(dataGap); - } - } else { - dataService.checkForAndUpdateMissingChannelIds(dataGap.getStartId() - 1, - dataGap.getEndId() + 1); + } + } + + if (System.currentTimeMillis() - printStats > 30000) { + log.info( + "The data gap detection process has been running for {}ms, detected {} rows that have been previously routed over a total gap range of {}, " + + "inserted {} new gaps, and deleted {} gaps", new Object[] { System.currentTimeMillis() - ts, + idsFilled, rangeChecked, newGapsInserted, gapsDeleted }); + printStats = System.currentTimeMillis(); + } + + transaction.commit(); + } catch (Error ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } catch (RuntimeException ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } finally { + if (transaction != null) { + transaction.close(); } } } @@ -190,27 +219,6 @@ public void beforeRouting() { } - /** - * If the system was shutdown in the middle of processing a large gap we - * could end up with a gap containing other gaps. - * - * @param gaps - */ - protected List removeAbandonedGaps(List gaps) { - List finalList = new ArrayList(gaps); - for (final DataGap dataGap1 : gaps) { - for (final DataGap dataGap2 : gaps) { - if (!dataGap1.equals(dataGap2) && dataGap1.contains(dataGap2)) { - finalList.remove(dataGap2); - if (dataService != null) { - dataService.deleteDataGap(dataGap2); - } - } - } - } - return finalList; - } - protected boolean isDataGapExpired(long dataId, long databaseTime) { long gapTimoutInMs = parameterService .getLong(ParameterConstants.ROUTING_STALE_DATA_ID_GAP_TIME); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index 9a38be3434..d86ba846ad 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -37,14 +37,14 @@ import org.jumpmind.symmetric.model.TableReloadRequestKey; import org.jumpmind.symmetric.model.TriggerHistory; import org.jumpmind.symmetric.model.TriggerRouter; - -/** - * This service provides an API to access and update {@link Data}. + +/** + * This service provides an API to access and update {@link Data}. */ -public interface IDataService { - - public void saveTableReloadRequest(TableReloadRequest request); - +public interface IDataService { + + public void saveTableReloadRequest(TableReloadRequest request); + public TableReloadRequest getTableReloadRequest(TableReloadRequestKey key); public String reloadNode(String nodeId, boolean reverseLoad, String createBy); @@ -52,29 +52,29 @@ public interface IDataService { public String reloadTable(String nodeId, String catalogName, String schemaName, String tableName); public String reloadTable(String nodeId, String catalogName, String schemaName, String tableName, String overrideInitialLoadSelect); - - /** - * Sends a SQL command to the remote node for execution by creating a SQL event that is synced like other data - * + + /** + * Sends a SQL command to the remote node for execution by creating a SQL event that is synced like other data + * * @param nodeId the remote node where the SQL statement will be executed * @param catalogName used to find the sym_trigger entry for table that will be associated with this event * @param schemaName used to find the sym_trigger entry for table that will be associated with this event * @param tableName used to find the sym_trigger entry for table that will be associated with this event - * @param sql the SQL statement to run on the remote node database - * @return message string indicating success or error + * @param sql the SQL statement to run on the remote node database + * @return message string indicating success or error */ public String sendSQL(String nodeId, String catalogName, String schemaName, String tableName, String sql); public void insertReloadEvents(Node targetNode, boolean reverse); - public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient); - - public long insertReloadEvent(ISqlTransaction transaction, Node targetNode, + public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient); + + public long insertReloadEvent(ISqlTransaction transaction, Node targetNode, TriggerRouter triggerRouter, TriggerHistory triggerHistory, String overrideInitialLoadSelect, boolean isLoad, long loadId, String createBy, Status status); - public void sendScript(String nodeId, String script, boolean isLoad); - - public boolean sendSchema(String nodeId, String catalogName, String schemaName, + public void sendScript(String nodeId, String script, boolean isLoad); + + public boolean sendSchema(String nodeId, String catalogName, String schemaName, String tableName, boolean isLoad); /** @@ -82,15 +82,15 @@ public boolean sendSchema(String nodeId, String catalogName, String schemaName, */ public void heartbeat(boolean force); - public void insertHeartbeatEvent(Node node, boolean isReload); - - public long insertData(Data data); - - public void insertDataEvents(ISqlTransaction transaction, List events); + public void insertHeartbeatEvent(Node node, boolean isReload); + + public long insertData(Data data); + + public void insertDataEvents(ISqlTransaction transaction, List events); + + public void insertDataAndDataEventAndOutgoingBatch(Data data, String channelId, List nodes, String routerId, boolean isLoad, long loadId, String createBy); - public void insertDataAndDataEventAndOutgoingBatch(Data data, String channelId, List nodes, String routerId, boolean isLoad, long loadId, String createBy); - - public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data, + public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data, String nodeId, String routerId, boolean isLoad, long loadId, String createBy, Status status); public long insertDataAndDataEventAndOutgoingBatch(Data data, String nodeId, String routerId, boolean isLoad, long loadId, String createBy); @@ -107,8 +107,8 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId, /** * Count the number of data ids in a range */ - public int countDataInRange(long firstDataId, long secondDataId); - + public int countDataInRange(long firstDataId, long secondDataId); + public void checkForAndUpdateMissingChannelIds(long firstDataId, long lastDataId); public List findDataGapsByStatus(DataGap.Status status); @@ -123,28 +123,32 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId, public Data createData(String catalogName, String schemaName, String tableName); - public Data createData(String catalogName, String schemaName, String tableName, String whereClause); - + public Data createData(String catalogName, String schemaName, String tableName, String whereClause); + public Data createData(ISqlTransaction transaction, String catalogName, String schemaName, String tableName, String whereClause); - public Data mapData(Row row); - - public List listDataIds(long batchId, String nodeId); - + public Data mapData(Row row); + + public List listDataIds(long batchId, String nodeId); + public List listData(long batchId, String nodeId, long startDataId, String channelId, int maxRowsToRetrieve); + public void updateDataGap(DataGap gap, DataGap.Status status); + public void insertDataGap(DataGap gap); + + public void insertDataGap(ISqlTransaction transaction, DataGap gap); + + public void deleteDataGap(ISqlTransaction transaction, DataGap gap); - public void updateDataGap(DataGap gap, DataGap.Status status); - public void deleteDataGap(DataGap gap); - public void deleteCapturedConfigChannelData(); - - public long findMaxDataId(); - - public ISqlReadCursor selectDataFor(Batch batch); + public void deleteCapturedConfigChannelData(); + + public long findMaxDataId(); + + public ISqlReadCursor selectDataFor(Batch batch); public ISqlReadCursor selectDataFor(Long batchId, String channelId); - + } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 3a8562fd63..f27adee5e6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -1388,12 +1388,14 @@ public List findDataGaps() { .getInt(ParameterConstants.ROUTING_LARGEST_GAP_SIZE); List gaps = findDataGapsByStatus(DataGap.Status.GP); boolean lastGapExists = false; + long maxDataEventId = 0; for (DataGap dataGap : gaps) { lastGapExists |= dataGap.gapSize() >= maxDataToSelect - 1; + maxDataEventId = maxDataEventId < dataGap.getEndId() ? dataGap.getEndId() : maxDataEventId; } if (!lastGapExists) { - long maxDataEventId = findMaxDataEventDataId(); + maxDataEventId = maxDataEventId == 0 ? findMaxDataEventDataId() : maxDataEventId; long maxDataId = findMaxDataId(); if (maxDataEventId > 0) { maxDataEventId++; @@ -1408,19 +1410,34 @@ public List findDataGaps() { public long findMaxDataEventDataId() { return sqlTemplate.queryForLong(getSql("selectMaxDataEventDataIdSql")); } - + public void insertDataGap(DataGap gap) { + ISqlTransaction transaction = null; try { - sqlTemplate.update(getSql("insertDataGapSql"), new Object[] { DataGap.Status.GP.name(), - AppUtils.getHostName(), gap.getStartId(), gap.getEndId() }, new int[] { - Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC }); - } catch (UniqueKeyException ex) { - log.warn("A gap already existed for {} to {}. Updating instead.", gap.getStartId(), - gap.getEndId()); - updateDataGap(gap, DataGap.Status.GP); + transaction = sqlTemplate.startSqlTransaction(); + insertDataGap(transaction, gap); + transaction.commit(); + } catch (Error ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } catch (RuntimeException ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } finally { + close(transaction); } } + public void insertDataGap(ISqlTransaction transaction, DataGap gap) { + transaction.prepareAndExecute(getSql("insertDataGapSql"), + new Object[] { DataGap.Status.GP.name(), AppUtils.getHostName(), gap.getStartId(), gap.getEndId() }, new int[] { + Types.VARCHAR, Types.VARCHAR, Types.NUMERIC, Types.NUMERIC }); + } + public void updateDataGap(DataGap gap, DataGap.Status status) { sqlTemplate.update( getSql("updateDataGapSql"), @@ -1429,13 +1446,36 @@ public void updateDataGap(DataGap gap, DataGap.Status status) { symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() }); } + + @Override public void deleteDataGap(DataGap gap) { - sqlTemplate.update( + ISqlTransaction transaction = null; + try { + transaction = sqlTemplate.startSqlTransaction(); + deleteDataGap(transaction, gap); + transaction.commit(); + } catch (Error ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } catch (RuntimeException ex) { + if (transaction != null) { + transaction.rollback(); + } + throw ex; + } finally { + close(transaction); + } + } + + @Override + public void deleteDataGap(ISqlTransaction transaction, DataGap gap) { + transaction.prepareAndExecute( getSql("deleteDataGapSql"), new Object[] { gap.getStartId(), gap.getEndId() }, new int[] { symmetricDialect.getSqlTypeForIds(), symmetricDialect.getSqlTypeForIds() }); - } public Date findCreateTimeOfEvent(long dataId) { diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/route/DataGapDetectorTest.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/route/DataGapDetectorTest.java deleted file mode 100644 index ba150b4cce..0000000000 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/route/DataGapDetectorTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to JumpMind Inc under one or more contributor - * license agreements. See the NOTICE file distributed - * with this work for additional information regarding - * copyright ownership. JumpMind Inc licenses this file - * to you under the GNU General Public License, version 3.0 (GPLv3) - * (the "License"); you may not use this file except in compliance - * with the License. - * - * You should have received a copy of the GNU General Public License, - * version 3.0 (GPLv3) along with this library; if not, see - * . - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.jumpmind.symmetric.route; - -import java.util.ArrayList; -import java.util.List; - -import static org.junit.Assert.*; - -import org.jumpmind.symmetric.model.DataGap; -import org.junit.Test; - -public class DataGapDetectorTest { - - @Test - public void testRemoveAbandonedGaps() { - DataGapDetector detector = new DataGapDetector(); - List gaps = new ArrayList(); - - gaps.add(new DataGap(1,1000)); - gaps.add(new DataGap(2000,3000)); - gaps.add(new DataGap(3001,3001)); - gaps.add(new DataGap(3002,3002)); - gaps.add(new DataGap(3003,3003)); - gaps.add(new DataGap(3004,3004)); - int expectedSize = gaps.size(); - - List evaluatedList = detector.removeAbandonedGaps(gaps); - assertEquals(expectedSize, evaluatedList.size()); - - gaps.add(new DataGap(2000,2001)); - gaps.add(new DataGap(2010,2022)); - gaps.add(new DataGap(2899,3000)); - - assertTrue(gaps.size() > expectedSize); - - evaluatedList = detector.removeAbandonedGaps(gaps); - assertEquals(expectedSize, evaluatedList.size()); - - - } -}