Skip to content

Commit

Permalink
Allow clients to disable secondary indexes (#3537)
Browse files Browse the repository at this point in the history
In some cases, a client might choose to load a table without a secondary
index support in order to reduce memory usage. This can be done by
providing an appropriate flag in TableOptions.
  • Loading branch information
vjeko committed Mar 8, 2023
1 parent 4974ba9 commit 555eda3
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 27 deletions.
21 changes: 13 additions & 8 deletions runtime/src/main/java/org/corfudb/runtime/collections/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -241,23 +243,26 @@ public void resetTableData(CorfuRuntime runtime) {

private void initializeCorfuTable(CorfuRuntime runtime) {
SMRObject.Builder<? extends ICorfuTable<K, CorfuRecord<V, M>>> builder;
final Deque<Object> arguments = new ArrayDeque<>();

if (!tableParameters.isSecondaryIndexesDisabled()) {
arguments.add(new ProtobufIndexer(
tableParameters.getValueSchema(),
tableParameters.getSchemaOptions()));
}

if (streamingMapSupplier == null) {
// PersistentCorfuTable
builder = runtime.getObjectsView().build()
.setTypeToken(new TypeToken<PersistentCorfuTable<K, CorfuRecord<V, M>>>() {})
.setArguments(new ProtobufIndexer(
tableParameters.getValueSchema(),
tableParameters.getSchemaOptions()));
.setArguments(arguments.toArray());
} else {
arguments.add(streamingMapSupplier);
arguments.add(ICorfuVersionPolicy.MONOTONIC);
// Disk-backed CorfuTable
builder = runtime.getObjectsView().build()
.setTypeToken(new TypeToken<CorfuTable<K, CorfuRecord<V, M>>>() {})
.setArguments(new ProtobufIndexer(
tableParameters.getValueSchema(),
tableParameters.getSchemaOptions()),
streamingMapSupplier,
ICorfuVersionPolicy.MONOTONIC);
.setArguments(arguments.toArray());
}

this.corfuTable = builder.setStreamName(this.fullyQualifiedTableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.Message;
import lombok.Builder;
import lombok.Getter;
import org.corfudb.runtime.CorfuOptions;

import javax.annotation.Nonnull;
Expand All @@ -12,23 +13,24 @@
/**
* Created by zlokhandwala on 2019-08-09.
*/
@Builder
@Builder(toBuilder = true)
public class TableOptions {

/**
* If this path is set, {@link CorfuStore} will utilize disk-backed {@link CorfuTable}.
*/
private final Path persistentDataPath;

@Getter
@Builder.Default
private final boolean secondaryIndexesDisabled = false;

/**
* Capture options like stream tags, backup restore, log replication at Table level
*/
@Getter
private final CorfuOptions.SchemaOptions schemaOptions;

public CorfuOptions.SchemaOptions getSchemaOptions() {
return schemaOptions;
}

public Optional<Path> getPersistentDataPath() {
return Optional.ofNullable(persistentDataPath);
}
Expand All @@ -43,19 +45,17 @@ public Optional<Path> getPersistentDataPath() {
public static <V extends Message> TableOptions fromProtoSchema(@Nonnull Class<V> vClass,
TableOptions tableOptions)
throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
TableOptions.TableOptionsBuilder tableOptionsBuilder =
new TableOptions.TableOptionsBuilder();
if (vClass != null) { // some test cases pass vClass as null to verify behavior
V defaultValueMessage = (V) vClass.getMethod("getDefaultInstance").invoke(null);
tableOptionsBuilder.schemaOptions(defaultValueMessage
.getDescriptorForType()
.getOptions()
.getExtension(CorfuOptions.tableSchema));
TableOptions.TableOptionsBuilder tableOptionsBuilder = TableOptions.builder();
if (tableOptions != null) {
tableOptionsBuilder = tableOptions.toBuilder();
}
if (tableOptions != null && tableOptions.getPersistentDataPath().isPresent()) {
tableOptionsBuilder.persistentDataPath((Path) tableOptions.getPersistentDataPath().get());
}
return tableOptionsBuilder.build();

V defaultValueMessage = (V) vClass.getMethod("getDefaultInstance").invoke(null);
return tableOptionsBuilder.schemaOptions(defaultValueMessage
.getDescriptorForType()
.getOptions()
.getExtension(CorfuOptions.tableSchema))
.build();
}

public static <V extends Message> TableOptions fromProtoSchema(@Nonnull Class<V> vClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,7 @@ public class TableParameters<K extends Message, V extends Message, M extends Mes
// like stream_tags, secondary indexes, backup_restore, log replication etc
@Getter
private final CorfuOptions.SchemaOptions schemaOptions;
}

@Getter
private final boolean secondaryIndexesDisabled;
}
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ Table<K, V, M> openTable(@Nonnull final String namespace,
.valueSchema(defaultValueMessage)
.metadataSchema(defaultMetadataMessage)
.schemaOptions(tableSchemaOptions)
.secondaryIndexesDisabled(tableOptions.isSecondaryIndexesDisabled())
.build(),
this.runtime,
this.protobufSerializer,
Expand Down Expand Up @@ -781,6 +782,7 @@ Table<Message, Message, Message> wrapInternalTable(@Nonnull String tableName,
.valueSchema(defaultValueMessage)
.metadataSchema(defaultMetadataMessage)
.schemaOptions(tableSchemaOptions)
.secondaryIndexesDisabled(tableOptions.isSecondaryIndexesDisabled())
.build(),
this.runtime,
this.protobufSerializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.corfudb.runtime.ExampleSchemas.ActivitySchedule;
import org.corfudb.runtime.ExampleSchemas.ManagedMetadata;
import org.corfudb.runtime.ExampleSchemas.Adult;
import org.corfudb.runtime.exceptions.TransactionAbortedException;
import org.corfudb.runtime.proto.RpcCommon.UuidMsg;
import org.corfudb.runtime.view.AbstractViewTest;
import org.junit.Test;
Expand All @@ -20,6 +21,7 @@
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.jupiter.api.Assertions.assertThrows;

/**
Expand All @@ -31,10 +33,56 @@
@SuppressWarnings("checkstyle:magicnumber")
public class CorfuStoreSecondaryIndexTest extends AbstractViewTest {

private static final String ANOTHER_KEY_INDEX = "anotherKey";

private CorfuRuntime getTestRuntime() {
return getDefaultRuntime();
}

@Test
public void testDisableSecondaryIndexes() throws Exception {

// Get a Corfu Runtime instance.
CorfuRuntime corfuRuntime = getTestRuntime();

// Creating Corfu Store using a connected corfu client.
CorfuStoreShim shimStore = new CorfuStoreShim(corfuRuntime);

// Define a namespace for the table.
final String someNamespace = "some-namespace";
// Define table name.
final String tableName = "ManagedMetadata";

{ // Positive test.
Table<UuidMsg, ExampleValue, ManagedMetadata> table = shimStore.openTable(
someNamespace,
tableName,
UuidMsg.class,
ExampleValue.class,
ManagedMetadata.class,
TableOptions.fromProtoSchema(ExampleValue.class)
.toBuilder().secondaryIndexesDisabled(true).build());

try (ManagedTxnContext txn = shimStore.tx(someNamespace)) {
assertThatExceptionOfType(TransactionAbortedException.class)
.isThrownBy(() -> txn.getByIndex(table, ANOTHER_KEY_INDEX, 0L))
.withCauseInstanceOf(IllegalArgumentException.class);
}
}
{ // Negative test.
Table<UuidMsg, ExampleValue, ManagedMetadata> table = shimStore.openTable(
someNamespace,
tableName + tableName,
UuidMsg.class,
ExampleValue.class,
ManagedMetadata.class,
TableOptions.fromProtoSchema(ExampleValue.class));

try (ManagedTxnContext txn = shimStore.tx(someNamespace)) {
txn.getByIndex(table,ANOTHER_KEY_INDEX, 0L);
}
}
}
/**
* Simple example to see how secondary indexes work. Please see example_schemas.proto.
*
Expand Down Expand Up @@ -90,7 +138,7 @@ public void testSecondaryIndexes() throws Exception {

try (ManagedTxnContext readWriteTxn = shimStore.tx(someNamespace)) {
List<CorfuStoreEntry<UuidMsg, ExampleValue, ManagedMetadata>> entries = readWriteTxn
.getByIndex(table, "anotherKey", eventTime);
.getByIndex(table, ANOTHER_KEY_INDEX, eventTime);
assertThat(entries.size()).isEqualTo(1);
assertThat(entries.get(0).getPayload().getPayload()).isEqualTo("abc");
readWriteTxn.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.corfudb.protocols.wireprotocol.Token;
import org.corfudb.runtime.CorfuRuntime;
import org.corfudb.runtime.CorfuStoreMetadata;
import org.corfudb.runtime.ExampleSchemas.ExampleValue;
import org.corfudb.runtime.exceptions.unrecoverable.UnrecoverableCorfuError;
import org.corfudb.runtime.object.ICorfuVersionPolicy;
import org.corfudb.runtime.view.AbstractViewTest;
Expand Down Expand Up @@ -49,6 +50,7 @@
import java.util.stream.LongStream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
Expand All @@ -66,6 +68,7 @@ public class DiskBackedCorfuClientTest extends AbstractViewTest implements AutoC
private static final int STRING_MIN = 5;
private static final int STRING_MAX = 10;

private static final String ANOTHER_KEY_INDEX = "anotherKey";
private static final String nonExistingKey = "nonExistingKey";
private static final String defaultNewMapEntry = "newEntry";

Expand Down Expand Up @@ -537,6 +540,45 @@ Arbitrary<EventInfo> eventInfo() {
.build());
}

@Property(tries = NUM_OF_TRIES)
void disableSecondaryIndexes(
@ForAll @StringLength(min = STRING_MIN, max = STRING_MAX) @AlphaChars String namespace,
@ForAll @StringLength(min = STRING_MIN, max = STRING_MAX) @AlphaChars String tableName)
throws Exception {
resetTests();

// Creating Corfu Store using a connected corfu client.
CorfuStore corfuStore = new CorfuStore(getDefaultRuntime());

{ // Positive test.
final Table<Uuid, ExampleValue, SampleSchema.ManagedResources> table =
corfuStore.openTable(namespace, tableName,
Uuid.class, ExampleValue.class,
SampleSchema.ManagedResources.class,
// TableOptions includes option to choose - Memory/Disk based corfu table.
TableOptions.fromProtoSchema(ExampleValue.class).toBuilder()
.persistentDataPath(Paths.get(diskBackedDirectory, tableName))
.secondaryIndexesDisabled(true).build());

assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> table.getByIndex(ANOTHER_KEY_INDEX, 0L))
.withMessage("Secondary Index anotherKey is not defined.");
}
{ // Negative test.
final Table<Uuid, ExampleValue, SampleSchema.ManagedResources> table1 =
corfuStore.openTable(namespace, tableName,
Uuid.class, ExampleValue.class,
SampleSchema.ManagedResources.class,
// TableOptions includes option to choose - Memory/Disk based corfu table.
TableOptions.fromProtoSchema(ExampleValue.class).toBuilder()
.persistentDataPath(Paths.get(diskBackedDirectory, tableName))
.build());

// Negative test. No throw.
table1.getByIndex(ANOTHER_KEY_INDEX, 0L);
}
}

/**
* Check {@link PersistedStreamingMap} integration with {@link CorfuStore}.
*/
Expand Down

0 comments on commit 555eda3

Please sign in to comment.