Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0003858: Initial load share extract requests for multiple nodes
  • Loading branch information
erilong committed Jan 21, 2019
1 parent 764fbfd commit 5dc95cd
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 91 deletions.
Expand Up @@ -160,7 +160,6 @@ private ParameterConstants() {
public final static String INITIAL_LOAD_EXTRACT_JOB_START = "start.initial.load.extract.job";
public final static String INITIAL_LOAD_SCHEMA_DUMP_COMMAND = "initial.load.schema.dump.command";
public final static String INITIAL_LOAD_SCHEMA_LOAD_COMMAND = "initial.load.schema.load.command";
public final static String INITIAL_LOAD_EXTRACT_AND_SEND_WHEN_STAGED = "initial.load.extract.and.send.when.staged";
public final static String INITIAL_LOAD_TRANSPORT_MAX_BYTES_TO_SYNC = "initial.load.transport.max.bytes.to.sync";
public final static String INITIAL_LOAD_USE_ESTIMATED_COUNTS = "initial.load.use.estimated.counts";
public final static String INITIAL_LOAD_PURGE_STAGE_IMMEDIATE_THRESHOLD_ROWS = "initial.load.purge.stage.immediate.threshold.rows";
Expand Down
Expand Up @@ -51,6 +51,7 @@ public enum ExtractStatus {
private long lastLoadedBatchId;
private long transferredMillis;
private long loadedMillis;
private long parentRequestId;

public long getRequestId() {
return requestId;
Expand Down Expand Up @@ -212,5 +213,12 @@ public void setLoadedMillis(long loadedMillis) {
this.loadedMillis = loadedMillis;
}

public long getParentRequestId() {
return parentRequestId;
}

public void setParentRequestId(long parentRequestId) {
this.parentRequestId = parentRequestId;
}

}
Expand Up @@ -65,7 +65,8 @@ public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTi

public RemoteNodeStatuses queueWork(boolean force);

public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String channelId, TriggerRouter triggerRouter, long startBatchId, long endBatchId, long loadId, String tableName, long rows);
public ExtractRequest requestExtractRequest(ISqlTransaction transaction, String nodeId, String channelId, TriggerRouter triggerRouter, long startBatchId, long endBatchId,
long loadId, String tableName, long rows, long parentId);

public void resetExtractRequest(OutgoingBatch batch);

Expand All @@ -81,4 +82,6 @@ public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTi

public int cancelExtractRequests(long loadId);

public void releaseMissedExtractRequests();

}
Expand Up @@ -35,6 +35,7 @@
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataEvent;
import org.jumpmind.symmetric.model.DataGap;
import org.jumpmind.symmetric.model.ExtractRequest;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.TableReloadRequest;
Expand Down Expand Up @@ -95,7 +96,8 @@ public interface IDataService {

public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo);

public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo, List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters);
public Map<String, ExtractRequest> insertReloadEvents(Node targetNode, boolean reverse, List<TableReloadRequest> reloadRequests, ProcessInfo processInfo,
List<TriggerHistory> activeHistories, List<TriggerRouter> triggerRouters, Map<String, ExtractRequest> extractRequests);

public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient);

Expand Down
Expand Up @@ -1900,12 +1900,30 @@ public NodeQueuePair mapRow(Row row) {
}
}

public List<ExtractRequest> getExtractRequestsForNode(NodeCommunication nodeCommunication) {
protected List<ExtractRequest> getExtractRequestsForNode(NodeCommunication nodeCommunication) {
return sqlTemplate.query(getSql("selectExtractRequestForNodeSql"),
new ExtractRequestMapper(), nodeCommunication.getNodeId(), nodeCommunication.getQueue()
, ExtractRequest.ExtractStatus.NE.name());
new ExtractRequestMapper(), nodeCommunication.getNodeId(), nodeCommunication.getQueue(),
ExtractRequest.ExtractStatus.NE.name());
}


protected Map<Long, List<ExtractRequest>> getExtractChildRequestsForNode(NodeCommunication nodeCommunication, List<ExtractRequest> parentRequests) {
Map<Long, List<ExtractRequest>> requests = new HashMap<Long, List<ExtractRequest>>();

List<ExtractRequest> childRequests = sqlTemplate.query(getSql("selectExtractChildRequestForNodeSql"),
new ExtractRequestMapper(), nodeCommunication.getNodeId(), nodeCommunication.getQueue(),
ExtractRequest.ExtractStatus.NE.name());

for (ExtractRequest childRequest: childRequests) {
List<ExtractRequest> childList = requests.get(childRequest.getParentRequestId());
if (childList == null) {
childList = new ArrayList<ExtractRequest>();
requests.put(childRequest.getParentRequestId(), childList);
}
childList.add(childRequest);
}
return requests;
}

@Override
public void resetExtractRequest(OutgoingBatch batch) {
ISqlTransaction transaction = null;
Expand All @@ -1932,20 +1950,33 @@ public void resetExtractRequest(OutgoingBatch batch) {
}
}

