Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,24 +303,24 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE
logger.info("[IoTConsensus] inactivate new peer: {}", peer);
impl.inactivePeer(peer, false);

// step 2: take snapshot
// step 2: notify all the other Peers to build the sync connection to newPeer
logger.info("[IoTConsensus] notify current peers to build sync log...");
impl.notifyPeersToBuildSyncLogChannel(peer);

// step 3: take snapshot
logger.info("[IoTConsensus] start to take snapshot...");
impl.checkAndLockSafeDeletedSearchIndex();

impl.takeSnapshot();

// step 3: transit snapshot
// step 4: transit snapshot
logger.info("[IoTConsensus] start to transmit snapshot...");
impl.transmitSnapshot(peer);

// step 4: let the new peer load snapshot
// step 5: let the new peer load snapshot
logger.info("[IoTConsensus] trigger new peer to load snapshot...");
impl.triggerSnapshotLoad(peer);
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);

// step 5: notify all the other Peers to build the sync connection to newPeer
logger.info("[IoTConsensus] notify current peers to build sync log...");
impl.notifyPeersToBuildSyncLogChannel(peer);

// step 6: active new Peer
logger.info("[IoTConsensus] activate new peer...");
impl.activePeer(peer);
Expand All @@ -340,7 +340,6 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE
impl.notifyPeersToRemoveSyncLogChannel(peer);
throw new ConsensusException(e);
} finally {
impl.checkAndUnlockSafeDeletedSearchIndex();
logger.info("[IoTConsensus] clean up local snapshot...");
impl.cleanupLocalSnapshot();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ public class IoTConsensusServerImpl {
private final ScheduledExecutorService backgroundTaskService;
private final IoTConsensusRateLimiter ioTConsensusRateLimiter =
IoTConsensusRateLimiter.getInstance();
private volatile long lastPinnedSearchIndexForMigration = -1;
private volatile long lastPinnedSafeDeletedIndexForMigration = -1;

public IoTConsensusServerImpl(
String storageDir,
Expand Down Expand Up @@ -516,7 +514,7 @@ public void notifyPeersToBuildSyncLogChannel(Peer targetPeer)
if (peer.equals(thisNode)) {
// use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
// snapshot produced by thisNode
buildSyncLogChannel(targetPeer, lastPinnedSearchIndexForMigration);
buildSyncLogChannel(targetPeer);
} else {
// use RPC to tell other peers to build sync log channel to target peer
try (SyncIoTConsensusServiceClient client =
Expand Down Expand Up @@ -822,9 +820,7 @@ public long getMinSyncIndex() {
}

public long getMinFlushedSyncIndex() {
return lastPinnedSafeDeletedIndexForMigration == -1
? logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get)
: lastPinnedSafeDeletedIndexForMigration;
return logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get);
}

public String getStorageDir() {
Expand Down Expand Up @@ -947,25 +943,6 @@ public void cleanupLocalSnapshot() {
}
}

/**
* We should set safelyDeletedSearchIndex to searchIndex before addPeer to avoid potential data
* lost.
*/
public void checkAndLockSafeDeletedSearchIndex() {
lastPinnedSearchIndexForMigration = searchIndex.get();
lastPinnedSafeDeletedIndexForMigration = getMinFlushedSyncIndex();
consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
}

/**
* We should unlock safelyDeletedSearchIndex after addPeer to avoid potential data accumulation.
*/
public void checkAndUnlockSafeDeletedSearchIndex() {
lastPinnedSearchIndexForMigration = -1;
lastPinnedSafeDeletedIndexForMigration = -1;
checkAndUpdateSafeDeletedSearchIndex();
}

/**
* If there is only one replica, set it to Long.MAX_VALUE.、 If there are multiple replicas, get
* the latest SafelyDeletedSearchIndex again. This enables wal to be deleted in a timely manner.
Expand Down