Skip to content

Commit

Permalink
0002560 Updated initial load extract in background process to include a
Browse files Browse the repository at this point in the history
parameter that allows you to transfer batches as soon as they are
written to stage.
  • Loading branch information
Hicks, Josh committed Apr 12, 2016
1 parent 3fbf383 commit e8a4b2c
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 3 deletions.
Expand Up @@ -128,6 +128,7 @@ private ParameterConstants() {
public final static String INITIAL_LOAD_EXTRACT_JOB_START = "start.initial.load.extract.job";
public final static String INITIAL_LOAD_SCHEMA_DUMP_COMMAND = "initial.load.schema.dump.command";
public final static String INITIAL_LOAD_SCHEMA_LOAD_COMMAND = "initial.load.schema.load.command";
public final static String INITIAL_LOAD_EXTRACT_AND_SEND_WHEN_STAGED = "initial.load.extract.and.send.when.staged";

public final static String CREATE_TABLE_WITHOUT_DEFAULTS = "create.table.without.defaults";
public final static String CREATE_TABLE_WITHOUT_FOREIGN_KEYS = "create.table.without.foreign.keys";
Expand Down
Expand Up @@ -1146,8 +1146,15 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status

if (!areBatchesOk) {
for (OutgoingBatch outgoingBatch : batches) {
outgoingBatch.setStatus(Status.NE);
outgoingBatchService.updateOutgoingBatch(transaction, outgoingBatch);
if (parameterService.is(ParameterConstants.INITIAL_LOAD_EXTRACT_AND_SEND_WHEN_STAGED, false)) {
if (outgoingBatch.getStatus() == Status.RQ) {
outgoingBatch.setStatus(Status.NE);
outgoingBatchService.updateOutgoingBatch(transaction, outgoingBatch);
}
} else {
outgoingBatch.setStatus(Status.NE);
outgoingBatchService.updateOutgoingBatch(transaction, outgoingBatch);
}
}
} else {
log.info("Batches already had an OK status for request {}, batches {} to {}. Not updating the status to NE", new Object[] { request.getRequestId(), request.getStartBatchId(),
Expand Down Expand Up @@ -1272,12 +1279,39 @@ public void write(CsvData data) {
Statistics stats = this.currentDataWriter.getStatistics().get(batch);
this.outgoingBatch.setByteCount(stats.get(DataWriterStatisticConstants.BYTECOUNT));
this.outgoingBatch.setExtractMillis(System.currentTimeMillis() - batch.getStartTime().getTime());
this.currentDataWriter.close();
this.currentDataWriter.close();
checkSend();
startNewBatch();
}

}

public void checkSend() {
if (parameterService.is(ParameterConstants.INITIAL_LOAD_EXTRACT_AND_SEND_WHEN_STAGED, false)) {
this.outgoingBatch.setStatus(Status.NE);
ISqlTransaction transaction = null;
try {
transaction = sqlTemplate.startSqlTransaction();
outgoingBatchService.updateOutgoingBatch(transaction, this.outgoingBatch);
transaction.commit();
} catch (Error ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} catch (RuntimeException ex) {
if (transaction != null) {
transaction.rollback();
}
throw ex;
} finally {
if (transaction != null) {
transaction.close();
}
}
}
}

public void end(Table table) {
if (this.currentDataWriter != null) {
this.currentDataWriter.end(table);
Expand Down
Expand Up @@ -509,6 +509,14 @@ initial.load.reverse.first=true
# Type: boolean
initial.load.use.extract.job.enabled=false

# Indicate that the initial load batches will be sent as soon
# as they are staged. Used in combination with initial.load.use.extract.job.enabled=true
#
# DatabaseOverridable: true
# Tags: load
# Type: boolean
initial.load.extract.and.send.when.staged=false

# The number of threads available for concurrent extracts of initial load batches.
#
# DatabaseOverridable: true
Expand Down

0 comments on commit e8a4b2c

Please sign in to comment.