New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RATIS-1473.Implement takeSnapshot in Server #567
Conversation
@szetszwo I implemented the code on the server side and wrote a UT, can you help review the code, thx! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codings-dan , thanks for working no this. It is hard to figure out all the details.
We should trigger state machine updater to take a snapshot but not using another thread. Otherwise, these threads will race each other. Some comments inlined. Other suggestions can be found in https://issues.apache.org/jira/secure/attachment/13037601/567_review.patch
CompletableFuture<RaftClientReply> takeSnapshotAsync( | ||
SnapshotRequest request) throws IOException; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not change the protocol yet. Let's discuss how to change it. We may want to add a snapshotManagementAsync(..) instead of individual snapshot methods.
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
public class Snapshot { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call it SnapshotRequestHandler.
public void enableSnapshot() { | ||
if(!takeSnapshot) { | ||
takeSnapshot = true; | ||
if(shouldTakeSnapshot()) { | ||
takeSnapshot(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method may race with the updater thread. The idea is to trigger updater thread to take a snapshot but not using another thread.
@@ -133,6 +134,37 @@ public void tearDown() { | |||
} | |||
} | |||
|
|||
@Test | |||
public void testTakeSnapshot() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create a new test class since there are already many tests here.
if (e != null) { | ||
replyFuture.completeExceptionally(e); | ||
} else { | ||
replyFuture.complete(r.isSuccess()? server.newSuccessReply(request) | ||
: server.newExceptionReply(request, r.getException())); | ||
} | ||
}); | ||
return replyFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just return previous.getReplyFuture().
return CompletableFuture.completedFuture(newExceptionReply(request,new RaftException(msg))); | ||
} | ||
setFinishSnapshot(false); | ||
state.getStateMachineUpdater().enableSnapshot(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not async.
@szetszwo Thanks for reviewing the code, I change the code according to the comment you left, PTAL again. In addition, you almost helped refactor the code logic, I benefited a lot, thanks again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codings-dan , thanks for the update. Some quick comments below and inlined.
- Please remove all the unused code changes and all the unrelated changes.
- Since we have not implemented RPCs in this JIRA, we should not add the RPC test subclasses.
import org.apache.ratis.server.impl.MiniRaftCluster; | ||
import org.apache.ratis.statemachine.RaftTakeSnapshotTest; | ||
|
||
public class TestRaftTakeSnapshotWithHadoopRpc extends RaftTakeSnapshotTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove rpc test. We will add it with the rpc change later.
public MiniRaftCluster.Factory<?> getFactory() { | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this unrelated change.
import org.apache.ratis.server.impl.MiniRaftCluster; | ||
import org.apache.ratis.statemachine.RaftTakeSnapshotTest; | ||
|
||
public class TestRaftTakeSnapshotWithGrpc extends RaftTakeSnapshotTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove rpc test. We will add it with the rpc change later.
import org.apache.ratis.server.impl.MiniRaftCluster; | ||
import org.apache.ratis.statemachine.RaftTakeSnapshotTest; | ||
|
||
public class TestRaftTakeSnapshotWithNetty extends RaftTakeSnapshotTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove rpc test. We will add it with the rpc change later.
import org.apache.ratis.server.impl.MiniRaftCluster; | ||
import org.apache.ratis.statemachine.RaftTakeSnapshotTest; | ||
|
||
public class TestRaftTakeSnapshotWithSimulatedRpc extends RaftTakeSnapshotTest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove rpc test. We will add it with the rpc change later.
public RaftClientReply takeSnapshot(SnapshotRequest request) throws IOException { | ||
return waitForReply(request, takeSnapshotAsync(request)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove takeSnapshot(..) since it is not used.
StateMachineUpdater getStateMachineUpdater() { | ||
return stateMachineUpdater; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove getStateMachineUpdater() since it is not used.
@@ -253,7 +260,6 @@ private void checkAndTakeSnapshot(MemoizedSupplier<List<CompletableFuture<Messag | |||
if (futures.isInitialized()) { | |||
JavaUtils.allOf(futures.get()).get(); | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this unrelated change.
import java.util.stream.Collectors; | ||
import java.util.stream.LongStream; | ||
|
||
public abstract class RaftTakeSnapshotTest extends BaseTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename it to SnapshotManagementTest. We will add other new tests later.
private MiniRaftCluster cluster; | ||
|
||
public MiniRaftCluster.Factory<?> getFactory() { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return MiniRaftClusterWithSimulatedRpc.FACTORY for now so that we can remove all the subclass.
The ci test seems to be unstable, and the error report has nothing to do with this code change, PTAL, thx! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codings-dan , thanks for the update. Sorry that I did not check the test in details previously. Some comments inlined on the test; see SnapshotManagementTest.java and TestSnapshotManagementWithSimulatedRpc.java in https://issues.apache.org/jira/secure/attachment/13037699/567_review2.patch
import java.util.stream.Collectors; | ||
import java.util.stream.LongStream; | ||
|
||
public abstract class SnapshotManagementTest extends BaseTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the class is abstract, we actually need to add a subclass TestSnapshotManagementWithSimulatedRpc. Otherwise, the test won't be executed. Sorry that I asked you to remove SimulatedRpc subclass before.
} | ||
|
||
static final Logger LOG = LoggerFactory.getLogger(SnapshotManagementTest.class); | ||
private MiniRaftCluster cluster; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's declare the class as
public abstract class SnapshotManagementTest<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
Then, it will take care the cluster start and shutdown.
} | ||
final SnapshotRequest r = new SnapshotRequest(client.getId(), leaderId, cluster.getGroupId(), | ||
CallId.getAndIncrement(), 3000); | ||
RaftServerTestUtil.takeSnapshotAsync(leader, r); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should get the future and wait for it.
long nextIndex = cluster.getLeader().getRaftLog().getNextIndex(); | ||
LOG.info("nextIndex = {}", nextIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should get the log index from the reply.
final List<File> snapshotFiles = LongStream.range(0, nextIndex) | ||
.mapToObj(j -> | ||
SimpleStateMachine4Testing | ||
.get(cluster.getLeader()) | ||
.getStateMachineStorage() | ||
.getSnapshotFile(cluster.getLeader().getInfo().getCurrentTerm(), j)) | ||
.collect(Collectors.toList()); | ||
JavaUtils.attemptRepeatedly(() -> { | ||
Assert.assertTrue(snapshotFiles.stream().anyMatch(File::exists)); | ||
return null; | ||
}, 100, ONE_SECOND, "snapshotFile.exist", LOG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we have the snapshot index, we can check only the expected file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good change, unit test is more accurate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 the change looks good.
@szetszwo After this patch merge, should we do this next: |
The tasks look good. We should also add some unit tests for the shell commands. |
What changes were proposed in this pull request?
subtask of Support snapshot command: Implement takeSnapshot in Server
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/RATIS-1473
How was this patch tested?
UT