Skip to content

Commit

Permalink
SYMMETRICDS-424 - flush a keep alive package every configurable (2 mi…
Browse files Browse the repository at this point in the history
…nutes) time period while data is being extracted.
  • Loading branch information
chenson42 committed Apr 28, 2011
1 parent 30acc25 commit eb7cba0
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ private ParameterConstants() {
public final static String DATA_RELOAD_IS_BATCH_INSERT_TRANSACTIONAL = "datareload.batch.insert.transactional";

public final static String DATA_EXTRACTOR_ENABLED = "dataextractor.enable";
public final static String DATA_EXTRACTOR_FLUSH_FOR_KEEP_ALIVE = "dataextractor.keepalive.period.ms";
public final static String DATA_EXTRACTOR_OLD_DATA_ENABLED = "dataextractor.old.data.enable";

public final static String DBDIALECT_ORACLE_USE_TRANSACTION_VIEW = "oracle.use.transaction.view";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,14 @@ public List<OutgoingBatch> extract(Node node, IOutgoingTransport targetTransport
if (activeBatches.size() > 0) {

Writer extractWriter = null;
BufferedWriter networkWriter = null;
BufferedWriter networkWriter = null;

boolean streamToFileEnabled = parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED);

ThresholdFileWriter fileWriter = new ThresholdFileWriter(parameterService
.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD), "extract");

if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) {
if (streamToFileEnabled) {
extractWriter = new BufferedWriter(fileWriter);
networkWriter = targetTransport.open();
} else {
Expand Down Expand Up @@ -463,7 +465,7 @@ public List<OutgoingBatch> extract(Node node, IOutgoingTransport targetTransport
outgoingBatch.setStatus(OutgoingBatch.Status.QY);
outgoingBatch.setExtractCount(outgoingBatch.getExtractCount() + 1);
outgoingBatchService.updateOutgoingBatch(outgoingBatch);
databaseExtract(node, outgoingBatch, handler);
databaseExtract(node, outgoingBatch, handler, networkWriter);
}
outgoingBatch.setStatus(OutgoingBatch.Status.SE);
outgoingBatch.setSentCount(outgoingBatch.getSentCount() + 1);
Expand Down Expand Up @@ -610,12 +612,12 @@ protected void networkTransfer(BufferedReader reader, BufferedWriter writer) thr
* Allow a handler callback to do the work so we can route the extracted
* data to other types of handlers for processing.
*/
protected void databaseExtract(Node node, OutgoingBatch batch, final IExtractListener handler)
protected void databaseExtract(Node node, OutgoingBatch batch, final IExtractListener handler, BufferedWriter keepaliveWriter)
throws IOException {
batch.resetStats();
long ts = System.currentTimeMillis();
handler.startBatch(batch);
selectEventDataToExtract(handler, batch);
selectEventDataToExtract(handler, batch, keepaliveWriter);
handler.endBatch(batch);
batch.setExtractMillis(System.currentTimeMillis() - ts);
}
Expand Down Expand Up @@ -650,7 +652,7 @@ public boolean extractBatchRange(final IExtractListener handler, String startBat
handler.init();
for (final OutgoingBatch batch : batches.getBatches()) {
handler.startBatch(batch);
selectEventDataToExtract(handler, batch);
selectEventDataToExtract(handler, batch, null);
handler.endBatch(batch);
}
} finally {
Expand All @@ -662,10 +664,18 @@ public boolean extractBatchRange(final IExtractListener handler, String startBat
return false;
}

private void selectEventDataToExtract(final IExtractListener handler, final OutgoingBatch batch) {
dataService.handleDataSelect(batch.getBatchId(), -1, batch.getChannelId(), false, new IModelRetrievalHandler<Data, String>() {
private void selectEventDataToExtract(final IExtractListener handler, final OutgoingBatch batch, final BufferedWriter keepAliveWriter) {
final long flushForKeepAliveInMs = parameterService.getLong(ParameterConstants.DATA_EXTRACTOR_FLUSH_FOR_KEEP_ALIVE, 120000);
dataService.handleDataSelect(batch.getBatchId(), -1, batch.getChannelId(), false, new IModelRetrievalHandler<Data, String>() {
long lastKeepAliveFlush = System.currentTimeMillis();
public boolean retrieved(Data data, String routerId, int count) throws IOException {
handler.dataExtracted(data, routerId);
if (System.currentTimeMillis()-lastKeepAliveFlush > flushForKeepAliveInMs && keepAliveWriter != null) {
CsvUtils.write(keepAliveWriter, CsvConstants.NODEID, CsvUtils.DELIMITER, batch.getNodeId());
CsvUtils.writeLineFeed(keepAliveWriter);
keepAliveWriter.flush();
lastKeepAliveFlush = System.currentTimeMillis();
}
return true;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ db.pool.max.active=10

routing.data.reader.type=gap

dataextractor.keepalive.period.ms=500

http.concurrent.workers.max=2
start.pull.job=false
start.push.job=false
Expand Down

0 comments on commit eb7cba0

Please sign in to comment.