Skip to content

Commit

Permalink
Reset trimmed table on reopen to avoid throwing TrimmedException (#3588)
Browse files Browse the repository at this point in the history
* Reset trimmed table on reopen to avoid throwing TrimmedException

On upgrade scenarios, it is possible that some tables are new to a version
and when they are accessed in some corner cases, a TrimmedException is thrown.
This fix on reopening a table, mimicks the compactor by appending a HOLE to
the empty stream, thus avoiding the TrimmedException.

* Do resetTrimmedTable within a transaction
  • Loading branch information
SravanthiAshokKumar committed Apr 20, 2023
1 parent 16175ee commit 38847dd
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private Set<UUID> discoverTableTags(UUID stream) {
return tags;
}

private Token forceNoOpEntry() {
public Token forceNoOpEntry() {
Set<UUID> streamsToAdvance = new HashSet<>();
if (serializer instanceof DynamicProtobufSerializer) {
// One of the use-cases of a no-op entry written by the checkpointer is to always - even for empty streams -
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.google.protobuf.ProtocolStringList;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.corfudb.protocols.wireprotocol.StreamAddressRange;
import org.corfudb.runtime.CheckpointWriter;
import org.corfudb.runtime.CorfuOptions;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.CorfuStoreMetadata.TableDescriptors;
Expand All @@ -17,6 +19,7 @@
import org.corfudb.runtime.CorfuStoreMetadata.ProtobufFileName;
import org.corfudb.runtime.CorfuStoreMetadata.ProtobufFileDescriptor;
import org.corfudb.runtime.collections.CorfuRecord;
import org.corfudb.runtime.collections.ICorfuTable;
import org.corfudb.runtime.collections.PersistedStreamingMap;
import org.corfudb.runtime.collections.PersistentCorfuTable;
import org.corfudb.runtime.collections.StreamingMap;
Expand All @@ -29,9 +32,11 @@
import org.corfudb.runtime.object.transactions.TransactionType;
import org.corfudb.runtime.object.transactions.TransactionalContext;
import org.corfudb.runtime.view.ObjectsView.StreamTagInfo;
import org.corfudb.runtime.view.stream.StreamAddressSpace;
import org.corfudb.util.GitRepositoryState;
import org.corfudb.util.serializer.ISerializer;
import org.corfudb.util.serializer.ProtobufSerializer;
import org.corfudb.util.serializer.Serializers;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -269,6 +274,19 @@ void registerTable(@Nonnull String namespace,
CorfuRecord<TableDescriptors, TableMetadata> newRecord =
new CorfuRecord<>(tableDescriptors, tableMetadata);
boolean protoFileChanged = tryUpdateTableSchemas(allDescriptors);

String fullyQualifiedTableName = getFullyQualifiedTableName(namespace, tableName);
UUID streamId = CorfuRuntime.getStreamID(fullyQualifiedTableName);

if (oldRecord == null) {
StreamAddressSpace streamAddressSpace = this.runtime.getSequencerView()
.getStreamAddressSpace(new StreamAddressRange(streamId, Address.MAX, Address.NON_ADDRESS));
if (streamAddressSpace.size() == 0
&& streamAddressSpace.getTrimMark() != Address.NON_ADDRESS) {
log.info("Found trimmed table that is re-opened. Reset table {}", fullyQualifiedTableName);
resetTrimmedTable(fullyQualifiedTableName);
}
}
if (oldRecord == null || protoFileChanged || tableRecordChanged(oldRecord, newRecord)) {
this.registryTable.insert(tableNameKey, newRecord);
}
Expand All @@ -287,6 +305,40 @@ void registerTable(@Nonnull String namespace,
}
}

/**
* This method in invoked from registerTable when there's a trimmed table which is reopened.
* To reset the table, we write a HOLE to the stream and a checkpoint. We write this checkpoint
* with the registerTable transaction's snapshot. Now, when 2 processes attempt to reopen a trimmed
* table, one of them fails with CONFLICT on registerTable - but both of them might end up writing the
* checkpoints. Since the checkpoints are both written only at or before the successful registerTable
* transaction snapshot, the table's state remains as intended.
*
* @param fullyQualifiedTableName get the fullyQualified name of a table
*/
private void resetTrimmedTable(String fullyQualifiedTableName) {
if (!TransactionalContext.isInTransaction()) {
throw new IllegalStateException(
"resetTrimmedTable cannot be invoked outside a transaction.");
}

UUID streamId = CorfuRuntime.getStreamID(fullyQualifiedTableName);
PersistentCorfuTable<TableName, CorfuRecord<TableDescriptors, TableMetadata>> corfuTable =
this.runtime.getObjectsView().build()
.setTypeToken(new TypeToken<PersistentCorfuTable<TableName, CorfuRecord<TableDescriptors, TableMetadata>>>() {

})
.setStreamName(fullyQualifiedTableName)
.setSerializer(Serializers.JSON)
.open();

CheckpointWriter<ICorfuTable<?,?>> cpw =
new CheckpointWriter<>(runtime, streamId, "resetTrimmedTable", corfuTable);
cpw.forceNoOpEntry();
cpw.startCheckpoint(TransactionalContext.getCurrentContext().getSnapshotTimestamp());
cpw.finishCheckpoint();
log.info("Finished resetting trimmed table {}", fullyQualifiedTableName);
}

/**
* A helper method for comparing CorfuRecord. If the new record is different from old record, the entry
* of registry table should be updated accordingly.
Expand Down Expand Up @@ -461,6 +513,9 @@ public static String getTypeUrl(Descriptor descriptor) {
* @return Fully qualified table name.
*/
public static String getFullyQualifiedTableName(String namespace, String tableName) {
if (namespace == null || namespace.isEmpty()) {
return tableName;
}
return namespace + "$" + tableName;
}

Expand Down Expand Up @@ -601,7 +656,6 @@ Table<K, V, M> openTable(@Nonnull final String namespace,
return table;
}


/**
* Get an already opened table. Fetches the table from the cache given only the namespace and table name.
* Throws a NoSuchElementException if table is not previously opened and not present in cache.
Expand Down
7 changes: 6 additions & 1 deletion test/src/test/java/org/corfudb/integration/AbstractIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ public static class CorfuServerRunner {
private boolean disableHost = false;
private String networkInterface = null;
private NetworkInterfaceVersion networkInterfaceVersion = null;
private boolean disableLogUnitServerCache = false;


/**
Expand All @@ -534,8 +535,12 @@ public static class CorfuServerRunner {
public String getOptionsString() {
StringBuilder command = new StringBuilder();

if (disableLogUnitServerCache) {
command.append("-c ").append(0);
}

if (!disableHost) {
command.append("-a ").append(host);
command.append(" -a ").append(host);
}

if (logPath != null) {
Expand Down
196 changes: 196 additions & 0 deletions test/src/test/java/org/corfudb/integration/CorfuStoreIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.Iterables;
import com.google.common.reflect.TypeToken;
import com.google.protobuf.DynamicMessage;
import lombok.extern.slf4j.Slf4j;

import org.corfudb.infrastructure.CompactorLeaderServices;
import org.corfudb.infrastructure.LivenessValidator;
Expand Down Expand Up @@ -30,6 +31,7 @@
import org.corfudb.runtime.collections.TableOptions;
import org.corfudb.runtime.collections.TxnContext;
import org.corfudb.runtime.object.ICorfuVersionPolicy;
import org.corfudb.runtime.object.transactions.TransactionType;
import org.corfudb.runtime.proto.RpcCommon;
import org.corfudb.runtime.view.ObjectOpenOption;
import org.corfudb.runtime.view.ObjectsView;
Expand Down Expand Up @@ -57,6 +59,9 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
Expand All @@ -71,6 +76,7 @@
/**
* Simple test that inserts data into CorfuStore via a separate server process
*/
@Slf4j
public class CorfuStoreIT extends AbstractIT {

private static String corfuSingleNodeHost;
Expand All @@ -89,6 +95,16 @@ private Process runSinglePersistentServer(String host, int port) throws IOExcept
.runServer();
}

private Process runSinglePersistentServer(String host, int port, boolean disableLogUnitServerCache) throws IOException {
return new AbstractIT.CorfuServerRunner()
.setHost(host)
.setPort(port)
.setLogPath(getCorfuServerLogPath(host, port))
.setSingle(true)
.setDisableLogUnitServerCache(disableLogUnitServerCache)
.runServer();
}

/**
* Load properties for a single node corfu server before each test
*/
Expand Down Expand Up @@ -817,4 +833,184 @@ DEFAULT_ENDPOINT, cpStoreSpy, new LivenessValidator(
cpRuntime.shutdown();
shutdownCorfuServer(p);
}

@Test
public void resetTrimmedTableTest() throws Exception {
final Process corfuServer = runSinglePersistentServer(corfuSingleNodeHost, corfuStringNodePort, true);
runtime = createRuntime(singleNodeEndpoint);
CorfuStore corfuStore = new CorfuStore(runtime);

final String namespace = "test-namespace";
final String tableNameA = "test-table-a";
final String tableNameB = "test-table-b";

// Create & Register the table.
// This is required to initialize the table for the current corfu client.
Table<Uuid, Uuid, Uuid> tableA = corfuStore.openTable(
namespace,
tableNameA,
Uuid.class,
Uuid.class,
null,
TableOptions.builder().build());
Table<Uuid, Uuid, Uuid> tableB = corfuStore.openTable(
namespace,
tableNameB,
Uuid.class,
Uuid.class,
null,
TableOptions.builder().build());

Uuid key1 = Uuid.newBuilder().setLsb(1L).setMsb(1L).build();
Uuid key2 = Uuid.newBuilder().setLsb(2L).setMsb(2L).build();

TxnContext tx = corfuStore.txn(namespace);
tx.putRecord(tableA, key1, key1, null);
tx.commit();

tx = corfuStore.txn(namespace);
tx.putRecord(tableB, key2, key2, null);
tx.commit();

// Unregister table B and invoke checkpointer
unregisterTable(namespace, tableNameB);
MultiCheckpointWriter<PersistentCorfuTable<?, ?>> mcw = new MultiCheckpointWriter<>();
PersistentCorfuTable<Uuid, CorfuRecord<Uuid, Uuid>> corfuTableA =
createCorfuTable(runtime, tableA.getFullyQualifiedTableName());
mcw.addMap(corfuTableA); //Checkpoint only table A
mcw.addMap(runtime.getTableRegistry().getRegistryTable());
mcw.addMap(runtime.getTableRegistry().getProtobufDescriptorTable());
Token trimPoint = mcw.appendCheckpoints(runtime, "checkpointer");
runtime.getAddressSpaceView().prefixTrim(trimPoint);

//Re-open table B
corfuStore.openTable(
namespace,
tableNameB,
Uuid.class,
Uuid.class,
null,
TableOptions.builder().build());

tx = corfuStore.txn(namespace);
assertThat(tx.getRecord(tableNameA, key1).getPayload()).isEqualTo(key1);
assertThat(tx.getRecord(tableNameB, key2).getPayload()).isNull();
tx.commit();
runtime.shutdown();
shutdownCorfuServer(corfuServer);
}

@Test
public void resetTrimmedTableInParallelTest() throws Exception {
final Process corfuServer = runSinglePersistentServer(corfuSingleNodeHost, corfuStringNodePort, true);
runtime = createRuntime(singleNodeEndpoint);
CorfuStore corfuStore = new CorfuStore(runtime);

final String namespace = "test-namespace";
final String tableNameA = "test-table-a";
final String tableNameB = "test-table-b";

// Create & Register the table.
// This is required to initialize the table for the current corfu client.
Table<Uuid, Uuid, Uuid> tableA = corfuStore.openTable(
namespace,
tableNameA,
Uuid.class,
Uuid.class,
null,
TableOptions.builder().build());
Table<Uuid, Uuid, Uuid> tableB = corfuStore.openTable(
namespace,
tableNameB,
Uuid.class,
Uuid.class,
null,
TableOptions.builder().build());

Uuid key1 = Uuid.newBuilder().setLsb(1L).setMsb(1L).build();
Uuid key2 = Uuid.newBuilder().setLsb(2L).setMsb(2L).build();

TxnContext tx = corfuStore.txn(namespace);
tx.putRecord(tableA, key1, key1, null);
tx.commit();

tx = corfuStore.txn(namespace);
tx.putRecord(tableB, key2, key2, null);
tx.commit();

// Unregister table B and invoke checkpointer
unregisterTable(namespace, tableNameB);
MultiCheckpointWriter<PersistentCorfuTable<?, ?>> mcw = new MultiCheckpointWriter<>();
PersistentCorfuTable<Uuid, CorfuRecord<Uuid, Uuid>> corfuTableA =
createCorfuTable(runtime, tableA.getFullyQualifiedTableName());
mcw.addMap(corfuTableA); //Checkpoint only table A
mcw.addMap(runtime.getTableRegistry().getRegistryTable());
mcw.addMap(runtime.getTableRegistry().getProtobufDescriptorTable());
Token trimPoint = mcw.appendCheckpoints(runtime, "checkpointer");
runtime.getAddressSpaceView().prefixTrim(trimPoint);

CorfuRuntime runtime2 = createRuntime(singleNodeEndpoint);
CorfuStore corfuStore2 = new CorfuStore(runtime2);

ExecutorService scheduler = Executors.newFixedThreadPool(2);
scheduler.execute(() -> {
try {
corfuStore.openTable(
namespace,
tableNameB,
Uuid.class,
Uuid.class,
null,
TableOptions.builder().build());
TxnContext tx1 = corfuStore.txn(namespace);
tx1.putRecord(tableB, key2, key2, null);
tx1.commit();
} catch (Exception e) {
log.error("Exception thrown while opening table");
}
});
Uuid key3 = Uuid.newBuilder().setLsb(3L).setMsb(3L).build();
scheduler.execute(() -> {
try {
corfuStore2.openTable(
namespace,
tableNameB,
Uuid.class,
Uuid.class,
null,
TableOptions.builder().build());
TxnContext tx2 = corfuStore.txn(namespace);
tx2.putRecord(tableB, key3, key3, null);
tx2.commit();
} catch (Exception e) {
log.error("Exception thrown while opening table");
}
});

scheduler.awaitTermination(10, TimeUnit.SECONDS);

tx = corfuStore.txn(namespace);
assertThat(tx.getRecord(tableNameA, key1).getPayload()).isEqualTo(key1);
assertThat(tx.getRecord(tableNameB, key2).getPayload()).isEqualTo(key2);
assertThat(tx.getRecord(tableNameB, key3).getPayload()).isEqualTo(key3);
tx.commit();
runtime.shutdown();
runtime2.shutdown();
shutdownCorfuServer(corfuServer);
}

private void unregisterTable(String namespace, String tableName) {
this.runtime.getObjectsView().TXBuild()
.type(TransactionType.WRITE_AFTER_WRITE)
.build()
.begin();

CorfuStoreMetadata.TableName tableNameKey = CorfuStoreMetadata.TableName.newBuilder()
.setNamespace(namespace)
.setTableName(tableName)
.build();
runtime.getTableRegistry().getRegistryTable().delete(tableNameKey);

this.runtime.getObjectsView().TXEnd();
}
}

0 comments on commit 38847dd

Please sign in to comment.