Skip to content

Commit

Permalink
Merge pull request #66 from smecsia/master
Browse files Browse the repository at this point in the history
Hazelcast locking retrials every 5 seconds
  • Loading branch information
smecsia committed Jul 13, 2015
2 parents 78a1119 + 8d92a63 commit 6b51df8
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@
*/
public class BuildersFactoryWithHazelcastImpl extends BuildersFactoryImpl {

protected long lockWaitHeartBeatSec = 5;
protected final HazelcastInstance hazelcastInstance;

public BuildersFactoryWithHazelcastImpl(HazelcastInstance hazelcastInstance) {
this.hazelcastInstance = hazelcastInstance;
}

public void setLockWaitHeartBeatSec(long lockWaitHeartBeatSec) {
this.lockWaitHeartBeatSec = lockWaitHeartBeatSec;
}

@Override
public AggregationRepositoryBuilder newRepositoryBuilder(CamelContext camelContext) throws Exception {
return new HazelcastAggregationRepositoryBuilder(hazelcastInstance, camelContext, getWaitForLockSec());
return new HazelcastAggregationRepositoryBuilder(hazelcastInstance, camelContext, getWaitForLockSec(), lockWaitHeartBeatSec);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ public class HazelcastAggregationRepositoryBuilder extends MemoryAggregationRepo

private final HazelcastInstance hazelcastInstance;
private final long waitForLockSec;
private final long lockWaitHeartBeatSec;

public HazelcastAggregationRepositoryBuilder(HazelcastInstance hazelcastInstance, CamelContext camelContext,
long waitForLockSec) {
long waitForLockSec, long lockWaitHeartBeatSec) {
super(camelContext, waitForLockSec);
this.hazelcastInstance = hazelcastInstance;
this.waitForLockSec = waitForLockSec;
this.lockWaitHeartBeatSec = lockWaitHeartBeatSec;
}

/**
Expand All @@ -35,6 +37,7 @@ public AggregationRepository initWritable(Plugin plugin) throws Exception {
aggregationRepository.setRepository(plugin.getId());
aggregationRepository.setHazelcastInstance(hazelcastInstance);
aggregationRepository.setWaitForLockSec(waitForLockSec);
aggregationRepository.setLockWaitHeartbeatSec(lockWaitHeartBeatSec);
aggregationRepository.doStart();
return aggregationRepository;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.util.concurrent.TimeUnit;

import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static ru.yandex.qatools.camelot.util.DateUtil.isTimePassedSince;
import static ru.yandex.qatools.camelot.util.ExceptionUtil.formatStackTrace;

public class HazelcastAggregationRepository extends ServiceSupport implements AggregationRepository,
Expand All @@ -34,6 +37,7 @@ public class HazelcastAggregationRepository extends ServiceSupport implements Ag
private String repository;
private IMap<String, DefaultExchangeHolder> map;

private long lockWaitHeartbeatSec = 5; // 5 seconds between lock trials
private long waitForLockSec = MINUTES.toSeconds(5);

@Override
Expand Down Expand Up @@ -69,7 +73,7 @@ public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
public Exchange get(CamelContext camelContext, String key) {
try {
debug("Getting from context. map.get('{}')...", key);
if (map.tryLock(key, waitForLockSec, TimeUnit.SECONDS)) {
if (tryLock(key)) {
return toExchange(camelContext, map.get(key));
}
} catch (QuorumException e) {
Expand Down Expand Up @@ -110,7 +114,7 @@ public Exchange getWithoutLock(CamelContext camelContext, String key) {
public void lock(String key) {
try {
debug("Locking key map.tryLock('{}')...", key);
if (!map.tryLock(key, waitForLockSec, TimeUnit.SECONDS)) {
if (!tryLock(key)) {
throw new RuntimeException("Failed to lock within timeout of " + waitForLockSec + "s");
}
} catch (Exception e) {
Expand All @@ -128,6 +132,16 @@ public void unlock(String key) {
}
}

private boolean tryLock(String key) throws InterruptedException {
long startedTime = currentTimeMillis();
boolean timeout = false;
while (!map.tryLock(key, lockWaitHeartbeatSec, TimeUnit.SECONDS) && !timeout) {
debug("Lock is still not available, waiting for key {}...", key);
timeout = isTimePassedSince(SECONDS.toMillis(waitForLockSec), startedTime);
}
return !timeout;
}

@Override
public void confirm(CamelContext camelContext, String key) {
forceUnlockKey(key);
Expand All @@ -154,6 +168,10 @@ public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
this.hazelcastInstance = hazelcastInstance;
}

public void setLockWaitHeartbeatSec(long lockWaitHeartbeat) {
this.lockWaitHeartbeatSec = lockWaitHeartbeat;
}

public void setWaitForLockSec(long waitForLockSec) {
this.waitForLockSec = waitForLockSec;
}
Expand Down
1 change: 1 addition & 0 deletions camelot-core/src/main/resources/camelot-hz-context.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
<bean id="FACTORY-HZ" class="ru.yandex.qatools.camelot.core.builders.BuildersFactoryWithHazelcastImpl">
<constructor-arg name="hazelcastInstance" ref="hazelcastInstance"/>
<property name="waitForLockSec" value="${camelot.hazelcast.waitForLockSec}"/>
<property name="lockWaitHeartBeatSec" value="${camelot.hazelcast.lockWaitHeartBeatSec}"/>
</bean>

</beans>
1 change: 1 addition & 0 deletions camelot-core/src/main/resources/camelot.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ plugins.remote.repositories=
camelot.enableListeners=true
camelot.pluginLoader=LOADER-MAVEN
camelot.hazelcast.waitForLockSec=300
camelot.hazelcast.lockWaitHeartBeatSec=5
camelot.mem.waitForLockSec=300
camelot.aggregation.optimisticLocking=true
camelot.factory=FACTORY-MEM
Expand Down

0 comments on commit 6b51df8

Please sign in to comment.