Skip to content

Commit

Permalink
Introduce a max limit on writes to the ProtobufDescriptor table in 1 …
Browse files Browse the repository at this point in the history
…TX during Snapshot Sync. (#3525)

Snapshot Writer coalesces all updates to a table into a single
transaction with the max size of 25MB(max supported write size).  Applications
running with a small memory footprint cannot deserialize a transaction
of this size.  So introduce another limit of max number of entries
applied in a transaction.
  • Loading branch information
pankti-m committed Mar 10, 2023
1 parent c6580cb commit 00e5afa
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class CorfuServer {
+ "[--metrics]"
+ "[--health-port=<health_port>]"
+ "[--snapshot-batch=<batch-size>] [--lock-lease=<lease-duration>]"
+ "[--max-snapshot-entries-applied=<max-snapshot-entries-applied>]"
+ "[-P <prefix>] [-R <retention>] <port>"
+ "[--compaction-trigger-freq-ms=<compaction_trigger_freq_ms>]"
+ "[--compactor-script=<compactor_script_path>]"
Expand Down Expand Up @@ -206,6 +207,9 @@ public class CorfuServer {
+ " The max size of replication data message in bytes.\n "
+ " --lock-lease=<lease-duration> "
+ " Lock lease duration in seconds\n "
+ " --max-snapshot-entries-applied=<max-snapshot-entries-applied> "
+ " Max number of entries applied in a snapshot transaction. 50 by default."
+ " For special tables only\n. "
+ " -h, --help "
+ " Show this screen\n"
+ " --version "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.corfudb.infrastructure.logreplication.LogReplicationConfig.DEFAULT_MAX_NUM_MSG_PER_BATCH;
import static org.corfudb.infrastructure.logreplication.LogReplicationConfig.MAX_DATA_MSG_SIZE_SUPPORTED;
import static org.corfudb.infrastructure.logreplication.LogReplicationConfig.MAX_CACHE_NUM_ENTRIES;
import static org.corfudb.infrastructure.logreplication.LogReplicationConfig.DEFAULT_MAX_SNAPSHOT_ENTRIES_APPLIED;
import static org.corfudb.common.util.URLUtils.getVersionFormattedHostAddress;

import com.google.common.collect.Sets;
Expand Down Expand Up @@ -302,6 +303,11 @@ public int getMaxWriteSize() {
return val == null ? Integer.MAX_VALUE : Integer.parseInt(val);
}

public int getMaxSnapshotEntriesApplied() {
String val = getServerConfig(String.class, "--max-snapshot-entries-applied");
return val == null ? DEFAULT_MAX_SNAPSHOT_ENTRIES_APPLIED : Integer.parseInt(val);
}

/**
* Cleanup the DataStore files with names that are prefixes of the specified
* fileName when so that the number of these files don't exceed the user-defined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class LogReplicationConfig {
// Log Replication default max number of messages generated at the active cluster for each batch
public static final int DEFAULT_MAX_NUM_MSG_PER_BATCH = 10;

// Default value for the max number of entries applied in a single transaction on Sink during snapshot sync
public static final int DEFAULT_MAX_SNAPSHOT_ENTRIES_APPLIED = 50;

// Log Replication default max data message size is 64MB
public static final int MAX_DATA_MSG_SIZE_SUPPORTED = (64 << 20);

Expand Down Expand Up @@ -89,16 +92,22 @@ public class LogReplicationConfig {
*/
private int maxDataSizePerMsg;

/**
* Max number of entries to be applied during a snapshot sync. For special tables only.
*/
private int maxSnapshotEntriesApplied = DEFAULT_MAX_SNAPSHOT_ENTRIES_APPLIED;

/**
* Constructor exposed to {@link CorfuReplicationDiscoveryService}
*/
public LogReplicationConfig(LogReplicationConfigManager configManager,
int maxNumMsgPerBatch, int maxMsgSize, int cacheSize) {
int maxNumMsgPerBatch, int maxMsgSize, int cacheSize, int maxSnapshotEntriesApplied) {
this.configManager = configManager;
this.maxNumMsgPerBatch = maxNumMsgPerBatch;
this.maxMsgSize = maxMsgSize;
this.maxCacheSize = cacheSize;
this.maxDataSizePerMsg = maxMsgSize * DATA_FRACTION_PER_MSG / 100;
this.maxSnapshotEntriesApplied = maxSnapshotEntriesApplied;
syncWithRegistry();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class CorfuInterClusterReplicationServer implements Runnable {
+ "[--max-replication-data-message-size=<msg-size>] "
+ "[--max-write-size=<max-write-size>] "
+ "[--lock-lease=<lease-duration>]"
+ "[--max-snapshot-entries-applied=<max-snapshot-entries-applied>]"
+ "[-c <ratio>] [-d <level>] [-p <seconds>] "
+ "[--lrCacheSize=<cache-num-entries>]"
+ "[--plugin=<plugin-config-file-path>]"
Expand Down Expand Up @@ -186,6 +187,9 @@ public class CorfuInterClusterReplicationServer implements Runnable {
+ " "
+ " --lock-lease=<lease-duration> "
+ " Lock lease duration in seconds\n "
+ " --max-snapshot-entries-applied=<max-snapshot-entries-applied> "
+ " Max number of entries applied in a snapshot transaction. 50 by default."
+ " For special tables only\n. "
+ " -h, --help "
+ " Show this screen\n"
+ " --version "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ private LogReplicationConfig getLogReplicationConfiguration(CorfuRuntime runtime
replicationConfigManager,
serverContext.getLogReplicationMaxNumMsgPerBatch(),
serverContext.getLogReplicationMaxDataMessageSize(),
serverContext.getLogReplicationCacheMaxSize()
);
serverContext.getLogReplicationCacheMaxSize(),
serverContext.getMaxSnapshotEntriesApplied());
} catch (Throwable t) {
log.error("Exception when fetching the Replication Config", t);
throw t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import static org.corfudb.infrastructure.logreplication.LogReplicationConfig.MERGE_ONLY_STREAMS;
import static org.corfudb.infrastructure.logreplication.LogReplicationConfig.REGISTRY_TABLE_ID;
import static org.corfudb.infrastructure.logreplication.LogReplicationConfig.PROTOBUF_TABLE_ID;

/**
* This class represents the entity responsible for writing streams' snapshots into the sink cluster DB.
Expand Down Expand Up @@ -333,9 +334,17 @@ private void applyShadowStream(UUID streamId, long snapshot) {
int numBatches = 1;

for (SMREntry smrEntry : smrEntries) {
// Apply all SMR entries in a single transaction as long as it does not exceed the max write size(25MB).
// It was observed that special streams(ProtobufDescriptor table), can get a lot of updates, especially
// due to schema updates during an upgrade. If the table was not checkpointed and trimmed on the Source,
// no de-duplication on these updates will occur. As a result, the transaction size can be large.
// Although it is within the maxWriteSize limit, deserializing these entries to read the table can cause an
// OOM on applications running with a small memory footprint. So for such tables, introduce an
// additional limit of max number of entries(50 by default) applied in a single transaction. This
// algorithm is in line with the limits imposed in Compaction and Restore workflows.
if (bufferSize + smrEntry.getSerializedSize() >
logReplicationMetadataManager.getRuntime().getParameters()
.getMaxWriteSize()) {
logReplicationMetadataManager.getRuntime().getParameters().getMaxWriteSize() ||
maxEntriesLimitReached(streamId, buffer)) {
try (TxnContext txnContext = logReplicationMetadataManager.getTxnContext()) {
updateLog(txnContext, buffer, streamId);
CorfuStoreMetadata.Timestamp ts = txnContext.commit();
Expand Down Expand Up @@ -363,6 +372,10 @@ private void applyShadowStream(UUID streamId, long snapshot) {
smrEntries.size(), numBatches);
}

private boolean maxEntriesLimitReached(UUID streamId, List<SMREntry> buffer) {
return (streamId.equals(PROTOBUF_TABLE_ID) && buffer.size() == config.getMaxSnapshotEntriesApplied());
}

/**
* Read from shadowStream and append/apply to the actual stream
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,19 @@
import org.corfudb.protocols.wireprotocol.ILogData;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.CorfuStoreMetadata;
import org.corfudb.runtime.CorfuStoreMetadata.ProtobufFileName;
import org.corfudb.runtime.CorfuStoreMetadata.ProtobufFileDescriptor;
import org.corfudb.runtime.CorfuStoreMetadata.TableDescriptors;
import org.corfudb.runtime.CorfuStoreMetadata.TableMetadata;
import org.corfudb.runtime.CorfuStoreMetadata.TableName;
import org.corfudb.runtime.collections.CorfuStreamEntries;
import org.corfudb.runtime.collections.CorfuStreamEntry;
import org.corfudb.runtime.collections.StreamListener;
import org.corfudb.runtime.collections.Table;
import org.corfudb.runtime.collections.TableSchema;
import org.corfudb.runtime.exceptions.StreamingException;
import org.corfudb.runtime.exceptions.TrimmedException;
import org.corfudb.runtime.view.ObjectsView;
import org.corfudb.runtime.view.TableRegistry;

import java.util.ArrayList;
Expand Down Expand Up @@ -76,20 +82,45 @@ public StreamingTask(CorfuRuntime runtime, ExecutorService workerPool, String na
this.listenerId = String.format("listener_%s_%s_%s", listener, namespace, streamTag);
this.listener = listener;
TableRegistry registry = runtime.getTableRegistry();
final UUID streamId = TableRegistry.getStreamIdForStreamTag(namespace, streamTag);
final UUID streamId;
// Federated tables do not have a stream tag, only an option isFederated set to true. Internally,
// their stream tag is constructed when the table is opened. This stream tag is constructed differently than
// other tables. So for subscribers on this tag, do not construct the streamId using the generic algorithm.
if (streamTag.equals(ObjectsView.getLogReplicatorStreamId().toString())) {
streamId = ObjectsView.getLogReplicatorStreamId();
} else {
streamId = TableRegistry.getStreamIdForStreamTag(namespace, streamTag);
}
this.stream = new DeltaStream(runtime.getAddressSpaceView(), streamId, address, bufferSize);
this.tableSchemas = tablesOfInterest
.stream()
.collect(Collectors.toMap(
tName -> CorfuRuntime.getStreamID(TableRegistry.getFullyQualifiedTableName(namespace, tName)),
tName -> {
// The table should be opened with full schema before subscription.
Table<K, V, M> t = registry.getTable(namespace, tName);
if (!t.getStreamTags().contains(streamId)) {
throw new IllegalArgumentException(String.format("Interested table: %s does not " +
// Subscription on special tables(ProtobufDescriptor and Registry tables) is not possible
// with the regular workflow as they are not opened using CorfuStore.openTable().
// However, these tables do have the stream tag for Log Replication and it is useful to
// subscribe to them for testing purposes. The below is special handling to allow for such a
// subscription.
if (streamId.equals(ObjectsView.getLogReplicatorStreamId()) && namespace.equals(
TableRegistry.CORFU_SYSTEM_NAMESPACE) &&
tName.equals(TableRegistry.PROTOBUF_DESCRIPTOR_TABLE_NAME)) {
return new TableSchema(tName, ProtobufFileName.class, ProtobufFileDescriptor.class,
TableMetadata.class);
} else if (streamId.equals(ObjectsView.getLogReplicatorStreamId()) && namespace.equals(
TableRegistry.CORFU_SYSTEM_NAMESPACE) &&
tName.equals(TableRegistry.REGISTRY_TABLE_NAME)) {
return new TableSchema(tName, TableName.class, TableDescriptors.class,
TableMetadata.class);
} else {
// The table should be opened with full schema before subscription.
Table<K, V, M> t = registry.getTable(namespace, tName);
if (!t.getStreamTags().contains(streamId)) {
throw new IllegalArgumentException(String.format("Interested table: %s does not " +
"have specified stream tag: %s", t.getFullyQualifiedTableName(), streamTag));
}
return new TableSchema<>(tName, t.getKeyClass(), t.getValueClass(), t.getMetadataClass());
}
return new TableSchema<>(tName, t.getKeyClass(), t.getValueClass(), t.getMetadataClass());
}));
this.status = new AtomicReference<>(StreamStatus.RUNNABLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import static java.lang.Thread.sleep;
import static org.assertj.core.api.Assertions.assertThat;
import static org.corfudb.infrastructure.logreplication.LogReplicationConfig.DEFAULT_MAX_SNAPSHOT_ENTRIES_APPLIED;
import static org.corfudb.infrastructure.logreplication.LogReplicationConfig.DEFAULT_MAX_NUM_MSG_PER_BATCH;
import static org.corfudb.infrastructure.logreplication.LogReplicationConfig.MAX_CACHE_NUM_ENTRIES;
import static org.corfudb.infrastructure.logreplication.LogReplicationConfig.MAX_DATA_MSG_SIZE_SUPPORTED;
import static org.corfudb.infrastructure.logreplication.LogReplicationConfig.MAX_CACHE_NUM_ENTRIES;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class LogReplicationFSMTest extends AbstractViewTest implements Observer
private static final int CORFU_PORT = 9000;
private static final int TEST_TOPOLOGY_CONFIG_ID = 1;
private static final String TEST_LOCAL_CLUSTER_ID = "local_cluster";
private static final int MAX_SNAPSHOT_ENTRIES_APPLIED = 50;

// This semaphore is used to block until the triggering event causes the transition to a new state
private final Semaphore transitionAvailable = new Semaphore(1, true);
Expand Down Expand Up @@ -473,7 +475,7 @@ private void initLogReplicationFSM(ReaderImplementation readerImpl) {
CorfuRuntime newRT = getNewRuntime(getDefaultNode()).connect();
LogReplicationConfigManager tableManagerPlugin = new LogReplicationConfigManager(newRT);
LogReplicationConfig config = new LogReplicationConfig(tableManagerPlugin, DEFAULT_MAX_NUM_MSG_PER_BATCH,
MAX_DATA_MSG_SIZE_SUPPORTED, MAX_CACHE_NUM_ENTRIES);
MAX_DATA_MSG_SIZE_SUPPORTED, MAX_CACHE_NUM_ENTRIES, DEFAULT_MAX_SNAPSHOT_ENTRIES_APPLIED);

switch(readerImpl) {
case EMPTY:
Expand Down
9 changes: 8 additions & 1 deletion test/src/test/java/org/corfudb/integration/AbstractIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,15 @@ public Process runReplicationServer(int port, String pluginConfigFilePath, int l
}

public Process runReplicationServerCustomMaxWriteSize(int port,
String pluginConfigFilePath, int maxWriteSize) throws IOException {
String pluginConfigFilePath, int maxWriteSize,
int maxEntriesApplied) throws IOException {
return new CorfuReplicationServerRunner()
.setHost(DEFAULT_HOST)
.setPort(port)
.setPluginConfigFilePath(pluginConfigFilePath)
.setMsg_size(MSG_SIZE)
.setMaxWriteSize(maxWriteSize)
.setMaxSnapshotEntriesApplied(maxEntriesApplied)
.runServer();
}

Expand Down Expand Up @@ -637,6 +639,7 @@ public static class CorfuReplicationServerRunner {
private int msg_size = 0;
private Integer lockLeaseDuration;
private int maxWriteSize = 0;
private int maxSnapshotEntriesApplied;

/**
* Create a command line string according to the properties set for a Corfu Server
Expand Down Expand Up @@ -690,6 +693,10 @@ public String getOptionsString() {
command.append(" --max-write-size=").append(maxWriteSize);
}

if (maxSnapshotEntriesApplied != 0) {
command.append(" --max-snapshot-entries-applied=").append(maxSnapshotEntriesApplied);
}

command.append(" -d ").append(logLevel).append(" ")
.append(port);

Expand Down

0 comments on commit 00e5afa

Please sign in to comment.