Skip to content

Commit

Permalink
Update remote command tests to ensure commands are applied to all nodes.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 26, 2015
1 parent 7fb4820 commit 057352e
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 60 deletions.
Expand Up @@ -235,7 +235,7 @@ public CompletableFuture<SyncResponse> sync(SyncRequest request) {
// Update the recycle index using the highest member's recycle index. // Update the recycle index using the highest member's recycle index.
request.members().stream() request.members().stream()
.mapToLong(RaftMember::recycleIndex).max() .mapToLong(RaftMember::recycleIndex).max()
.ifPresent(recycleIndex -> context.setRecycleIndex(Math.max(context.getRecycleIndex(), recycleIndex))); .ifPresent(recycleIndex -> context.setRecycleIndex(Math.min(Math.max(context.getRecycleIndex(), recycleIndex), context.getCommitIndex())));


// Reply with the updated vector clock. // Reply with the updated vector clock.
return CompletableFuture.completedFuture(logResponse(SyncResponse.builder() return CompletableFuture.completedFuture(logResponse(SyncResponse.builder()
Expand Down
22 changes: 22 additions & 0 deletions raft/src/main/java/net/kuujo/copycat/protocol/raft/RaftMember.java
Expand Up @@ -129,6 +129,17 @@ public long commitIndex() {
return commitIndex; return commitIndex;
} }


/**
* Sets the member index.
*
* @param index The member's commit index.
* @return The member info.
*/
RaftMember commitIndex(long index) {
this.commitIndex = index;
return this;
}

/** /**
* Returns the member's recycle index. * Returns the member's recycle index.
* *
Expand All @@ -138,6 +149,17 @@ public long recycleIndex() {
return recycleIndex; return recycleIndex;
} }


/**
* Sets the member's recycle index.
*
* @param index The member's recycle index.
* @return The member info.
*/
RaftMember recycleIndex(long index) {
this.recycleIndex = index;
return this;
}

/** /**
* Updates the member info. * Updates the member info.
* *
Expand Down
Expand Up @@ -197,6 +197,7 @@ long getTerm() {
*/ */
RaftProtocol setVersion(long version) { RaftProtocol setVersion(long version) {
this.version = Math.max(this.version, version); this.version = Math.max(this.version, version);
getRaftMember(cluster.member().id()).version(this.version);
return this; return this;
} }


Expand All @@ -209,15 +210,6 @@ long getVersion() {
return version; return version;
} }


/**
* Returns whether the context is recovering.
*
* @return Indicates whether the context is currently recovering.
*/
boolean isRecovering() {
return recovering;
}

/** /**
* Sets the state last voted for candidate. * Sets the state last voted for candidate.
* *
Expand Down Expand Up @@ -272,6 +264,7 @@ RaftProtocol setCommitIndex(long commitIndex) {
} }
} }
this.commitIndex = commitIndex; this.commitIndex = commitIndex;
getRaftMember(cluster.member().id()).commitIndex(commitIndex);
return this; return this;
} }


Expand All @@ -296,6 +289,7 @@ RaftProtocol setRecycleIndex(long recycleIndex) {
if (recycleIndex < this.recycleIndex) if (recycleIndex < this.recycleIndex)
throw new IllegalArgumentException("cannot decrease recycle index"); throw new IllegalArgumentException("cannot decrease recycle index");
this.recycleIndex = recycleIndex; this.recycleIndex = recycleIndex;
getRaftMember(cluster.member().id()).recycleIndex(recycleIndex);
return this; return this;
} }


Expand Down
Expand Up @@ -1176,58 +1176,62 @@ public void testPartialActivePartialPassiveDefaultDefaultCommandOnPassive() thro
/** /**
* Tests a command on a remote node. * Tests a command on a remote node.
*/ */
public void testCommandOnRemote(Persistence persistence, Consistency consistency) throws Throwable { public void testCommandOnRemote(int activeNodes, int passiveNodes, Persistence persistence, Consistency consistency) throws Throwable {
RaftTestMemberRegistry registry = new RaftTestMemberRegistry(); RaftTestMemberRegistry registry = new RaftTestMemberRegistry();


RaftTestCluster cluster1 = buildCluster(1, Member.Type.ACTIVE, 3, registry);
RaftTestCluster cluster2 = buildCluster(2, Member.Type.ACTIVE, 3, registry);
RaftTestCluster cluster3 = buildCluster(3, Member.Type.ACTIVE, 3, registry);

RaftProtocol protocol1 = buildProtocol(1, cluster1);
RaftProtocol protocol2 = buildProtocol(2, cluster2);
RaftProtocol protocol3 = buildProtocol(3, cluster3);

CommitHandler commitHandler = (key, entry, result) -> { CommitHandler commitHandler = (key, entry, result) -> {
threadAssertEquals(key.readLong(), Long.valueOf(1234)); threadAssertEquals(key.readLong(), Long.valueOf(1234));
threadAssertEquals(entry.readLong(), Long.valueOf(4321)); threadAssertEquals(entry.readLong(), Long.valueOf(4321));
resume();
return result.writeLong(5678); return result.writeLong(5678);
}; };


protocol1.commitHandler(commitHandler); Function<RaftProtocol, EventListener<Event>> createListener = protocol -> {
protocol2.commitHandler(commitHandler); return new EventListener<Event>() {
protocol3.commitHandler(commitHandler); @Override
public void accept(Event event) {
if (event instanceof LeaderChangeEvent && ((LeaderChangeEvent) event).newLeader() != null) {
protocol.removeListener(this);
resume();
}
}
};
};


expectResumes(4); expectResumes(activeNodes * 2);


AtomicInteger electionCount = new AtomicInteger(); for (int i = 1; i <= activeNodes; i++) {
EventListener<Event> listener = event -> { RaftTestCluster cluster = buildCluster(i, Member.Type.ACTIVE, activeNodes, registry);
if (event instanceof LeaderChangeEvent && ((LeaderChangeEvent) event).newLeader() != null && electionCount.incrementAndGet() == 3) { RaftProtocol protocol = buildProtocol(i, cluster);
resume(); protocol.commitHandler(commitHandler).addListener(createListener.apply(protocol));
} protocol.open().thenRun(this::resume);
}; }


protocol1.addListener(listener); await();
protocol2.addListener(listener);
protocol3.addListener(listener);


protocol1.open().thenRun(this::resume); expectResumes(passiveNodes * 2);
protocol2.open().thenRun(this::resume);
protocol3.open().thenRun(this::resume); for (int i = activeNodes + 1; i <= activeNodes + passiveNodes; i++) {
RaftTestCluster cluster = buildCluster(i, Member.Type.PASSIVE, activeNodes + 1, registry);
RaftProtocol protocol = buildProtocol(i, cluster);
protocol.commitHandler(commitHandler).addListener(createListener.apply(protocol));
protocol.open().thenRun(this::resume);
}


await(); await();


RaftTestCluster cluster4 = buildCluster(4, Member.Type.REMOTE, 4, registry); RaftTestCluster cluster = buildCluster(activeNodes + passiveNodes + 1, Member.Type.REMOTE, 4, registry);
RaftProtocol protocol4 = buildProtocol(4, cluster4); RaftProtocol protocol = buildProtocol(activeNodes + passiveNodes + 1, cluster);


expectResume(); expectResume();


protocol4.open().thenRun(this::resume); protocol.open().thenRun(this::resume);


await(); await();


expectResume(); expectResumes(activeNodes + passiveNodes + 1);


protocol4.submit(HeapBuffer.allocate(8).writeLong(1234).flip(), HeapBuffer.allocate(8).writeLong(4321).flip()).thenAccept(result -> { protocol.submit(HeapBuffer.allocate(8).writeLong(1234).flip(), HeapBuffer.allocate(8).writeLong(4321).flip(), persistence, consistency).thenAccept(result -> {
threadAssertEquals(result.readLong(), Long.valueOf(5678)); threadAssertEquals(result.readLong(), Long.valueOf(5678));
resume(); resume();
}); });
Expand All @@ -1236,83 +1240,83 @@ public void testCommandOnRemote(Persistence persistence, Consistency consistency
} }


public void testPersistentConsistentCommandOnRemote() throws Throwable { public void testPersistentConsistentCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.PERSISTENT, Consistency.STRICT); testCommandOnRemote(3, 3, Persistence.PERSISTENT, Consistency.STRICT);
} }


public void testDurableConsistentCommandOnRemote() throws Throwable { public void testDurableConsistentCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.DURABLE, Consistency.STRICT); testCommandOnRemote(3, 3, Persistence.DURABLE, Consistency.STRICT);
} }


