Skip to content

Commit

Permalink
Update Apache Ratis API to 2.0.0
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?
Updating the Apache Ratis API from v1.0.0 to v2.0.0

### Why are the changes needed?
This upgrade will allow Alluxio to use the new TransferLeadership
functionality in Ratis 2.0.0. This will allow Alluxio masters to
gracefully give up their leadership manually.

### Does this PR introduce any user facing changes?
No

Fixes #13680

pr-link: #13689
change-id: cid-77eb53a5d48a69d08866a8d484741f6d0243b5e6
  • Loading branch information
jenoudet committed Jun 24, 2021
1 parent a9f8980 commit 4161cab
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 69 deletions.
Expand Up @@ -23,7 +23,7 @@

import com.google.common.base.Preconditions;
import io.grpc.Status;
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Expand Up @@ -40,7 +40,6 @@
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
Expand Down Expand Up @@ -295,14 +294,14 @@ public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
}

@Override
public void notifyIndexUpdate(long term, long index) {
super.notifyIndexUpdate(term, index);
public void notifyTermIndexUpdated(long term, long index) {
super.notifyTermIndexUpdated(term, index);
CompletableFuture.runAsync(mJournalSystem::updateGroup);
}

private long getNextIndex() {
try {
return ((RaftServerProxy) mServer).getImpl(mRaftGroupId).getState().getLog().getNextIndex();
return mServer.getDivision(mRaftGroupId).getRaftLog().getNextIndex();
} catch (IOException e) {
throw new IllegalStateException("Cannot obtain raft log index", e);
}
Expand Down
Expand Up @@ -16,12 +16,12 @@
import alluxio.util.LogUtils;

import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -84,9 +84,17 @@ public CompletableFuture<RaftClientReply> sendAsync(Message message,
private CompletableFuture<RaftClientReply> sendLocalRequest(Message message,
TimeDuration timeout) throws IOException {
LOG.trace("Sending local message {}", message);
return mServer.submitClientRequestAsync(
new RaftClientRequest(mLocalClientId, null, RaftJournalSystem.RAFT_GROUP_ID,
RaftJournalSystem.nextCallId(), message, RaftClientRequest.writeRequestType(), null))
// ClientId, ServerId, and GroupId must not be null
RaftClientRequest request = RaftClientRequest.newBuilder()
.setClientId(mLocalClientId)
.setServerId(mServer.getId())
.setGroupId(RaftJournalSystem.RAFT_GROUP_ID)
.setCallId(RaftJournalSystem.nextCallId())
.setMessage(message)
.setType(RaftClientRequest.writeRequestType())
.setSlidingWindowEntry(null)
.build();
return mServer.submitClientRequestAsync(request)
.thenApply(reply -> handleLocalException(message, reply, timeout));
}

Expand Down Expand Up @@ -130,7 +138,7 @@ private void handleRemoteException(Throwable t) {
private CompletableFuture<RaftClientReply> sendRemoteRequest(Message message) {
ensureClient();
LOG.trace("Sending remote message {}", message);
return mClient.sendAsync(message).exceptionally(t -> {
return mClient.async().send(message).exceptionally(t -> {
handleRemoteException(t);
throw new CompletionException(t.getCause());
});
Expand Down
Expand Up @@ -49,7 +49,6 @@
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.LeaderNotReadyException;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
Expand All @@ -58,6 +57,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.retry.ExponentialBackoffRetry;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.SupportedRpcType;
Expand Down Expand Up @@ -691,7 +691,11 @@ public synchronized void startInternal() throws InterruptedException, IOExceptio
mPeerId = RaftJournalUtils.getPeerId(localAddress);
List<InetSocketAddress> addresses = mConf.getClusterAddresses();
Set<RaftPeer> peers = addresses.stream()
.map(addr -> new RaftPeer(RaftJournalUtils.getPeerId(addr), addr))
.map(addr -> RaftPeer.newBuilder()
.setId(RaftJournalUtils.getPeerId(addr))
.setAddress(addr)
.build()
)
.collect(Collectors.toSet());
mRaftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, peers);
initServer();
Expand Down Expand Up @@ -721,7 +725,7 @@ private void joinQuorum() {
.setRpcPort(localAddress.getPort()))
.build();
RaftClient client = createClient();
client.sendReadOnlyAsync(Message.valueOf(
client.async().sendReadOnly(Message.valueOf(
UnsafeByteOperations.unsafeWrap(
JournalQueryRequest
.newBuilder()
Expand Down Expand Up @@ -809,16 +813,23 @@ public synchronized List<QuorumServerInfo> getQuorumServerInfoList() throws IOEx
public synchronized CompletableFuture<RaftClientReply> sendMessageAsync(
RaftPeerId server, Message message) {
RaftClient client = createClient();
return client.getClientRpc().sendRequestAsync(
new RaftClientRequest(mRawClientId, server, RAFT_GROUP_ID, nextCallId(), message,
RaftClientRequest.staleReadRequestType(0), null)
).whenComplete((reply, t) -> {
try {
client.close();
} catch (IOException e) {
throw new CompletionException(e);
}
});
RaftClientRequest request = RaftClientRequest.newBuilder()
.setClientId(mRawClientId)
.setServerId(server)
.setGroupId(RAFT_GROUP_ID)
.setCallId(nextCallId())
.setMessage(message)
.setType(RaftClientRequest.staleReadRequestType(0))
.setSlidingWindowEntry(null)
.build();
return client.getClientRpc().sendRequestAsync(request)
.whenComplete((reply, t) -> {
try {
client.close();
} catch (IOException e) {
throw new CompletionException(e);
}
});
}

private GroupInfoReply getGroupInfo() throws IOException {
Expand Down Expand Up @@ -850,9 +861,9 @@ public synchronized void removeQuorumServer(NetAddress serverNetAddress) throws
RaftPeerId peerId = RaftJournalUtils.getPeerId(serverAddress);
try (RaftClient client = createClient()) {
Collection<RaftPeer> peers = mServer.getGroups().iterator().next().getPeers();
RaftClientReply reply = client.setConfiguration(peers.stream()
RaftClientReply reply = client.admin().setConfiguration(peers.stream()
.filter(peer -> !peer.getId().equals(peerId))
.toArray(RaftPeer[]::new));
.collect(Collectors.toList()));
if (reply.getException() != null) {
throw reply.getException();
}
Expand All @@ -873,7 +884,10 @@ public synchronized void addQuorumServer(NetAddress serverNetAddress) throws IOE
if (peers.stream().anyMatch((peer) -> peer.getId().equals(peerId))) {
return;
}
RaftPeer newPeer = new RaftPeer(peerId, serverAddress);
RaftPeer newPeer = RaftPeer.newBuilder()
.setId(peerId)
.setAddress(serverAddress)
.build();
List<RaftPeer> newPeers = new ArrayList<>(peers);
newPeers.add(newPeer);
RaftClientReply reply = mServer.setConfiguration(
Expand Down
Expand Up @@ -123,7 +123,7 @@ private void cleanup() {
}

private void onNextInternal(R response) throws IOException {
TermIndex termIndex = TermIndex.newTermIndex(
TermIndex termIndex = TermIndex.valueOf(
mDataGetter.apply(response).getSnapshotTerm(),
mDataGetter.apply(response).getSnapshotIndex());
if (mTermIndex == null) {
Expand Down
Expand Up @@ -25,8 +25,7 @@
import alluxio.util.network.NetworkAddressUtils;

import com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.RaftServer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -530,21 +529,33 @@ private List<RaftJournalSystem> startJournalCluster(List<RaftJournalSystem> jour

@VisibleForTesting
void changeToCandidate(RaftJournalSystem journalSystem) throws Exception {
RaftServerImpl serverImpl = ((RaftServerProxy) journalSystem.getRaftServer()).getImpl(
RaftJournalSystem.RAFT_GROUP_ID);
Method method = serverImpl.getClass().getDeclaredMethod("changeToCandidate");
RaftServer.Division serverImpl = journalSystem.getRaftServer()
.getDivision(RaftJournalSystem.RAFT_GROUP_ID);
Class<?> raftServerImpl = (Class.forName("org.apache.ratis.server.impl.RaftServerImpl"));
Method method = raftServerImpl.getDeclaredMethod("changeToCandidate", boolean.class);
method.setAccessible(true);
method.invoke(serverImpl);
method.invoke(serverImpl, false);
}

@VisibleForTesting
void changeToFollower(RaftJournalSystem journalSystem) throws Exception {
RaftServerImpl serverImpl = ((RaftServerProxy) journalSystem.getRaftServer()).getImpl(
RaftJournalSystem.RAFT_GROUP_ID);
Method method = serverImpl.getClass().getDeclaredMethod("changeToFollower",
RaftServer.Division serverImplObj = journalSystem.getRaftServer()
.getDivision(RaftJournalSystem.RAFT_GROUP_ID);
Class<?> raftServerImplClass = Class.forName("org.apache.ratis.server.impl.RaftServerImpl");

Method getStateMethod = raftServerImplClass.getDeclaredMethod("getState");
getStateMethod.setAccessible(true);
Object serverStateObj = getStateMethod.invoke(serverImplObj);
Class<?> serverStateClass = Class.forName("org.apache.ratis.server.impl.ServerState");
Method getCurrentTermMethod = serverStateClass.getDeclaredMethod("getCurrentTerm");
getCurrentTermMethod.setAccessible(true);
long currentTermObj = (long) getCurrentTermMethod.invoke(serverStateObj);

Method changeToFollowerMethod = raftServerImplClass.getDeclaredMethod("changeToFollower",
long.class, boolean.class, Object.class);
method.setAccessible(true);
method.invoke(serverImpl, serverImpl.getState().getCurrentTerm(), true, "test");

changeToFollowerMethod.setAccessible(true);
changeToFollowerMethod.invoke(serverImplObj, currentTermObj, true, "test");
}

/**
Expand Down
Expand Up @@ -52,10 +52,18 @@ public void after() throws Exception {

private void setupRaftJournalWriter() throws IOException {
mClient = mock(LocalFirstRaftClient.class);
RaftClientReply reply = new RaftClientReply(ClientId.randomId(),
RaftGroupMemberId.valueOf(RaftJournalUtils.getPeerId(new InetSocketAddress(1)),
RaftGroupId.valueOf(UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1"))),
1L, true, Message.valueOf("mp"), null, 1L, null);
RaftClientReply reply = RaftClientReply.newBuilder()
.setClientId(ClientId.randomId())
.setServerId(
RaftGroupMemberId.valueOf(RaftJournalUtils.getPeerId(new InetSocketAddress(1)),
RaftGroupId.valueOf(UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1")))
).setCallId(1L)
.setSuccess(true)
.setMessage(Message.valueOf("mp"))
.setException(null)
.setLogIndex(1L)
.setCommitInfos(null)
.build();

CompletableFuture<RaftClientReply> future = new CompletableFuture<RaftClientReply>() {
@Override
Expand Down
Expand Up @@ -32,11 +32,11 @@
import org.apache.commons.io.FileUtils;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageImpl;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.junit.After;
Expand Down Expand Up @@ -85,8 +85,9 @@ public void before() throws Exception {
JournalQueryRequest queryRequest = JournalQueryRequest.parseFrom(
message.getContent().asReadOnlyByteBuffer());
Message response = mFollowerSnapshotManager.handleRequest(queryRequest);
return CompletableFuture.completedFuture(
new RaftClientReply(Mockito.mock(RaftClientRequest.class), response, null));
RaftClientReply reply = Mockito.mock(RaftClientReply.class);
Mockito.when(reply.getMessage()).thenReturn(response);
return CompletableFuture.completedFuture(reply);
});
mLeaderStore = getSimpleStateMachineStorage();
mLeaderSnapshotManager = new SnapshotReplicationManager(mLeader, mLeaderStore);
Expand Down Expand Up @@ -118,8 +119,8 @@ public void before() throws Exception {
}

private SimpleStateMachineStorage getSimpleStateMachineStorage() throws IOException {
RaftStorage rs = new RaftStorage(mFolder.newFolder(CommonUtils.randomAlphaNumString(6)),
RaftServerConstants.StartupOption.REGULAR);
RaftStorage rs = new RaftStorageImpl(mFolder.newFolder(CommonUtils.randomAlphaNumString(6)),
RaftServerConfigKeys.Log.CorruptionPolicy.getDefault());
SimpleStateMachineStorage snapshotStore = new SimpleStateMachineStorage();
snapshotStore.init(rs);
return snapshotStore;
Expand All @@ -134,7 +135,7 @@ private void createSnapshotFile(SimpleStateMachineStorage storage) throws IOExce
private void validateSnapshotFile(SimpleStateMachineStorage storage) throws IOException {
SingleFileSnapshotInfo snapshot = storage.getLatestSnapshot();
Assert.assertNotNull(snapshot);
Assert.assertEquals(TermIndex.newTermIndex(0, 1), snapshot.getTermIndex());
Assert.assertEquals(TermIndex.valueOf(0, 1), snapshot.getTermIndex());
byte[] received = FileUtils.readFileToByteArray(snapshot.getFiles().get(0).getPath().toFile());
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(SNAPSHOT_SIZE, received));
}
Expand Down
Expand Up @@ -20,10 +20,10 @@

import com.google.common.base.Preconditions;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.raftlog.segmented.LogSegment;
import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.server.storage.RaftStorageImpl;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.slf4j.Logger;
Expand Down Expand Up @@ -83,14 +83,13 @@ private void readRatisLogFromDir() {
try (
PrintStream out =
new PrintStream(new BufferedOutputStream(new FileOutputStream(mJournalEntryFile)));
RaftStorage storage = new RaftStorage(getJournalDir(),
RaftServerConstants.StartupOption.REGULAR)) {
List<RaftStorageDirectory.LogPathAndIndex> paths =
storage.getStorageDir().getLogSegmentFiles();
for (RaftStorageDirectory.LogPathAndIndex path : paths) {
RaftStorage storage = new RaftStorageImpl(getJournalDir(),
RaftServerConfigKeys.Log.CorruptionPolicy.getDefault())) {
List<LogSegmentPath> paths = LogSegmentPath.getLogSegmentPaths(storage);
for (LogSegmentPath path : paths) {
final int entryCount = LogSegment.readSegmentFile(path.getPath().toFile(),
path.getStartIndex(), path.getEndIndex(), path.isOpen(),
RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, null, (proto) -> {
path.getStartEnd(), RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION,
null, (proto) -> {
if (proto.hasStateMachineLogEntry()) {
try {
Journal.JournalEntry entry = Journal.JournalEntry.parseFrom(
Expand All @@ -114,8 +113,8 @@ private File getJournalDir() {
}

private void readRatisSnapshotFromDir() throws IOException {
try (RaftStorage storage = new RaftStorage(getJournalDir(),
RaftServerConstants.StartupOption.REGULAR)) {
try (RaftStorage storage = new RaftStorageImpl(getJournalDir(),
RaftServerConfigKeys.Log.CorruptionPolicy.getDefault())) {
SimpleStateMachineStorage stateMachineStorage = new SimpleStateMachineStorage();
stateMachineStorage.init(storage);
SingleFileSnapshotInfo currentSnapshot = stateMachineStorage.getLatestSnapshot();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -150,7 +150,7 @@
<parquet.version>1.11.0</parquet.version>
<protobuf.version>3.12.4</protobuf.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<ratis.version>1.0.0</ratis.version>
<ratis.version>2.0.0</ratis.version>
<slf4j.version>1.7.30</slf4j.version>
<jackson.version>2.11.1</jackson.version>
<ozone.version>1.0.0</ozone.version>
Expand Down
7 changes: 4 additions & 3 deletions tests/src/test/java/alluxio/client/cli/JournalToolTest.java
Expand Up @@ -39,8 +39,9 @@
import alluxio.testutils.LocalAlluxioClusterResource;
import alluxio.util.io.PathUtils;

import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageImpl;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -217,10 +218,10 @@ private void checkpointEmbeddedJournal() throws Throwable {
}

private long getCurrentRatisSnapshotIndex(String journalFolder) throws Throwable {
try (RaftStorage storage = new RaftStorage(
try (RaftStorage storage = new RaftStorageImpl(
new File(RaftJournalUtils.getRaftJournalDir(new File(journalFolder)),
RaftJournalSystem.RAFT_GROUP_UUID.toString()),
RaftServerConstants.StartupOption.REGULAR)) {
RaftServerConfigKeys.Log.CorruptionPolicy.getDefault())) {
SimpleStateMachineStorage stateMachineStorage = new SimpleStateMachineStorage();
stateMachineStorage.init(storage);
SingleFileSnapshotInfo snapshot = stateMachineStorage.getLatestSnapshot();
Expand Down

0 comments on commit 4161cab

Please sign in to comment.