Skip to content

Commit

Permalink
Special LR Stream Listener across namespaces (#3267)
Browse files Browse the repository at this point in the history
Currently, stream subscription can be done on a single namespace.  This
PR introduces a special LR-specific streaming subscription to receive
ordered updates across namespaces.  This behavior is required by clients
of the Logical Group-based replication.

Co-authored-by: Pankti Majmudar <pmajmudar@vmware.com>
  • Loading branch information
shama358 and pankti-m committed Mar 20, 2023
1 parent e7a44e4 commit 4a45744
Show file tree
Hide file tree
Showing 17 changed files with 1,864 additions and 415 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
* **logreplication.opaque.count\_per\_message**: Number of opaque entries per message (rate, mean, max).
* **logreplication.opaque.count\_total**: Number of overall opaque entries (rate, mean, max).
* **logreplication.opaque.count\_valid**: Number of valid opaque entries (rate, mean, max).
* **logreplication.subscribe.trim.count**: Number of times a Trimmed Exception was thrown from the MVO layer when subscribing to LogReplication listener.
* **logreplication.subscribe.conflict.count**: Number of times a Transaction Aborted Exception was thrown due to conflicting updates when subscribing to LogReplication listener.
* **logreplication.subscribe.duration**: Time taken to subscribe the LogReplication listener.
* **logreplication.client.fullsync.duration**: Time taken by the client subscribing to LogReplication listener to perform a full sync on its tables.

### Current metrics collected for Corfu Runtime:

Expand Down
182 changes: 182 additions & 0 deletions runtime/src/main/java/org/corfudb/runtime/LogReplicationListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package org.corfudb.runtime;

import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.runtime.LogReplication.ReplicationStatusVal;
import org.corfudb.runtime.collections.CorfuStore;
import org.corfudb.runtime.collections.CorfuStreamEntries;
import org.corfudb.runtime.collections.CorfuStreamEntry;
import org.corfudb.runtime.collections.StreamListener;
import org.corfudb.runtime.collections.TableSchema;
import org.corfudb.runtime.collections.TxnContext;
import org.corfudb.runtime.view.Address;
import javax.annotation.Nonnull;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.corfudb.runtime.LogReplicationUtils.REPLICATION_STATUS_TABLE;

/**
* This is the interface that a client must subscribe to if it needs to observe and bifurcate the data updates received
* on Log Entry and Snapshot Sync. The client's usecase is that it maintains a 'merged table' which contains data
* received through replication and local updates. Log Replicator does not write to this merged table. This
* listener will observe the writes and apply them to the merged table based on the client implementation.
*
*
* This interface sees ordered updates from :
* 1. client-streams from client-Namespace, and,
* 2. LrStatusTable from corfuSystem-Namespace.
*
* The client implementing this interface will only observe the data updates from client streams
*/
@Slf4j
public abstract class LogReplicationListener implements StreamListener {

// Indicates if a full sync on client tables was performed during subscription. A full sync will not be
// performed if a snapshot sync is ongoing.
@Getter
private final AtomicBoolean clientFullSyncPending = new AtomicBoolean(false);

// This variable tracks if a snapshot sync is ongoing
@Getter
private final AtomicBoolean snapshotSyncInProgress = new AtomicBoolean(false);

// Timestamp at which the client performed a full sync. Any updates below this timestamp must be ignored.
// At the time of subscription, a full sync cannot be performed if LR Snapshot Sync is in progress. Full Sync is
// performed when this ongoing snapshot sync completes. The listener, however, can get updates before this full
// sync. So we need to maintain this timestamp and ignore any updates below it.
@Getter
private final AtomicLong clientFullSyncTimestamp = new AtomicLong(Address.NON_ADDRESS);

private final CorfuStore corfuStore;
private final String namespace;

/**
* Special LogReplication listener which a client creates to receive ordered updates for replicated data.
* @param corfuStore Corfu Store used on the client
* @param namespace Namespace of the client's tables
*/
public LogReplicationListener(CorfuStore corfuStore, @Nonnull String namespace) {
this.corfuStore = corfuStore;
this.namespace = namespace;
}

/**
* This is an internal method of this abstract listener and not exposed to clients.
*
* @param results is a map of stream UUID -> list of entries of this stream.
*/
public final void onNext(CorfuStreamEntries results) {

// If this update came before the client's full sync timestamp, ignore it.
if (results.getTimestamp().getSequence() <= clientFullSyncTimestamp.get()) {
return;
}

Set<String> tableNames =
results.getEntries().keySet().stream().map(schema -> schema.getTableName()).collect(Collectors.toSet());

if (tableNames.contains(REPLICATION_STATUS_TABLE)) {
Preconditions.checkState(results.getEntries().keySet().size() == 1,
"Replication Status Table Update received with other tables");
processReplicationStatusUpdate(results);
return;
}

// Data Updates
if (clientFullSyncPending.get()) {
// If the listener started when snapshot sync was ongoing, ignore all data updates until it ends. When
// it ends, the client will perform a full sync and build a consistent state containing these updates.
return;
}

if (snapshotSyncInProgress.get()) {
processUpdatesInSnapshotSync(results);
} else {
processUpdatesInLogEntrySync(results);
}
}

private void processReplicationStatusUpdate(CorfuStreamEntries results) {
Map<TableSchema, List<CorfuStreamEntry>> entries = results.getEntries();

List<CorfuStreamEntry> replicationStatusTableEntries =
entries.entrySet().stream().filter(e -> e.getKey().getTableName().equals(REPLICATION_STATUS_TABLE))
.map(Map.Entry::getValue)
.findFirst()
.get();

for (CorfuStreamEntry entry : replicationStatusTableEntries) {

// Ignore any update where the operation type != UPDATE
if (entry.getOperation() == CorfuStreamEntry.OperationType.UPDATE) {
ReplicationStatusVal status = (ReplicationStatusVal)entry.getPayload();

if (status.getDataConsistent()) {
// getDataConsistent() == true means that snapshot sync has ended.
if (snapshotSyncInProgress.get()) {
if (clientFullSyncPending.get()) {
// Snapshot sync which was ongoing when the listener was subscribed has ended. Attempt to
// perform a full sync now.
LogReplicationUtils.attemptClientFullSync(corfuStore, this, namespace);
return;
}
// Process snapshot sync completion in steady state, i.e., client full sync is already complete
snapshotSyncInProgress.set(false);
onSnapshotSyncComplete();
}
} else {
// getDataConsistent() == false. Snapshot sync has started.
snapshotSyncInProgress.set(true);
onSnapshotSyncStart();
}
}
}
}

// -------- Methods to be implemented on the client/application ---------------

/**
* Invoked when a snapshot sync start has been detected.
*/
protected abstract void onSnapshotSyncStart();

/**
* Invoked when an ongoing snapshot sync completes
*/
protected abstract void onSnapshotSyncComplete();

/**
* Invoked when data updates are received during a snapshot sync. These updates will be the writes
* received as part of the snapshot sync
* @param results Entries received in a single transaction as part of a snapshot sync
*/
protected abstract void processUpdatesInSnapshotSync(CorfuStreamEntries results);

/**
* Invoked when data updates are received as part of a LogEntry Sync.
* @param results Entries received in a single transaction as part of a log entry sync
*/
protected abstract void processUpdatesInLogEntrySync(CorfuStreamEntries results);

/**
* Invoked by the Corfu runtime when this listener is being subscribed. This method should
* perform a full-sync on all application tables which the client is interested in merging together.
* @param txnContext transaction context in which the operation must be performed
*/
protected abstract void performFullSync(TxnContext txnContext);

/**
* Callback to indicate that an error or exception has occurred while streaming or that the stream is
* shutting down. Some exceptions can be handled by restarting the stream (TrimmedException) while
* some errors (SystemUnavailableError) are unrecoverable.
* To be implemented on the client/application
* @param throwable
*/
public abstract void onError(Throwable throwable);
}
155 changes: 150 additions & 5 deletions runtime/src/main/java/org/corfudb/runtime/LogReplicationUtils.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,160 @@
package org.corfudb.runtime;

import com.google.common.base.Preconditions;
import com.google.protobuf.Message;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.common.metrics.micrometer.MicroMeterUtils;
import org.corfudb.runtime.collections.CorfuStore;
import org.corfudb.runtime.collections.CorfuStoreEntry;
import org.corfudb.runtime.collections.Table;
import org.corfudb.runtime.collections.TableOptions;
import org.corfudb.runtime.collections.TxnContext;
import org.corfudb.runtime.exceptions.AbortCause;
import org.corfudb.runtime.exceptions.StreamingException;
import org.corfudb.runtime.LogReplication.ReplicationStatusKey;
import org.corfudb.runtime.LogReplication.ReplicationStatusVal;
import org.corfudb.runtime.exceptions.TransactionAbortedException;
import org.corfudb.runtime.exceptions.TrimmedException;
import org.corfudb.util.retry.IRetry;
import org.corfudb.util.retry.IntervalRetry;
import org.corfudb.util.retry.RetryNeededException;
import javax.annotation.Nonnull;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Optional;

import static org.corfudb.runtime.view.TableRegistry.CORFU_SYSTEM_NAMESPACE;

/**
* LogReplication code resides in the infrastructure package. Adding a dependency from this package(runtime) to
* infrastructure introduces a circular dependency. This class defines LR-specific constants and utility methods
* required in runtime. Note that these methods are unique and not duplicated from infrastructure.
* infrastructure introduces a circular dependency. This class defines LR-specific constants and utility methods required in
* runtime. Note that these methods are unique and not duplicated from infrastructure.
*/
public class LogReplicationUtils {

@Slf4j
public final class LogReplicationUtils {
public static final String LR_STATUS_STREAM_TAG = "lr_status";

public static final String REPLICATION_STATUS_TABLE = "LogReplicationStatus";
}

private LogReplicationUtils() { }

public static void subscribe(@Nonnull LogReplicationListener clientListener, @Nonnull String namespace,
@Nonnull String streamTag, @Nonnull List<String> tablesOfInterest, int bufferSize,
CorfuStore corfuStore) {

long subscriptionTimestamp = getSubscriptionTimestamp(corfuStore, namespace, clientListener);
corfuStore.getRuntime().getTableRegistry().getStreamingManager().subscribeLogReplicationListener(
clientListener, namespace, streamTag, tablesOfInterest, subscriptionTimestamp, bufferSize);
log.info("Client subscription at timestamp {} successful.", subscriptionTimestamp);
}

private static long getSubscriptionTimestamp(CorfuStore corfuStore, String namespace,
LogReplicationListener clientListener) {
Optional<Counter> mvoTrimCounter = MicroMeterUtils.counter("logreplication.subscribe.trim.count");
Optional<Counter> conflictCounter = MicroMeterUtils.counter("logreplication.subscribe.conflict.count");
Optional<Timer.Sample> subscribeTimer = MicroMeterUtils.startTimer();

Table<ReplicationStatusKey, ReplicationStatusVal, Message> replicationStatusTable =
openReplicationStatusTable(corfuStore);

try {
return IRetry.build(IntervalRetry.class, () -> {
try (TxnContext txnContext = corfuStore.txn(namespace)) {
// The transaction is started in the client's namespace and the Replication Status table resides in the
// system namespace. Corfu Store does not validate the cross-namespace access as long as there are no
// writes on the table in the different namespace. This hack is required here as we want client full
// sync to happen in the same transaction which checks the status of a snapshot sync so that there is no
// window between the check and full sync.
List<CorfuStoreEntry<ReplicationStatusKey, ReplicationStatusVal, Message>> entries =
txnContext.executeQuery(replicationStatusTable, p -> true);

// In LR V1, it is a valid assumption that the size of replication status table will be 1 as there is
// only 1 remote cluster. This implementation will change in LR V2
Preconditions.checkState(entries.size() == 1);
CorfuStoreEntry<ReplicationStatusKey, ReplicationStatusVal, Message> entry = entries.get(0);

if (entry.getPayload().getDataConsistent()) {
// No snapshot sync is in progress
log.info("No Snapshot Sync is in progress. Request the client to perform a full sync on its " +
"tables.");
Optional<Timer.Sample> clientFullSyncTimer = MicroMeterUtils.startTimer();
clientListener.performFullSync(txnContext);
MicroMeterUtils.time(clientFullSyncTimer, "logreplication.client.fullsync.duration");
} else {
// Snapshot sync is in progress. Subscribe without performing a full sync on the tables.
log.info("Snapshot Sync is in progress. Subscribing without performing a full sync on client" +
" tables.");
updateListenerFlagsForSnapshotSync(clientListener, true);
}
txnContext.commit();

// Subscribe from the snapshot timestamp of this transaction, i.e., log tail when the transaction started.
// Subscribing from the commit address will result in missed updates which took place between the start
// and end of the transaction because reads in a transaction observe updates only till the snapshot when it
// started.
long subscriptionTimestamp = txnContext.getTxnSequence();
return subscriptionTimestamp;
} catch (TransactionAbortedException tae) {
if (tae.getCause() instanceof TrimmedException) {
// If the snapshot version where this transaction started has been evicted from the JVM's MVO
// cache, a trimmed exception is thrown and requires a retry at a later timestamp.
incrementCount(mvoTrimCounter);
log.warn("Snapshot no longer available in the cache. Retrying.", tae);
} else if (tae.getAbortCause() == AbortCause.CONFLICT) {
// Concurrent updates to the client's tables
incrementCount(conflictCounter);
log.warn("Concurrent updates to client tables. Retrying.", tae);
} else {
log.error("Unexpected type of Transaction Aborted Exception", tae);
}
throw new RetryNeededException();
} catch (Exception e) {
log.error("Unexpected exception type hit", e);
throw new RetryNeededException();
}
}).run();
} catch (InterruptedException e) {
throw new StreamingException(e);
} finally {
MicroMeterUtils.time(subscribeTimer, "logreplication.subscribe.duration");
}
}


private static Table<ReplicationStatusKey, ReplicationStatusVal, Message> openReplicationStatusTable(CorfuStore corfuStore) {
try {
return corfuStore.openTable(CORFU_SYSTEM_NAMESPACE, REPLICATION_STATUS_TABLE, ReplicationStatusKey.class,
ReplicationStatusVal.class, null, TableOptions.fromProtoSchema(ReplicationStatusVal.class));
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
log.error("Failed to open the Replication Status table", e);
throw new StreamingException(e);
}
}

private static void incrementCount(Optional<Counter> counter) {
counter.ifPresent(Counter::increment);
}

/**
* If full sync on client tables was not performed during subscription, attempt to perform it now and complete
* the subscription.
* @param corfuStore
* @param clientListener
* @param namespace
*/
public static void attemptClientFullSync(CorfuStore corfuStore, LogReplicationListener clientListener,
String namespace) {
long subscriptionTimestamp = getSubscriptionTimestamp(corfuStore, namespace, clientListener);
log.info("Client full sync completed at timestamp {}", subscriptionTimestamp);
clientListener.getClientFullSyncTimestamp().set(subscriptionTimestamp);
updateListenerFlagsForSnapshotSync(clientListener, false);
}

private static void updateListenerFlagsForSnapshotSync(LogReplicationListener clientListener,
boolean snapshotSyncInProgress) {
clientListener.getClientFullSyncPending().set(snapshotSyncInProgress);
clientListener.getSnapshotSyncInProgress().set(snapshotSyncInProgress);
}
}

0 comments on commit 4a45744

Please sign in to comment.