public void testEphemeralConsistentCommandOnRemote() throws Throwable { public void testEphemeralConsistentCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.EPHEMERAL, Consistency.STRICT); testCommandOnRemote(3, 3, Persistence.EPHEMERAL, Consistency.STRICT);
} }


public void testTransientConsistentCommandOnRemote() throws Throwable { public void testTransientConsistentCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.NONE, Consistency.STRICT); testCommandOnRemote(3, 3, Persistence.NONE, Consistency.STRICT);
} }


public void testDefaultConsistentCommandOnRemote() throws Throwable { public void testDefaultConsistentCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.DEFAULT, Consistency.STRICT); testCommandOnRemote(3, 3, Persistence.DEFAULT, Consistency.STRICT);
} }


public void testPersistentLeaseCommandOnRemote() throws Throwable { public void testPersistentLeaseCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.PERSISTENT, Consistency.LEASE); testCommandOnRemote(3, 3, Persistence.PERSISTENT, Consistency.LEASE);
} }


public void testDurableLeaseCommandOnRemote() throws Throwable { public void testDurableLeaseCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.DURABLE, Consistency.LEASE); testCommandOnRemote(3, 3, Persistence.DURABLE, Consistency.LEASE);
} }


public void testEphemeralLeaseCommandOnRemote() throws Throwable { public void testEphemeralLeaseCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.EPHEMERAL, Consistency.LEASE); testCommandOnRemote(3, 3, Persistence.EPHEMERAL, Consistency.LEASE);
} }


