Skip to content

Commit

Permalink
New LR SchemaOption for logical_group, client_name (#3501)
Browse files Browse the repository at this point in the history
This is to support the logical group replication usecase
where multiple tables need to be tagged to be replicated
over to one or more remote destinations.
  • Loading branch information
hisundar committed Jan 28, 2023
1 parent 9fa45fa commit 71cb842
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 1 deletion.
12 changes: 11 additions & 1 deletion runtime/proto/corfu_options.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ message SchemaOptions {
repeated string stream_tag = 5;
// Nested Secondary Key (repeated field)
repeated SecondaryIndex secondary_key = 6;
// The combination of client and logical_group name to indicate how data is to be replicated.
optional ReplicationLogicalGroup replication_group = 7;
}

message ReplicationLogicalGroup {
// Logical group name that helps club a number of tables into one group for Log Replication.
required string logical_group = 1;
// Client Name refers to the name of this replicated table's owner
// It can be used on the sink side of replication to apply or consume the incoming replicated updates.
required string client_name = 2;
}

message SecondaryIndex {
Expand All @@ -38,4 +48,4 @@ extend google.protobuf.FieldOptions {
extend google.protobuf.MessageOptions {
// 1039 is in the extendable range in the descriptor.proto
optional SchemaOptions table_schema = 1039;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,8 @@ record = corfuRuntime.getTableRegistry().getRegistryTable().get(tableNameKey);
assertThat(options.getStreamTagCount()).isEqualTo(2);
assertThat(options.getStreamTag(0)).isEqualTo("sample_streamer_2");
assertThat(options.getStreamTag(1)).isEqualTo("sample_streamer_3");
assertThat(options.getReplicationGroup().getLogicalGroup()).isEqualTo("group1");
assertThat(options.getReplicationGroup().getClientName()).isEqualTo("logical_group_consumer");

// Open the same table again with different schema options, verify that registry table
// gets updated
Expand Down
1 change: 1 addition & 0 deletions test/src/test/resources/proto/sample_schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ message SampleTableBMsg {
option (org.corfudb.runtime.table_schema).stream_tag = "sample_streamer_3";
option (org.corfudb.runtime.table_schema).requires_backup_support = false;
option (org.corfudb.runtime.table_schema).is_federated = true;
option (org.corfudb.runtime.table_schema).replication_group = { logical_group: "group1" client_name: "logical_group_consumer"};

optional string payload = 1;
}
Expand Down

0 comments on commit 71cb842

Please sign in to comment.