Skip to content

Commit

Permalink
feat: allow for unknown values in change streams (#17655)
Browse files Browse the repository at this point in the history
Allows for unknown values to be given in the mod type and value capture
type. This way, customers won't have to force update the connector when
the backend adds a new value here.

Co-authored-by: Pablo <pabloem@users.noreply.github.com>
  • Loading branch information
thiagotnunes and pabloem committed Jun 1, 2022
1 parent 55b378d commit 23aeca4
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ private DataChangeRecord toDataChangeRecord(
.map(this::columnTypeFrom)
.collect(Collectors.toList()),
row.getStructList(MODS_COLUMN).stream().map(this::modFrom).collect(Collectors.toList()),
ModType.valueOf(row.getString(MOD_TYPE_COLUMN)),
ValueCaptureType.valueOf(row.getString(VALUE_CAPTURE_TYPE_COLUMN)),
modTypeFrom(row.getString(MOD_TYPE_COLUMN)),
valueCaptureTypeFrom(row.getString(VALUE_CAPTURE_TYPE_COLUMN)),
row.getLong(NUMBER_OF_RECORDS_IN_TRANSACTION_COLUMN),
row.getLong(NUMBER_OF_PARTITIONS_IN_TRANSACTION_COLUMN),
changeStreamRecordMetadataFrom(partition, commitTimestamp, resultSetMetadata));
Expand Down Expand Up @@ -299,6 +299,24 @@ private Mod modFrom(Struct struct) {
return new Mod(keys, oldValues, newValues);
}

private ModType modTypeFrom(String name) {
try {
return ModType.valueOf(name);
} catch (IllegalArgumentException e) {
// This is not logged to prevent flooding users with messages
return ModType.UNKNOWN;
}
}

private ValueCaptureType valueCaptureTypeFrom(String name) {
try {
return ValueCaptureType.valueOf(name);
} catch (IllegalArgumentException e) {
// This is not logged to prevent flooding users with messages
return ValueCaptureType.UNKNOWN;
}
}

private ChildPartition childPartitionFrom(String partitionToken, Struct struct) {
final HashSet<String> parentTokens =
Sets.newHashSet(struct.getStringList(PARENT_PARTITION_TOKENS_COLUMN));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@
public enum ModType {
INSERT,
UPDATE,
DELETE
DELETE,
UNKNOWN
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@
@DefaultCoder(AvroCoder.class)
public enum ValueCaptureType {
OLD_AND_NEW_VALUES,
UNKNOWN
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithJson;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsToStructWithStrings;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestStructMapper.recordsWithUnknownModTypeAndValueCaptureType;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -172,6 +173,33 @@ public void testMappingDeleteStructRowToDataChangeRecord() {
mapper.toChangeStreamRecords(partition, jsonFieldsStruct, resultSetMetadata));
}

@Test
public void testMappingStructRowWithUnknownModTypeAndValueCaptureTypeToDataChangeRecord() {
final DataChangeRecord dataChangeRecord =
new DataChangeRecord(
"partitionToken",
Timestamp.ofTimeSecondsAndNanos(10L, 20),
"transactionId",
false,
"1",
"tableName",
Arrays.asList(
new ColumnType("column1", new TypeCode("type1"), true, 1L),
new ColumnType("column2", new TypeCode("type2"), false, 2L)),
Collections.singletonList(
new Mod("{\"column1\": \"value1\"}", null, "{\"column2\": \"newValue2\"}")),
ModType.UNKNOWN,
ValueCaptureType.UNKNOWN,
10L,
2L,
null);
final Struct struct = recordsWithUnknownModTypeAndValueCaptureType(dataChangeRecord);

assertEquals(
Collections.singletonList(dataChangeRecord),
mapper.toChangeStreamRecords(partition, struct, resultSetMetadata));
}

@Test
public void testMappingStructRowToHeartbeatRecord() {
final HeartbeatRecord heartbeatRecord =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,30 +109,49 @@ public class TestStructMapper {
StructField.of("child_partitions_record", Type.array(CHILD_PARTITIONS_RECORD_TYPE)));

public static Struct recordsToStructWithJson(ChangeStreamRecord... records) {
return recordsToStruct(true, records);
return recordsToStruct(false, false, true, records);
}

public static Struct recordsWithUnknownModTypeAndValueCaptureType(ChangeStreamRecord... records) {
return recordsToStruct(true, true, true, records);
}

// TODO: Remove when backend is fully migrated to JSON
public static Struct recordsToStructWithStrings(ChangeStreamRecord... records) {
return recordsToStruct(false, records);
return recordsToStruct(false, false, false, records);
}

private static Struct recordsToStruct(boolean useJsonFields, ChangeStreamRecord... records) {
private static Struct recordsToStruct(
boolean useUnknownModType,
boolean useUnknownValueCaptureType,
boolean useJsonFields,
ChangeStreamRecord... records) {
final Type streamRecordType =
useJsonFields ? STREAM_RECORD_JSON_TYPE : STREAM_RECORD_STRING_TYPE;
return Struct.newBuilder()
.add(
Value.structArray(
streamRecordType,
Arrays.stream(records)
.map(record -> TestStructMapper.streamRecordStructFrom(record, useJsonFields))
.map(
record ->
TestStructMapper.streamRecordStructFrom(
record,
useUnknownModType,
useUnknownValueCaptureType,
useJsonFields))
.collect(Collectors.toList())))
.build();
}

private static Struct streamRecordStructFrom(ChangeStreamRecord record, boolean useJsonFields) {
private static Struct streamRecordStructFrom(
ChangeStreamRecord record,
boolean useUnknownModType,
boolean useUnknownValueCaptureType,
boolean useJsonFields) {
if (record instanceof DataChangeRecord) {
return streamRecordStructFrom((DataChangeRecord) record, useJsonFields);
return streamRecordStructFrom(
(DataChangeRecord) record, useUnknownModType, useUnknownValueCaptureType, useJsonFields);
} else if (record instanceof HeartbeatRecord) {
return streamRecordStructFrom((HeartbeatRecord) record, useJsonFields);
} else if (record instanceof ChildPartitionsRecord) {
Expand Down Expand Up @@ -194,37 +213,48 @@ private static Struct recordStructFrom(HeartbeatRecord record) {
return Struct.newBuilder().set("timestamp").to(record.getTimestamp()).build();
}

private static Struct streamRecordStructFrom(DataChangeRecord record, boolean useJsonFields) {
private static Struct streamRecordStructFrom(
DataChangeRecord record,
boolean useUnknownModType,
boolean useUnknownValueCaptureType,
boolean useJsonFields) {
final Type dataChangeRecordType =
useJsonFields ? DATA_CHANGE_RECORD_JSON_TYPE : DATA_CHANGE_RECORD_STRING_TYPE;
return Struct.newBuilder()
.set("data_change_record")
.to(
Value.structArray(
dataChangeRecordType,
Collections.singletonList(recordStructFrom(record, useJsonFields))))
Collections.singletonList(
recordStructFrom(
record, useUnknownModType, useUnknownValueCaptureType, useJsonFields))))
.set("heartbeat_record")
.to(Value.structArray(HEARTBEAT_RECORD_TYPE, Collections.emptyList()))
.set("child_partitions_record")
.to(Value.structArray(CHILD_PARTITIONS_RECORD_TYPE, Collections.emptyList()))
.build();
}

private static Struct recordStructFrom(DataChangeRecord record, boolean useJsonFields) {
final Type columnTypeType = useJsonFields ? COLUMN_TYPE_JSON_TYPE : COLUMN_TYPE_STRING_TYPE;
final Type modType = useJsonFields ? MOD_JSON_TYPE : MOD_STRING_TYPE;
private static Struct recordStructFrom(
DataChangeRecord record,
boolean useUnknownModType,
boolean useUnknownValueCaptureType,
boolean useJsonFields) {
final Value columnTypes =
Value.structArray(
columnTypeType,
useJsonFields ? COLUMN_TYPE_JSON_TYPE : COLUMN_TYPE_STRING_TYPE,
record.getRowType().stream()
.map(rowType -> TestStructMapper.columnTypeStructFrom(rowType, useJsonFields))
.collect(Collectors.toList()));
final Value mods =
Value.structArray(
modType,
useJsonFields ? MOD_JSON_TYPE : MOD_STRING_TYPE,
record.getMods().stream()
.map(mod -> TestStructMapper.modStructFrom(mod, useJsonFields))
.collect(Collectors.toList()));
final String modType = useUnknownModType ? "NEW_MOD_TYPE" : record.getModType().name();
final String valueCaptureType =
useUnknownValueCaptureType ? "NEW_VALUE_CAPTURE_TYPE" : record.getValueCaptureType().name();
return Struct.newBuilder()
.set("commit_timestamp")
.to(record.getCommitTimestamp())
Expand All @@ -241,9 +271,9 @@ private static Struct recordStructFrom(DataChangeRecord record, boolean useJsonF
.set("mods")
.to(mods)
.set("mod_type")
.to(record.getModType().toString())
.to(modType)
.set("value_capture_type")
.to(record.getValueCaptureType().toString())
.to(valueCaptureType)
.set("number_of_records_in_transaction")
.to(record.getNumberOfRecordsInTransaction())
.set("number_of_partitions_in_transaction")
Expand Down

0 comments on commit 23aeca4

Please sign in to comment.