Skip to content

Commit

Permalink
expose addTypeToMap for rolling upgrades (#3601)
Browse files Browse the repository at this point in the history
This allows new code to open old schemas without
actually registering the old types.
  • Loading branch information
hisundar committed May 3, 2023
1 parent ff43bf4 commit 92fb6a2
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,15 @@ public static UUID getStreamIdForStreamTag(String namespace, String streamTag) {

/**
* Adds the schema to the class map to enable serialization of this table data.
* This method is exposed as public purely for those tables whose value schema
* has changed under the hood from one type to another without changing the table name.
* So those tables can be accessed without hitting a serialization exception once the
* old type is added to the Serializer's known type map.
*
* @param msg Default message of this protobuf message.
* @param <T> Type of message.
*/
private <T extends Message> void addTypeToClassMap(T msg) {
public <T extends Message> void addTypeToClassMap(T msg) {
String typeUrl = getTypeUrl(msg.getDescriptorForType());
// Register the schemas to schema table.
((ProtobufSerializer)runtime.getSerializers().getSerializer(ProtobufSerializer.PROTOBUF_SERIALIZER_CODE))
Expand Down
128 changes: 122 additions & 6 deletions test/src/test/java/org/corfudb/integration/CorfuStoreIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import org.corfudb.protocols.wireprotocol.IMetadata;
import org.corfudb.protocols.wireprotocol.Token;
import org.corfudb.runtime.CompactorMetadataTables;
import org.corfudb.runtime.CorfuOptions;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.CorfuStoreMetadata;
import org.corfudb.runtime.CorfuStoreMetadata.Timestamp;
import org.corfudb.runtime.ExampleSchemas;
import org.corfudb.runtime.MultiCheckpointWriter;

import org.corfudb.runtime.collections.CorfuDynamicKey;
Expand All @@ -29,7 +31,10 @@
import org.corfudb.runtime.collections.StreamingMap;
import org.corfudb.runtime.collections.Table;
import org.corfudb.runtime.collections.TableOptions;
import org.corfudb.runtime.collections.TableParameters;
import org.corfudb.runtime.collections.TxnContext;
import org.corfudb.runtime.exceptions.AbortCause;
import org.corfudb.runtime.exceptions.TransactionAbortedException;
import org.corfudb.runtime.object.ICorfuVersionPolicy;
import org.corfudb.runtime.object.transactions.TransactionType;
import org.corfudb.runtime.proto.RpcCommon;
Expand All @@ -52,6 +57,7 @@
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -68,7 +74,9 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.corfudb.runtime.view.ObjectsView.LOG_REPLICATOR_STREAM_INFO;
import static org.corfudb.runtime.view.TableRegistry.CORFU_SYSTEM_NAMESPACE;
import static org.corfudb.runtime.view.TableRegistry.getFullyQualifiedTableName;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -120,7 +128,7 @@ public void loadProperties() {
/**
* This test is divided into 3 phases.
* Phase 1: Writes data to CorfuStore in a Table using the transaction builder.
* Phase 2: Using DynamicMessages, we try to read and edit the message. The lsb in the metadata is putd.
* Phase 2: Using DynamicMessages, we try to read and edit the message.
* Phase 3: Using the corfuStore the message is read back to ensure, the schema wasn't altered and the
* serialization isn't broken. The 'lsb' value of the metadata is asserted.
*/
Expand Down Expand Up @@ -175,7 +183,7 @@ public void readDataWithDynamicMessages() throws Exception {
runtime.getSerializers().registerSerializer(dynamicProtobufSerializer);

PersistentCorfuTable<CorfuDynamicKey, CorfuDynamicRecord> corfuTable =
createCorfuTable(runtime, TableRegistry.getFullyQualifiedTableName(namespace, tableName), dynamicProtobufSerializer);
createCorfuTable(runtime, getFullyQualifiedTableName(namespace, tableName), dynamicProtobufSerializer);

for (Iterator<Map.Entry<CorfuDynamicKey, CorfuDynamicRecord>> it = corfuTable.entryStream().iterator(); it.hasNext(); ) {
Map.Entry<CorfuDynamicKey, CorfuDynamicRecord> entry = it.next();
Expand Down Expand Up @@ -204,15 +212,15 @@ public void readDataWithDynamicMessages() throws Exception {
PersistentCorfuTable<CorfuDynamicKey, CorfuDynamicRecord> tableRegistry = runtime.getObjectsView().build()
.setTypeToken(new TypeToken<PersistentCorfuTable<CorfuDynamicKey, CorfuDynamicRecord>>() {
})
.setStreamName(TableRegistry.getFullyQualifiedTableName(CORFU_SYSTEM_NAMESPACE,
.setStreamName(getFullyQualifiedTableName(CORFU_SYSTEM_NAMESPACE,
TableRegistry.REGISTRY_TABLE_NAME))
.setSerializer(dynamicProtobufSerializer)
.addOpenOption(ObjectOpenOption.NO_CACHE)
.open();
PersistentCorfuTable<CorfuDynamicKey, CorfuDynamicRecord> descriptorTable = runtime.getObjectsView().build()
.setTypeToken(new TypeToken<PersistentCorfuTable<CorfuDynamicKey, CorfuDynamicRecord>>() {
})
.setStreamName(TableRegistry.getFullyQualifiedTableName(CORFU_SYSTEM_NAMESPACE,
.setStreamName(getFullyQualifiedTableName(CORFU_SYSTEM_NAMESPACE,
TableRegistry.PROTOBUF_DESCRIPTOR_TABLE_NAME))
.setSerializer(dynamicProtobufSerializer)
.addOpenOption(ObjectOpenOption.NO_CACHE)
Expand Down Expand Up @@ -287,7 +295,7 @@ public Token checkpointAndTrimCorfuStore(CorfuRuntime runtimeC, boolean skipTrim
runtimeC.getSerializers().registerSerializer(dynamicProtobufSerializer);

for (CorfuStoreMetadata.TableName tableName : tableRegistry.listTables(null)) {
String fullTableName = TableRegistry.getFullyQualifiedTableName(
String fullTableName = getFullyQualifiedTableName(
tableName.getNamespace(), tableName.getTableName()
);
if (tableName.getNamespace().equals(CORFU_SYSTEM_NAMESPACE) &&
Expand Down Expand Up @@ -480,6 +488,114 @@ public void alterTableUsingDropTest() throws Exception {
assertThat(shutdownCorfuServer(corfuServer)).isTrue();
}

/**
* This test verifies that syncing a table with entries different from the schema it was
* opened with does not throw an error if the ProtobufSerializer is made aware of the
* different schema.
* @throws Exception
*/
@Test
public void syncTableWithOldSchemaTest() throws Exception {
Process corfuServer = runSinglePersistentServer(corfuSingleNodeHost, corfuStringNodePort);

// PHASE 1 - Start a Corfu runtime & a CorfuStore instance
runtime = createRuntime(singleNodeEndpoint);

// Creating Corfu Store using a connected corfu client.
CorfuStore corfuStore = new CorfuStore(runtime);

// Define a namespace for the table.
final String someNamespace = "some-namespace";
// Define table name.
final String tableName = "EventInfo";

// Create & Register the table.
// This is required to initialize the table for the current corfu client.
Table<Uuid, SampleSchema.EventInfo, ManagedResources> table = corfuStore.openTable(
someNamespace,
tableName,
Uuid.class,
SampleSchema.EventInfo.class,
ManagedResources.class,
// TableOptions includes option to choose - Memory/Disk based corfu table.
TableOptions.builder().build());

Uuid key = Uuid.newBuilder().setLsb(0L).setMsb(0L).build();

/*
* HACK - PLEASE DO NOT TRY THIS AT HOME -
* Create a fake Table bypassing the TableRegistry but with the same table name to insert
* records of the wrong type (ContactBookId) into a table which was opened with a different schema
* (EventInfo)
*/
ExampleSchemas.ContactBookId defaultValueMessage = (ExampleSchemas.ContactBookId) ExampleSchemas
.ContactBookId.class.getMethod("getDefaultInstance").invoke(null);
ManagedResources defaultMetadataMessage = (ManagedResources) ManagedResources
.class.getMethod("getDefaultInstance").invoke(null);

Table<Uuid, ExampleSchemas.ContactBookId, ManagedResources> badTable = new Table<Uuid, ExampleSchemas.ContactBookId, ManagedResources>(
TableParameters.<Uuid, ExampleSchemas.ContactBookId, ManagedResources>builder()
.namespace(someNamespace)
.fullyQualifiedTableName(getFullyQualifiedTableName(someNamespace, tableName))
.kClass(Uuid.class)
.vClass(ExampleSchemas.ContactBookId.class)
.mClass(ManagedResources.class)
.valueSchema(defaultValueMessage)
.metadataSchema(defaultMetadataMessage)
.schemaOptions(CorfuOptions.SchemaOptions.getDefaultInstance())
.secondaryIndexesDisabled(true)
.build(),
runtime,
runtime.getSerializers().getSerializer(ProtobufSerializer.PROTOBUF_SERIALIZER_CODE),
null,
new HashSet<>(Collections.singletonList(LOG_REPLICATOR_STREAM_INFO.getStreamId())));

// Now this badTable is completely hidden from both the TableRegistry and the Serializer!
TxnContext tx = corfuStore.txn(someNamespace);
long timestamp = System.currentTimeMillis();
ExampleSchemas.ContactBookId badSchema = ExampleSchemas.ContactBookId.newBuilder()
.setName(tableName).build();
tx.putRecord(badTable, key, badSchema,
ManagedResources.newBuilder()
.setCreateTimestamp(timestamp).build());
tx.commit();

runtime.shutdown();

runtime = createRuntime(singleNodeEndpoint);
corfuStore = new CorfuStore(runtime);

// Re-open the table with the same schema and test if it hits a serializer exception
// as it would when a new code is trying to open something from an older instance
Table<Uuid, SampleSchema.EventInfo, ManagedResources> tableV2 = corfuStore.openTable(
someNamespace,
tableName,
Uuid.class,
SampleSchema.EventInfo.class,
ManagedResources.class,
// TableOptions includes option to choose - Memory/Disk based corfu table.
TableOptions.builder().build());

boolean didAssertFire = false;
tx = corfuStore.txn(someNamespace);
try {
tx.clear(tableV2);
tx.commit();
} catch (TransactionAbortedException e) {
assertThat(e.getAbortCause()).isEqualTo(AbortCause.UNDEFINED);
didAssertFire = true;
}
assertThat(didAssertFire).isTrue();
// Once the unknown type is added to the map it should be ok to clear the table
runtime.getTableRegistry().addTypeToClassMap(ExampleSchemas.ContactBookId.getDefaultInstance());

tx = corfuStore.txn(someNamespace);
tx.clear(tableV2);
tx.commit();

assertThat(shutdownCorfuServer(corfuServer)).isTrue();
}

/**
* Test that tx.commit()---in the context of read-only transactions---returns the sequence
* for the max tail of all streams accessed in the transaction. For the following scenarios:
Expand Down Expand Up @@ -723,7 +839,7 @@ public void testLogDataPersistedMetadata() throws Exception {
Map<UUID, Long> backpointerMap = (Map<UUID, Long>) ld.getMetadataMap().get(IMetadata.LogUnitMetadataType.BACKPOINTER_MAP);
Set<UUID> expectedTableUpdates = new HashSet<>();
// Table itself (tableA)
expectedTableUpdates.add(CorfuRuntime.getStreamID(TableRegistry.getFullyQualifiedTableName(namespace, tableName)));
expectedTableUpdates.add(CorfuRuntime.getStreamID(getFullyQualifiedTableName(namespace, tableName)));
// Stream tags for tableA
expectedTableUpdates.add(TableRegistry.getStreamIdForStreamTag(namespace, "sample_streamer_1"));
expectedTableUpdates.add(TableRegistry.getStreamIdForStreamTag(namespace, "sample_streamer_2"));
Expand Down

0 comments on commit 92fb6a2

Please sign in to comment.