Skip to content
This repository has been archived by the owner on Sep 16, 2021. It is now read-only.

Refactor LockingRoutePolicy out of periodic delete #53

Merged
merged 2 commits into from
May 13, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,25 @@

package org.esbtools.eventhandler.lightblue;

import org.esbtools.eventhandler.lightblue.locking.LockNotAvailableException;
import java.sql.Date;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;

import org.apache.camel.builder.RouteBuilder;
import org.esbtools.eventhandler.lightblue.locking.LockStrategy;
import org.esbtools.eventhandler.lightblue.locking.LockedResource;
import org.esbtools.eventhandler.lightblue.locking.LostLockException;
import org.esbtools.eventhandler.lightblue.locking.LockingRoutePolicy;
import org.esbtools.lightbluenotificationhook.NotificationEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.redhat.lightblue.client.LightblueClient;
import com.redhat.lightblue.client.Query;
import com.redhat.lightblue.client.request.data.DataDeleteRequest;
import com.redhat.lightblue.client.response.LightblueDataResponse;
import org.apache.camel.Exchange;
import org.apache.camel.Route;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.support.RoutePolicySupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.Date;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;

public class PeriodicDeleteOldEntitiesRoute extends RouteBuilder {

private final LightblueClient client;
private final LockStrategy lockStrategy;
private final Duration deleteOlderThan;
Expand Down Expand Up @@ -92,7 +86,7 @@ public PeriodicDeleteOldEntitiesRoute(String entityName, String entityVersion,
public void configure() throws Exception {
from("timer:" + deleterLockResourceId + "?period=" + deleteInterval.toMillis())
.routeId(deleterLockResourceId)
.routePolicy(new DeleterLockRoutePolicy())
.routePolicy(new LockingRoutePolicy(deleterLockResourceId, lockStrategy))
.process(exchange -> {
Instant tooOld = clock.instant().minus(deleteOlderThan);

Expand All @@ -108,50 +102,5 @@ public void configure() throws Exception {
});
}

private class DeleterLockRoutePolicy extends RoutePolicySupport {
private @Nullable LockedResource<String> lock;

@Override
public void onStop(Route route) {
releaseLock();
}

@Override
public void onSuspend(Route route) {
releaseLock();
}

@Override
public synchronized void onExchangeBegin(Route route, Exchange exchange) {
if (lock != null) {
try {
lock.ensureAcquiredOrThrow("Lost lock");
return;
} catch (LostLockException e) {
log.warn("Lost deleter lock, trying to reacquire...", e);
lock = null;
}
}

try {
lock = lockStrategy.tryAcquire(deleterLockResourceId);
} catch (LockNotAvailableException e) {
log.debug("Deleter lock not available, assuming " +
"another thread is cleaning up old " + entityName + " entities", e);
exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
}
}

private synchronized void releaseLock() {
if (lock == null) return;

try {
lock.close();
} catch (IOException e) {
log.warn("IOException trying to release deleter lock", e);
}

lock = null;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.esbtools.eventhandler.lightblue.locking;

import java.io.IOException;

import javax.annotation.Nullable;

import org.apache.camel.Exchange;
import org.apache.camel.Route;
import org.apache.camel.support.RoutePolicySupport;

public class LockingRoutePolicy extends RoutePolicySupport {

private final String identifier;

private final LockStrategy lockStrategy;

private @Nullable LockedResource<String> lock;

public LockingRoutePolicy(String identifier, LockStrategy lockStrategy) {
this.identifier = identifier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicky, but it would be more consistent with lock strategy interface if identifier was named resourceId, perhaps I even had this wrong originally.

this.lockStrategy = lockStrategy;
}

@Override
public void onStop(Route route) {
releaseLock();
}

@Override
public void onSuspend(Route route) {
releaseLock();
}

@Override
public synchronized void onExchangeBegin(Route route, Exchange exchange) {
if (lock != null) {
try {
lock.ensureAcquiredOrThrow("Lost lock");
return;
} catch (LostLockException e) {
log.warn("Lost lock w id: " + identifier + ", trying to reacquire...", e);
lock = null;
}
}

try {
lock = lockStrategy.tryAcquire(identifier);
} catch (LockNotAvailableException e) {
log.debug("Lock not available, assuming " +
"another thread is holding lock w/ id: " + identifier, e);
exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
}
}

private synchronized void releaseLock() {
if (lock == null) return;

try {
lock.close();
} catch (IOException e) {
log.warn("IOException trying to release lock w/ identifier " + identifier, e);
}

lock = null;
}
}