Skip to content

Commit

Permalink
Fixes for initialization of sync status (#3570)
Browse files Browse the repository at this point in the history
* Status sync initialization fixes

* Codacy fixes

* Coalesce sync status fixes

* Remove use of markSyncStatus in discovery service

* Init status once on active and for each cluster

* Codacy fixes

* Init sync type on exit of initialized

* Init sync status on bootstrap

* Addressed comments

* Flakey test fixes

* Addressed comments 2

* Test cleanup

* Additional tests

* Fix test flakiness

* Added tests for log entry <-> snapshot transitions

* Undo delta stream test fix

---------

Co-authored-by: Shreay Patel <pshreay@vmware.com>
  • Loading branch information
shreayp and pshreay committed Apr 15, 2023
1 parent f46ddea commit 835f53d
Show file tree
Hide file tree
Showing 9 changed files with 488 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,6 @@ public class CorfuReplicationDiscoveryService implements Runnable, CorfuReplicat
*/
private boolean serverStarted = false;

/**
* Indicates the replication status has been set as NOT_STARTED.
* It should be reset if its role changes to Standby.
*/
private boolean statusFlag = false;

/**
* This is the listener to the replication event table shared by the nodes in the cluster.
* When a non-leader node is called to do the enforcedSnapshotSync, it will write the event to
Expand Down Expand Up @@ -343,8 +337,12 @@ private void bootstrapLogReplicationService() {

logReplicationServerHandler = new LogReplicationServer(serverContext, logReplicationConfig,
logReplicationMetadataManager, localCorfuEndpoint, topologyDescriptor.getTopologyConfigId(), localNodeId);
logReplicationServerHandler.setActive(localClusterDescriptor.getRole().equals(ClusterRole.ACTIVE));
logReplicationServerHandler.setStandby(localClusterDescriptor.getRole().equals(ClusterRole.STANDBY));
if (localClusterDescriptor.getRole().equals(ClusterRole.ACTIVE)) {
logReplicationServerHandler.setActive(true);
addDefaultReplicationStatus();
} else if (localClusterDescriptor.getRole().equals(ClusterRole.STANDBY)) {
logReplicationServerHandler.setStandby(true);
}

interClusterReplicationService = new CorfuInterClusterReplicationServerNode(
serverContext,
Expand Down Expand Up @@ -497,7 +495,6 @@ private void onLeadershipAcquire() {
}
replicationManager.setTopology(topologyDescriptor);
replicationManager.start();
updateReplicationStatus();
lockAcquireSample = recordLockAcquire(localClusterDescriptor.getRole());
processCountOnLockAcquire(localClusterDescriptor.getRole());
break;
Expand All @@ -506,7 +503,6 @@ private void onLeadershipAcquire() {
log.info("Start as Sink (receiver)");
interClusterReplicationService.getLogReplicationServer().getSinkManager().reset();
interClusterReplicationService.getLogReplicationServer().setLeadership(true);
statusFlag = false;
lockAcquireSample = recordLockAcquire(localClusterDescriptor.getRole());
processCountOnLockAcquire(localClusterDescriptor.getRole());
break;
Expand Down Expand Up @@ -620,6 +616,10 @@ public void onClusterRoleChange(TopologyDescriptor newTopology) {
// Consider the case of async configuration changes, non-lead nodes could overwrite
// the replication status if it has already completed by the lead node
resetReplicationStatusTableWithRetry();
// In the event of Standby -> Active we should add the default replication values
if (localClusterDescriptor.getRole() == ClusterRole.ACTIVE) {
addDefaultReplicationStatus();
}
}

log.debug("Persist new topologyConfigId {}, cluster id={}, role={}", topologyDescriptor.getTopologyConfigId(),
Expand Down Expand Up @@ -903,10 +903,10 @@ private void recordLockRelease() {
lockAcquireSample.ifPresent(LongTaskTimer.Sample::stop);
}

private void updateReplicationStatus() {
if (!statusFlag) {
replicationManager.updateStatusAsNotStarted();
statusFlag = true;
private void addDefaultReplicationStatus() {
// Add default entry to Replication Status Table
for (ClusterDescriptor clusterDescriptor : topologyDescriptor.getStandbyClusters().values()) {
logReplicationMetadataManager.initializeReplicationStatusTable(clusterDescriptor.clusterId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.infrastructure.LogReplicationRuntimeParameters;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.SyncStatus;
import org.corfudb.infrastructure.logreplication.replication.receive.LogReplicationMetadataManager;
import org.corfudb.infrastructure.logreplication.runtime.CorfuLogReplicationRuntime;
import org.corfudb.infrastructure.logreplication.utils.LogReplicationConfigManager;
Expand Down Expand Up @@ -231,6 +230,8 @@ public void processStandbyChange(TopologyDescriptor newConfig) {
topology.addStandbyCluster(clusterInfo);
startLogReplicationRuntime(clusterInfo);
}
// Initialize default replication status values for the new standby
metadataManager.initializeReplicationStatusTable(clusterId);
}

// The connection id or other transportation plugin's info could've changed for
Expand All @@ -256,16 +257,4 @@ public void enforceSnapshotSync(DiscoveryServiceEvent event) {
standbyRuntime.getSourceManager().startForcedSnapshotSync(event.getEventId());
}
}

/**
* Update Replication Status as NOT_STARTED.
* Should be called only once in an active lifecycle.
*/
public void updateStatusAsNotStarted() {
runtimeToRemoteCluster.values().forEach(corfuLogReplicationRuntime ->
corfuLogReplicationRuntime
.getSourceManager()
.getAckReader()
.markSyncStatus(SyncStatus.NOT_STARTED));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public class LogReplicationAckReader {
// Last ack'd timestamp from Receiver
private long lastAckedTimestamp = Address.NON_ADDRESS;

// Sync Type for which last Ack was received. Default to LOG_ENTRY as this is the initial FSM state
private SyncType lastSyncType = SyncType.LOG_ENTRY;
// Sync Type for which last Ack was received, it is initialized when setSyncType is called
private SyncType lastSyncType = null;

private LogEntryReader logEntryReader;

Expand Down Expand Up @@ -399,7 +399,7 @@ public void markSnapshotSyncInfoOngoing() {
IRetry.build(IntervalRetry.class, () -> {
try {
lock.lock();
metadataManager.updateSyncStatus(remoteClusterId, SyncType.SNAPSHOT, SyncStatus.ONGOING);
metadataManager.updateSyncStatus(remoteClusterId, lastSyncType, SyncStatus.ONGOING);
} catch (TransactionAbortedException tae) {
log.error("Error while attempting to markSnapshotSyncInfoOngoing for cluster {}.", remoteClusterId, tae);
throw new RetryNeededException();
Expand Down Expand Up @@ -470,10 +470,14 @@ public void run() {
IRetry.build(IntervalRetry.class, () -> {
try {
lock.lock();
if (lastSyncType == null) {
log.info("lastSyncType is null before polling task run");
return null;
}
long entriesToSend = calculateRemainingEntriesToSend(lastAckedTimestamp);
metadataManager.setReplicationStatusTable(remoteClusterId, entriesToSend, lastSyncType);
metadataManager.updateRemainingEntriesToSend(remoteClusterId, entriesToSend, lastSyncType);
} catch (TransactionAbortedException tae) {
log.error("Error while attempting to set replication status for " +
log.error("Error while attempting to set remaining entries for " +
"remote cluster {} with lastSyncType {}.",
remoteClusterId, lastSyncType, tae);
throw new RetryNeededException();
Expand All @@ -484,7 +488,7 @@ public void run() {
return null;
}).run();
} catch (InterruptedException e) {
log.error("Unrecoverable exception when attempting to setReplicationStatusTable", e);
log.error("Unrecoverable exception when attempting to updateRemainingEntriesToSend", e);
throw new UnrecoverableCorfuInterruptedError(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.common.metrics.micrometer.MeterRegistryProvider;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationStatusVal.SyncType;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.SyncStatus;
import org.corfudb.infrastructure.logreplication.replication.send.SnapshotSender;

Expand Down Expand Up @@ -152,6 +153,7 @@ public void onEntry(LogReplicationState from) {
try {
// If the transition is to itself, the snapshot sync is continuing, no need to reset the sender
if (from != this) {
fsm.getAckReader().setSyncType(SyncType.SNAPSHOT);
snapshotSender.reset();
fsm.getAckReader().markSnapshotSyncInfoOngoing(forcedSnapshotSync, transitionEventId);
snapshotSyncTransferTimerSample = MeterRegistryProvider.getInstance().map(Timer::start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void onEntry(LogReplicationState from) {

@Override
public void onExit(LogReplicationState to) {
if (to != this || to.getType() != LogReplicationStateType.ERROR) {
if (!to.equals(this) && !to.getType().equals(LogReplicationStateType.ERROR)) {
fsm.getAckReader().startSyncStatusUpdatePeriodicTask();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import lombok.extern.slf4j.Slf4j;
import org.corfudb.common.metrics.micrometer.MeterRegistryProvider;
import org.corfudb.infrastructure.logreplication.DataSender;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationStatusVal.SyncType;
import org.corfudb.infrastructure.logreplication.replication.send.LogReplicationEventMetadata;
import org.corfudb.infrastructure.logreplication.runtime.CorfuLogReplicationRuntime;
import org.corfudb.infrastructure.logreplication.utils.LogReplicationConfigManager;
Expand Down Expand Up @@ -156,6 +157,7 @@ public LogReplicationState processEvent(LogReplicationEvent event) throws Illega
public void onEntry(LogReplicationState from) {
log.info("OnEntry :: wait snapshot apply state");
if (from.getType().equals(LogReplicationStateType.INITIALIZED)) {
fsm.getAckReader().setSyncType(SyncType.SNAPSHOT);
stopSnapshotApply.set(false);
fsm.getAckReader().markSnapshotSyncInfoOngoing();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,43 @@ public LogReplicationMetadataManager(CorfuRuntime rt, long topologyConfigId, Str
setupTopologyConfigId(topologyConfigId);
}

public void initializeReplicationStatusTable(String remoteClusterId) {
try {
IRetry.build(IntervalRetry.class, () -> {
try (TxnContext txn = corfuStore.txn(NAMESPACE)) {
ReplicationStatusKey replicationStatusKey = ReplicationStatusKey.newBuilder()
.setClusterId(remoteClusterId)
.build();

// Only set the default value if the key is not present
if (!txn.isExists(replicationStatusTable, replicationStatusKey)) {
ReplicationStatusVal defaultSourceStatus = ReplicationStatusVal.newBuilder()
.setStatus(SyncStatus.NOT_STARTED)
.setRemainingEntriesToSend(-1L)
.setSnapshotSyncInfo(SnapshotSyncInfo.newBuilder()
.setStatus(SyncStatus.NOT_STARTED)
.build())
.build();

log.debug("Adding default entry on source to Replication Status Table");
txn.putRecord(replicationStatusTable, replicationStatusKey, defaultSourceStatus, null);
}
txn.commit();
} catch (TransactionAbortedException tae) {
log.error("Error while adding default entry to Replication Status Table", tae);
throw new RetryNeededException();
}
if (log.isTraceEnabled()) {
log.trace("Adding default value to Replication Status Table succeeds.");
}
return null;
}).run();
} catch (InterruptedException e) {
log.error("Unrecoverable exception when attempting to add default sync status.", e);
throw new UnrecoverableCorfuInterruptedError(e);
}
}

public TxnContext getTxnContext() {
return corfuStore.txn(NAMESPACE);
}
Expand Down Expand Up @@ -546,92 +583,41 @@ public void updateSyncStatus(String clusterId, SyncType lastSyncType, SyncStatus
}

/**
* Set replication status table.
* If the current sync type is log entry sync, keep Snapshot Sync Info.
* Updates the number of remaining entries.
*
* Note: TransactionAbortedException has been handled by upper level.
*
* @param clusterId standby cluster id
* @param remainingEntries num of remaining entries to send
* @param type sync type
*/
public void setReplicationStatusTable(String clusterId, long remainingEntries, SyncType type) {
ReplicationStatusKey key = ReplicationStatusKey.newBuilder().setClusterId(clusterId).build();
SnapshotSyncInfo snapshotStatus = null;
ReplicationStatusVal current;
ReplicationStatusVal previous = null;

public void updateRemainingEntriesToSend(String clusterId, long remainingEntries, SyncType type) {
try (TxnContext txn = corfuStore.txn(NAMESPACE)) {
CorfuStoreEntry<ReplicationStatusKey, ReplicationStatusVal, Message> record = txn.getRecord(replicationStatusTable, key);
if (record.getPayload() != null) {
previous = record.getPayload();
snapshotStatus = previous.getSnapshotSyncInfo();
}
txn.commit();
}
ReplicationStatusKey key = ReplicationStatusKey.newBuilder().setClusterId(clusterId).build();
CorfuStoreEntry<ReplicationStatusKey, ReplicationStatusVal, Message> entry =
txn.getRecord(replicationStatusTable, key);

if (type == SyncType.LOG_ENTRY) {
if (previous != null &&
(previous.getStatus().equals(SyncStatus.NOT_STARTED)
|| snapshotStatus.getStatus().equals(SyncStatus.STOPPED))) {
log.info("syncStatusPoller :: skip replication status update, log entry replication is {}", previous.getStatus());
// Skip update of sync status, it will be updated once replication is resumed or started
return;
}

if (snapshotStatus == null){
log.warn("syncStatusPoller [logEntry]:: previous snapshot status is not present for cluster: {}", clusterId);
snapshotStatus = SnapshotSyncInfo.newBuilder().build();
}
ReplicationStatusVal previous = entry.getPayload();
SnapshotSyncInfo previousSnapshotSyncInfo = previous.getSnapshotSyncInfo();

current = ReplicationStatusVal.newBuilder()
.setRemainingEntriesToSend(remainingEntries)
.setSyncType(type)
.setStatus(SyncStatus.ONGOING)
.setSnapshotSyncInfo(snapshotStatus)
.build();

try (TxnContext txn = corfuStore.txn(NAMESPACE)) {
txn.putRecord(replicationStatusTable, key, current, null);
if ((previous.getStatus().equals(SyncStatus.NOT_STARTED) && previousSnapshotSyncInfo.getStatus().equals(SyncStatus.NOT_STARTED))
|| (previous.getStatus().equals(SyncStatus.STOPPED) || previousSnapshotSyncInfo.getStatus().equals(SyncStatus.STOPPED))) {
// Skip update of sync status, it will be updated once replication is resumed or started
log.info("syncStatusPoller :: skip remaining entries update, replication status is {}",
previous.getStatus());
txn.commit();
}

log.debug("syncStatusPoller :: Log Entry status set to ONGOING, clusterId: {}, remainingEntries: {}, " +
"snapshotSyncInfo: {}", clusterId, remainingEntries, snapshotStatus);
} else if (type == SyncType.SNAPSHOT) {

SnapshotSyncInfo currentSnapshotSyncInfo;
if (snapshotStatus == null){
log.warn("syncStatusPoller [snapshot] :: previous status is not present for cluster: {}", clusterId);
currentSnapshotSyncInfo = SnapshotSyncInfo.newBuilder().build();
} else {

if (snapshotStatus.getStatus().equals(SyncStatus.NOT_STARTED)
|| snapshotStatus.getStatus().equals(SyncStatus.STOPPED)) {
// Skip update of sync status, it will be updated once replication is resumed or started
log.info("syncStatusPoller :: skip replication status update, snapshot sync is {}", snapshotStatus);
return;
}

currentSnapshotSyncInfo = snapshotStatus.toBuilder()
.setStatus(SyncStatus.ONGOING)
.build();
return;
}

current = ReplicationStatusVal.newBuilder()
ReplicationStatusVal current = previous.toBuilder()
.setRemainingEntriesToSend(remainingEntries)
.setSyncType(type)
.setStatus(SyncStatus.ONGOING)
.setSnapshotSyncInfo(currentSnapshotSyncInfo)
.build();

try (TxnContext txn = corfuStore.txn(NAMESPACE)) {
txn.putRecord(replicationStatusTable, key, current, null);
txn.commit();
}
txn.putRecord(replicationStatusTable, key, current, null);
txn.commit();

log.debug("syncStatusPoller :: sync status for {} set to ONGOING, clusterId: {}, remainingEntries: {}",
type, clusterId, remainingEntries);
log.debug("syncStatusPoller :: remaining entries updated for {}, clusterId: {}, remainingEntries: {}" +
"snapshotSyncInfo: {}", type, clusterId, remainingEntries, previousSnapshotSyncInfo);
}
}

Expand Down

0 comments on commit 835f53d

Please sign in to comment.