From 0e145b3c6c3fa66897aa85c58fec80e148a2a143 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Wed, 21 Aug 2019 13:29:32 -0400 Subject: [PATCH] 0004074: Multiple nodes initial load with wildcards sending table twice and missing table --- .../symmetric/service/IDataService.java | 4 ++-- .../symmetric/service/impl/DataService.java | 17 ++++++++--------- .../symmetric/service/impl/RouterService.java | 2 +- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java index 391e6d3337..60b06b7d33 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java @@ -106,8 +106,8 @@ public String reloadTableImmediate(String nodeId, String catalogName, String sch public void insertReloadEvents(Node targetNode, boolean reverse, List reloadRequests, ProcessInfo processInfo); - public Map insertReloadEvents(Node targetNode, boolean reverse, List reloadRequests, ProcessInfo processInfo, - List activeHistories, List triggerRouters, Map extractRequests); + public Map insertReloadEvents(Node targetNode, boolean reverse, List reloadRequests, ProcessInfo processInfo, + List activeHistories, List triggerRouters, Map extractRequests); public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index fdbbd6a484..f53fae4bf6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -782,8 +782,8 @@ public void insertReloadEvents(Node targetNode, boolean reverse, ProcessInfo pro @Override - public Map insertReloadEvents(Node targetNode, boolean reverse, List reloadRequests, ProcessInfo processInfo, - List activeHistories, List triggerRouters, Map extractRequests) { + public Map insertReloadEvents(Node targetNode, boolean reverse, List reloadRequests, ProcessInfo processInfo, + List activeHistories, List triggerRouters, Map extractRequests) { if (engine.getClusterService().lock(ClusterConstants.SYNC_TRIGGERS)) { try { INodeService nodeService = engine.getNodeService(); @@ -1036,7 +1036,7 @@ public Map insertReloadEvents(Node targetNode, boolean r return extractRequests; } - private long getBatchCountFor(Map extractRequests) { + private long getBatchCountFor(Map extractRequests) { long batchCount = 0; for (ExtractRequest request : extractRequests.values()) { batchCount += ((request.getEndBatchId() - request.getStartBatchId()) + 1); @@ -1397,14 +1397,14 @@ private int insertSQLBatchesForReload(Node targetNode, long loadId, String creat return sqlEventsSent; } - private Map insertLoadBatchesForReload(Node targetNode, long loadId, String createBy, + private Map insertLoadBatchesForReload(Node targetNode, long loadId, String createBy, List triggerHistories, Map> triggerRoutersByHistoryId, boolean transactional, ISqlTransaction transaction, Map reloadRequests, ProcessInfo processInfo, - String selectSqlOverride, Map extractRequests, boolean isFullLoad) throws InterruptedException { + String selectSqlOverride, Map extractRequests, boolean isFullLoad) throws InterruptedException { Map channels = engine.getConfigurationService().getChannels(false); - Map requests = new HashMap(); + Map requests = new HashMap(); if (extractRequests != null) { requests.putAll(extractRequests); } @@ -1450,8 +1450,7 @@ private Map insertLoadBatchesForReload(Node targetNode, long rowCount = -1; long parentRequestId = 0; - String extractRequestKey = triggerRouter.getTriggerId() + "::" + triggerRouter.getRouterId(); - ExtractRequest parentRequest = requests.get(extractRequestKey); + ExtractRequest parentRequest = requests.get(triggerHistory.getTriggerHistoryId()); if (parentRequest != null) { Router router = engine.getTriggerRouterService().getRouterById(triggerRouter.getRouterId(), false); @@ -1496,7 +1495,7 @@ private Map insertLoadBatchesForReload(Node targetNode, ExtractRequest request = engine.getDataExtractorService().requestExtractRequest(transaction, targetNode.getNodeId(), channel.getQueue(), triggerRouter, startBatchId, endBatchId, loadId, table.getName(), rowCount, parentRequestId); if (parentRequestId == 0) { - requests.put(extractRequestKey, request); + requests.put(triggerHistory.getTriggerHistoryId(), request); } } else { log.warn("The table defined by trigger_hist row %d no longer exists. A load will not be queue'd up for the table", triggerHistory.getTriggerHistoryId()); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index 13af19f7f9..bec880b6ad 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -360,7 +360,7 @@ public void processTableRequestLoads(Node source, ProcessInfo processInfo, Map< boolean streamToFile = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED, false); Map> requestsSplitByLoad = new HashMap>(); - Map extractRequests = null; + Map extractRequests = null; for (TableReloadRequest load : loadsToProcess) { Node targetNode = engine.getNodeService().findNode(load.getTargetNodeId(), true);