Skip to content

Commit

Permalink
Revert LR StreamListener and Path Change of LR Protobufs (#3566)
Browse files Browse the repository at this point in the history
* Revert "Special LR Stream Listener across namespaces (#3267)"

This reverts commit 4a45744.

* Revert "Move LR protobufs shared with runtime from infrastructure to runtime. (#3539)"

This reverts commit 4974ba9.
  • Loading branch information
pankti-m committed Mar 21, 2023
1 parent 7a23715 commit 84640ba
Show file tree
Hide file tree
Showing 41 changed files with 717 additions and 2,169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
* **logreplication.opaque.count\_per\_message**: Number of opaque entries per message (rate, mean, max).
* **logreplication.opaque.count\_total**: Number of overall opaque entries (rate, mean, max).
* **logreplication.opaque.count\_valid**: Number of valid opaque entries (rate, mean, max).
* **logreplication.subscribe.trim.count**: Number of times a Trimmed Exception was thrown from the MVO layer when subscribing to LogReplication listener.
* **logreplication.subscribe.conflict.count**: Number of times a Transaction Aborted Exception was thrown due to conflicting updates when subscribing to LogReplication listener.
* **logreplication.subscribe.duration**: Time taken to subscribe the LogReplication listener.
* **logreplication.client.fullsync.duration**: Time taken by the client subscribing to LogReplication listener to perform a full sync on its tables.

### Current metrics collected for Corfu Runtime:

Expand Down
62 changes: 62 additions & 0 deletions infrastructure/proto/log_replication_metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,68 @@ message LogReplicationMetadataVal {
string val = 1;
}

/*
* Replication Status Key
*/
message ReplicationStatusKey {
string clusterId = 1;
}

/*
* Replication Status Value
* Active Site sets the completionPercent, Standby sets the dataConsistent boolean
*/
message ReplicationStatusVal {
option (org.corfudb.runtime.table_schema).stream_tag = "lr_status";

uint64 remainingEntriesToSend = 1;
bool dataConsistent = 2;
enum SyncType {
SNAPSHOT = 0;
LOG_ENTRY = 1;
}
SyncType syncType = 3;
SyncStatus status = 4;
SnapshotSyncInfo snapshotSyncInfo = 5;
}

/*
* Snapshot Sync Info
*
* If replication is in SNAPSHOT sync, this provides details of the
* ongoing snapshot sync. If replication is in LOG ENTRY sync (delta).
* this provides details of the previous SNAPSHOT sync leading up to
* the log entry sync.
*/
message SnapshotSyncInfo {
enum SnapshotSyncType {
DEFAULT = 0;
FORCED = 1;
}
SnapshotSyncType type = 1;
SyncStatus status = 2;
string snapshotRequestId = 3;
google.protobuf.Timestamp completedTime = 4;
uint64 baseSnapshot = 5;
}

/*
* SyncStatus
*
* COMPLETED: Used in SnapshotSyncInfo only for the latest snapshot sync
* ERROR: Log Replication is unrecoverable, need to restart
* NOT_STARTED: Log Replication might be in the very beginning state
* STOPPED: Log Replication is in initialized state
*/
enum SyncStatus {
ONGOING = 0;
COMPLETED = 1;
ERROR = 2;
NOT_STARTED = 3;
STOPPED = 4;
UNAVAILABLE = 5;
}

message ReplicationEventKey {
string key = 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import org.corfudb.infrastructure.logreplication.proto.LogReplicationClusterInfo;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationClusterInfo.ClusterRole;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationClusterInfo.TopologyConfigurationMsg;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationEvent;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationEventKey;
import org.corfudb.infrastructure.logreplication.replication.receive.LogReplicationMetadataManager;
import org.corfudb.infrastructure.logreplication.utils.LogReplicationConfigManager;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.LogReplication;
import org.corfudb.runtime.exceptions.RetryExhaustedException;
import org.corfudb.runtime.exceptions.TransactionAbortedException;
import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuInterruptedError;
Expand Down Expand Up @@ -793,13 +793,13 @@ public void updateTopology(LogReplicationClusterInfo.TopologyConfigurationMsg to
* snapshot sync is in the apply phase)
*/
@Override
public Map<String, LogReplication.ReplicationStatusVal> queryReplicationStatus() {
public Map<String, LogReplicationMetadata.ReplicationStatusVal> queryReplicationStatus() {
if (localClusterDescriptor == null || logReplicationMetadataManager == null) {
log.warn("Cluster configuration has not been pushed to current LR node.");
return null;
} else if (localClusterDescriptor.getRole() == ClusterRole.ACTIVE) {
Map<String, LogReplication.ReplicationStatusVal> mapReplicationStatus = logReplicationMetadataManager.getReplicationRemainingEntries();
Map<String, LogReplication.ReplicationStatusVal> mapToSend = new HashMap<>(mapReplicationStatus);
Map<String, LogReplicationMetadata.ReplicationStatusVal> mapReplicationStatus = logReplicationMetadataManager.getReplicationRemainingEntries();
Map<String, LogReplicationMetadata.ReplicationStatusVal> mapToSend = new HashMap<>(mapReplicationStatus);
// If map contains local cluster, remove (as it might have been added by the SinkManager) but this node
// has an active role.
if (mapToSend.containsKey(localClusterDescriptor.getClusterId())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.corfudb.infrastructure.logreplication.infrastructure;

import org.corfudb.infrastructure.logreplication.proto.LogReplicationClusterInfo;
import org.corfudb.runtime.LogReplication;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata;

import java.util.Map;
import java.util.UUID;
Expand All @@ -18,7 +18,7 @@ public interface CorfuReplicationDiscoveryServiceAdapter {
*
* @return
*/
Map<String, LogReplication.ReplicationStatusVal> queryReplicationStatus();
Map<String, LogReplicationMetadata.ReplicationStatusVal> queryReplicationStatus();

/**
* Enforce snapshotFullSync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.infrastructure.LogReplicationRuntimeParameters;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.SyncStatus;
import org.corfudb.infrastructure.logreplication.replication.receive.LogReplicationMetadataManager;
import org.corfudb.infrastructure.logreplication.runtime.CorfuLogReplicationRuntime;
import org.corfudb.infrastructure.logreplication.utils.LogReplicationConfigManager;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.exceptions.TransactionAbortedException;
import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuInterruptedError;
import org.corfudb.runtime.LogReplication.SyncStatus;
import org.corfudb.util.retry.IRetry;
import org.corfudb.util.retry.IntervalRetry;
import org.corfudb.util.retry.RetryNeededException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import org.corfudb.infrastructure.logreplication.infrastructure.CorfuReplicationDiscoveryServiceAdapter;
import org.corfudb.infrastructure.logreplication.infrastructure.LogReplicationDiscoveryServiceException;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationClusterInfo.TopologyConfigurationMsg;
import org.corfudb.runtime.LogReplication;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata;

import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -50,7 +50,7 @@ public interface CorfuReplicationClusterManagerAdapter {
*
* @return
*/
Map<String, LogReplication.ReplicationStatusVal> queryReplicationStatus();
Map<String, LogReplicationMetadata.ReplicationStatusVal> queryReplicationStatus();

/**
* This API enforce a full snapshot sync on the standby cluster with the clusterId at best effort.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.corfudb.infrastructure.logreplication.infrastructure.CorfuReplicationDiscoveryServiceAdapter;
import org.corfudb.infrastructure.logreplication.infrastructure.LogReplicationDiscoveryServiceException;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationClusterInfo.TopologyConfigurationMsg;
import org.corfudb.runtime.LogReplication;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata;

import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -45,7 +45,7 @@ public synchronized void updateTopologyConfig(TopologyConfigurationMsg newTopolo
}
}

public Map<String, LogReplication.ReplicationStatusVal> queryReplicationStatus() {
public Map<String, LogReplicationMetadata.ReplicationStatusVal> queryReplicationStatus() {
return corfuReplicationDiscoveryService.queryReplicationStatus();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.infrastructure.logreplication.LogReplicationConfig;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.SyncStatus;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationStatusVal.SyncType;
import org.corfudb.infrastructure.logreplication.replication.receive.LogReplicationMetadataManager;
import org.corfudb.infrastructure.logreplication.replication.send.LogEntrySender;
import org.corfudb.infrastructure.logreplication.replication.send.logreader.LogEntryReader;
Expand All @@ -12,8 +14,6 @@
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.exceptions.TransactionAbortedException;
import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuInterruptedError;
import org.corfudb.runtime.LogReplication.ReplicationStatusVal.SyncType;
import org.corfudb.runtime.LogReplication.SyncStatus;
import org.corfudb.runtime.view.Address;
import org.corfudb.runtime.view.ObjectsView;
import org.corfudb.runtime.view.stream.StreamAddressSpace;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.corfudb.infrastructure.logreplication.replication.fsm;

import lombok.extern.slf4j.Slf4j;
import org.corfudb.runtime.LogReplication.SyncStatus;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.SyncStatus;

/**
* This class represents the stopped state of the Log Replication State Machine.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.corfudb.infrastructure.logreplication.replication.fsm;

import lombok.extern.slf4j.Slf4j;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.SyncStatus;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationStatusVal.SyncType;
import org.corfudb.infrastructure.logreplication.replication.send.LogEntrySender;
import org.corfudb.runtime.LogReplication.ReplicationStatusVal.SyncType;
import org.corfudb.runtime.LogReplication.SyncStatus;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.common.metrics.micrometer.MeterRegistryProvider;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.SyncStatus;
import org.corfudb.infrastructure.logreplication.replication.send.SnapshotSender;
import org.corfudb.runtime.LogReplication.SyncStatus;

import java.util.Optional;
import java.util.UUID;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.corfudb.infrastructure.logreplication.replication.fsm;

import lombok.extern.slf4j.Slf4j;
import org.corfudb.runtime.LogReplication;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata;

/**
* This class represents the Init state of the Log Replication State Machine.
Expand Down Expand Up @@ -69,7 +69,7 @@ public LogReplicationState processEvent(LogReplicationEvent event) throws Illega
@Override
public void onEntry(LogReplicationState from) {
if (from != this) {
fsm.getAckReader().markSyncStatus(LogReplication.SyncStatus.STOPPED);
fsm.getAckReader().markSyncStatus(LogReplicationMetadata.SyncStatus.STOPPED);
// Disable periodic sync status periodic task while in initialized state (no actual replication occurring)
fsm.getAckReader().stopSyncStatusUpdatePeriodicTask();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.LogReplicationMetadataVal;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationEvent;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationEventKey;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationStatusKey;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationStatusVal;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationStatusVal.SyncType;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.SnapshotSyncInfo;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.SyncStatus;
import org.corfudb.infrastructure.logreplication.utils.LogReplicationConfigManager;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.CorfuStoreMetadata;
import org.corfudb.runtime.LogReplication;
import org.corfudb.runtime.LogReplication.ReplicationStatusKey;
import org.corfudb.runtime.LogReplication.ReplicationStatusVal;
import org.corfudb.runtime.LogReplication.ReplicationStatusVal.SyncType;
import org.corfudb.runtime.LogReplication.SnapshotSyncInfo;
import org.corfudb.runtime.LogReplication.SyncStatus;
import org.corfudb.runtime.collections.CorfuStore;
import org.corfudb.runtime.collections.CorfuStoreEntry;
import org.corfudb.runtime.collections.StreamListener;
Expand All @@ -44,7 +44,6 @@
import java.util.UUID;

import static org.corfudb.protocols.service.CorfuProtocolMessage.getResponseMsg;
import static org.corfudb.runtime.LogReplicationUtils.REPLICATION_STATUS_TABLE;
import static org.corfudb.runtime.view.TableRegistry.CORFU_SYSTEM_NAMESPACE;

/**
Expand All @@ -56,6 +55,8 @@ public class LogReplicationMetadataManager {

public static final String NAMESPACE = CORFU_SYSTEM_NAMESPACE;
public static final String METADATA_TABLE_PREFIX_NAME = "CORFU-REPLICATION-WRITER-";
public static final String REPLICATION_STATUS_TABLE = "LogReplicationStatus";
public static final String LR_STATUS_STREAM_TAG = "lr_status";
private static final String REPLICATION_EVENT_TABLE_NAME = "LogReplicationEventTable";
private static final String LR_STREAM_TAG = "log_replication";

Expand Down Expand Up @@ -400,7 +401,7 @@ public void setSnapshotAppliedComplete(LogReplication.LogReplicationEntryMsg ent
// the event of crashes
ReplicationStatusVal statusValue = ReplicationStatusVal.newBuilder()
.setDataConsistent(true)
.setStatus(SyncStatus.NOT_AVAILABLE)
.setStatus(SyncStatus.UNAVAILABLE)
.build();
txn.putRecord(replicationStatusTable, ReplicationStatusKey.newBuilder().setClusterId(localClusterId).build(),
statusValue, null);
Expand Down Expand Up @@ -667,7 +668,7 @@ public void setDataConsistentOnStandby(boolean isConsistent) {
ReplicationStatusKey key = ReplicationStatusKey.newBuilder().setClusterId(localClusterId).build();
ReplicationStatusVal val = ReplicationStatusVal.newBuilder()
.setDataConsistent(isConsistent)
.setStatus(SyncStatus.NOT_AVAILABLE)
.setStatus(SyncStatus.UNAVAILABLE)
.build();
try (TxnContext txn = getTxnContext()) {
txn.putRecord(replicationStatusTable, key, val, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import lombok.extern.slf4j.Slf4j;
import org.corfudb.common.metrics.micrometer.MeterRegistryProvider;
import org.corfudb.infrastructure.logreplication.DataSender;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationStatusVal;
import org.corfudb.infrastructure.logreplication.replication.LogReplicationAckReader;
import org.corfudb.runtime.LogReplication.LogReplicationEntryMsg;
import org.corfudb.runtime.LogReplication.ReplicationStatusVal;

import java.util.Map;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import lombok.extern.slf4j.Slf4j;
import org.corfudb.common.metrics.micrometer.MeterRegistryProvider;
import org.corfudb.infrastructure.logreplication.DataSender;
import org.corfudb.infrastructure.logreplication.proto.LogReplicationMetadata.ReplicationStatusVal;
import org.corfudb.infrastructure.logreplication.replication.LogReplicationAckReader;
import org.corfudb.runtime.LogReplication.LogReplicationEntryMsg;
import org.corfudb.runtime.LogReplication.LogReplicationEntryType;
import org.corfudb.runtime.LogReplication.ReplicationStatusVal;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down
62 changes: 0 additions & 62 deletions runtime/proto/service/log_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,68 +46,6 @@ message LogReplicationLeadershipResponseMsg {
string nodeId = 3;
}

/*
* Replication Status Key
*/
message ReplicationStatusKey {
string clusterId = 1;
}

/*
* Replication Status Value
* Active Site sets the completionPercent, Standby sets the dataConsistent boolean
*/
message ReplicationStatusVal {
option (org.corfudb.runtime.table_schema).stream_tag = "lr_status";

uint64 remainingEntriesToSend = 1;
bool dataConsistent = 2;
enum SyncType {
SNAPSHOT = 0;
LOG_ENTRY = 1;
}
SyncType syncType = 3;
SyncStatus status = 4;
SnapshotSyncInfo snapshotSyncInfo = 5;
}

/*
* Snapshot Sync Info
*
* If replication is in SNAPSHOT sync, this provides details of the
* ongoing snapshot sync. If replication is in LOG ENTRY sync (delta).
* this provides details of the previous SNAPSHOT sync leading up to
* the log entry sync.
*/
message SnapshotSyncInfo {
enum SnapshotSyncType {
DEFAULT = 0;
FORCED = 1;
}
SnapshotSyncType type = 1;
SyncStatus status = 2;
string snapshotRequestId = 3;
google.protobuf.Timestamp completedTime = 4;
uint64 baseSnapshot = 5;
}

/*
* SyncStatus
*
* COMPLETED: Used in SnapshotSyncInfo only for the latest snapshot sync
* ERROR: Log Replication is unrecoverable, need to restart
* NOT_STARTED: Log Replication might be in the very beginning state
* STOPPED: Log Replication is in initialized state
*/
enum SyncStatus {
ONGOING = 0;
COMPLETED = 1;
ERROR = 2;
NOT_STARTED = 3;
STOPPED = 4;
NOT_AVAILABLE = 5;
}

enum LogReplicationEntryType {
LOG_ENTRY_MESSAGE = 0;
SNAPSHOT_MESSAGE = 1;
Expand Down

0 comments on commit 84640ba

Please sign in to comment.