Skip to content

Commit

Permalink
do not purge memory staged resource until it is OKed
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Nov 2, 2020
1 parent 2e22ef9 commit bfda61f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
Expand Up @@ -189,7 +189,9 @@ public BatchAckResult ack(final BatchAck batch) {
engine.getStatisticManager().incrementDataLoadedOutgoing(outgoingBatch.getChannelId(), outgoingBatch.getLoadRowCount());
engine.getStatisticManager().incrementDataBytesLoadedOutgoing(outgoingBatch.getChannelId(), outgoingBatch.getByteCount());
}
purgeLoadBatchesFromStaging(outgoingBatch);
if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) {
purgeBatchesFromStaging(outgoingBatch);
}
Channel channel = engine.getConfigurationService().getChannel(outgoingBatch.getChannelId());
if (channel != null && channel.isFileSyncFlag()){
/* Acknowledge the file_sync in case the file needs deleted. */
Expand All @@ -206,7 +208,7 @@ public BatchAckResult ack(final BatchAck batch) {
return result;
}

protected void purgeLoadBatchesFromStaging(OutgoingBatch outgoingBatch) {
protected void purgeBatchesFromStaging(OutgoingBatch outgoingBatch) {
long threshold = parameterService.getLong(ParameterConstants.INITIAL_LOAD_PURGE_STAGE_IMMEDIATE_THRESHOLD_ROWS);
if (threshold >= 0 && outgoingBatch.isLoadFlag() && !outgoingBatch.isCommonFlag()) {
if (outgoingBatch.getDataRowCount() > threshold) {
Expand All @@ -217,6 +219,15 @@ protected void purgeLoadBatchesFromStaging(OutgoingBatch outgoingBatch) {
}
}
}

long streamToFileThreshold = parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD);
if (streamToFileThreshold > 0 && !outgoingBatch.isCommonFlag() && outgoingBatch.getByteCount() <= streamToFileThreshold) {
IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_OUTGOING,
outgoingBatch.getStagedLocation(), outgoingBatch.getBatchId());
if (resource != null && resource.isMemoryResource() && !resource.isInUse()) {
resource.delete();
}
}
}

public List<BatchAckResult> ack(List<BatchAck> batches) {
Expand Down
Expand Up @@ -1598,13 +1598,6 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi
} finally {
stagedResource.close();
stagedResource.dereference();
if (!batch.isCommonFlag() && stagedResource.isMemoryResource() && !stagedResource.isInUse()) {
synchronized(DataExtractorService.this) {
if (stagedResource.isMemoryResource() && !stagedResource.isInUse()) {
stagedResource.delete();
}
}
}
}
}

Expand Down Expand Up @@ -1652,7 +1645,16 @@ public void updateExtractRequestLoadTime(Date loadTime, OutgoingBatch outgoingBa
outgoingBatch.getNodeId(), outgoingBatch.getLoadId());

dataService.updateTableReloadStatusDataLoaded(transaction, outgoingBatch.getLoadId(), outgoingBatch.getBatchId(), 1);


// TODO: check if TableReloadRequest.isFullLoadRequest()
// TOOD: move to nodeservice
int done = transaction.queryForInt("select case when completed = 1 or cancelled = 1 then 1 else 0 end " +
"from sym_table_reload_status where load_id = ?", outgoingBatch.getLoadId());
if (done == 1) {
log.info("Initial load for node {} ended", outgoingBatch.getNodeId());
transaction.prepareAndExecute("update sym_node_security set initial_load_end_time = current_timestamp where node_id = ?",
outgoingBatch.getNodeId());
}

transaction.commit();
} catch (Error ex) {
Expand Down

0 comments on commit bfda61f

Please sign in to comment.