Skip to content

Commit

Permalink
Modify CDC to be inline with namespace changes. (#2986)
Browse files Browse the repository at this point in the history
Modify CDC to be inline with namespace changes. Add a test case to double check this works.
  • Loading branch information
davinchia committed Apr 20, 2021
1 parent 8ff3e14 commit 7e55f3a
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ public static AirbyteMessage toAirbyteMessage(ChangeEvent<String, String> event,
final JsonNode before = debeziumRecord.get("before");
final JsonNode after = debeziumRecord.get("after");
final JsonNode source = debeziumRecord.get("source");
final String op = debeziumRecord.get("op").asText();

final JsonNode data = formatDebeziumData(before, after, source);

final String streamName = source.get("schema").asText() + "." + source.get("table").asText();
final String schemaName = source.get("schema").asText();
final String streamName = source.get("table").asText();

final AirbyteRecordMessage airbyteRecordMessage = new AirbyteRecordMessage()
.withStream(streamName)
.withNamespace(schemaName)
.withEmittedAt(emittedAt.toEpochMilli())
.withData(data);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
Expand Down Expand Up @@ -173,7 +172,7 @@ protected static String getTableWhitelist(ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream()
.filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL)
.map(ConfiguredAirbyteStream::getStream)
.map(AirbyteStream::getName)
.map(stream -> stream.getNamespace() + "." + stream.getName())
// debezium needs commas escaped to split properly
.map(x -> StringUtils.escape(x, new char[] {','}, "\\,"))
.collect(Collectors.joining(","));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ class CdcPostgresSourceTest {
private static final Logger LOGGER = LoggerFactory.getLogger(CdcPostgresSourceTest.class);

private static final String SLOT_NAME_BASE = "debezium_slot";
private static final String MAKES_STREAM_NAME = "public.makes";
private static final String MODELS_STREAM_NAME = "public.models";
private static final String MAKES_SCHEMA = "public";
private static final String MAKES_STREAM_NAME = "makes";
private static final String MODELS_SCHEMA = "staging";
private static final String MODELS_STREAM_NAME = "models";
private static final Set<String> STREAM_NAMES = Sets.newHashSet(MAKES_STREAM_NAME, MODELS_STREAM_NAME);
private static final String COL_ID = "id";
private static final String COL_MAKE = "make";
Expand All @@ -100,12 +102,14 @@ class CdcPostgresSourceTest {
private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
MAKES_STREAM_NAME,
MAKES_SCHEMA,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_MAKE, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
MODELS_STREAM_NAME,
MODELS_SCHEMA,
Field.of(COL_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_MAKE_ID, JsonSchemaPrimitive.NUMBER),
Field.of(COL_MODEL, JsonSchemaPrimitive.STRING))
Expand Down Expand Up @@ -165,9 +169,11 @@ void setup() throws Exception {
database.query(ctx -> {
ctx.execute("SELECT pg_create_logical_replication_slot('" + fullReplicationSlot + "', 'pgoutput');");
ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;");
ctx.execute(String.format("CREATE TABLE %s(%s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));", MAKES_STREAM_NAME, COL_ID, COL_MAKE, COL_ID));
ctx.execute(String.format("CREATE TABLE %s(%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));",
MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID));
ctx.execute("CREATE SCHEMA " + MODELS_SCHEMA + ";");
ctx.execute(String.format("CREATE TABLE %s.%s(%s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));", MAKES_SCHEMA, MAKES_STREAM_NAME, COL_ID,
COL_MAKE, COL_ID));
ctx.execute(String.format("CREATE TABLE %s.%s(%s INTEGER, %s INTEGER, %s VARCHAR(200), PRIMARY KEY (%s));",
MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL, COL_ID));

for (JsonNode recordJson : MAKE_RECORDS) {
writeMakeRecord(ctx, recordJson);
Expand Down Expand Up @@ -211,7 +217,7 @@ private Database getDatabaseFromConfig(JsonNode config) {
}

@Test
@DisplayName("On the first First sync, produces returns records that exist in the database.")
@DisplayName("On the first sync, produce returns records that exist in the database.")
void testExistingData() throws Exception {
final AutoCloseableIterator<AirbyteMessage> read = source.read(getConfig(PSQL_DB, dbName), CONFIGURED_CATALOG, null);
final List<AirbyteMessage> actualRecords = AutoCloseableIterators.toListAndClose(read);
Expand All @@ -233,7 +239,7 @@ void testDelete() throws Exception {
assertExpectedStateMessages(stateMessages1);

database.query(ctx -> {
ctx.execute(String.format("DELETE FROM %s WHERE %s = %s", MODELS_STREAM_NAME, COL_ID, 11));
ctx.execute(String.format("DELETE FROM %s.%s WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, 11));
return null;
});

Expand Down Expand Up @@ -262,7 +268,7 @@ void testUpdate() throws Exception {
assertExpectedStateMessages(stateMessages1);

database.query(ctx -> {
ctx.execute(String.format("UPDATE %s SET %s = '%s' WHERE %s = %s", MODELS_STREAM_NAME, COL_MODEL, updatedModel, COL_ID, 11));
ctx.execute(String.format("UPDATE %s.%s SET %s = '%s' WHERE %s = %s", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_MODEL, updatedModel, COL_ID, 11));
return null;
});

Expand Down Expand Up @@ -397,12 +403,12 @@ void testCdcAndFullRefreshInSameSync() throws Exception {
@DisplayName("When no records exist, no records are returned.")
void testNoData() throws Exception {
database.query(ctx -> {
ctx.execute(String.format("DELETE FROM %s", MAKES_STREAM_NAME));
ctx.execute(String.format("DELETE FROM %s.%s", MAKES_SCHEMA, MAKES_STREAM_NAME));
return null;
});

database.query(ctx -> {
ctx.execute(String.format("DELETE FROM %s", MODELS_STREAM_NAME));
ctx.execute(String.format("DELETE FROM %s.%s", MODELS_SCHEMA, MODELS_STREAM_NAME));
return null;
});

Expand Down Expand Up @@ -475,13 +481,14 @@ void testReadWithoutReplicationSlot() throws SQLException {
}

private void writeMakeRecord(DSLContext ctx, JsonNode recordJson) {
ctx.execute(String.format("INSERT INTO %s (%s, %s) VALUES (%s, '%s');", MAKES_STREAM_NAME, COL_ID, COL_MAKE,
ctx.execute(String.format("INSERT INTO %s.%s (%s, %s) VALUES (%s, '%s');", MAKES_SCHEMA, MAKES_STREAM_NAME, COL_ID, COL_MAKE,
recordJson.get(COL_ID).asInt(), recordJson.get(COL_MAKE).asText()));
}

private void writeModelRecord(DSLContext ctx, JsonNode recordJson) {
ctx.execute(String.format("INSERT INTO %s (%s, %s, %s) VALUES (%s, %s, '%s');", MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL,
recordJson.get(COL_ID).asInt(), recordJson.get(COL_MAKE_ID).asInt(), recordJson.get(COL_MODEL).asText()));
ctx.execute(
String.format("INSERT INTO %s.%s (%s, %s, %s) VALUES (%s, %s, '%s');", MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL,
recordJson.get(COL_ID).asInt(), recordJson.get(COL_MAKE_ID).asInt(), recordJson.get(COL_MODEL).asText()));
}

private Set<AirbyteRecordMessage> extractRecordMessages(List<AirbyteMessage> messages) {
Expand Down Expand Up @@ -519,6 +526,11 @@ private static void assertExpectedRecords(Set<JsonNode> expectedRecords, Set<Air
.map(recordMessage -> {
assertTrue(STREAM_NAMES.contains(recordMessage.getStream()));
assertNotNull(recordMessage.getEmittedAt());
if (recordMessage.getStream().equals(MAKES_STREAM_NAME)) {
assertEquals(MAKES_SCHEMA, recordMessage.getNamespace());
} else {
assertEquals(MODELS_SCHEMA, recordMessage.getNamespace());
}

final JsonNode data = recordMessage.getData();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class DebeziumEventUtilsTest {

@Test
public void testConvertChangeEvent() throws IOException {
final String stream = "public.names";
final String stream = "names";
final Instant emittedAt = Instant.now();
ChangeEvent<String, String> insertChangeEvent = mockChangeEvent("insert_change_event.json");
ChangeEvent<String, String> updateChangeEvent = mockChangeEvent("update_change_event.json");
Expand Down Expand Up @@ -73,6 +73,7 @@ private static AirbyteMessage createAirbyteMessage(String stream, Instant emitte

final AirbyteRecordMessage recordMessage = new AirbyteRecordMessage()
.withStream(stream)
.withNamespace("public")
.withData(Jsons.deserialize(data))
.withEmittedAt(emittedAt.toEpochMilli());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void testWhitelistCreation() {
CatalogHelpers.createConfiguredAirbyteStream("id_,something", "public").withSyncMode(SyncMode.INCREMENTAL),
CatalogHelpers.createConfiguredAirbyteStream("n\"aMéS", "public").withSyncMode(SyncMode.INCREMENTAL)));

final String expectedWhitelist = "id_and_name,id_\\,something,n\"aMéS";
final String expectedWhitelist = "public.id_and_name,public.id_\\,something,public.n\"aMéS";
final String actualWhitelist = DebeziumRecordPublisher.getTableWhitelist(catalog);

assertEquals(expectedWhitelist, actualWhitelist);
Expand All @@ -53,7 +53,7 @@ public void testWhitelistFiltersFullRefresh() {
CatalogHelpers.createConfiguredAirbyteStream("id_and_name", "public").withSyncMode(SyncMode.INCREMENTAL),
CatalogHelpers.createConfiguredAirbyteStream("id_and_name2", "public").withSyncMode(SyncMode.FULL_REFRESH)));

final String expectedWhitelist = "id_and_name";
final String expectedWhitelist = "public.id_and_name";
final String actualWhitelist = DebeziumRecordPublisher.getTableWhitelist(catalog);

assertEquals(expectedWhitelist, actualWhitelist);
Expand Down

0 comments on commit 7e55f3a

Please sign in to comment.