public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String queue,
TriggerRouter triggerRouter, long startBatchId, long endBatchId, long loadId, String table, long rows) {
public ExtractRequest requestExtractRequest(ISqlTransaction transaction, String nodeId, String queue,
TriggerRouter triggerRouter, long startBatchId, long endBatchId, long loadId, String table, long rows, long parentRequestId) {
long requestId = sequenceService.nextVal(transaction, Constants.SEQUENCE_EXTRACT_REQ);
transaction.prepareAndExecute(getSql("insertExtractRequestSql"),
new Object[] { requestId, nodeId, queue, ExtractStatus.NE.name(), startBatchId,
endBatchId, triggerRouter.getTrigger().getTriggerId(),
triggerRouter.getRouter().getRouterId(), loadId, table, rows }, new int[] { Types.BIGINT, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.VARCHAR, Types.BIGINT });
triggerRouter.getRouter().getRouterId(), loadId, table, rows, parentRequestId }, new int[] { Types.BIGINT, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.VARCHAR,
Types.BIGINT, Types.BIGINT });
ExtractRequest request = new ExtractRequest();
request.setRequestId(requestId);
request.setNodeId(nodeId);
request.setQueue(queue);
request.setStatus(ExtractStatus.NE);
request.setStartBatchId(startBatchId);
request.setEndBatchId(endBatchId);
request.setRouterId(triggerRouter.getRouterId());
request.setLoadId(loadId);
request.setTableName(table);
request.setRows(rows);
request.setParentRequestId(parentRequestId);
return request;
}

protected void updateExtractRequestStatus(ISqlTransaction transaction, long extractId,
ExtractStatus status) {
transaction.prepareAndExecute(getSql("updateExtractRequestStatus"), status.name(),
extractId);
transaction.prepareAndExecute(getSql("updateExtractRequestStatus"), status.name(), extractId);
}

protected boolean canProcessExtractRequest(ExtractRequest request, CommunicationType communicationType) {
Expand All @@ -1969,7 +2000,13 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
}

List<ExtractRequest> requests = getExtractRequestsForNode(nodeCommunication);
Map<Long, List<ExtractRequest>> allChildRequests = null;
long ts = System.currentTimeMillis();

if (requests.size() > 0) {
allChildRequests = getExtractChildRequestsForNode(nodeCommunication, requests);
}

