Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Adding configuration to optionally ignore lock acquire failures and c…
Browse files Browse the repository at this point in the history
…ontinue execution.
  • Loading branch information
kishorebanala committed Nov 1, 2019
1 parent 77d4365 commit 4b1f310
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 17 deletions.
Expand Up @@ -70,6 +70,9 @@ public interface Configuration {
String LOCKING_SERVER_PROPERTY_NAME = "locking.server";
String LOCKING_SERVER_DEFAULT_VALUE = "noop_lock";

String IGNORE_LOCKING_EXCEPTIONS_PROPERTY_NAME = "locking.exceptions.ignore";
boolean IGNORE_LOCKING_EXCEPTIONS_DEFAULT_VALUE = false;

//TODO add constants for input/output external payload related properties.

default DB getDB() {
Expand All @@ -88,6 +91,10 @@ default String getLockingServerString() {
return getProperty(LOCKING_SERVER_PROPERTY_NAME, LOCKING_SERVER_DEFAULT_VALUE).toUpperCase();
}

default boolean ignoreLockingExceptions() {
return getBooleanProperty(IGNORE_LOCKING_EXCEPTIONS_PROPERTY_NAME, IGNORE_LOCKING_EXCEPTIONS_DEFAULT_VALUE);
}

/**
* @return time frequency in seconds, at which the workflow sweeper should run to evaluate running workflows.
*/
Expand Down
Expand Up @@ -2,6 +2,7 @@

import com.google.inject.Inject;
import com.google.inject.ProvisionException;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.utils.Lock;
import com.netflix.conductor.locking.redis.config.RedisLockConfiguration;
import com.netflix.conductor.metrics.Monitors;
Expand All @@ -20,13 +21,15 @@ public class RedisLock implements Lock {

private static final Logger LOGGER = LoggerFactory.getLogger(RedisLock.class);

private Config config;
private RedisLockConfiguration configuration;
private Config redisConfig;
private RedissonClient redisson;
private final int connectionTimeout = 10000;
private static String LOCK_NAMESPACE = "";

@Inject
public RedisLock(RedisLockConfiguration configuration) {
this.configuration = configuration;
LOCK_NAMESPACE = configuration.getProperty("decider.locking.namespace", "");
RedisLockConfiguration.REDIS_SERVER_TYPE redisServerType = configuration.getRedisServerType();
try {
Expand All @@ -39,24 +42,24 @@ public RedisLock(RedisLockConfiguration configuration) {
}
String redisServerAddress = configuration.getRedisServerAddress();

config = new Config();
redisConfig = new Config();

switch (redisServerType) {
case SINGLE:
config.useSingleServer().setAddress(redisServerAddress).setTimeout(connectionTimeout);
redisConfig.useSingleServer().setAddress(redisServerAddress).setTimeout(connectionTimeout);
break;
case CLUSTER:
config.useClusterServers()
redisConfig.useClusterServers()
.setScanInterval(2000) // cluster state scan interval in milliseconds
.addNodeAddress(redisServerAddress.split(","))
.setTimeout(connectionTimeout);
break;
case SENTINEL:
config.useSentinelServers().addSentinelAddress(redisServerAddress).setTimeout(connectionTimeout);
redisConfig.useSentinelServers().addSentinelAddress(redisServerAddress).setTimeout(connectionTimeout);
break;
}

redisson = Redisson.create(config);
redisson = Redisson.create(redisConfig);
}

@Override
Expand All @@ -71,10 +74,8 @@ public boolean acquireLock(String lockId, long timeToTry, TimeUnit unit) {
try {
return lock.tryLock(timeToTry, unit);
} catch (Exception e) {
LOGGER.error("Failed in acquireLock: ", e);
Monitors.recordAcquireLockFailure(e.getClass().getName());
return handleAcquireLockFailure(lockId, e);
}
return false;
}

/**
Expand All @@ -91,10 +92,8 @@ public boolean acquireLock(String lockId, long timeToTry, long leaseTime, TimeUn
try {
return lock.tryLock(timeToTry, leaseTime, unit);
} catch (Exception e) {
LOGGER.error("Failed in acquireLock: ", e);
Monitors.recordAcquireLockFailure(e.getClass().getName());
return handleAcquireLockFailure(lockId, e);
}
return false;
}

@Override
Expand All @@ -118,4 +117,13 @@ private String parseLockId(String lockId) {
}
return LOCK_NAMESPACE + "." + lockId;
}

private boolean handleAcquireLockFailure(String lockId, Exception e) {
LOGGER.error("Failed to acquireLock for lockId: {}", lockId, e);
Monitors.recordAcquireLockFailure(e.getClass().getName());
// A Valid failure to acquire lock when another thread has acquired it returns false.
// However, when an exception is thrown while acquiring lock, due to connection or others issues,
// we can optionally continue without a "lock" to not block executions until Locking service is available.
return configuration.ignoreLockingExceptions();
}
}
Expand Up @@ -173,16 +173,16 @@ public void testLockingDuplicateThreads() throws InterruptedException {
public void testDuplicateLockAcquireFailure() throws InterruptedException {
redisson.getKeys().flushall();
String lockId = "abcd-1234";

boolean isLocked = redisLock.acquireLock(lockId, 500L, 60000L, TimeUnit.MILLISECONDS);
Worker worker1 = new Worker(redisLock, lockId);
Worker worker1 = new Worker(redisLock, lockId, 100L, 60000L);

worker1.start();
worker1.join();

boolean isLocked = redisLock.acquireLock(lockId, 500L, 1000L, TimeUnit.MILLISECONDS);

// Ensure only one of them had got the lock.
assertTrue(isLocked);
assertFalse(worker1.isLocked);
assertFalse(isLocked);
assertTrue(worker1.isLocked);
}

@Test
Expand Down

0 comments on commit 4b1f310

Please sign in to comment.