Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-19120 Raft client should get leader metadata along while getting leader itself #2255

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.raft.service;

import org.apache.ignite.internal.raft.Peer;

/**
* The class represents a leader metadata.
*/
public class LeaderMetadata {
/** Leader peer. */
private final Peer leader;

/** Corresponding term to the leader. */
private final long term;

/** Index on the moment to formed the metadata. */
private final long index;

/**
* The constructor.
*
* @param leader Leader.
* @param term Leader term.
* @param index Raft index.
*/
public LeaderMetadata(Peer leader, long term, long index) {
this.leader = leader;
this.term = term;
this.index = index;
}

/**
* Get a leader peer.
*
* @return Leader peer.
*/
public Peer getLeader() {
return leader;
}

/**
* Gets a term corresponding to the leader.
*
* @return Leader term.
*/
public long getTerm() {
return term;
}

/**
* Gets an index on the moment to formed the metadata.
*
* @return Raft index.
*/
public long getIndex() {
return index;
}

@Override
public String toString() {
return "LeaderMetadata{"
+ "leader=" + leader
+ ", term=" + term
+ ", index=" + index
+ '}';
}
}
Expand Up @@ -41,4 +41,14 @@ public LeaderWithTerm(@Nullable Peer leader, long term) {
public long term() {
return term;
}

@Override
public String toString() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use S.toString(LeaderWithTerm.class, this); ?

return "LeaderWithTerm{"
+ "leader="
+ leader
+ ", term="
+ term
+ '}';
}
}
Expand Up @@ -234,11 +234,11 @@ public interface RaftGroupService {
void shutdown();

/**
* Reads index from the group leader.
* Reads a metadata from leader.
*
* @return Future containing the index.
* @return Future contains a leader metadata.
*/
CompletableFuture<Long> readIndex();
CompletableFuture<LeaderMetadata> readLeaderMetadata();

/**
* Returns a cluster service.
Expand Down
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.raft.server;

import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.function.Supplier;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.service.LeaderMetadata;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
import org.apache.ignite.raft.server.counter.CounterListener;
import org.junit.jupiter.api.Test;

/**
* Reade metadata tests.
*/
public class ItReadLeaderMetadataTest extends JraftAbstractTest {
/**
* Listener factory.
*/
private final Supplier<CounterListener> listenerFactory = CounterListener::new;

/** Raft group id. */
private static final TestReplicationGroupId GROUP_ID = new TestReplicationGroupId("GRP_1");

@Test
public void test() throws Exception {
log.info("Test here.");

for (int i = 0; i < 3; i++) {
startServer(i, raftServer -> {
String localNodeName = raftServer.clusterService().topologyService().localMember().name();

Peer serverPeer = initialMembersConf.peer(localNodeName);

raftServer.startRaftNode(
new RaftNodeId(GROUP_ID, serverPeer), initialMembersConf, listenerFactory.get(), defaults()
);
}, opts -> {});
}

var raftClient1 = startClient(GROUP_ID);
var raftClient2 = startClient(GROUP_ID);

raftClient1.refreshMembers(true).get();
raftClient1.refreshLeader().get();
raftClient2.refreshMembers(true).get();
raftClient2.refreshLeader().get();

assertEquals(raftClient1.leader(), raftClient2.leader());

checkLeaderConsistency(raftClient1, raftClient2);

Peer curLeader = raftClient1.leader();

Peer newLeader = raftClient1.peers().stream().filter(p -> !p.equals(curLeader)).findAny().get();

assertNotNull(newLeader);

log.info("Transfer leadership [cur={}, new={}]", curLeader, newLeader);

raftClient1.transferLeadership(newLeader).get();

assertNotEquals(raftClient1.leader(), raftClient2.leader());

log.info("Leader transferred [client1={}, client2={}]", raftClient1.leader(), raftClient2.leader());

checkLeaderConsistency(raftClient1, raftClient2);

assertEquals(raftClient1.leader(), raftClient2.leader());
}

/**
* Checks that leader refreshed consistency on both clients.
*
* @param raftClient1 Raft client 1.
* @param raftClient2 Raft client 2
* @throws Exception If failed.
*/
private static void checkLeaderConsistency(RaftGroupService raftClient1, RaftGroupService raftClient2) throws Exception {
LeaderWithTerm leaderWithTerm = raftClient1.refreshAndGetLeaderWithTerm().get();
LeaderMetadata leaderMetadata = raftClient2.readLeaderMetadata().get();

long indexOnClient2 = leaderMetadata.getIndex();

assertTrue(leaderWithTerm.term() <= leaderMetadata.getTerm());

if (leaderWithTerm.term() == leaderMetadata.getTerm()) {
assertEquals(leaderWithTerm.leader(), leaderMetadata.getLeader());
}

leaderMetadata = raftClient1.readLeaderMetadata().get();
leaderWithTerm = raftClient2.refreshAndGetLeaderWithTerm().get();

long indexOnClient1 = leaderMetadata.getIndex();

assertTrue(leaderWithTerm.term() <= leaderMetadata.getTerm());

if (leaderWithTerm.term() == leaderMetadata.getTerm()) {
assertEquals(leaderWithTerm.leader(), leaderMetadata.getLeader());
}

assertEquals(indexOnClient2, indexOnClient1);
}
}
Expand Up @@ -56,6 +56,7 @@
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.service.LeaderMetadata;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
Expand All @@ -73,7 +74,7 @@
import org.apache.ignite.raft.jraft.rpc.ActionResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ErrorResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadIndexResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.ReadLeaderMetadataResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.SMErrorResponse;
import org.apache.ignite.raft.jraft.rpc.impl.RaftException;
import org.apache.ignite.raft.jraft.rpc.impl.SMCompactedThrowable;
Expand Down Expand Up @@ -464,16 +465,18 @@ public void shutdown() {
}

@Override
public CompletableFuture<Long> readIndex() {
Function<Peer, ? extends NetworkMessage> requestFactory = p -> factory.readIndexRequest()
public CompletableFuture<LeaderMetadata> readLeaderMetadata() {
Function<Peer, ? extends NetworkMessage> requestFactory = p -> factory.readLeaderMetadataRequest()
.groupId(groupId)
.peerId(p.consistentId())
.build();

Peer leader = leader();

Peer node = leader == null ? randomNode() : leader;
return this.<ReadIndexResponse>sendWithRetry(node, requestFactory)
.thenApply(ReadIndexResponse::index);

return this.<ReadLeaderMetadataResponse>sendWithRetry(node, requestFactory)
.thenApply(resp -> new LeaderMetadata(parsePeer(resp.leaderId()), resp.currentTerm(), resp.index()));
}

@Override
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.apache.ignite.raft.jraft;

import java.util.List;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.raft.jraft.closure.ReadIndexClosure;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.core.NodeMetrics;
Expand All @@ -41,6 +42,14 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
*/
PeerId getLeaderId();

/**
* Returns leader with their term concurrently.
* If the leader is not known, the method returns {@code null}.
*
* @return Leader peer with corresponding term.
*/
IgniteBiTuple<PeerId, Long> getLeaderWithTer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getLeaderWithTerm


/**
* Get current node id.
*/
Expand Down
Expand Up @@ -171,6 +171,12 @@ public static final class RpcRequestsMessageGroup {

/** */
public static final short SM_ERROR_RESPONSE = 3014;

/** */
public static final short READ_LEADER_METADATA_REQUEST = 3015;

/** */
public static final short READ_LEADER_METADATA_RESPONSE = 3016;
}

/**
Expand Down