Skip to content

Commit

Permalink
0002658: Initial load defaults to initial load extract in background
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Jun 30, 2016
1 parent 9a262a1 commit 004de5b
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 24 deletions.
Expand Up @@ -39,7 +39,8 @@ public enum ExtractStatus {
private TriggerRouter triggerRouter;
private Date lastUpdateTime;
private Date createTime;

private String queue;

public long getRequestId() {
return requestId;
}
Expand Down Expand Up @@ -104,4 +105,12 @@ public void setCreateTime(Date createTime) {
this.createTime = createTime;
}

public String getQueue() {
return queue;
}

public void setQueue(String queue) {
this.queue = queue;
}

}
Expand Up @@ -49,7 +49,7 @@ public class TriggerRouter implements Serializable {

private String initialLoadDeleteStmt;

private int initialLoadBatchCount = 1;
private int initialLoadBatchCount = 0;

private Trigger trigger;

Expand Down
Expand Up @@ -77,8 +77,14 @@ public ConfigurationService(IParameterService parameterService, ISymmetricDialec
this.defaultChannels = new ArrayList<Channel>();
this.defaultChannels
.add(new Channel(Constants.CHANNEL_CONFIG, 0, 2000, 100, true, 0, true));
this.defaultChannels.add(new Channel(Constants.CHANNEL_RELOAD, 1, 1, 1, true, 0, false,
true, false));
if (parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB)) {
this.defaultChannels.add(new Channel(Constants.CHANNEL_RELOAD, 1, 10000, 100, true, 0, false,
true, false));
} else {
this.defaultChannels.add(new Channel(Constants.CHANNEL_RELOAD, 1, 1, 1, true, 0, false,
true, false));
}

this.defaultChannels.add(new Channel(Constants.CHANNEL_HEARTBEAT, 2, 100, 100, true, 0,
false));
this.defaultChannels.add(new Channel(Constants.CHANNEL_DEFAULT, 99999, 1000, 100, true, 0,
Expand Down
Expand Up @@ -523,13 +523,14 @@ protected List<OutgoingBatch> extract(final ProcessInfo processInfo, final Node
Set<String> channelsProcessed = new HashSet<String>();
long batchesSelectedAtMs = System.currentTimeMillis();
OutgoingBatch currentBatch = null;
ExecutorService executor = null;
try {
final long maxBytesToSync = parameterService.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC);
final boolean streamToFileEnabled = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED);
long keepAliveMillis = parameterService.getLong(ParameterConstants.DATA_LOADER_SEND_ACK_KEEPALIVE);
Node sourceNode = nodeService.findIdentity();
final FutureExtractStatus status = new FutureExtractStatus();
ExecutorService executor = Executors.newFixedThreadPool(1, new DataExtractorThreadFactory());
executor = Executors.newFixedThreadPool(1, new DataExtractorThreadFactory());
List<Future<FutureOutgoingBatch>> futures = new ArrayList<Future<FutureOutgoingBatch>>();

for (int i = 0; i < activeBatches.size(); i++) {
Expand Down Expand Up @@ -567,6 +568,12 @@ protected List<OutgoingBatch> extract(final ProcessInfo processInfo, final Node
currentBatch.setStatus(Status.NE);
currentBatch.setExtractJobFlag(false);
}

processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch, false,
dataWriter, writer, mode);
processedBatches.add(currentBatch);

} else {
processInfo.setStatus(ProcessInfo.Status.EXTRACTING);
final OutgoingBatch extractBatch = currentBatch;
Expand Down Expand Up @@ -677,6 +684,10 @@ public FutureOutgoingBatch call() throws Exception {
log.error("Could not log the outgoing batch status because the batch was null",
e);
}
} finally {
if (executor != null) {
executor.shutdown();
}
}

// Next, we update the node channel controls to the
Expand Down Expand Up @@ -1101,17 +1112,17 @@ public RemoteNodeStatuses queueWork(boolean force) {
return statuses;
}

protected void queue(String nodeId, String channelId, RemoteNodeStatuses statuses) {
protected void queue(String nodeId, String queue, RemoteNodeStatuses statuses) {
final NodeCommunication.CommunicationType TYPE = NodeCommunication.CommunicationType.EXTRACT;
int availableThreads = nodeCommunicationService.getAvailableThreads(TYPE);
NodeCommunication lock = nodeCommunicationService.find(nodeId, channelId, TYPE);
NodeCommunication lock = nodeCommunicationService.find(nodeId, queue, TYPE);
if (availableThreads > 0) {
nodeCommunicationService.execute(lock, statuses, this);
}
}

public Map<String, String> getExtractRequestNodes() {
return sqlTemplate.queryForMap(getSql("selectNodeIdsForExtractSql"), "node_id", "channel_id",
return sqlTemplate.queryForMap(getSql("selectNodeIdsForExtractSql"), "node_id", "queue",
ExtractStatus.NE.name());
}

Expand All @@ -1126,11 +1137,11 @@ protected void resetExtractRequest(OutgoingBatch batch) {
batch.getBatchId(), batch.getBatchId(), batch.getNodeId(), batch.getChannelId());
}

public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String channelId,
public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String queue,
TriggerRouter triggerRouter, long startBatchId, long endBatchId) {
long requestId = sequenceService.nextVal(transaction, Constants.SEQUENCE_EXTRACT_REQ);
transaction.prepareAndExecute(getSql("insertExtractRequestSql"),
new Object[] { requestId, nodeId, channelId, ExtractStatus.NE.name(), startBatchId,
new Object[] { requestId, nodeId, queue, ExtractStatus.NE.name(), startBatchId,
endBatchId, triggerRouter.getTrigger().getTriggerId(),
triggerRouter.getRouter().getRouterId() }, new int[] { Types.BIGINT, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.BIGINT, Types.BIGINT, Types.VARCHAR, Types.VARCHAR });
Expand Down Expand Up @@ -1280,6 +1291,7 @@ public ExtractRequest mapRow(Row row) {
request.setLastUpdateTime(row.getDateTime("last_update_time"));
request.setTriggerRouter(triggerRouterService.findTriggerRouterById(
row.getString("trigger_id"), row.getString("router_id")));
request.setQueue(row.getString("queue"));
return request;
}
}
Expand Down
Expand Up @@ -31,15 +31,15 @@ public DataExtractorServiceSqlMap(IDatabasePlatform platform,
super(platform, replacementTokens);

// @formatter:off
putSql("selectNodeIdsForExtractSql", "select node_id, channel_id from $(extract_request) where status=? group by node_id, channel_id");
putSql("selectNodeIdsForExtractSql", "select node_id, queue from $(extract_request) where status=? group by node_id, queue");

putSql("selectExtractRequestForNodeSql", "select * from $(extract_request) where node_id=? and channel_id=? and status=? order by request_id");
putSql("selectExtractRequestForNodeSql", "select * from $(extract_request) where node_id=? and queue=? and status=? order by request_id");

putSql("insertExtractRequestSql", "insert into $(extract_request) (request_id, node_id, channel_id, status, start_batch_id, end_batch_id, trigger_id, router_id, last_update_time, create_time) values(?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp)");
putSql("insertExtractRequestSql", "insert into $(extract_request) (request_id, node_id, queue, status, start_batch_id, end_batch_id, trigger_id, router_id, last_update_time, create_time) values(?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp)");

putSql("updateExtractRequestStatus", "update $(extract_request) set status=? where request_id=?");

putSql("resetExtractRequestStatus", "update $(extract_request) set status=? where start_batch_id <= ? and end_batch_id >= ? and node_id=? and channel_id=?");
putSql("resetExtractRequestStatus", "update $(extract_request) set status=? where start_batch_id <= ? and end_batch_id >= ? and node_id=? and queue=?");
}

}
Expand Up @@ -660,7 +660,7 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre

}
engine.getDataExtractorService().requestExtractRequest(transaction,
targetNode.getNodeId(), channel.getChannelId(), triggerRouter, startBatchId, endBatchId);
targetNode.getNodeId(), channel.getQueue(), triggerRouter, startBatchId, endBatchId);
} else {
insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory,
null, true, loadId, createBy, Status.NE);
Expand Down
Expand Up @@ -107,14 +107,15 @@ private final void initialize() {
}
}

