Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,19 @@ public interface IConsensus {
*/
void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException;

/**
* Reset the peer list of the corresponding consensus group. Currently only used in the automatic
* cleanup of region migration as a rollback for {@link #addRemotePeer(ConsensusGroupId, Peer)},
* so it will only be less but not more.
*
* @param groupId the consensus group
* @param peers the new peer list
* @return reset result
* @throws ConsensusException when resetPeerList doesn't success with other reasons
* @throws ConsensusGroupNotExistException when the specified consensus group doesn't exist
*/
TSStatus resetPeerList(ConsensusGroupId groupId, List<Peer> peers) throws ConsensusException;

// management API

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,33 @@ public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
}

public TSStatus resetPeerList(ConsensusGroupId groupId, List<Peer> peers)
throws ConsensusException {
IoTConsensusServerImpl impl =
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
if (impl.isReadOnly()) {
return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY);
} else if (!impl.isActive()) {
return RpcUtils.getStatus(
TSStatusCode.WRITE_PROCESS_REJECT,
"peer is inactive and not ready to receive reset configuration request.");
} else {
for (Peer peer : impl.getConfiguration()) {
if (!peers.contains(peer)) {
try {
removeRemotePeer(groupId, peer);
} catch (ConsensusException e) {
logger.error("Failed to remove peer {} from group {}", peer, groupId, e);
return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
}
impl.resetConfiguration(peers);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
}

public IoTConsensusServerImpl getImpl(ConsensusGroupId groupId) {
return stateMachineMap.get(groupId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,25 +605,13 @@ public void persistConfiguration() {
public void persistConfigurationUpdate() throws ConsensusGroupModifyPeerException {
try {
serializeConfigurationAndFsyncToDisk(CONFIGURATION_TMP_FILE_NAME);
Path tmpConfigurationPath =
Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
Path configurationPath =
Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath());
Files.deleteIfExists(configurationPath);
Files.move(tmpConfigurationPath, configurationPath);
tmpConfigurationUpdate(configuration);
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException(
"Unexpected error occurs when update configuration", e);
}
}

private void serializeConfigurationTo(DataOutputStream outputStream) throws IOException {
outputStream.writeInt(configuration.size());
for (Peer peer : configuration) {
peer.serialize(outputStream);
}
}

public void recoverConfiguration() {
ByteBuffer buffer;
try {
Expand All @@ -635,22 +623,68 @@ public void recoverConfiguration() {
// interrupted
// unexpectedly, we need substitute configuration with tmpConfiguration file
if (Files.exists(tmpConfigurationPath)) {
if (Files.exists(configurationPath)) {
Files.delete(configurationPath);
}
Files.deleteIfExists(configurationPath);
Files.move(tmpConfigurationPath, configurationPath);
}
buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
int size = buffer.getInt();
for (int i = 0; i < size; i++) {
configuration.add(Peer.deserialize(buffer));
if (Files.exists(configurationPath)) {
// recover from old configuration file
buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath));
int size = buffer.getInt();
for (int i = 0; i < size; i++) {
configuration.add(Peer.deserialize(buffer));
}
Files.delete(configurationPath);
persistConfiguration();
} else {
// recover from split configuration file
Path dirPath = Paths.get(storageDir);
List<Peer> tmpPeerList = getConfiguration(dirPath, CONFIGURATION_TMP_FILE_NAME);
tmpConfigurationUpdate(tmpPeerList);
List<Peer> peerList = getConfiguration(dirPath, CONFIGURATION_FILE_NAME);
configuration.addAll(peerList);
}
logger.info("Recover IoTConsensus server Impl, configuration: {}", configuration);
} catch (IOException e) {
logger.error("Unexpected error occurs when recovering configuration", e);
}
}

private void tmpConfigurationUpdate(List<Peer> tmpPeerList) throws IOException {
for (Peer peer : tmpPeerList) {
Path tmpConfigurationPath =
Paths.get(
new File(storageDir, peer.getNodeId() + "_" + CONFIGURATION_TMP_FILE_NAME)
.getAbsolutePath());
Path configurationPath =
Paths.get(
new File(storageDir, peer.getNodeId() + "_" + CONFIGURATION_FILE_NAME)
.getAbsolutePath());
Files.deleteIfExists(configurationPath);
Files.move(tmpConfigurationPath, configurationPath);
}
}

private List<Peer> getConfiguration(Path dirPath, String configurationFileName)
throws IOException {
ByteBuffer buffer;
List<Peer> tmpConfiguration = new ArrayList<>();
Path[] files =
Files.walk(dirPath)
.filter(Files::isRegularFile)
.filter(filePath -> filePath.getFileName().toString().contains(configurationFileName))
.toArray(Path[]::new);
for (Path file : files) {
buffer = ByteBuffer.wrap(Files.readAllBytes(file));
tmpConfiguration.add(Peer.deserialize(buffer));
}
return tmpConfiguration;
}

public void resetConfiguration(List<Peer> newConfiguration) {
configuration.clear();
configuration.addAll(newConfiguration);
}

public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
IConsensusRequest request) {
if (request instanceof ComparableConsensusRequest) {
Expand Down Expand Up @@ -841,15 +875,20 @@ public String getConsensusGroupId() {

private void serializeConfigurationAndFsyncToDisk(String configurationFileName)
throws IOException {
FileOutputStream fileOutputStream =
new FileOutputStream(new File(storageDir, configurationFileName));
DataOutputStream outputStream = new DataOutputStream(fileOutputStream);
try {
serializeConfigurationTo(outputStream);
} finally {
fileOutputStream.flush();
fileOutputStream.getFD().sync();
outputStream.close();
for (Peer peer : configuration) {
String peerConfigurationFileName = peer.getNodeId() + "_" + configurationFileName;
FileOutputStream fileOutputStream =
new FileOutputStream(new File(storageDir, peerConfigurationFileName));
try (DataOutputStream outputStream = new DataOutputStream(fileOutputStream)) {
peer.serialize(outputStream);
} finally {
try {
fileOutputStream.flush();
fileOutputStream.getFD().sync();
} catch (IOException e) {
logger.error("Failed to fsync the configuration file {}", peerConfigurationFileName, e);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.iotdb.consensus.ratis.utils.Retriable;
import org.apache.iotdb.consensus.ratis.utils.RetryPolicy;
import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.commons.pool2.KeyedObjectPool;
Expand Down Expand Up @@ -537,6 +538,42 @@ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws Consens
sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
}

@Override
public TSStatus resetPeerList(ConsensusGroupId groupId, List<Peer> peers)
throws ConsensusException {
RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
RaftGroup group = getGroupInfo(raftGroupId);

// pre-conditions: group exists and myself in this group
if (group == null || !group.getPeers().contains(myself)) {
throw new ConsensusGroupNotExistException(groupId);
}

TSStatus writeResult = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
for (Peer peer : peers) {
RaftPeer peerToRemove = Utils.fromPeerAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY);
// pre-condition: peer is a member of groupId
if (!group.getPeers().contains(peerToRemove)) {
throw new PeerAlreadyInConsensusGroupException(groupId, peer);
}
// update group peer information
List<RaftPeer> newConfig =
group.getPeers().stream()
.filter(raftPeer -> !raftPeer.equals(peerToRemove))
.collect(Collectors.toList());
RaftClientReply reply = sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig));
if (!reply.isSuccess()) {
throw new RatisRequestFailedException(reply.getException());
}
try {
writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
} catch (Exception e) {
throw new RatisRequestFailedException(e);
}
}
return writeResult;
}

/** NOTICE: transferLeader *does not guarantee* the leader be transferred to newLeader. */
@Override
public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws ConsensusException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
}

@Override
public TSStatus resetPeerList(ConsensusGroupId groupId, List<Peer> peers)
throws ConsensusException {
throw new ConsensusException("SimpleConsensus does not support reset peer list");
}

private String buildPeerDir(ConsensusGroupId groupId) {
return storageDir + File.separator + groupId.getType().getValue() + "_" + groupId.getId();
}
Expand Down