diff --git a/lightblue/src/main/java/org/esbtools/eventhandler/lightblue/PeriodicDeleteOldEntitiesRoute.java b/lightblue/src/main/java/org/esbtools/eventhandler/lightblue/PeriodicDeleteOldEntitiesRoute.java index d23bb25..aa1450d 100644 --- a/lightblue/src/main/java/org/esbtools/eventhandler/lightblue/PeriodicDeleteOldEntitiesRoute.java +++ b/lightblue/src/main/java/org/esbtools/eventhandler/lightblue/PeriodicDeleteOldEntitiesRoute.java @@ -18,31 +18,25 @@ package org.esbtools.eventhandler.lightblue; -import org.esbtools.eventhandler.lightblue.locking.LockNotAvailableException; +import java.sql.Date; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; + +import org.apache.camel.builder.RouteBuilder; import org.esbtools.eventhandler.lightblue.locking.LockStrategy; -import org.esbtools.eventhandler.lightblue.locking.LockedResource; -import org.esbtools.eventhandler.lightblue.locking.LostLockException; +import org.esbtools.eventhandler.lightblue.locking.LockingRoutePolicy; import org.esbtools.lightbluenotificationhook.NotificationEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.redhat.lightblue.client.LightblueClient; import com.redhat.lightblue.client.Query; import com.redhat.lightblue.client.request.data.DataDeleteRequest; import com.redhat.lightblue.client.response.LightblueDataResponse; -import org.apache.camel.Exchange; -import org.apache.camel.Route; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.support.RoutePolicySupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.sql.Date; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; public class PeriodicDeleteOldEntitiesRoute extends RouteBuilder { + private final LightblueClient client; private final LockStrategy lockStrategy; private final Duration deleteOlderThan; @@ -92,7 +86,7 @@ public PeriodicDeleteOldEntitiesRoute(String entityName, String entityVersion, public void configure() throws Exception { from("timer:" + deleterLockResourceId + "?period=" + deleteInterval.toMillis()) .routeId(deleterLockResourceId) - .routePolicy(new DeleterLockRoutePolicy()) + .routePolicy(new LockingRoutePolicy(deleterLockResourceId, lockStrategy)) .process(exchange -> { Instant tooOld = clock.instant().minus(deleteOlderThan); @@ -108,50 +102,5 @@ public void configure() throws Exception { }); } - private class DeleterLockRoutePolicy extends RoutePolicySupport { - private @Nullable LockedResource lock; - - @Override - public void onStop(Route route) { - releaseLock(); - } - - @Override - public void onSuspend(Route route) { - releaseLock(); - } - - @Override - public synchronized void onExchangeBegin(Route route, Exchange exchange) { - if (lock != null) { - try { - lock.ensureAcquiredOrThrow("Lost lock"); - return; - } catch (LostLockException e) { - log.warn("Lost deleter lock, trying to reacquire...", e); - lock = null; - } - } - - try { - lock = lockStrategy.tryAcquire(deleterLockResourceId); - } catch (LockNotAvailableException e) { - log.debug("Deleter lock not available, assuming " + - "another thread is cleaning up old " + entityName + " entities", e); - exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); - } - } - - private synchronized void releaseLock() { - if (lock == null) return; - - try { - lock.close(); - } catch (IOException e) { - log.warn("IOException trying to release deleter lock", e); - } - - lock = null; - } - } + } diff --git a/lightblue/src/main/java/org/esbtools/eventhandler/lightblue/locking/LockingRoutePolicy.java b/lightblue/src/main/java/org/esbtools/eventhandler/lightblue/locking/LockingRoutePolicy.java new file mode 100644 index 0000000..d53d02f --- /dev/null +++ b/lightblue/src/main/java/org/esbtools/eventhandler/lightblue/locking/LockingRoutePolicy.java @@ -0,0 +1,66 @@ +package org.esbtools.eventhandler.lightblue.locking; + +import java.io.IOException; + +import javax.annotation.Nullable; + +import org.apache.camel.Exchange; +import org.apache.camel.Route; +import org.apache.camel.support.RoutePolicySupport; + +public class LockingRoutePolicy extends RoutePolicySupport { + + private final String resourceId; + + private final LockStrategy lockStrategy; + + private @Nullable LockedResource lock; + + public LockingRoutePolicy(String resourceId, LockStrategy lockStrategy) { + this.resourceId = resourceId; + this.lockStrategy = lockStrategy; + } + + @Override + public void onStop(Route route) { + releaseLock(); + } + + @Override + public void onSuspend(Route route) { + releaseLock(); + } + + @Override + public synchronized void onExchangeBegin(Route route, Exchange exchange) { + if (lock != null) { + try { + lock.ensureAcquiredOrThrow("Lost lock"); + return; + } catch (LostLockException e) { + log.warn("Lost lock w id: " + resourceId + ", trying to reacquire...", e); + lock = null; + } + } + + try { + lock = lockStrategy.tryAcquire(resourceId); + } catch (LockNotAvailableException e) { + log.debug("Lock not available, assuming " + + "another thread is holding lock w/ id: " + resourceId, e); + exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); + } + } + + private synchronized void releaseLock() { + if (lock == null) return; + + try { + lock.close(); + } catch (IOException e) { + log.warn("IOException trying to release lock w/ identifier " + resourceId, e); + } + + lock = null; + } +} \ No newline at end of file