diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index 0ee32a55f8..658f020c42 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -78,7 +78,7 @@ public interface IDataService { public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient); public long insertReloadEvent(ISqlTransaction transaction, Node targetNode, - TriggerRouter triggerRouter, TriggerHistory triggerHistory, String overrideInitialLoadSelect, boolean isLoad, long loadId, String createBy, Status status); + TriggerRouter triggerRouter, TriggerHistory triggerHistory, String overrideInitialLoadSelect, boolean isLoad, long loadId, String createBy, Status status, long estimatedBatchRowCount); public void sendScript(String nodeId, String script, boolean isLoad); @@ -99,7 +99,7 @@ public boolean sendSchema(String nodeId, String catalogName, String schemaName, public void insertDataAndDataEventAndOutgoingBatch(Data data, String channelId, List nodes, String routerId, boolean isLoad, long loadId, String createBy); public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data, - String nodeId, String routerId, boolean isLoad, long loadId, String createBy, Status status); + String nodeId, String routerId, boolean isLoad, long loadId, String createBy, Status status, long estimatedBatchRowCount); public long insertDataAndDataEventAndOutgoingBatch(Data data, String nodeId, String routerId, boolean isLoad, long loadId, String createBy); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index ce03453a6d..3738c9e0a4 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -154,7 +154,7 @@ public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtCli insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory, request.getReloadSelect(), false, -1, null, - Status.NE); + Status.NE, null, -1); if (!targetNode.requires13Compatiblity() && deleteAtClient) { insertSqlEvent( @@ -291,21 +291,22 @@ public TableReloadRequest mapRow(Row rs) { public long insertReloadEvent(ISqlTransaction transaction, Node targetNode, TriggerRouter triggerRouter, TriggerHistory triggerHistory, String overrideInitialLoadSelect, boolean isLoad, long loadId, String createBy, - Status status) { + Status status, long estimatedBatchRowCount) { String channelId = getReloadChannelIdForTrigger(triggerRouter.getTrigger(), engine .getConfigurationService().getChannels(false)); return insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory, - overrideInitialLoadSelect, isLoad, loadId, createBy, status, channelId); + overrideInitialLoadSelect, isLoad, loadId, createBy, status, channelId, estimatedBatchRowCount); } /** + * @param estimatedBatchRowCount TODO * @return If isLoad then return the inserted batch id otherwise return the * data id */ public long insertReloadEvent(ISqlTransaction transaction, Node targetNode, TriggerRouter triggerRouter, TriggerHistory triggerHistory, String overrideInitialLoadSelect, boolean isLoad, long loadId, String createBy, - Status status, String channelId) { + Status status, String channelId, long estimatedBatchRowCount) { if (triggerHistory == null) { triggerHistory = lookupTriggerHistory(triggerRouter.getTrigger()); } @@ -320,7 +321,7 @@ public long insertReloadEvent(ISqlTransaction transaction, Node targetNode, if (isLoad) { return insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(), triggerRouter.getRouter().getRouterId(), isLoad, - loadId, createBy, status); + loadId, createBy, status, null, estimatedBatchRowCount); } else { return insertData(transaction, data); } @@ -443,15 +444,15 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List triggerRouterList : triggerRoutersByHistoryId.values()) { - if (triggerRouterList.size() > 0) { - TriggerRouter tr = triggerRouterList.get(0); - symNodeSecurityReloadChannel = tr.getTrigger().getReloadChannelId(); - } - totalTableCount += triggerRouterList.size(); - } + for (List triggerRouterList : triggerRoutersByHistoryId.values()) { + if (triggerRouterList.size() > 0) { + TriggerRouter tr = triggerRouterList.get(0); + symNodeSecurityReloadChannel = tr.getTrigger().getReloadChannelId(); + } + totalTableCount += triggerRouterList.size(); + } + } catch (Exception e) { } - catch (Exception e) { } processInfo.setDataCount(totalTableCount); @@ -906,15 +907,25 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre processInfo.setCurrentTableName(table.getName()); - int numberOfBatches = getNumberOfReloadBatches(table, triggerRouter, - channel, targetNode, selectSql); + long rowCount = getDataCountForReload(table, targetNode, selectSql); + long transformMultiplier = getTransformMultiplier(table, triggerRouter); + + // calculate the number of batches needed for table. + long numberOfBatches = 1; + long lastBatchSize = channel.getMaxBatchSize(); + + if (rowCount > 0) { + numberOfBatches = (rowCount * transformMultiplier / channel.getMaxBatchSize()) + 1; + lastBatchSize = rowCount % numberOfBatches; + } long startBatchId = -1; long endBatchId = -1; for (int i = 0; i < numberOfBatches; i++) { + long batchSize = i == numberOfBatches-1 ? lastBatchSize : channel.getMaxBatchSize(); // needs to grab the start and end batch id endBatchId = insertReloadEvent(transaction, targetNode, triggerRouter, - triggerHistory, selectSql, true, loadId, createBy, Status.RQ); + triggerHistory, selectSql, true, loadId, createBy, Status.RQ, null, batchSize); if (startBatchId == -1) { startBatchId = endBatchId; } @@ -924,7 +935,7 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre targetNode.getNodeId(), channel.getQueue(), triggerRouter, startBatchId, endBatchId); } else { insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory, - selectSql, true, loadId, createBy, Status.NE); + selectSql, true, loadId, createBy, Status.NE, null, -1); } if (!transactional) { @@ -936,20 +947,6 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre } } - protected int getNumberOfReloadBatches(Table table, TriggerRouter triggerRouter, Channel channel, Node targetNode, String selectSql) { - int rowCount = getDataCountForReload(table, targetNode, selectSql); - int transformMultiplier = getTransformMultiplier(table, triggerRouter); - - // calculate the number of batches needed for table. - int numberOfBatches = 1; - - if (rowCount > 0) { - numberOfBatches = (rowCount * transformMultiplier / channel.getMaxBatchSize()) + 1; - } - - return numberOfBatches; - } - protected int getDataCountForReload(Table table, Node targetNode, String selectSql) { DatabaseInfo dbInfo = platform.getDatabaseInfo(); String quote = dbInfo.getDelimiterToken(); @@ -1010,7 +1007,7 @@ private void insertFileSyncBatchForReload(Node targetNode, long loadId, String c insertReloadEvent(transaction, targetNode, fileSyncSnapshotTriggerRouter, fileSyncSnapshotHistory, "reload_channel_id='" + channel.getChannelId() + "'", true, loadId, - createBy, Status.NE, channel.getChannelId()); + createBy, Status.NE, channel.getChannelId(), -1); if (!transactional) { transaction.commit(); } @@ -1091,7 +1088,7 @@ protected void createPurgeEvent(ISqlTransaction transaction, String sql, Node ta CsvUtils.escapeCsvData(sql), null, triggerHistory, channelId, null, null); if (isLoad) { insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(), - triggerRouter.getRouter().getRouterId(), isLoad, loadId, createBy, Status.NE); + triggerRouter.getRouter().getRouterId(), isLoad, loadId, createBy, Status.NE, null, -1); } else { data.setNodeList(targetNode.getNodeId()); insertData(transaction, data); @@ -1140,7 +1137,7 @@ protected void insertSqlEvent(ISqlTransaction transaction, TriggerHistory histor data.setNodeList(targetNode.getNodeId()); if (isLoad) { insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(), - Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE); + Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE, null, -1); } else { insertData(transaction, data); } @@ -1160,7 +1157,7 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId, data.setNodeList(targetNode.getNodeId()); if (isLoad) { insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(), - Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE); + Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE, null, -1); } else { insertData(transaction, data); } @@ -1208,7 +1205,7 @@ public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, try { if (isLoad) { insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(), - routerId, isLoad, loadId, createBy, Status.NE); + routerId, isLoad, loadId, createBy, Status.NE, null, -1); } else { insertData(transaction, data); } @@ -1311,7 +1308,7 @@ public void insertDataAndDataEventAndOutgoingBatch(Data data, String channelId, long dataId = insertData(transaction, data); for (Node node : nodes) { insertDataEventAndOutgoingBatch(transaction, dataId, channelId, node.getNodeId(), - data.getDataEventType(), routerId, isLoad, loadId, createBy, Status.NE, data.getTableName()); + data.getDataEventType(), routerId, isLoad, loadId, createBy, Status.NE, data.getTableName(), -1); } transaction.commit(); } catch (Error ex) { @@ -1339,7 +1336,7 @@ public long insertDataAndDataEventAndOutgoingBatch(Data data, String nodeId, Str try { transaction = sqlTemplate.startSqlTransaction(); batchId = insertDataAndDataEventAndOutgoingBatch(transaction, data, nodeId, routerId, - isLoad, loadId, createBy, Status.NE); + isLoad, loadId, createBy, Status.NE, null, -1); transaction.commit(); return batchId; } catch (Error ex) { @@ -1358,48 +1355,49 @@ public long insertDataAndDataEventAndOutgoingBatch(Data data, String nodeId, Str } /** + * @param estimatedBatchRowCount TODO * @return The inserted batch id */ - public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data, - String nodeId, String routerId, boolean isLoad, long loadId, String createBy, - Status status, String overrideChannelId) { + public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data, String nodeId, String routerId, boolean isLoad, + long loadId, String createBy, Status status, String overrideChannelId, long estimatedBatchRowCount) { long dataId = insertData(transaction, data); String channelId = null; if (isLoad) { - if (overrideChannelId != null) { - channelId = overrideChannelId; - } - else { - TriggerHistory history = data.getTriggerHistory(); - if (history != null && channelId == null) { - Trigger trigger = engine.getTriggerRouterService().getTriggerById( - history.getTriggerId()); - channelId = getReloadChannelIdForTrigger(trigger, engine.getConfigurationService() - .getChannels(false)); - } - } + if (overrideChannelId != null) { + channelId = overrideChannelId; + } else { + TriggerHistory history = data.getTriggerHistory(); + if (history != null && channelId == null) { + Trigger trigger = engine.getTriggerRouterService().getTriggerById(history.getTriggerId()); + channelId = getReloadChannelIdForTrigger(trigger, engine.getConfigurationService().getChannels(false)); + } + } } else { channelId = data.getChannelId(); } - return insertDataEventAndOutgoingBatch(transaction, dataId, channelId, nodeId, - data.getDataEventType(), routerId, isLoad, loadId, createBy, status, data.getTableName()); + return insertDataEventAndOutgoingBatch(transaction, dataId, channelId, nodeId, data.getDataEventType(), routerId, isLoad, loadId, + createBy, status, data.getTableName(), estimatedBatchRowCount); } public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data, String nodeId, String routerId, boolean isLoad, long loadId, String createBy, - Status status) { - return insertDataAndDataEventAndOutgoingBatch(transaction, data, nodeId, routerId, isLoad, loadId, createBy, status, null); + Status status, long estimatedBatchRowCount) { + return insertDataAndDataEventAndOutgoingBatch(transaction, data, nodeId, routerId, isLoad, loadId, createBy, status, null, estimatedBatchRowCount); } protected long insertDataEventAndOutgoingBatch(ISqlTransaction transaction, long dataId, String channelId, String nodeId, DataEventType eventType, String routerId, - boolean isLoad, long loadId, String createBy, Status status, String tableName) { + boolean isLoad, long loadId, String createBy, Status status, String tableName, long estimatedBatchRowCount) { OutgoingBatch outgoingBatch = new OutgoingBatch(nodeId, channelId, status); outgoingBatch.setLoadId(loadId); outgoingBatch.setCreateBy(createBy); outgoingBatch.setLoadFlag(isLoad); outgoingBatch.incrementRowCount(eventType); - outgoingBatch.incrementDataRowCount(); + if (estimatedBatchRowCount > 0) { + outgoingBatch.setDataRowCount(estimatedBatchRowCount); + } else { + outgoingBatch.incrementDataRowCount(); + } if (tableName != null) { outgoingBatch.incrementTableCount(tableName.toLowerCase()); } @@ -1432,7 +1430,7 @@ private void insertNodeSecurityUpdate(ISqlTransaction transaction, String nodeId " t.node_id = '" + nodeIdRecord + "'"); if (data != null) { insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNodeId, - Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE, channelId); + Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE, channelId, -1); } } @@ -1559,7 +1557,7 @@ public String reloadTable(String nodeId, String catalogName, String schemaName, for (TriggerRouter triggerRouter : triggerRouters) { eventCount++; insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory, - overrideInitialLoadSelect, false, -1, "reloadTable", Status.NE); + overrideInitialLoadSelect, false, -1, "reloadTable", Status.NE, null, -1); } } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index 9ddd8caca6..84f526fd1e 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -268,7 +268,7 @@ public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isCommonFlag() ? 1 : 0, outgoingBatch.getReloadRowCount(), outgoingBatch.getOtherRowCount(), outgoingBatch.getDataUpdateRowCount(), outgoingBatch.getDataInsertRowCount(), outgoingBatch.getDataDeleteRowCount(), outgoingBatch.getLastUpdatedHostName(), outgoingBatch.getCreateBy(), - outgoingBatch.getSummary()); + outgoingBatch.getSummary(), outgoingBatch.getDataRowCount()); outgoingBatch.setBatchId(batchId); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index bb620ca572..f25d85d8a2 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -51,8 +51,8 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, putSql("insertOutgoingBatchSql", "insert into $(outgoing_batch) " + " (batch_id, node_id, channel_id, status, load_id, extract_job_flag, load_flag, common_flag, reload_row_count, other_row_count, " - + " data_update_row_count, data_insert_row_count, data_delete_row_count, last_update_hostname, last_update_time, create_time, create_by, summary) " - + " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp, ?, ?) "); + + " data_update_row_count, data_insert_row_count, data_delete_row_count, last_update_hostname, last_update_time, create_time, create_by, summary, data_row_count) " + + " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp, ?, ?, ?) "); putSql("updateOutgoingBatchSql", "update $(outgoing_batch) set status=?, load_id=?, extract_job_flag=?, load_flag=?, error_flag=?, "