diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java index 77525a1aa0..4cfff8acd6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java @@ -189,7 +189,9 @@ public BatchAckResult ack(final BatchAck batch) { engine.getStatisticManager().incrementDataLoadedOutgoing(outgoingBatch.getChannelId(), outgoingBatch.getLoadRowCount()); engine.getStatisticManager().incrementDataBytesLoadedOutgoing(outgoingBatch.getChannelId(), outgoingBatch.getByteCount()); } - purgeLoadBatchesFromStaging(outgoingBatch); + if (parameterService.is(ParameterConstants.STREAM_TO_FILE_ENABLED)) { + purgeBatchesFromStaging(outgoingBatch); + } Channel channel = engine.getConfigurationService().getChannel(outgoingBatch.getChannelId()); if (channel != null && channel.isFileSyncFlag()){ /* Acknowledge the file_sync in case the file needs deleted. */ @@ -206,7 +208,7 @@ public BatchAckResult ack(final BatchAck batch) { return result; } - protected void purgeLoadBatchesFromStaging(OutgoingBatch outgoingBatch) { + protected void purgeBatchesFromStaging(OutgoingBatch outgoingBatch) { long threshold = parameterService.getLong(ParameterConstants.INITIAL_LOAD_PURGE_STAGE_IMMEDIATE_THRESHOLD_ROWS); if (threshold >= 0 && outgoingBatch.isLoadFlag() && !outgoingBatch.isCommonFlag()) { if (outgoingBatch.getDataRowCount() > threshold) { @@ -217,6 +219,15 @@ protected void purgeLoadBatchesFromStaging(OutgoingBatch outgoingBatch) { } } } + + long streamToFileThreshold = parameterService.getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD); + if (streamToFileThreshold > 0 && !outgoingBatch.isCommonFlag() && outgoingBatch.getByteCount() <= streamToFileThreshold) { + IStagedResource resource = engine.getStagingManager().find(Constants.STAGING_CATEGORY_OUTGOING, + outgoingBatch.getStagedLocation(), outgoingBatch.getBatchId()); + if (resource != null && resource.isMemoryResource() && !resource.isInUse()) { + resource.delete(); + } + } } public List ack(List batches) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 56576c0381..c61891a188 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -1598,13 +1598,6 @@ protected void transferFromStaging(ExtractMode mode, BatchType batchType, Outgoi } finally { stagedResource.close(); stagedResource.dereference(); - if (!batch.isCommonFlag() && stagedResource.isMemoryResource() && !stagedResource.isInUse()) { - synchronized(DataExtractorService.this) { - if (stagedResource.isMemoryResource() && !stagedResource.isInUse()) { - stagedResource.delete(); - } - } - } } } @@ -1652,7 +1645,16 @@ public void updateExtractRequestLoadTime(Date loadTime, OutgoingBatch outgoingBa outgoingBatch.getNodeId(), outgoingBatch.getLoadId()); dataService.updateTableReloadStatusDataLoaded(transaction, outgoingBatch.getLoadId(), outgoingBatch.getBatchId(), 1); - + + // TODO: check if TableReloadRequest.isFullLoadRequest() + // TOOD: move to nodeservice + int done = transaction.queryForInt("select case when completed = 1 or cancelled = 1 then 1 else 0 end " + + "from sym_table_reload_status where load_id = ?", outgoingBatch.getLoadId()); + if (done == 1) { + log.info("Initial load for node {} ended", outgoingBatch.getNodeId()); + transaction.prepareAndExecute("update sym_node_security set initial_load_end_time = current_timestamp where node_id = ?", + outgoingBatch.getNodeId()); + } transaction.commit(); } catch (Error ex) {