From 4161cab5817b2784b8a07ca4ff19773b9958d9c5 Mon Sep 17 00:00:00 2001 From: Arthur Jenoudet Date: Thu, 24 Jun 2021 15:05:29 -0400 Subject: [PATCH] Update Apache Ratis API to 2.0.0 ### What changes are proposed in this pull request? Updating the Apache Ratis API from v1.0.0 to v2.0.0 ### Why are the changes needed? This upgrade will allow Alluxio to use the new TransferLeadership functionality in Ratis 2.0.0. This will allow Alluxio masters to gracefully give up their leadership manually. ### Does this PR introduce any user facing changes? No Fixes #13680 pr-link: Alluxio/alluxio#13689 change-id: cid-77eb53a5d48a69d08866a8d484741f6d0243b5e6 --- .../master/journal/MasterJournalContext.java | 2 +- .../journal/raft/JournalStateMachine.java | 7 ++- .../journal/raft/LocalFirstRaftClient.java | 20 +++++--- .../journal/raft/RaftJournalSystem.java | 46 ++++++++++++------- .../journal/raft/SnapshotDownloader.java | 2 +- .../master/journal/raft/RaftJournalTest.java | 33 ++++++++----- .../journal/raft/RaftJournalWriterTest.java | 16 +++++-- .../raft/SnapshotReplicationManagerTest.java | 15 +++--- .../journal/tool/RaftJournalDumper.java | 21 ++++----- pom.xml | 2 +- .../alluxio/client/cli/JournalToolTest.java | 7 +-- .../raft/EmbeddedJournalIntegrationTest.java | 10 ++-- 12 files changed, 112 insertions(+), 69 deletions(-) diff --git a/core/server/common/src/main/java/alluxio/master/journal/MasterJournalContext.java b/core/server/common/src/main/java/alluxio/master/journal/MasterJournalContext.java index 36904f2734cb..a25a391133b6 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/MasterJournalContext.java +++ b/core/server/common/src/main/java/alluxio/master/journal/MasterJournalContext.java @@ -23,7 +23,7 @@ import com.google.common.base.Preconditions; import io.grpc.Status; -import org.apache.ratis.protocol.NotLeaderException; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/core/server/common/src/main/java/alluxio/master/journal/raft/JournalStateMachine.java b/core/server/common/src/main/java/alluxio/master/journal/raft/JournalStateMachine.java index 1f636ac82bd9..d292f790f1de 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/raft/JournalStateMachine.java +++ b/core/server/common/src/main/java/alluxio/master/journal/raft/JournalStateMachine.java @@ -40,7 +40,6 @@ import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.storage.RaftStorage; @@ -295,14 +294,14 @@ public void notifyNotLeader(Collection pendingEntries) { } @Override - public void notifyIndexUpdate(long term, long index) { - super.notifyIndexUpdate(term, index); + public void notifyTermIndexUpdated(long term, long index) { + super.notifyTermIndexUpdated(term, index); CompletableFuture.runAsync(mJournalSystem::updateGroup); } private long getNextIndex() { try { - return ((RaftServerProxy) mServer).getImpl(mRaftGroupId).getState().getLog().getNextIndex(); + return mServer.getDivision(mRaftGroupId).getRaftLog().getNextIndex(); } catch (IOException e) { throw new IllegalStateException("Cannot obtain raft log index", e); } diff --git a/core/server/common/src/main/java/alluxio/master/journal/raft/LocalFirstRaftClient.java b/core/server/common/src/main/java/alluxio/master/journal/raft/LocalFirstRaftClient.java index a25a5809ead4..f26b706428c2 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/raft/LocalFirstRaftClient.java +++ b/core/server/common/src/main/java/alluxio/master/journal/raft/LocalFirstRaftClient.java @@ -16,12 +16,12 @@ import alluxio.util.LogUtils; import org.apache.ratis.client.RaftClient; -import org.apache.ratis.protocol.AlreadyClosedException; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.NotLeaderException; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.exceptions.AlreadyClosedException; +import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.apache.ratis.server.RaftServer; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -84,9 +84,17 @@ public CompletableFuture sendAsync(Message message, private CompletableFuture sendLocalRequest(Message message, TimeDuration timeout) throws IOException { LOG.trace("Sending local message {}", message); - return mServer.submitClientRequestAsync( - new RaftClientRequest(mLocalClientId, null, RaftJournalSystem.RAFT_GROUP_ID, - RaftJournalSystem.nextCallId(), message, RaftClientRequest.writeRequestType(), null)) + // ClientId, ServerId, and GroupId must not be null + RaftClientRequest request = RaftClientRequest.newBuilder() + .setClientId(mLocalClientId) + .setServerId(mServer.getId()) + .setGroupId(RaftJournalSystem.RAFT_GROUP_ID) + .setCallId(RaftJournalSystem.nextCallId()) + .setMessage(message) + .setType(RaftClientRequest.writeRequestType()) + .setSlidingWindowEntry(null) + .build(); + return mServer.submitClientRequestAsync(request) .thenApply(reply -> handleLocalException(message, reply, timeout)); } @@ -130,7 +138,7 @@ private void handleRemoteException(Throwable t) { private CompletableFuture sendRemoteRequest(Message message) { ensureClient(); LOG.trace("Sending remote message {}", message); - return mClient.sendAsync(message).exceptionally(t -> { + return mClient.async().send(message).exceptionally(t -> { handleRemoteException(t); throw new CompletionException(t.getCause()); }); diff --git a/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalSystem.java b/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalSystem.java index 0659bb1f0c2c..694b30398992 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalSystem.java +++ b/core/server/common/src/main/java/alluxio/master/journal/raft/RaftJournalSystem.java @@ -49,7 +49,6 @@ import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.GroupInfoReply; import org.apache.ratis.protocol.GroupInfoRequest; -import org.apache.ratis.protocol.LeaderNotReadyException; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; @@ -58,6 +57,7 @@ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.protocol.exceptions.LeaderNotReadyException; import org.apache.ratis.retry.ExponentialBackoffRetry; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.SupportedRpcType; @@ -691,7 +691,11 @@ public synchronized void startInternal() throws InterruptedException, IOExceptio mPeerId = RaftJournalUtils.getPeerId(localAddress); List addresses = mConf.getClusterAddresses(); Set peers = addresses.stream() - .map(addr -> new RaftPeer(RaftJournalUtils.getPeerId(addr), addr)) + .map(addr -> RaftPeer.newBuilder() + .setId(RaftJournalUtils.getPeerId(addr)) + .setAddress(addr) + .build() + ) .collect(Collectors.toSet()); mRaftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, peers); initServer(); @@ -721,7 +725,7 @@ private void joinQuorum() { .setRpcPort(localAddress.getPort())) .build(); RaftClient client = createClient(); - client.sendReadOnlyAsync(Message.valueOf( + client.async().sendReadOnly(Message.valueOf( UnsafeByteOperations.unsafeWrap( JournalQueryRequest .newBuilder() @@ -809,16 +813,23 @@ public synchronized List getQuorumServerInfoList() throws IOEx public synchronized CompletableFuture sendMessageAsync( RaftPeerId server, Message message) { RaftClient client = createClient(); - return client.getClientRpc().sendRequestAsync( - new RaftClientRequest(mRawClientId, server, RAFT_GROUP_ID, nextCallId(), message, - RaftClientRequest.staleReadRequestType(0), null) - ).whenComplete((reply, t) -> { - try { - client.close(); - } catch (IOException e) { - throw new CompletionException(e); - } - }); + RaftClientRequest request = RaftClientRequest.newBuilder() + .setClientId(mRawClientId) + .setServerId(server) + .setGroupId(RAFT_GROUP_ID) + .setCallId(nextCallId()) + .setMessage(message) + .setType(RaftClientRequest.staleReadRequestType(0)) + .setSlidingWindowEntry(null) + .build(); + return client.getClientRpc().sendRequestAsync(request) + .whenComplete((reply, t) -> { + try { + client.close(); + } catch (IOException e) { + throw new CompletionException(e); + } + }); } private GroupInfoReply getGroupInfo() throws IOException { @@ -850,9 +861,9 @@ public synchronized void removeQuorumServer(NetAddress serverNetAddress) throws RaftPeerId peerId = RaftJournalUtils.getPeerId(serverAddress); try (RaftClient client = createClient()) { Collection peers = mServer.getGroups().iterator().next().getPeers(); - RaftClientReply reply = client.setConfiguration(peers.stream() + RaftClientReply reply = client.admin().setConfiguration(peers.stream() .filter(peer -> !peer.getId().equals(peerId)) - .toArray(RaftPeer[]::new)); + .collect(Collectors.toList())); if (reply.getException() != null) { throw reply.getException(); } @@ -873,7 +884,10 @@ public synchronized void addQuorumServer(NetAddress serverNetAddress) throws IOE if (peers.stream().anyMatch((peer) -> peer.getId().equals(peerId))) { return; } - RaftPeer newPeer = new RaftPeer(peerId, serverAddress); + RaftPeer newPeer = RaftPeer.newBuilder() + .setId(peerId) + .setAddress(serverAddress) + .build(); List newPeers = new ArrayList<>(peers); newPeers.add(newPeer); RaftClientReply reply = mServer.setConfiguration( diff --git a/core/server/common/src/main/java/alluxio/master/journal/raft/SnapshotDownloader.java b/core/server/common/src/main/java/alluxio/master/journal/raft/SnapshotDownloader.java index 7b644f739211..a632aaef38c9 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/raft/SnapshotDownloader.java +++ b/core/server/common/src/main/java/alluxio/master/journal/raft/SnapshotDownloader.java @@ -123,7 +123,7 @@ private void cleanup() { } private void onNextInternal(R response) throws IOException { - TermIndex termIndex = TermIndex.newTermIndex( + TermIndex termIndex = TermIndex.valueOf( mDataGetter.apply(response).getSnapshotTerm(), mDataGetter.apply(response).getSnapshotIndex()); if (mTermIndex == null) { diff --git a/core/server/common/src/test/java/alluxio/master/journal/raft/RaftJournalTest.java b/core/server/common/src/test/java/alluxio/master/journal/raft/RaftJournalTest.java index da59e3d68459..04ff93537e06 100644 --- a/core/server/common/src/test/java/alluxio/master/journal/raft/RaftJournalTest.java +++ b/core/server/common/src/test/java/alluxio/master/journal/raft/RaftJournalTest.java @@ -25,8 +25,7 @@ import alluxio.util.network.NetworkAddressUtils; import com.google.common.annotations.VisibleForTesting; -import org.apache.ratis.server.impl.RaftServerImpl; -import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.ratis.server.RaftServer; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -530,21 +529,33 @@ private List startJournalCluster(List jour @VisibleForTesting void changeToCandidate(RaftJournalSystem journalSystem) throws Exception { - RaftServerImpl serverImpl = ((RaftServerProxy) journalSystem.getRaftServer()).getImpl( - RaftJournalSystem.RAFT_GROUP_ID); - Method method = serverImpl.getClass().getDeclaredMethod("changeToCandidate"); + RaftServer.Division serverImpl = journalSystem.getRaftServer() + .getDivision(RaftJournalSystem.RAFT_GROUP_ID); + Class raftServerImpl = (Class.forName("org.apache.ratis.server.impl.RaftServerImpl")); + Method method = raftServerImpl.getDeclaredMethod("changeToCandidate", boolean.class); method.setAccessible(true); - method.invoke(serverImpl); + method.invoke(serverImpl, false); } @VisibleForTesting void changeToFollower(RaftJournalSystem journalSystem) throws Exception { - RaftServerImpl serverImpl = ((RaftServerProxy) journalSystem.getRaftServer()).getImpl( - RaftJournalSystem.RAFT_GROUP_ID); - Method method = serverImpl.getClass().getDeclaredMethod("changeToFollower", + RaftServer.Division serverImplObj = journalSystem.getRaftServer() + .getDivision(RaftJournalSystem.RAFT_GROUP_ID); + Class raftServerImplClass = Class.forName("org.apache.ratis.server.impl.RaftServerImpl"); + + Method getStateMethod = raftServerImplClass.getDeclaredMethod("getState"); + getStateMethod.setAccessible(true); + Object serverStateObj = getStateMethod.invoke(serverImplObj); + Class serverStateClass = Class.forName("org.apache.ratis.server.impl.ServerState"); + Method getCurrentTermMethod = serverStateClass.getDeclaredMethod("getCurrentTerm"); + getCurrentTermMethod.setAccessible(true); + long currentTermObj = (long) getCurrentTermMethod.invoke(serverStateObj); + + Method changeToFollowerMethod = raftServerImplClass.getDeclaredMethod("changeToFollower", long.class, boolean.class, Object.class); - method.setAccessible(true); - method.invoke(serverImpl, serverImpl.getState().getCurrentTerm(), true, "test"); + + changeToFollowerMethod.setAccessible(true); + changeToFollowerMethod.invoke(serverImplObj, currentTermObj, true, "test"); } /** diff --git a/core/server/common/src/test/java/alluxio/master/journal/raft/RaftJournalWriterTest.java b/core/server/common/src/test/java/alluxio/master/journal/raft/RaftJournalWriterTest.java index 8ae234b1b660..379b5bdf9468 100644 --- a/core/server/common/src/test/java/alluxio/master/journal/raft/RaftJournalWriterTest.java +++ b/core/server/common/src/test/java/alluxio/master/journal/raft/RaftJournalWriterTest.java @@ -52,10 +52,18 @@ public void after() throws Exception { private void setupRaftJournalWriter() throws IOException { mClient = mock(LocalFirstRaftClient.class); - RaftClientReply reply = new RaftClientReply(ClientId.randomId(), - RaftGroupMemberId.valueOf(RaftJournalUtils.getPeerId(new InetSocketAddress(1)), - RaftGroupId.valueOf(UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1"))), - 1L, true, Message.valueOf("mp"), null, 1L, null); + RaftClientReply reply = RaftClientReply.newBuilder() + .setClientId(ClientId.randomId()) + .setServerId( + RaftGroupMemberId.valueOf(RaftJournalUtils.getPeerId(new InetSocketAddress(1)), + RaftGroupId.valueOf(UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1"))) + ).setCallId(1L) + .setSuccess(true) + .setMessage(Message.valueOf("mp")) + .setException(null) + .setLogIndex(1L) + .setCommitInfos(null) + .build(); CompletableFuture future = new CompletableFuture() { @Override diff --git a/core/server/common/src/test/java/alluxio/master/journal/raft/SnapshotReplicationManagerTest.java b/core/server/common/src/test/java/alluxio/master/journal/raft/SnapshotReplicationManagerTest.java index 1461557cb32e..1a93c548b471 100644 --- a/core/server/common/src/test/java/alluxio/master/journal/raft/SnapshotReplicationManagerTest.java +++ b/core/server/common/src/test/java/alluxio/master/journal/raft/SnapshotReplicationManagerTest.java @@ -32,11 +32,11 @@ import org.apache.commons.io.FileUtils; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.server.storage.RaftStorageImpl; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.junit.After; @@ -85,8 +85,9 @@ public void before() throws Exception { JournalQueryRequest queryRequest = JournalQueryRequest.parseFrom( message.getContent().asReadOnlyByteBuffer()); Message response = mFollowerSnapshotManager.handleRequest(queryRequest); - return CompletableFuture.completedFuture( - new RaftClientReply(Mockito.mock(RaftClientRequest.class), response, null)); + RaftClientReply reply = Mockito.mock(RaftClientReply.class); + Mockito.when(reply.getMessage()).thenReturn(response); + return CompletableFuture.completedFuture(reply); }); mLeaderStore = getSimpleStateMachineStorage(); mLeaderSnapshotManager = new SnapshotReplicationManager(mLeader, mLeaderStore); @@ -118,8 +119,8 @@ public void before() throws Exception { } private SimpleStateMachineStorage getSimpleStateMachineStorage() throws IOException { - RaftStorage rs = new RaftStorage(mFolder.newFolder(CommonUtils.randomAlphaNumString(6)), - RaftServerConstants.StartupOption.REGULAR); + RaftStorage rs = new RaftStorageImpl(mFolder.newFolder(CommonUtils.randomAlphaNumString(6)), + RaftServerConfigKeys.Log.CorruptionPolicy.getDefault()); SimpleStateMachineStorage snapshotStore = new SimpleStateMachineStorage(); snapshotStore.init(rs); return snapshotStore; @@ -134,7 +135,7 @@ private void createSnapshotFile(SimpleStateMachineStorage storage) throws IOExce private void validateSnapshotFile(SimpleStateMachineStorage storage) throws IOException { SingleFileSnapshotInfo snapshot = storage.getLatestSnapshot(); Assert.assertNotNull(snapshot); - Assert.assertEquals(TermIndex.newTermIndex(0, 1), snapshot.getTermIndex()); + Assert.assertEquals(TermIndex.valueOf(0, 1), snapshot.getTermIndex()); byte[] received = FileUtils.readFileToByteArray(snapshot.getFiles().get(0).getPath().toFile()); Assert.assertTrue(BufferUtils.equalIncreasingByteArray(SNAPSHOT_SIZE, received)); } diff --git a/core/server/master/src/main/java/alluxio/master/journal/tool/RaftJournalDumper.java b/core/server/master/src/main/java/alluxio/master/journal/tool/RaftJournalDumper.java index dc42384e7932..46506fb5ea1f 100644 --- a/core/server/master/src/main/java/alluxio/master/journal/tool/RaftJournalDumper.java +++ b/core/server/master/src/main/java/alluxio/master/journal/tool/RaftJournalDumper.java @@ -20,10 +20,10 @@ import com.google.common.base.Preconditions; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.raftlog.segmented.LogSegment; +import org.apache.ratis.server.raftlog.segmented.LogSegmentPath; import org.apache.ratis.server.storage.RaftStorage; -import org.apache.ratis.server.storage.RaftStorageDirectory; +import org.apache.ratis.server.storage.RaftStorageImpl; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.slf4j.Logger; @@ -83,14 +83,13 @@ private void readRatisLogFromDir() { try ( PrintStream out = new PrintStream(new BufferedOutputStream(new FileOutputStream(mJournalEntryFile))); - RaftStorage storage = new RaftStorage(getJournalDir(), - RaftServerConstants.StartupOption.REGULAR)) { - List paths = - storage.getStorageDir().getLogSegmentFiles(); - for (RaftStorageDirectory.LogPathAndIndex path : paths) { + RaftStorage storage = new RaftStorageImpl(getJournalDir(), + RaftServerConfigKeys.Log.CorruptionPolicy.getDefault())) { + List paths = LogSegmentPath.getLogSegmentPaths(storage); + for (LogSegmentPath path : paths) { final int entryCount = LogSegment.readSegmentFile(path.getPath().toFile(), - path.getStartIndex(), path.getEndIndex(), path.isOpen(), - RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, null, (proto) -> { + path.getStartEnd(), RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, + null, (proto) -> { if (proto.hasStateMachineLogEntry()) { try { Journal.JournalEntry entry = Journal.JournalEntry.parseFrom( @@ -114,8 +113,8 @@ private File getJournalDir() { } private void readRatisSnapshotFromDir() throws IOException { - try (RaftStorage storage = new RaftStorage(getJournalDir(), - RaftServerConstants.StartupOption.REGULAR)) { + try (RaftStorage storage = new RaftStorageImpl(getJournalDir(), + RaftServerConfigKeys.Log.CorruptionPolicy.getDefault())) { SimpleStateMachineStorage stateMachineStorage = new SimpleStateMachineStorage(); stateMachineStorage.init(storage); SingleFileSnapshotInfo currentSnapshot = stateMachineStorage.getLatestSnapshot(); diff --git a/pom.xml b/pom.xml index e3a50bd79c7f..274219403256 100644 --- a/pom.xml +++ b/pom.xml @@ -150,7 +150,7 @@ 1.11.0 3.12.4 UTF-8 - 1.0.0 + 2.0.0 1.7.30 2.11.1 1.0.0 diff --git a/tests/src/test/java/alluxio/client/cli/JournalToolTest.java b/tests/src/test/java/alluxio/client/cli/JournalToolTest.java index 2b3fbc9eed69..a3be49a3a114 100644 --- a/tests/src/test/java/alluxio/client/cli/JournalToolTest.java +++ b/tests/src/test/java/alluxio/client/cli/JournalToolTest.java @@ -39,8 +39,9 @@ import alluxio.testutils.LocalAlluxioClusterResource; import alluxio.util.io.PathUtils; -import org.apache.ratis.server.impl.RaftServerConstants; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.server.storage.RaftStorageImpl; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.hamcrest.Matchers; @@ -217,10 +218,10 @@ private void checkpointEmbeddedJournal() throws Throwable { } private long getCurrentRatisSnapshotIndex(String journalFolder) throws Throwable { - try (RaftStorage storage = new RaftStorage( + try (RaftStorage storage = new RaftStorageImpl( new File(RaftJournalUtils.getRaftJournalDir(new File(journalFolder)), RaftJournalSystem.RAFT_GROUP_UUID.toString()), - RaftServerConstants.StartupOption.REGULAR)) { + RaftServerConfigKeys.Log.CorruptionPolicy.getDefault())) { SimpleStateMachineStorage stateMachineStorage = new SimpleStateMachineStorage(); stateMachineStorage.init(storage); SingleFileSnapshotInfo snapshot = stateMachineStorage.getLatestSnapshot(); diff --git a/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTest.java b/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTest.java index 057277d7c27d..0b1189cb3325 100644 --- a/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTest.java +++ b/tests/src/test/java/alluxio/server/ft/journal/raft/EmbeddedJournalIntegrationTest.java @@ -42,8 +42,8 @@ import org.apache.commons.io.FileUtils; import org.apache.ratis.protocol.Message; -import org.apache.ratis.server.impl.RaftServerConstants; -import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.storage.RaftStorageImpl; import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.junit.After; @@ -139,7 +139,8 @@ public void copySnapshotToMaster() throws Exception { mCluster.stopMasters(); SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); - storage.init(new RaftStorage(raftDir, RaftServerConstants.StartupOption.REGULAR)); + storage.init(new RaftStorageImpl(raftDir, + RaftServerConfigKeys.Log.CorruptionPolicy.getDefault())); SingleFileSnapshotInfo snapshot = storage.findLatestSnapshot(); assertNotNull(snapshot); mCluster.notifySuccess(); @@ -178,7 +179,8 @@ public void copySnapshotToFollower() throws Exception { waitForSnapshot(raftDir); mCluster.stopMaster(catchUpMasterIndex); SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); - storage.init(new RaftStorage(raftDir, RaftServerConstants.StartupOption.REGULAR)); + storage.init(new RaftStorageImpl(raftDir, + RaftServerConfigKeys.Log.CorruptionPolicy.getDefault())); SingleFileSnapshotInfo snapshot = storage.findLatestSnapshot(); assertNotNull(snapshot); mCluster.notifySuccess();