Skip to content

Commit

Permalink
0005945: compare and repair dev checkin
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Aug 8, 2023
1 parent ffde525 commit d9cff2d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
@@ -0,0 +1,14 @@
package org.jumpmind.symmetric.ext;

import org.jumpmind.extension.IExtensionPoint;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.StagingPurgeContext;

public interface IBatchStagingExtension extends IExtensionPoint {
public void beforeClean(StagingPurgeContext context);

public boolean isValidPath(String category);

public boolean shouldCleanPath(IStagedResource resource, long ttlInMs, StagingPurgeContext context, String[] path, boolean resourceIsOld,
boolean resourceClearsMinTimeHurdle);
}
Expand Up @@ -32,6 +32,7 @@

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.ext.IBatchStagingExtension;
import org.jumpmind.symmetric.model.BatchId;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.slf4j.Logger;
Expand Down Expand Up @@ -90,6 +91,11 @@ public long clean(long ttlInMs) {
context.putContextValue("outgoingBatches", outgoingBatches);
context.putContextValue("incomingBatches", incomingBatches);
context.putContextValue("biggestIncomingByNode", biggestIncomingByNode);
IBatchStagingExtension ext = engine.getExtensionService().getExtensionPoint(IBatchStagingExtension.class);
if (ext != null) {
context.putContextValue("extension", ext);
ext.beforeClean(context);
}
return super.clean(ttlInMs, context);
} finally {
if (isLockAcquired) {
Expand Down Expand Up @@ -119,7 +125,12 @@ protected boolean shouldCleanPath(IStagedResource resource, long ttlInMs, Stagin
} else if (path[0].equals(STAGING_CATEGORY_BULK_LOAD)) {
return false;
} else {
log.warn("Unrecognized path: " + resource.getPath());
IBatchStagingExtension ext = (IBatchStagingExtension) context.getContextValue("extension");
if (ext != null && ext.isValidPath(path[0])) {
return ext.shouldCleanPath(resource, ttlInMs, context, path, resourceIsOld, resourceClearsMinTimeHurdle);
} else {
log.warn("Unrecognized path: " + resource.getPath());
}
}
return false;
}
Expand Down

0 comments on commit d9cff2d

Please sign in to comment.