Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
dc23ce2
IGNITE-17465 (fix) NodeImpl#describe don't print snapshotExecutor whe…
alievmirza Sep 1, 2022
9bc3f1b
IGNITE-17465 (fix) Adds new method stateChanged(peerId, newState) for…
alievmirza Sep 1, 2022
40874e6
IGNITE-17465 LogEntry's data should not be null
alievmirza Sep 1, 2022
305aae3
IGNITE-17465 fail-fast on connection fail
alievmirza Feb 15, 2023
9836435
IGNITE-17465 Node interface add api, get node state.
alievmirza Feb 15, 2023
2fffdd1
IGNITE-17465 Make checksumPeers more general
alievmirza Feb 15, 2023
9cb8fbc
IGNITE-17465 add putIfAbsent method for InvokeContext
alievmirza Feb 15, 2023
a4f71e2
IGNITE-17465 add recycling logic for com.alipay.sofa.jraft.storage.sn…
alievmirza Feb 15, 2023
a30a471
IGNITE-17465 Move the unmap function of the SegmentFile to Utils
alievmirza Feb 15, 2023
913516c
IGNITE-17465 for compatible. make the checksum logic is same with ori…
alievmirza Feb 15, 2023
ca1ced0
IGNITE-17465 Fix memory leak.
alievmirza Feb 15, 2023
c76a1d0
IGNITE-17465 Memory optimize at log manager
alievmirza Feb 15, 2023
2a3f78d
IGNITE-17465 feature: add learner to follower realize
alievmirza Feb 15, 2023
09b61d3
IGNITE-17465 Fix appendEntriesCounter to count the number of addition…
alievmirza Feb 15, 2023
b2446ca
IGNITE-17465 The replicator metrics data add probeCounter, ut need su…
alievmirza Feb 15, 2023
e6c09a4
IGNITE-17465 fix replicate-inflights-count metric didn't remove when …
alievmirza Feb 15, 2023
96ccdd0
IGNITE-17465 Fix threadId npe problem at com/alipay/sofa/jraft/core/R…
alievmirza Feb 15, 2023
37a98dc
IGNITE-17465 (fix) replicator test failure
alievmirza Feb 15, 2023
3fb4e2f
IGNITE-17465 transfer a method sendProbeRequest.
alievmirza Feb 15, 2023
f5ed544
IGNITE-17465 optimize resetPendingStatusError's call back.
alievmirza Feb 15, 2023
16567ae
IGNITE-17465 rocksdb support config max wal log size, default value i…
alievmirza Feb 15, 2023
2804d26
IGNITE-17465 refactor CliServiceImpl 's duplicated code
alievmirza Feb 15, 2023
6a72162
IGNITE-17465 io error logging
alievmirza Feb 15, 2023
c1ad78d
IGNITE-17465 (fix) CountDownLatch is best placed into finally stateme…
alievmirza Feb 15, 2023
6227666
IGNITE-17465 Rre check connection before send install snapshot request
alievmirza Feb 15, 2023
d9431a1
IGNITE-17465 (fix) Call NodeImpl#join asynchronously after shutting d…
alievmirza Feb 15, 2023
f2d3854
Revert "IGNITE-17465 (fix) Call NodeImpl#join asynchronously after sh…
alievmirza Feb 15, 2023
bf45ea0
IGNITE-17465 review fixes
alievmirza Feb 20, 2023
0f07c35
IGNITE-17465 fix commit index in ItRaftCommandLeftInLogUntilRestartTest
alievmirza Feb 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.ignite.raft.jraft.core.TestCluster.ELECTION_TIMEOUT_MILLIS;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -271,6 +272,23 @@ public void testLearnerServices(TestInfo testInfo) throws Exception {
sleep(1000);
assertEquals(Collections.singletonList(learner3.getPeerId()), cliService.getLearners(groupId, conf));
assertTrue(cliService.getAliveLearners(groupId, conf).isEmpty());

TestPeer learner4 = new TestPeer(testInfo, TestUtils.INIT_PORT + LEARNER_PORT_STEP + 4);
assertTrue(cluster.startLearner(learner4));

cliService.addLearners(groupId, conf, Collections.singletonList(learner4.getPeerId()));
sleep(1000);
assertEquals(1, cliService.getAliveLearners(groupId, conf).size());
assertTrue(cliService.learner2Follower(groupId, conf, learner4.getPeerId()).isOk());

sleep(1000);
List<PeerId> currentLearners = cliService.getAliveLearners(groupId, conf);
assertFalse(currentLearners.contains(learner4.getPeerId()));

List<PeerId> currentFollowers = cliService.getPeers(groupId, conf);
assertTrue(currentFollowers.contains(learner4.getPeerId()));

cluster.stop(learner4.getPeerId());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,11 @@ public void onCreated(PeerId peer) {
LOG.info("Replicator has been created {} {}", peer, val);
}

@Override
public void stateChanged(final PeerId peer, final ReplicatorState newState) {
LOG.info("Replicator {} state is changed into {}.", peer, newState);
}

/** {@inheritDoc} */
@Override
public void onError(PeerId peer, Status status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ public interface CliService extends Lifecycle<CliOptions> {
*/
Status removeLearners(final String groupId, final Configuration conf, final List<PeerId> learners);

/**
* Converts the specified learner to follower of |conf|.
* return OK status when success.
*
* @param groupId the raft group id
* @param conf current configuration
* @param learner learner peer
* @return operation status
*/
Status learner2Follower(final String groupId, final Configuration conf, final PeerId learner);

/**
* Update learners set in the replicating group which consists of |conf|. return OK status when success.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.core.NodeMetrics;
import org.apache.ignite.raft.jraft.core.Replicator;
import org.apache.ignite.raft.jraft.core.State;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.Task;
Expand Down Expand Up @@ -77,13 +78,6 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
*/
boolean isLeader(final boolean blocking);

/**
* Shutdown local replica node.
*
* @param done callback
*/
void shutdown(final Closure done);

/**
* Block the thread until the node is successfully stopped.
*
Expand Down Expand Up @@ -310,6 +304,13 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
*/
int getNodeTargetPriority();

/**
* Get the node's state.
*
* @return node's state.
*/
State getNodeState();

/**
* Get the node's current term.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ public JoinableClosure(Closure closure) {

@Override
public void run(final Status status) {
this.closure.run(status);
latch.countDown();
try {
this.closure.run(status);
} finally {
latch.countDown();
}
}

public void join() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -93,22 +94,48 @@ public synchronized void shutdown() {
this.cliClientService = null;
}

private void recordConfigurationChange(final String groupId, final Collection<String> oldPeersList,
final Collection<String> newPeersList) {
final Configuration oldConf = new Configuration();
for (final String peerIdStr : oldPeersList) {
final PeerId oldPeer = new PeerId();
oldPeer.parse(peerIdStr);
oldConf.addPeer(oldPeer);
}
final Configuration newConf = new Configuration();
for (final String peerIdStr : newPeersList) {
final PeerId newPeer = new PeerId();
newPeer.parse(peerIdStr);
newConf.addPeer(newPeer);
}
LOG.info("Configuration of replication group {} changed from {} to {}.", groupId, oldConf, newConf);
}

private Status checkLeaderAndConnect(final String groupId, final Configuration conf, final PeerId leaderId) {
final Status st = getLeader(groupId, conf, leaderId);
if (!st.isOk()) {
return st;
}

if (!this.cliClientService.connect(leaderId)) {
return new Status(-1, "Fail to init channel to leader %s", leaderId);
}

return Status.OK();
}

@Override
public Status addPeer(final String groupId, final Configuration conf, final PeerId peer) {
Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id");
Requires.requireNonNull(conf, "Null configuration");
Requires.requireNonNull(peer, "Null peer");

final PeerId leaderId = new PeerId();
final Status st = getLeader(groupId, conf, leaderId);
final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
if (!st.isOk()) {
return st;
}

if (!this.cliClientService.connect(leaderId)) {
return new Status(-1, "Fail to init channel to leader %s", leaderId);
}

AddPeerRequest req = cliOptions.getRaftMessagesFactory()
.addPeerRequest()
.groupId(groupId)
Expand All @@ -120,20 +147,7 @@ public Status addPeer(final String groupId, final Configuration conf, final Peer
final Message result = this.cliClientService.addPeer(leaderId, req, null).get();
if (result instanceof AddPeerResponse) {
final AddPeerResponse resp = (AddPeerResponse) result;
final Configuration oldConf = new Configuration();
for (final String peerIdStr : resp.oldPeersList()) {
final PeerId oldPeer = new PeerId();
oldPeer.parse(peerIdStr);
oldConf.addPeer(oldPeer);
}
final Configuration newConf = new Configuration();
for (final String peerIdStr : resp.newPeersList()) {
final PeerId newPeer = new PeerId();
newPeer.parse(peerIdStr);
newConf.addPeer(newPeer);
}

LOG.info("Configuration of replication group {} changed from {} to {}.", groupId, oldConf, newConf);
recordConfigurationChange(groupId, resp.oldPeersList(), resp.newPeersList());
return Status.OK();
}
else {
Expand All @@ -159,15 +173,11 @@ public Status removePeer(final String groupId, final Configuration conf, final P
Requires.requireTrue(!peer.isEmpty(), "Removing peer is blank");

final PeerId leaderId = new PeerId();
final Status st = getLeader(groupId, conf, leaderId);
final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
if (!st.isOk()) {
return st;
}

if (!this.cliClientService.connect(leaderId)) {
return new Status(-1, "Fail to init channel to leader %s", leaderId);
}

RemovePeerRequest req = cliOptions.getRaftMessagesFactory()
.removePeerRequest()
.groupId(groupId)
Expand All @@ -179,20 +189,7 @@ public Status removePeer(final String groupId, final Configuration conf, final P
final Message result = this.cliClientService.removePeer(leaderId, req, null).get();
if (result instanceof RemovePeerResponse) {
final RemovePeerResponse resp = (RemovePeerResponse) result;
final Configuration oldConf = new Configuration();
for (final String peerIdStr : resp.oldPeersList()) {
final PeerId oldPeer = new PeerId();
oldPeer.parse(peerIdStr);
oldConf.addPeer(oldPeer);
}
final Configuration newConf = new Configuration();
for (final String peerIdStr : resp.newPeersList()) {
final PeerId newPeer = new PeerId();
newPeer.parse(peerIdStr);
newConf.addPeer(newPeer);
}

LOG.info("Configuration of replication group {} changed from {} to {}", groupId, oldConf, newConf);
recordConfigurationChange(groupId, resp.oldPeersList(), resp.newPeersList());
return Status.OK();
}
else {
Expand All @@ -213,15 +210,11 @@ public Status changePeers(final String groupId, final Configuration conf, final
Requires.requireNonNull(newPeers, "Null new peers");

final PeerId leaderId = new PeerId();
final Status st = getLeader(groupId, conf, leaderId);
final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
if (!st.isOk()) {
return st;
}

if (!this.cliClientService.connect(leaderId)) {
return new Status(-1, "Fail to init channel to leader %s", leaderId);
}

ChangePeersRequest req = cliOptions.getRaftMessagesFactory()
.changePeersRequest()
.groupId(groupId)
Expand All @@ -233,20 +226,7 @@ public Status changePeers(final String groupId, final Configuration conf, final
final Message result = this.cliClientService.changePeers(leaderId, req, null).get();
if (result instanceof ChangePeersResponse) {
final ChangePeersResponse resp = (ChangePeersResponse) result;
final Configuration oldConf = new Configuration();
for (final String peerIdStr : resp.oldPeersList()) {
final PeerId oldPeer = new PeerId();
oldPeer.parse(peerIdStr);
oldConf.addPeer(oldPeer);
}
final Configuration newConf = new Configuration();
for (final String peerIdStr : resp.newPeersList()) {
final PeerId newPeer = new PeerId();
newPeer.parse(peerIdStr);
newConf.addPeer(newPeer);
}

LOG.info("Configuration of replication group {} changed from {} to {}", groupId, oldConf, newConf);
recordConfigurationChange(groupId, resp.oldPeersList(), resp.newPeersList());
return Status.OK();
}
else {
Expand Down Expand Up @@ -386,6 +366,15 @@ public Status removeLearners(final String groupId, final Configuration conf, fin
}
}

@Override
public Status learner2Follower(final String groupId, final Configuration conf, final PeerId learner) {
Status status = removeLearners(groupId, conf, Arrays.asList(learner));
if (status.isOk()) {
status = addPeer(groupId, conf, new PeerId(learner.getConsistentId()));
}
return status;
}

@Override
public Status resetLearners(final String groupId, final Configuration conf, final List<PeerId> learners) {
checkLearnersOpParams(groupId, conf, learners);
Expand Down Expand Up @@ -424,15 +413,11 @@ public Status transferLeader(final String groupId, final Configuration conf, fin
Requires.requireNonNull(peer, "Null peer");

final PeerId leaderId = new PeerId();
final Status st = getLeader(groupId, conf, leaderId);
final Status st = checkLeaderAndConnect(groupId, conf, leaderId);
if (!st.isOk()) {
return st;
}

if (!this.cliClientService.connect(leaderId)) {
return new Status(-1, "Fail to init channel to leader %s", leaderId);
}

TransferLeaderRequest rb = cliOptions.getRaftMessagesFactory()
.transferLeaderRequest()
.groupId(groupId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final bo
if (task.committedIndex > maxCommittedIndex) {
maxCommittedIndex = task.committedIndex;
}
task.reset();
}
else {
if (maxCommittedIndex >= 0) {
Expand Down Expand Up @@ -438,6 +439,7 @@ private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final bo
}
finally {
this.nodeMetrics.recordLatency(task.type.metricName(), Utils.monotonicMs() - startMs);
task.reset();
}
}
try {
Expand Down
Loading