public void testTransientLeaseCommandOnRemote() throws Throwable { public void testTransientLeaseCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.NONE, Consistency.LEASE); testCommandOnRemote(3, 3, Persistence.NONE, Consistency.LEASE);
} }


public void testDefaultLeaseCommandOnRemote() throws Throwable { public void testDefaultLeaseCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.DEFAULT, Consistency.LEASE); testCommandOnRemote(3, 3, Persistence.DEFAULT, Consistency.LEASE);
} }


public void testPersistentEventualCommandOnRemote() throws Throwable { public void testPersistentEventualCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.PERSISTENT, Consistency.EVENTUAL); testCommandOnRemote(3, 3, Persistence.PERSISTENT, Consistency.EVENTUAL);
} }


public void testDurableEventualCommandOnRemote() throws Throwable { public void testDurableEventualCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.DURABLE, Consistency.EVENTUAL); testCommandOnRemote(3, 3, Persistence.DURABLE, Consistency.EVENTUAL);
} }


public void testEphemeralEventualCommandOnRemote() throws Throwable { public void testEphemeralEventualCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.EPHEMERAL, Consistency.EVENTUAL); testCommandOnRemote(3, 3, Persistence.EPHEMERAL, Consistency.EVENTUAL);
} }


public void testTransientEventualCommandOnRemote() throws Throwable { public void testTransientEventualCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.NONE, Consistency.EVENTUAL); testCommandOnRemote(3, 3, Persistence.NONE, Consistency.EVENTUAL);
} }


public void testDefaultEventualCommandOnRemote() throws Throwable { public void testDefaultEventualCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.DEFAULT, Consistency.EVENTUAL); testCommandOnRemote(3, 3, Persistence.DEFAULT, Consistency.EVENTUAL);
} }


public void testPersistentDefaultCommandOnRemote() throws Throwable { public void testPersistentDefaultCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.PERSISTENT, Consistency.DEFAULT); testCommandOnRemote(3, 3, Persistence.PERSISTENT, Consistency.DEFAULT);
} }


public void testDurableDefaultCommandOnRemote() throws Throwable { public void testDurableDefaultCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.DURABLE, Consistency.DEFAULT); testCommandOnRemote(3, 3, Persistence.DURABLE, Consistency.DEFAULT);
} }


public void testEphemeralDefaultCommandOnRemote() throws Throwable { public void testEphemeralDefaultCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.EPHEMERAL, Consistency.DEFAULT); testCommandOnRemote(3, 3, Persistence.EPHEMERAL, Consistency.DEFAULT);
} }


public void testTransientDefaultCommandOnRemote() throws Throwable { public void testTransientDefaultCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.NONE, Consistency.DEFAULT); testCommandOnRemote(3, 3, Persistence.NONE, Consistency.DEFAULT);
} }


public void testDefaultDefaultCommandOnRemote() throws Throwable { public void testDefaultDefaultCommandOnRemote() throws Throwable {
testCommandOnRemote(Persistence.DEFAULT, Consistency.DEFAULT); testCommandOnRemote(3, 3, Persistence.DEFAULT, Consistency.DEFAULT);
} }


/** /**
Expand Down

0 comments on commit 057352e

Please sign in to comment.