Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@
import org.apache.ratis.util.Timestamp;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -79,12 +80,12 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.apache.ratis.server.RaftServer.Division.LOG;
import static org.apache.ratis.server.RaftServerConfigKeys.Write.FOLLOWER_GAP_RATIO_MAX_KEY;
Expand Down Expand Up @@ -193,23 +194,16 @@ public String toString() {
* Since each mutation induces a copy of the list, only bulk operations
* (addAll and removeAll) are supported.
*/
static class SenderList {
static class SenderList implements Iterable<LogAppender> {
private final List<LogAppender> senders;

SenderList() {
this.senders = new CopyOnWriteArrayList<>();
}

Stream<LogAppender> stream() {
return senders.stream();
}

List<LogAppender> getSenders() {
return senders;
}

void forEach(Consumer<LogAppender> action) {
senders.forEach(action);
@Override
public Iterator<LogAppender> iterator() {
return senders.iterator();
}

void addAll(Collection<LogAppender> newSenders) {
Expand Down Expand Up @@ -555,7 +549,7 @@ private void updateConfiguration(long logIndex, RaftConfigurationImpl newConf) {
}

void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto> protos) {
for (LogAppender sender : senders.getSenders()) {
for (LogAppender sender : senders) {
FollowerInfo info = sender.getFollower();
protos.add(cache.update(info.getPeer(), info.getCommitIndex()));
}
Expand All @@ -574,7 +568,7 @@ public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follo
/**
* Update sender list for setConfiguration request
*/
void addAndStartSenders(Collection<RaftPeer> newPeers) {
private void addAndStartSenders(Collection<RaftPeer> newPeers) {
if (!newPeers.isEmpty()) {
addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start);
}
Expand All @@ -584,7 +578,7 @@ private RaftPeer getPeer(RaftPeerId id) {
return server.getRaftConf().getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER);
}

Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
final List<LogAppender> newAppenders = newPeers.stream().map(peer -> {
final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, this::getPeer, t, nextIndex, attendVote);
Expand All @@ -597,8 +591,11 @@ Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex
return newAppenders;
}

void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList());
private void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
stopAndRemoveSenders(getLogAppenders().filter(predicate).collect(Collectors.toList()));
}

private void stopAndRemoveSenders(Collection<LogAppender> toStop) {
toStop.forEach(LogAppender::stop);
senders.removeAll(toStop);
}
Expand All @@ -620,8 +617,7 @@ public void restart(LogAppender sender) {

final FollowerInfo info = sender.getFollower();
LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), info.getName());
sender.stop();
senders.removeAll(Collections.singleton(sender));
stopAndRemoveSenders(Collections.singleton(sender));

Optional.ofNullable(getPeer(info.getId()))
.ifPresent(peer -> addAndStartSenders(Collections.singleton(peer)));
Expand Down Expand Up @@ -775,14 +771,6 @@ private BootStrapProgress checkProgress(FollowerInfo follower, long committed) {
}
}

private Collection<BootStrapProgress> checkAllProgress(long committed) {
Preconditions.assertTrue(inStagingState());
return senders.stream()
.filter(sender -> !isAttendingVote(sender.getFollower()))
.map(sender -> checkProgress(sender.getFollower(), committed))
.collect(Collectors.toCollection(ArrayList::new));
}

@Override
public void onFollowerSuccessAppendEntries(FollowerInfo follower) {
if (isAttendingVote(follower)) {
Expand All @@ -802,15 +790,19 @@ private void checkStaging() {
// it is possible that the bootstrapping is done. Then, fallback to UPDATE_COMMIT
updateCommitEvent.execute();
} else {
final long committedIndex = server.getState().getLog()
.getLastCommittedIndex();
Collection<BootStrapProgress> reports = checkAllProgress(committedIndex);
final long commitIndex = server.getState().getLog().getLastCommittedIndex();
// check progress for the new followers
final EnumSet<BootStrapProgress> reports = getLogAppenders()
.map(LogAppender::getFollower)
.filter(follower -> !isAttendingVote(follower))
.map(follower -> checkProgress(follower, commitIndex))
.collect(Collectors.toCollection(() -> EnumSet.noneOf(BootStrapProgress.class)));
if (reports.contains(BootStrapProgress.NOPROGRESS)) {
stagingState.fail(BootStrapProgress.NOPROGRESS);
} else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
// all caught up!
applyOldNewConf();
senders.stream()
getLogAppenders()
.map(LogAppender::getFollower)
.filter(f -> server.getRaftConf().containsInConf(f.getId()))
.map(FollowerInfoImpl.class::cast)
Expand Down Expand Up @@ -1053,7 +1045,7 @@ private void checkPeersForYieldingLeader() {

FollowerInfo highestPriorityInfo = null;
int highestPriority = Integer.MIN_VALUE;
for (LogAppender logAppender : senders.getSenders()) {
for (LogAppender logAppender : senders) {
final RaftPeer follower = conf.getPeer(logAppender.getFollowerId());
if (follower == null) {
continue;
Expand Down Expand Up @@ -1088,7 +1080,7 @@ public boolean checkLeadership() {
return true;
}

final List<RaftPeerId> activePeers = senders.stream()
final List<RaftPeerId> activePeers = getLogAppenders()
.filter(sender -> sender.getFollower()
.getLastRpcResponseTime()
.elapsedTimeMs() <= server.getMaxTimeoutMs())
Expand All @@ -1106,7 +1098,7 @@ public boolean checkLeadership() {
+ ". Election timeout: " + server.getMaxTimeoutMs() + "ms"
+ ". In charge for: " + server.getRole().getRoleElapsedTimeMs() + "ms"
+ ". Conf: " + conf);
senders.stream().map(LogAppender::getFollower).forEach(f -> LOG.warn("Follower {}", f));
getLogAppenders().map(LogAppender::getFollower).forEach(f -> LOG.warn("Follower {}", f));

// step down as follower
stepDown(currentTerm, StepDownReason.LOST_MAJORITY_HEARTBEATS);
Expand Down Expand Up @@ -1173,7 +1165,7 @@ TransactionContext getTransactionContext(long index) {
}

long[] getFollowerNextIndices() {
return senders.stream().mapToLong(s -> s.getFollower().getNextIndex()).toArray();
return getLogAppenders().mapToLong(s -> s.getFollower().getNextIndex()).toArray();
}

static Map<RaftPeerId, RaftPeer> newMap(Collection<RaftPeer> peers, String str) {
Expand Down Expand Up @@ -1237,15 +1229,14 @@ public String toString() {
/**
* @return the RaftPeer (address and id) information of the followers.
*/
List<RaftPeer> getFollowers() {
return Collections.unmodifiableList(senders.stream()
Stream<RaftPeer> getFollowers() {
return getLogAppenders()
.map(sender -> sender.getFollower().getPeer())
.filter(peer -> server.getRaftConf().containsInConf(peer.getId()))
.collect(Collectors.toList()));
.filter(peer -> server.getRaftConf().containsInConf(peer.getId()));
}

Stream<LogAppender> getLogAppenders() {
return senders.stream();
return StreamSupport.stream(senders.spliterator(), false);
}

private static boolean isAttendingVote(FollowerInfo follower) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1835,8 +1835,8 @@ public String getRole() {

@Override
public List<String> getFollowers() {
return role.getLeaderState().map(LeaderStateImpl::getFollowers).orElse(Collections.emptyList())
.stream().map(RaftPeer::toString).collect(Collectors.toList());
return role.getLeaderState().map(LeaderStateImpl::getFollowers).orElseGet(Stream::empty)
.map(RaftPeer::toString).collect(Collectors.toList());
}

@Override
Expand Down