@@ -706,6 +706,9 @@ public void testWithOnlyMinSessionTimeout() throws Exception {
706706 assertEquals (maxSessionTimeOut , quorumPeer .getMaxSessionTimeout (), "maximumSessionTimeOut is wrong" );
707707 }
708708
709+ /**
710+ * Verify that failed txn in isolated leader got truncated after rejoining quorum.
711+ */
709712 @ Test
710713 public void testFailedTxnAsPartOfQuorumLoss () throws Exception {
711714 final int LEADER_TIMEOUT_MS = 10_000 ;
@@ -729,6 +732,8 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
729732 // increase the tick time to delay the leader going to looking
730733 int previousTick = servers .mt [leader ].main .quorumPeer .tickTime ;
731734 servers .mt [leader ].main .quorumPeer .tickTime = LEADER_TIMEOUT_MS ;
735+ // isolate it from other quorum members by prevent it from rejoining
736+ servers .mt [leader ].getQuorumPeer ().setSuspended (true );
732737 // let the previous tick on the leader exhaust itself so the new tick time takes effect
733738 Thread .sleep (previousTick );
734739 LOG .warn ("LEADER {}" , leader );
@@ -739,34 +744,18 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
739744 }
740745 }
741746
742- // 3. start up the followers to form a new quorum
743- for (int i = 0 ; i < SERVER_COUNT ; i ++) {
744- if (i != leader ) {
745- servers .mt [i ].start ();
746- }
747- }
748-
749- // 4. wait one of the follower to be the new leader
750- for (int i = 0 ; i < SERVER_COUNT ; i ++) {
751- if (i != leader ) {
752- // Recreate a client session since the previous session was not persisted.
753- servers .restartClient (i , this );
754- waitForOne (servers .zk [i ], States .CONNECTED );
755- }
756- }
757-
758- // 5. send a create request to old leader and make sure it's synced to disk,
747+ // 3. send a create request to old leader and make sure it's synced to disk,
759748 // which means it acked from itself
760749 try {
761750 servers .zk [leader ].create ("/zk" + leader , "zk" .getBytes (), Ids .OPEN_ACL_UNSAFE , CreateMode .PERSISTENT );
762751 fail ("create /zk" + leader + " should have failed" );
763- } catch (KeeperException e ) {
752+ } catch (KeeperException ignored ) {
764753 }
765754
766- // just make sure that we actually did get it in process at the
767- // leader
755+ // just make sure that we actually did get it in process at the leader
756+ //
768757 // there can be extra sessionClose proposals
769- assertTrue (outstanding .size () > 0 );
758+ assertFalse (outstanding .isEmpty () );
770759 Proposal p = findProposalOfType (outstanding , OpCode .create );
771760 LOG .info ("Old leader id: {}. All proposals: {}" , leader , outstanding );
772761 assertNotNull (p , "Old leader doesn't have 'create' proposal" );
@@ -782,36 +771,73 @@ public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
782771 sleepTime += 100 ;
783772 }
784773
785- // 6. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum
786- LOG .info ("Waiting for leader {} to timeout followers" , leader );
774+ // 4. start up the followers to form a new quorum
775+ for (int i = 0 ; i < SERVER_COUNT ; i ++) {
776+ if (i != leader ) {
777+ servers .mt [i ].start ();
778+ }
779+ }
780+
781+ // 5. wait one of the follower to be the new leader
782+ for (int i = 0 ; i < SERVER_COUNT ; i ++) {
783+ if (i != leader ) {
784+ // Recreate a new client session to avoid ConnectionLoss as connecting server is restarted.
785+ servers .restartClient (i , this );
786+ waitForOne (servers .zk [i ], States .CONNECTED );
787+ }
788+ }
789+
790+ // 6. make sure new quorum does not replicate the failed txn
791+ for (int i = 0 ; i < SERVER_COUNT ; i ++) {
792+ if (i == leader ) {
793+ continue ;
794+ }
795+ assertNull (servers .zk [i ].exists ("/zk" + leader , false ),
796+ "server " + i + " should not have /zk" + leader );
797+ }
798+
799+ // resume election to rejoin the cluster
800+ servers .mt [leader ].getQuorumPeer ().setSuspended (false );
801+
802+ // 7. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum
803+ LOG .info ("Waiting for leader {} to timeout and rejoin as follower" , leader );
787804 sleepTime = 0 ;
788- Follower f = servers .mt [leader ].main .quorumPeer .follower ;
789- while (f == null || !f .isRunning ()) {
790- if (sleepTime > LEADER_TIMEOUT_MS * 2 ) {
791- fail ("Took too long for old leader to time out "
805+ while (servers .mt [leader ].getQuorumPeer ().getPeerState () != QuorumPeer .ServerState .FOLLOWING ) {
806+ if (sleepTime > LEADER_TIMEOUT_MS * 10 * 2 ) {
807+ fail ("Took too long for old leader to time out and rejoin "
792808 + servers .mt [leader ].main .quorumPeer .getPeerState ());
793809 }
794810 Thread .sleep (100 );
795811 sleepTime += 100 ;
796- f = servers .mt [leader ].main .quorumPeer .follower ;
797812 }
798813
799814 int newLeader = servers .findLeader ();
800815 // make sure a different leader was elected
801816 assertNotEquals (leader , newLeader );
802817
803- // 7. restart the previous leader to force it to replay the edits and possibly come up in a bad state
804- servers .mt [leader ].shutdown ();
805- servers .mt [leader ].start ();
806- // old client session can expire, restart it
818+ // Now, all preconditions meet. Let's verify that the failed txn got truncated in whole cluster.
819+
820+ boolean restarted = false ;
807821 servers .restartClient (leader , this );
808- waitForAll (servers , States .CONNECTED );
822+ waitForOne (servers .zk [leader ], States .CONNECTED );
823+ while (true ) {
824+ // 7. make sure everything is consistent, that is the failed txn got truncated in old leader.
825+ for (int i = 0 ; i < SERVER_COUNT ; i ++) {
826+ assertNull (servers .zk [i ].exists ("/zk" + leader , false ),
827+ "server " + i + " should not have /zk" + leader );
828+ }
809829
810- // 8. check the node exist in previous leader but not others
811- // make sure everything is consistent
812- for (int i = 0 ; i < SERVER_COUNT ; i ++) {
813- assertNull (servers .zk [i ].exists ("/zk" + leader , false ),
814- "server " + i + " should not have /zk" + leader );
830+ if (restarted ) {
831+ break ;
832+ }
833+
834+ // 8. make sure above holds after restart
835+ servers .mt [leader ].shutdown ();
836+ servers .mt [leader ].start ();
837+ // old client session can expire, restart it
838+ servers .restartClient (leader , this );
839+ waitForAll (servers , States .CONNECTED );
840+ restarted = true ;
815841 }
816842 }
817843
0 commit comments