Skip to content

Commit

Permalink
0004074: Multiple nodes initial load with wildcards sending table twice
Browse files Browse the repository at this point in the history
and missing table
  • Loading branch information
erilong committed Aug 21, 2019
1 parent 4c38402 commit 0e145b3
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 12 deletions.
Expand Up @@ -106,8 +106,8 @@ public String reloadTableImmediate(String nodeId, String catalogName, String sch

public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo);

public Map<String, ExtractRequest> insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo,
List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters, Map<String, ExtractRequest> extractRequests);
public Map<Integer, ExtractRequest> insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo,
List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters, Map<Integer, ExtractRequest> extractRequests);

public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient);

Expand Down
Expand Up @@ -782,8 +782,8 @@ public void insertReloadEvents(Node targetNode, boolean reverse, ProcessInfo pro


@Override
public Map<String, ExtractRequest> insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo,
List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters, Map<String, ExtractRequest> extractRequests) {
public Map<Integer, ExtractRequest> insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo,
List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters, Map<Integer, ExtractRequest> extractRequests) {
if (engine.getClusterService().lock(ClusterConstants.SYNC_TRIGGERS)) {
try {
INodeService nodeService = engine.getNodeService();
Expand Down Expand Up @@ -1036,7 +1036,7 @@ public Map<String, ExtractRequest> insertReloadEvents(Node targetNode, boolean r
return extractRequests;
}

private long getBatchCountFor(Map<String, ExtractRequest> extractRequests) {
private long getBatchCountFor(Map<Integer, ExtractRequest> extractRequests) {
long batchCount = 0;
for (ExtractRequest request : extractRequests.values()) {
batchCount += ((request.getEndBatchId() - request.getStartBatchId()) + 1);
Expand Down Expand Up @@ -1397,14 +1397,14 @@ private int insertSQLBatchesForReload(Node targetNode, long loadId, String creat
return sqlEventsSent;
}

private Map<String, ExtractRequest> insertLoadBatchesForReload(Node targetNode, long loadId, String createBy,
private Map<Integer, ExtractRequest> insertLoadBatchesForReload(Node targetNode, long loadId, String createBy,
List<TriggerHistory> triggerHistories,
Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId, boolean transactional,
ISqlTransaction transaction, Map<String, TableReloadRequest> reloadRequests, ProcessInfo processInfo,
String selectSqlOverride, Map<String, ExtractRequest> extractRequests, boolean isFullLoad) throws InterruptedException {
String selectSqlOverride, Map<Integer, ExtractRequest> extractRequests, boolean isFullLoad) throws InterruptedException {

Map<String, Channel> channels = engine.getConfigurationService().getChannels(false);
Map<String, ExtractRequest> requests = new HashMap<String, ExtractRequest>();
Map<Integer, ExtractRequest> requests = new HashMap<Integer, ExtractRequest>();
if (extractRequests != null) {
requests.putAll(extractRequests);
}
Expand Down Expand Up @@ -1450,8 +1450,7 @@ private Map<String, ExtractRequest> insertLoadBatchesForReload(Node targetNode,

long rowCount = -1;
long parentRequestId = 0;
String extractRequestKey = triggerRouter.getTriggerId() + "::" + triggerRouter.getRouterId();
ExtractRequest parentRequest = requests.get(extractRequestKey);
ExtractRequest parentRequest = requests.get(triggerHistory.getTriggerHistoryId());

if (parentRequest != null) {
Router router = engine.getTriggerRouterService().getRouterById(triggerRouter.getRouterId(), false);
Expand Down Expand Up @@ -1496,7 +1495,7 @@ private Map<String, ExtractRequest> insertLoadBatchesForReload(Node targetNode,
ExtractRequest request = engine.getDataExtractorService().requestExtractRequest(transaction, targetNode.getNodeId(), channel.getQueue(),
triggerRouter, startBatchId, endBatchId, loadId, table.getName(), rowCount, parentRequestId);
if (parentRequestId == 0) {
requests.put(extractRequestKey, request);
requests.put(triggerHistory.getTriggerHistoryId(), request);
}
} else {
log.warn("The table defined by trigger_hist row %d no longer exists. A load will not be queue'd up for the table", triggerHistory.getTriggerHistoryId());
Expand Down
Expand Up @@ -360,7 +360,7 @@ public void processTableRequestLoads(Node source, ProcessInfo processInfo, Map<
boolean streamToFile = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED, false);

Map<String, List<TableReloadRequest>> requestsSplitByLoad = new HashMap<String, List<TableReloadRequest>>();
Map<String, ExtractRequest> extractRequests = null;
Map<Integer, ExtractRequest> extractRequests = null;

for (TableReloadRequest load : loadsToProcess) {
Node targetNode = engine.getNodeService().findNode(load.getTargetNodeId(), true);
Expand Down

0 comments on commit 0e145b3

Please sign in to comment.