/*
* Process extract requests until it has taken longer than 30 seconds, and then
* allow the process to return so progress status can be seen.
Expand All @@ -1993,6 +2030,8 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
.getNodeId(), nodeCommunication.getQueue(), nodeCommunication.getNodeId(),
getProcessType()));
processInfo.setTotalBatchCount(batches.size());
List<ExtractRequest> childRequests = allChildRequests.get(request.getRequestId());

try {
boolean areBatchesOk = true;

Expand Down Expand Up @@ -2023,7 +2062,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
}

MultiBatchStagingWriter multiBatchStagingWriter =
buildMultiBatchStagingWriter(request, identity, targetNode, batches, processInfo, channel);
buildMultiBatchStagingWriter(request, childRequests, identity, targetNode, batches, processInfo, channel);

extractOutgoingBatch(processInfo, targetNode, multiBatchStagingWriter,
firstBatch, false, false, ExtractMode.FOR_SYM_CLIENT, new ClusterLockRefreshListener(clusterService));
Expand Down Expand Up @@ -2064,25 +2103,11 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
updateExtractRequestStatus(transaction, request.getRequestId(),
ExtractStatus.OK);

if (!areBatchesOk) {
for (OutgoingBatch outgoingBatch : batches) {
if (!parameterService.is(ParameterConstants.INITIAL_LOAD_EXTRACT_AND_SEND_WHEN_STAGED, false)) {
outgoingBatch.setStatus(Status.NE);
outgoingBatchService.updateOutgoingBatch(transaction, outgoingBatch);
} else if (outgoingBatch.getStatus() == Status.RQ) {
log.info("Batch {} was empty after extract in background and will be ignored.",
new Object[] { outgoingBatch.getNodeBatchId() });
outgoingBatch.setStatus(Status.IG);
outgoingBatchService.updateOutgoingBatch(transaction, outgoingBatch);

}
updateExtractRequestStatus(transaction, request.getRequestId(), ExtractStatus.OK);
if (childRequests != null) {
for (ExtractRequest childRequest : childRequests) {
updateExtractRequestStatus(transaction, childRequest.getRequestId(), ExtractStatus.OK);
}
} else {
log.info("Batches already had an OK status for request {}, batches {} to {}. Not updating the status to NE",
new Object[] { request.getRequestId(), request.getStartBatchId(), request.getEndBatchId() });
}
transaction.commit();
log.info("Done extracting {} batches for request {}", (request.getEndBatchId() - request.getStartBatchId()) + 1, request.getRequestId());
Expand All @@ -2099,6 +2124,8 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
} finally {
close(transaction);
}

releaseMissedExtractRequests();
processInfo.setStatus(ProcessInfo.ProcessStatus.OK);

} catch (CancellationException ex) {
Expand Down Expand Up @@ -2131,6 +2158,14 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
}
}
}

public void releaseMissedExtractRequests() {
int missingCount = sqlTemplateDirty.queryForInt(getSql("countExtractChildRequestMissed"), Status.NE.name(), Status.OK.name());
if (missingCount > 0) {
log.info("Releasing {} child extract requests that missed processing by parent node", missingCount);
sqlTemplate.update(getSql("releaseExtractChildRequestMissed"), Status.NE.name(), Status.OK.name());
}
}

protected void checkSendDeferredConstraints(ExtractRequest request, Node targetNode, OutgoingBatch batch) {
if (parameterService.is(ParameterConstants.INITIAL_LOAD_DEFER_CREATE_CONSTRAINTS, false)) {
Expand Down Expand Up @@ -2171,9 +2206,9 @@ protected boolean isApplicable(NodeCommunication nodeCommunication) {
return nodeCommunication.getCommunicationType() != CommunicationType.FILE_XTRCT;
}

protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest request, Node sourceNode, Node targetNode, List<OutgoingBatch> batches,
ProcessInfo processInfo, Channel channel) {
MultiBatchStagingWriter multiBatchStatingWriter = new MultiBatchStagingWriter(this, request, sourceNode.getNodeId(), stagingManager,
protected MultiBatchStagingWriter buildMultiBatchStagingWriter(ExtractRequest request, List<ExtractRequest> childRequests, Node sourceNode,
Node targetNode, List<OutgoingBatch> batches, ProcessInfo processInfo, Channel channel) {
MultiBatchStagingWriter multiBatchStatingWriter = new MultiBatchStagingWriter(this, request, childRequests, sourceNode.getNodeId(), stagingManager,
batches, channel.getMaxBatchSize(), processInfo);
return multiBatchStatingWriter;
}
Expand Down Expand Up @@ -2222,6 +2257,7 @@ public ExtractRequest mapRow(Row row) {
request.setLastLoadedBatchId(row.getLong("last_loaded_batch_id"));
request.setTransferredMillis(row.getLong("transferred_millis"));
request.setLoadedMillis(row.getLong("loaded_millis"));
request.setParentRequestId(row.getLong("parent_request_id"));
return request;
}
}
Expand Down
Expand Up @@ -31,12 +31,23 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform,
super(platform, replacementTokens);

// @formatter:off
putSql("selectNodeIdsForExtractSql", "select node_id, queue from $(extract_request) where status=? group by node_id, queue");
putSql("selectNodeIdsForExtractSql", "select node_id, queue from $(extract_request) where status=? and parent_request_id=0 group by node_id, queue");

putSql("selectExtractRequestForNodeSql", "select * from $(extract_request) where node_id=? and queue=? and status=? order by request_id");
putSql("selectExtractRequestForNodeSql", "select * from $(extract_request) where node_id=? and queue=? and status=? and parent_request_id=0 order by request_id");

putSql("insertExtractRequestSql", "insert into $(extract_request) (request_id, node_id, queue, status, start_batch_id, end_batch_id, trigger_id, router_id, load_id, table_name, total_rows, last_update_time, create_time) "
+ " values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp)");
putSql("selectExtractChildRequestForNodeSql", "select c.* from $(extract_request) c " +
"inner join $(extract_request) p on p.request_id = c.parent_request_id where p.node_id=? and p.queue=? and p.status=? and p.parent_request_id=0");

putSql("countExtractChildRequestMissed",
"select count(*) from $(extract_request) where status = ? and parent_request_id > 0 "
+ "and parent_request_id in (select request_id from sym_extract_request where parent_request_id = 0 and status = ?)");

putSql("releaseExtractChildRequestMissed",
"update sym_extract_request set parent_request_id = 0 where status = ? and parent_request_id > 0 "
+ "and parent_request_id in (select request_id from sym_extract_request where parent_request_id = 0 and status = ?)");

putSql("insertExtractRequestSql", "insert into $(extract_request) (request_id, node_id, queue, status, start_batch_id, end_batch_id, trigger_id, router_id, load_id, table_name, total_rows, parent_request_id, last_update_time, create_time) "
+ " values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp)");

putSql("updateExtractRequestStatus", "update $(extract_request) set status=?, last_update_time=current_timestamp where request_id=?");

Expand All @@ -47,7 +58,8 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform,
putSql("updateExtractRequestTransferred", "update $(extract_request) set last_transferred_batch_id=?, transferred_rows = transferred_rows + ?, transferred_millis = ?"
+ " where start_batch_id <= ? and end_batch_id >= ? and node_id=? and load_id=? and (last_transferred_batch_id is null or last_transferred_batch_id < ?)");

putSql("resetExtractRequestStatus", "update $(extract_request) set status=?, last_update_time= current_timestamp where start_batch_id <= ? and end_batch_id >= ? and node_id=?");
putSql("resetExtractRequestStatus", "update $(extract_request) set status=?, parent_request_id=0, last_update_time= current_timestamp"
+ " where start_batch_id <= ? and end_batch_id >= ? and node_id=?");

putSql("cancelExtractRequests", "update $(extract_request) set status=?, last_update_time=current_timestamp where load_id = ?");

Expand Down

0 comments on commit 5dc95cd

Please sign in to comment.