public NodeCommunication find(String nodeId, String channelThread, CommunicationType communicationType) {
public NodeCommunication find(String nodeId, String queue, CommunicationType communicationType) {
NodeCommunication lock = sqlTemplate.queryForObject(
getSql("selectNodeCommunicationByNodeAndChannelSql"), new NodeCommunicationMapper(),
nodeId, channelThread, communicationType.name());
nodeId, queue, communicationType.name());
if (lock == null) {
lock = new NodeCommunication();
lock.setNodeId(nodeId);
lock.setCommunicationType(communicationType);
lock.setQueue(queue);
save(lock);
}
return lock;
Expand Down
10 changes: 8 additions & 2 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -520,15 +520,15 @@ initial.load.reverse.first=true
# DatabaseOverridable: true
# Tags: load
# Type: boolean
initial.load.use.extract.job.enabled=false
initial.load.use.extract.job.enabled=true

# Indicate that the initial load batches will be sent as soon
# as they are staged. Used in combination with initial.load.use.extract.job.enabled=true
#
# DatabaseOverridable: true
# Tags: load
# Type: boolean
initial.load.extract.and.send.when.staged=false
initial.load.extract.and.send.when.staged=true

# The number of threads available for concurrent extracts of initial load batches.
#
Expand Down Expand Up @@ -943,6 +943,12 @@ routing.data.reader.threshold.gaps.to.use.greater.than.query=100
# Tags: routing
routing.log.stats.on.batch.error=false

# Enable to collect unrouted data statistics into the stat tables for graphs.
#
# DatabaseOverridable: true
# Tags: routing
routing.collect.stats.unrouted=false

# Use a faster method of gap detection that uses the output of the work from router service
# instead of querying for it.
#
Expand Down
2 changes: 1 addition & 1 deletion symmetric-core/src/main/resources/symmetric-schema.xml
Expand Up @@ -137,7 +137,7 @@
<table name="extract_request" description="This table is used internally to request the extract of initial loads asynchronously when the initial load extract job is enabled.">
<column name="request_id" type="BIGINT" required="true" primaryKey="true" description="Unique identifier for a request." />
<column name="node_id" type="VARCHAR" size="50" required="true" description="The node_id of the batch being loaded." />
<column name="channel_id" type="VARCHAR" size="128" description="The channel_id of the batch being loaded." />
<column name="queue" type="VARCHAR" size="128" description="The channel queue name of the batch being loaded." />
<column name="status" type="CHAR" size="2" description="NE, OK" />
<column name="start_batch_id" type="BIGINT" required="true" description="A load can be split across multiple batches. This is the first of N batches the load will be split across." />
<column name="end_batch_id" type="BIGINT" required="true" description="This is the last of N batches the load will be split across." />
Expand Down
Expand Up @@ -1358,7 +1358,10 @@ public List<String> execute(Connection connection) throws SQLException {
try {
rs = meta.getCatalogs();
while (rs.next()) {
catalogs.add(rs.getString(1));
String catalog = rs.getString(1);
if (catalog != null) {
catalogs.add(catalog);
}
}
return catalogs;
} finally {
Expand Down
Expand Up @@ -27,15 +27,15 @@ http.enable=true

# Port number for synchronization over HTTP.
#
http.port=31415
http.port=32415

# Enable synchronization over HTTPS (HTTP over SSL).
#
https.enable=false

# Port number for synchronization over HTTPS (HTTP over SSL).
#
https.port=31417
https.port=32417

# Use a trust manager that allows self-signed server SSL certificates.
#
Expand All @@ -51,4 +51,4 @@ jmx.http.enable=true

# Port number for Java Management Extensions (JMX) web console.
#
jmx.http.port=31416
jmx.http.port=32416

0 comments on commit 004de5b

Please sign in to comment.