From 358714b49fd254d7c1cc0c5885bb296776351004 Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Thu, 29 Feb 2024 11:49:16 +0800 Subject: [PATCH 1/5] split_config --- .../consensus/iot/IoTConsensusServerImpl.java | 74 +++++++++++++------ 1 file changed, 51 insertions(+), 23 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 800754ddba6ac..e535b69087d53 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -605,25 +605,24 @@ public void persistConfiguration() { public void persistConfigurationUpdate() throws ConsensusGroupModifyPeerException { try { serializeConfigurationAndFsyncToDisk(CONFIGURATION_TMP_FILE_NAME); - Path tmpConfigurationPath = - Paths.get(new File(storageDir, CONFIGURATION_TMP_FILE_NAME).getAbsolutePath()); - Path configurationPath = - Paths.get(new File(storageDir, CONFIGURATION_FILE_NAME).getAbsolutePath()); - Files.deleteIfExists(configurationPath); - Files.move(tmpConfigurationPath, configurationPath); + for (Peer peer : configuration) { + Path tmpConfigurationPath = + Paths.get( + new File(storageDir, peer.getNodeId() + "_" + CONFIGURATION_TMP_FILE_NAME) + .getAbsolutePath()); + Path configurationPath = + Paths.get( + new File(storageDir, peer.getNodeId() + "_" + CONFIGURATION_FILE_NAME) + .getAbsolutePath()); + Files.deleteIfExists(configurationPath); + Files.move(tmpConfigurationPath, configurationPath); + } } catch (IOException e) { throw new ConsensusGroupModifyPeerException( "Unexpected error occurs when update configuration", e); } } - private void serializeConfigurationTo(DataOutputStream outputStream) throws IOException { - outputStream.writeInt(configuration.size()); - for (Peer peer : configuration) { - peer.serialize(outputStream); - } - } - public void recoverConfiguration() { ByteBuffer buffer; try { @@ -647,8 +646,36 @@ public void recoverConfiguration() { } logger.info("Recover IoTConsensus server Impl, configuration: {}", configuration); } catch (IOException e) { - logger.error("Unexpected error occurs when recovering configuration", e); + Path dirPath = Paths.get(storageDir); + try { + List tmpPeerList = getConfiguration(dirPath, CONFIGURATION_TMP_FILE_NAME); + List peerList = getConfiguration(dirPath, CONFIGURATION_FILE_NAME); + for (Peer peer : peerList) { + if (!tmpPeerList.contains(peer)) { + configuration.add(peer); + } + } + configuration.addAll(tmpPeerList); + } catch (IOException ioe) { + logger.error("Unexpected error occurs when recovering configuration", ioe); + } + } + } + + private List getConfiguration(Path dirPath, String configurationFileName) + throws IOException { + ByteBuffer buffer; + List tmpConfiguration = new ArrayList<>(); + File[] files = + Files.walk(dirPath) + .filter(Files::isRegularFile) + .filter(filePath -> filePath.getFileName().toString().contains(configurationFileName)) + .toArray(File[]::new); + for (File file : files) { + buffer = ByteBuffer.wrap(Files.readAllBytes(file.toPath())); + tmpConfiguration.add(Peer.deserialize(buffer)); } + return tmpConfiguration; } public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest( @@ -841,15 +868,16 @@ public String getConsensusGroupId() { private void serializeConfigurationAndFsyncToDisk(String configurationFileName) throws IOException { - FileOutputStream fileOutputStream = - new FileOutputStream(new File(storageDir, configurationFileName)); - DataOutputStream outputStream = new DataOutputStream(fileOutputStream); - try { - serializeConfigurationTo(outputStream); - } finally { - fileOutputStream.flush(); - fileOutputStream.getFD().sync(); - outputStream.close(); + for (Peer peer : configuration) { + String peerConfigurationFileName = peer.getNodeId() + "_" + configurationFileName; + FileOutputStream fileOutputStream = + new FileOutputStream(new File(storageDir, peerConfigurationFileName)); + try (DataOutputStream outputStream = new DataOutputStream(fileOutputStream)) { + peer.serialize(outputStream); + } finally { + fileOutputStream.flush(); + fileOutputStream.getFD().sync(); + } } } From 8710ebcc1968397c6b17cca3ebf7fc3fe33b7576 Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Thu, 29 Feb 2024 14:59:56 +0800 Subject: [PATCH 2/5] use path --- .../iotdb/consensus/iot/IoTConsensusServerImpl.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index e535b69087d53..8d52bffcf34d5 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -666,13 +666,13 @@ private List getConfiguration(Path dirPath, String configurationFileName) throws IOException { ByteBuffer buffer; List tmpConfiguration = new ArrayList<>(); - File[] files = + Path[] files = Files.walk(dirPath) .filter(Files::isRegularFile) .filter(filePath -> filePath.getFileName().toString().contains(configurationFileName)) - .toArray(File[]::new); - for (File file : files) { - buffer = ByteBuffer.wrap(Files.readAllBytes(file.toPath())); + .toArray(Path[]::new); + for (Path file : files) { + buffer = ByteBuffer.wrap(Files.readAllBytes(file)); tmpConfiguration.add(Peer.deserialize(buffer)); } return tmpConfiguration; From f1ba7cff3ca9791a6de30c08e47c8808970265b2 Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Thu, 29 Feb 2024 15:40:33 +0800 Subject: [PATCH 3/5] refactor recover --- .../consensus/iot/IoTConsensusServerImpl.java | 50 ++++++++++--------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 8d52bffcf34d5..1a96244b35533 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -605,18 +605,7 @@ public void persistConfiguration() { public void persistConfigurationUpdate() throws ConsensusGroupModifyPeerException { try { serializeConfigurationAndFsyncToDisk(CONFIGURATION_TMP_FILE_NAME); - for (Peer peer : configuration) { - Path tmpConfigurationPath = - Paths.get( - new File(storageDir, peer.getNodeId() + "_" + CONFIGURATION_TMP_FILE_NAME) - .getAbsolutePath()); - Path configurationPath = - Paths.get( - new File(storageDir, peer.getNodeId() + "_" + CONFIGURATION_FILE_NAME) - .getAbsolutePath()); - Files.deleteIfExists(configurationPath); - Files.move(tmpConfigurationPath, configurationPath); - } + tmpConfigurationUpdate(configuration); } catch (IOException e) { throw new ConsensusGroupModifyPeerException( "Unexpected error occurs when update configuration", e); @@ -634,9 +623,7 @@ public void recoverConfiguration() { // interrupted // unexpectedly, we need substitute configuration with tmpConfiguration file if (Files.exists(tmpConfigurationPath)) { - if (Files.exists(configurationPath)) { - Files.delete(configurationPath); - } + Files.deleteIfExists(configurationPath); Files.move(tmpConfigurationPath, configurationPath); } buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath)); @@ -644,24 +631,37 @@ public void recoverConfiguration() { for (int i = 0; i < size; i++) { configuration.add(Peer.deserialize(buffer)); } + Files.delete(configurationPath); + persistConfiguration(); logger.info("Recover IoTConsensus server Impl, configuration: {}", configuration); } catch (IOException e) { Path dirPath = Paths.get(storageDir); try { List tmpPeerList = getConfiguration(dirPath, CONFIGURATION_TMP_FILE_NAME); + tmpConfigurationUpdate(tmpPeerList); List peerList = getConfiguration(dirPath, CONFIGURATION_FILE_NAME); - for (Peer peer : peerList) { - if (!tmpPeerList.contains(peer)) { - configuration.add(peer); - } - } - configuration.addAll(tmpPeerList); + configuration.addAll(peerList); } catch (IOException ioe) { logger.error("Unexpected error occurs when recovering configuration", ioe); } } } + private void tmpConfigurationUpdate(List tmpPeerList) throws IOException { + for (Peer peer : tmpPeerList) { + Path tmpConfigurationPath = + Paths.get( + new File(storageDir, peer.getNodeId() + "_" + CONFIGURATION_TMP_FILE_NAME) + .getAbsolutePath()); + Path configurationPath = + Paths.get( + new File(storageDir, peer.getNodeId() + "_" + CONFIGURATION_FILE_NAME) + .getAbsolutePath()); + Files.deleteIfExists(configurationPath); + Files.move(tmpConfigurationPath, configurationPath); + } + } + private List getConfiguration(Path dirPath, String configurationFileName) throws IOException { ByteBuffer buffer; @@ -875,8 +875,12 @@ private void serializeConfigurationAndFsyncToDisk(String configurationFileName) try (DataOutputStream outputStream = new DataOutputStream(fileOutputStream)) { peer.serialize(outputStream); } finally { - fileOutputStream.flush(); - fileOutputStream.getFD().sync(); + try { + fileOutputStream.flush(); + fileOutputStream.getFD().sync(); + } catch (IOException e) { + logger.error("Failed to fsync the configuration file {}", peerConfigurationFileName, e); + } } } } From 6ce441466ab662b079e87e32abd5aae5a5b4f880 Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Mon, 4 Mar 2024 21:12:18 +0800 Subject: [PATCH 4/5] refactor recover --- .../consensus/iot/IoTConsensusServerImpl.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 1a96244b35533..d5361f882284c 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -626,24 +626,26 @@ public void recoverConfiguration() { Files.deleteIfExists(configurationPath); Files.move(tmpConfigurationPath, configurationPath); } - buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath)); - int size = buffer.getInt(); - for (int i = 0; i < size; i++) { - configuration.add(Peer.deserialize(buffer)); - } - Files.delete(configurationPath); - persistConfiguration(); - logger.info("Recover IoTConsensus server Impl, configuration: {}", configuration); - } catch (IOException e) { - Path dirPath = Paths.get(storageDir); - try { + if (Files.exists(configurationPath)) { + // recover from old configuration file + buffer = ByteBuffer.wrap(Files.readAllBytes(configurationPath)); + int size = buffer.getInt(); + for (int i = 0; i < size; i++) { + configuration.add(Peer.deserialize(buffer)); + } + Files.delete(configurationPath); + persistConfiguration(); + } else { + // recover from split configuration file + Path dirPath = Paths.get(storageDir); List tmpPeerList = getConfiguration(dirPath, CONFIGURATION_TMP_FILE_NAME); tmpConfigurationUpdate(tmpPeerList); List peerList = getConfiguration(dirPath, CONFIGURATION_FILE_NAME); configuration.addAll(peerList); - } catch (IOException ioe) { - logger.error("Unexpected error occurs when recovering configuration", ioe); } + logger.info("Recover IoTConsensus server Impl, configuration: {}", configuration); + } catch (IOException e) { + logger.error("Unexpected error occurs when recovering configuration", e); } } From 42e98a2773b95e03d88f096064238f59f31eb189 Mon Sep 17 00:00:00 2001 From: BUAAserein <18376359@buaa.edu.cn> Date: Mon, 4 Mar 2024 22:32:49 +0800 Subject: [PATCH 5/5] add resetPeerList --- .../apache/iotdb/consensus/IConsensus.java | 13 +++++++ .../iotdb/consensus/iot/IoTConsensus.java | 27 ++++++++++++++ .../consensus/iot/IoTConsensusServerImpl.java | 5 +++ .../iotdb/consensus/ratis/RatisConsensus.java | 37 +++++++++++++++++++ .../consensus/simple/SimpleConsensus.java | 6 +++ 5 files changed, 88 insertions(+) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java index 4ee908a7af854..dc3942ca07149 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java @@ -143,6 +143,19 @@ public interface IConsensus { */ void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusException; + /** + * Reset the peer list of the corresponding consensus group. Currently only used in the automatic + * cleanup of region migration as a rollback for {@link #addRemotePeer(ConsensusGroupId, Peer)}, + * so it will only be less but not more. + * + * @param groupId the consensus group + * @param peers the new peer list + * @return reset result + * @throws ConsensusException when resetPeerList doesn't success with other reasons + * @throws ConsensusGroupNotExistException when the specified consensus group doesn't exist + */ + TSStatus resetPeerList(ConsensusGroupId groupId, List peers) throws ConsensusException; + // management API /** diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index babe939393fea..b0b4b1f9bcacf 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -392,6 +392,33 @@ public List getAllConsensusGroupIds() { return new ArrayList<>(stateMachineMap.keySet()); } + public TSStatus resetPeerList(ConsensusGroupId groupId, List peers) + throws ConsensusException { + IoTConsensusServerImpl impl = + Optional.ofNullable(stateMachineMap.get(groupId)) + .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); + if (impl.isReadOnly()) { + return StatusUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY); + } else if (!impl.isActive()) { + return RpcUtils.getStatus( + TSStatusCode.WRITE_PROCESS_REJECT, + "peer is inactive and not ready to receive reset configuration request."); + } else { + for (Peer peer : impl.getConfiguration()) { + if (!peers.contains(peer)) { + try { + removeRemotePeer(groupId, peer); + } catch (ConsensusException e) { + logger.error("Failed to remove peer {} from group {}", peer, groupId, e); + return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); + } + } + } + impl.resetConfiguration(peers); + return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } + } + public IoTConsensusServerImpl getImpl(ConsensusGroupId groupId) { return stateMachineMap.get(groupId); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index d5361f882284c..6fd4bebae16f6 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -680,6 +680,11 @@ private List getConfiguration(Path dirPath, String configurationFileName) return tmpConfiguration; } + public void resetConfiguration(List newConfiguration) { + configuration.clear(); + configuration.addAll(newConfiguration); + } + public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest( IConsensusRequest request) { if (request instanceof ComparableConsensusRequest) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index c627692b72d6c..86b77e16c8ec8 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -51,6 +51,7 @@ import org.apache.iotdb.consensus.ratis.utils.Retriable; import org.apache.iotdb.consensus.ratis.utils.RetryPolicy; import org.apache.iotdb.consensus.ratis.utils.Utils; +import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.pool2.KeyedObjectPool; @@ -537,6 +538,42 @@ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws Consens sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig)); } + @Override + public TSStatus resetPeerList(ConsensusGroupId groupId, List peers) + throws ConsensusException { + RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); + RaftGroup group = getGroupInfo(raftGroupId); + + // pre-conditions: group exists and myself in this group + if (group == null || !group.getPeers().contains(myself)) { + throw new ConsensusGroupNotExistException(groupId); + } + + TSStatus writeResult = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + for (Peer peer : peers) { + RaftPeer peerToRemove = Utils.fromPeerAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY); + // pre-condition: peer is a member of groupId + if (!group.getPeers().contains(peerToRemove)) { + throw new PeerAlreadyInConsensusGroupException(groupId, peer); + } + // update group peer information + List newConfig = + group.getPeers().stream() + .filter(raftPeer -> !raftPeer.equals(peerToRemove)) + .collect(Collectors.toList()); + RaftClientReply reply = sendReconfiguration(RaftGroup.valueOf(raftGroupId, newConfig)); + if (!reply.isSuccess()) { + throw new RatisRequestFailedException(reply.getException()); + } + try { + writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer()); + } catch (Exception e) { + throw new RatisRequestFailedException(e); + } + } + return writeResult; + } + /** NOTICE: transferLeader *does not guarantee* the leader be transferred to newLeader. */ @Override public void transferLeader(ConsensusGroupId groupId, Peer newLeader) throws ConsensusException { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java index 59f6cccd6f84a..d9c0aca85de50 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java @@ -241,6 +241,12 @@ public List getAllConsensusGroupIds() { return new ArrayList<>(stateMachineMap.keySet()); } + @Override + public TSStatus resetPeerList(ConsensusGroupId groupId, List peers) + throws ConsensusException { + throw new ConsensusException("SimpleConsensus does not support reset peer list"); + } + private String buildPeerDir(ConsensusGroupId groupId) { return storageDir + File.separator + groupId.getType().getValue() + "_" + groupId.getId(); }