From d9cff2de2d9529468af405f13e7189f96fdced60 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Tue, 8 Aug 2023 13:56:34 -0400 Subject: [PATCH] 0005945: compare and repair dev checkin --- .../symmetric/ext/IBatchStagingExtension.java | 14 ++++++++++++++ .../symmetric/io/stage/BatchStagingManager.java | 13 ++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 symmetric-core/src/main/java/org/jumpmind/symmetric/ext/IBatchStagingExtension.java diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/ext/IBatchStagingExtension.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/ext/IBatchStagingExtension.java new file mode 100644 index 0000000000..c1453989eb --- /dev/null +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/ext/IBatchStagingExtension.java @@ -0,0 +1,14 @@ +package org.jumpmind.symmetric.ext; + +import org.jumpmind.extension.IExtensionPoint; +import org.jumpmind.symmetric.io.stage.IStagedResource; +import org.jumpmind.symmetric.io.stage.StagingPurgeContext; + +public interface IBatchStagingExtension extends IExtensionPoint { + public void beforeClean(StagingPurgeContext context); + + public boolean isValidPath(String category); + + public boolean shouldCleanPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context, String[] path, boolean resourceIsOld, + boolean resourceClearsMinTimeHurdle); +} diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java index a23044cee3..bd5724a5c9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/io/stage/BatchStagingManager.java @@ -32,6 +32,7 @@ import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.ext.IBatchStagingExtension; import org.jumpmind.symmetric.model.BatchId; import org.jumpmind.symmetric.service.ClusterConstants; import org.slf4j.Logger; @@ -90,6 +91,11 @@ public long clean(long ttlInMs) { context.putContextValue("outgoingBatches", outgoingBatches); context.putContextValue("incomingBatches", incomingBatches); context.putContextValue("biggestIncomingByNode", biggestIncomingByNode); + IBatchStagingExtension ext = engine.getExtensionService().getExtensionPoint(IBatchStagingExtension.class); + if (ext != null) { + context.putContextValue("extension", ext); + ext.beforeClean(context); + } return super.clean(ttlInMs, context); } finally { if (isLockAcquired) { @@ -119,7 +125,12 @@ protected boolean shouldCleanPath(IStagedResource resource, long ttlInMs, Stagin } else if (path[0].equals(STAGING_CATEGORY_BULK_LOAD)) { return false; } else { - log.warn("Unrecognized path: " + resource.getPath()); + IBatchStagingExtension ext = (IBatchStagingExtension) context.getContextValue("extension"); + if (ext != null && ext.isValidPath(path[0])) { + return ext.shouldCleanPath(resource, ttlInMs, context, path, resourceIsOld, resourceClearsMinTimeHurdle); + } else { + log.warn("Unrecognized path: " + resource.getPath()); + } } return false; }