Skip to content

Commit

Permalink
Added CountDownLatches to simulate Dual Leadership Issue seen in Lead…
Browse files Browse the repository at this point in the history
…erLatch Recipe
  • Loading branch information
viswanathan.rajagopa committed Oct 17, 2021
1 parent 31256d6 commit 6a78a3a
Showing 1 changed file with 92 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -69,6 +70,13 @@ public class LeaderLatch implements Closeable
private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
private final CloseMode closeMode;

public final CountDownLatch nonLeaderThreadWaitLatch = new CountDownLatch(1);

public final CountDownLatch leaderThread1WaitLatch = new CountDownLatch(1);
public final CountDownLatch leaderThread2WaitLatch = new CountDownLatch(1);
public final CountDownLatch leaderThread2WaitLatch1 = new CountDownLatch(1);
final AtomicInteger reconnectedCount = new AtomicInteger(0);

private final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
Expand Down Expand Up @@ -436,9 +444,51 @@ public boolean hasLeadership()
@VisibleForTesting
void reset() throws Exception
{
// Resetting leadership
setLeadership(false);

/**
* [Dual Leadership Issue] : Code injected for testing
*/
if(Thread.currentThread().getName().contains("leader-thread-2")
&& reconnectedCount.get() == 2){
// Releasing leader-thread-1 await() for "checking its leadership"
leaderThread1WaitLatch.countDown();
// Pausing leader-thread-2 before "deleting its latch node in zookeeper"
leaderThread2WaitLatch.await();
}

// Deleting its latch node in zookeeper
log.info("Deleting its latch node [" + ourPath.get() + "] in zookeeper");
setNode(null);

/**
* [Dual Leadership Issue] : Code injected for testing
*/
if(Thread.currentThread().getName().contains("leader-thread-2")
&& reconnectedCount.get() == 2){
// Releasing non-leader-thread await() for "creating its sequential latch node in zookeeper"
nonLeaderThreadWaitLatch.countDown();
}

/**
* [Dual Leadership Issue] : Code injected for testing
*/
if(Thread.currentThread().getName().contains("leader-thread-2")
&& reconnectedCount.get() == 2){
// Pausing leader-thread-2 before "creating its sequential latch node in zookeeper"
leaderThread2WaitLatch1.await();
}

/**
* [Dual Leadership Issue] : Code injected for testing
*/
String currentThreadName = Thread.currentThread().getName();
if(currentThreadName.contains("non-leader-thread")){
// Pausing non-leader-thread before "creating its sequential latch node in zookeeper"
nonLeaderThreadWaitLatch.await();
}

BackgroundCallback callback = new BackgroundCallback()
{
@Override
Expand All @@ -449,18 +499,27 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
debugResetWaitLatch.await();
debugResetWaitLatch = null;
}

if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
setNode(event.getName());
if ( state.get() == State.CLOSED )
{
setNode(null);
}
else
{
else{
log.info("Node created successfully : " + event.getName());

/**
* [Dual Leadership Issue] : Code injected for testing
*/
if(Thread.currentThread().getName().contains("leader-thread-1") || (Thread.currentThread().getName().contains("Thread-1-EventThread") && reconnectedCount.get() >= 1)){
// Pausing leader-thread-1 before "checking its leadership"
leaderThread1WaitLatch.await();
}

getChildren();
}

}
else
{
Expand All @@ -484,6 +543,20 @@ private void checkLeadership(List<String> children) throws Exception
else if ( ourIndex == 0 )
{
setLeadership(true);
/**
* [Dual Leadership Issue] : Code injected for testing
*/
if(Thread.currentThread().getName().contains("leader-thread-1") || (Thread.currentThread().getName().contains("Thread-1-EventThread") && reconnectedCount.get() >= 1)){
// Releasing leader-thread-2 await() for "deleting its latch node in zookeeper"
leaderThread2WaitLatch.countDown();
}
/**
* [Dual Leadership Issue] : Code injected for testing
*/
if(Thread.currentThread().getName().contains("Thread-2-EventThread")){
// Releasing all remaining waiting threads
leaderThread2WaitLatch1.countDown();
}
}
else
{
Expand All @@ -497,6 +570,15 @@ public void process(WatchedEvent event)
{
try
{
log.info("Node Deleted Event : Previous node deleted");
/**
* [Dual Leadership Issue] : Code injected for testing
*/
if(Thread.currentThread().getName().contains("Thread-2-EventThread")){
// Pausing non-leader-thread (event-thread) before "creating its sequential latch node in zookeeper"
nonLeaderThreadWaitLatch.await();
Thread.sleep(1000);
}
getChildren();
}
catch ( Exception ex )
Expand Down Expand Up @@ -540,7 +622,12 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
client.getChildren().inBackground(callback).forPath(latchPath);
}

private void handleStateChange(ConnectionState newState)
public void fireReconnectEvent(){
reconnectedCount.getAndIncrement();
handleStateChange(ConnectionState.RECONNECTED);
}

private void handleStateChange(ConnectionState newState)
{
switch ( newState )
{
Expand All @@ -554,6 +641,7 @@ private void handleStateChange(ConnectionState newState)
{
try
{
log.info("Received Reconnected Event, Reconnected count [" + reconnectedCount.get() + "] : Resetting leadership");
reset();
}
catch ( Exception e )
Expand Down

0 comments on commit 6a78a3a

Please sign in to comment.