diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java b/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java index 0e79f12b0e59f..9423f54c5eb9b 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/util/AutoCloseableIterators.java @@ -208,4 +208,5 @@ public static CompositeIterator concatWithEagerClose(final List CompositeIterator concatWithEagerClose(final List> iterators) { return concatWithEagerClose(iterators, null); } + } diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java new file mode 100644 index 0000000000000..bde7cd34a4e63 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java @@ -0,0 +1,609 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.typing_deduping; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Streams; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; +import io.airbyte.protocol.models.v0.DestinationSyncMode; +import io.airbyte.protocol.models.v0.SyncMode; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class exercises {@link SqlGenerator} implementations. All destinations should extend this + * class for their respective implementation. Subclasses are encouraged to add additional tests with + * destination-specific behavior (for example, verifying that datasets are created in the correct + * BigQuery region). + *

+ * Subclasses should implement a {@link org.junit.jupiter.api.BeforeAll} method to load any secrets + * and connect to the destination. This test expects to be able to run + * {@link #getDestinationHandler()} in a {@link org.junit.jupiter.api.BeforeEach} method. + */ +@Execution(ExecutionMode.CONCURRENT) +public abstract class BaseSqlGeneratorIntegrationTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(BaseSqlGeneratorIntegrationTest.class); + /** + * This, along with {@link #FINAL_TABLE_COLUMN_NAMES_CDC}, is the list of columns that should be in + * the final table. They're useful for generating SQL queries to insert records into the final + * table. + */ + protected static final List FINAL_TABLE_COLUMN_NAMES = List.of( + "_airbyte_raw_id", + "_airbyte_extracted_at", + "_airbyte_meta", + "id1", + "id2", + "updated_at", + "struct", + "array", + "string", + "number", + "integer", + "boolean", + "timestamp_with_timezone", + "timestamp_without_timezone", + "time_with_timezone", + "time_without_timezone", + "date", + "unknown"); + protected static final List FINAL_TABLE_COLUMN_NAMES_CDC; + + static { + FINAL_TABLE_COLUMN_NAMES_CDC = Streams.concat( + FINAL_TABLE_COLUMN_NAMES.stream(), + Stream.of("_ab_cdc_deleted_at")).toList(); + } + + private static final RecordDiffer DIFFER = new RecordDiffer( + Pair.of("id1", AirbyteProtocolType.INTEGER), + Pair.of("id2", AirbyteProtocolType.INTEGER), + Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE)); + + /** + * Subclasses may use these four StreamConfigs in their tests. + */ + protected StreamConfig incrementalDedupStream; + /** + * We intentionally don't have full refresh overwrite/append streams. Those actually behave + * identically in the sqlgenerator. Overwrite mode is actually handled in + * {@link DefaultTyperDeduper}. + */ + protected StreamConfig incrementalAppendStream; + protected StreamConfig cdcIncrementalDedupStream; + /** + * This isn't particularly realistic, but it's technically possible. + */ + protected StreamConfig cdcIncrementalAppendStream; + + protected SqlGenerator generator; + protected DestinationHandler destinationHandler; + protected String namespace; + + private StreamId streamId; + + protected abstract SqlGenerator getSqlGenerator(); + + protected abstract DestinationHandler getDestinationHandler(); + + /** + * Do any setup work to create a namespace for this test run. For example, this might create a + * BigQuery dataset, or a Snowflake schema. + */ + protected abstract void createNamespace(String namespace); + + /** + * Create a raw table using the StreamId's rawTableId. + */ + protected abstract void createRawTable(StreamId streamId) throws Exception; + + /** + * Create a final table usingi the StreamId's finalTableId. Subclasses are recommended to hardcode + * the columns from {@link #FINAL_TABLE_COLUMN_NAMES} or {@link #FINAL_TABLE_COLUMN_NAMES_CDC}. The + * only difference between those two column lists is the inclusion of the _ab_cdc_deleted_at column, + * which is controlled by the includeCdcDeletedAt parameter. + */ + protected abstract void createFinalTable(boolean includeCdcDeletedAt, StreamId streamId, String suffix) throws Exception; + + protected abstract void insertRawTableRecords(StreamId streamId, List records) throws Exception; + + protected abstract void insertFinalTableRecords(boolean includeCdcDeletedAt, StreamId streamId, String suffix, List records) + throws Exception; + + /** + * The two dump methods are defined identically as in {@link BaseTypingDedupingTest}, but with + * slightly different method signature. This test expects subclasses to respect the raw/finalTableId + * on the StreamId object, rather than hardcoding e.g. the airbyte_internal dataset. + */ + protected abstract List dumpRawTableRecords(StreamId streamId) throws Exception; + + protected abstract List dumpFinalTableRecords(StreamId streamId, String suffix) throws Exception; + + /** + * Clean up all resources in the namespace. For example, this might delete the BigQuery dataset + * created in {@link #createNamespace(String)}. + */ + protected abstract void teardownNamespace(String namespace); + + /** + * This test implementation is extremely destination-specific, but all destinations must implement + * it. This test should verify that creating a table using {@link #incrementalDedupStream} works as + * expected, including column types, indexing, partitioning, etc. + *

+ * Note that subclasses must also annotate their implementation with @Test. + */ + @Test + public abstract void testCreateTableIncremental() throws Exception; + + @BeforeEach + public void setup() { + generator = getSqlGenerator(); + destinationHandler = getDestinationHandler(); + ColumnId id1 = generator.buildColumnId("id1"); + ColumnId id2 = generator.buildColumnId("id2"); + List primaryKey = List.of(id1, id2); + ColumnId cursor = generator.buildColumnId("updated_at"); + + LinkedHashMap columns = new LinkedHashMap<>(); + columns.put(id1, AirbyteProtocolType.INTEGER); + columns.put(id2, AirbyteProtocolType.INTEGER); + columns.put(cursor, AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); + columns.put(generator.buildColumnId("struct"), new Struct(new LinkedHashMap<>())); + columns.put(generator.buildColumnId("array"), new Array(AirbyteProtocolType.UNKNOWN)); + columns.put(generator.buildColumnId("string"), AirbyteProtocolType.STRING); + columns.put(generator.buildColumnId("number"), AirbyteProtocolType.NUMBER); + columns.put(generator.buildColumnId("integer"), AirbyteProtocolType.INTEGER); + columns.put(generator.buildColumnId("boolean"), AirbyteProtocolType.BOOLEAN); + columns.put(generator.buildColumnId("timestamp_with_timezone"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); + columns.put(generator.buildColumnId("timestamp_without_timezone"), AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE); + columns.put(generator.buildColumnId("time_with_timezone"), AirbyteProtocolType.TIME_WITH_TIMEZONE); + columns.put(generator.buildColumnId("time_without_timezone"), AirbyteProtocolType.TIME_WITHOUT_TIMEZONE); + columns.put(generator.buildColumnId("date"), AirbyteProtocolType.DATE); + columns.put(generator.buildColumnId("unknown"), AirbyteProtocolType.UNKNOWN); + + LinkedHashMap cdcColumns = new LinkedHashMap<>(columns); + cdcColumns.put(generator.buildColumnId("_ab_cdc_deleted_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); + + namespace = Strings.addRandomSuffix("sql_generator_test", "_", 5); + // This is not a typical stream ID would look like, but SqlGenerator isn't allowed to make any + // assumptions about StreamId structure. + // In practice, the final table would be testDataset.users, and the raw table would be + // airbyte_internal.testDataset_raw__stream_users. + streamId = new StreamId(namespace, "users_final", namespace, "users_raw", namespace, "users_final"); + + incrementalDedupStream = new StreamConfig( + streamId, + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND_DEDUP, + primaryKey, + Optional.of(cursor), + columns); + incrementalAppendStream = new StreamConfig( + streamId, + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND, + primaryKey, + Optional.of(cursor), + columns); + + cdcIncrementalDedupStream = new StreamConfig( + streamId, + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND_DEDUP, + primaryKey, + Optional.of(cursor), + cdcColumns); + cdcIncrementalAppendStream = new StreamConfig( + streamId, + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND, + primaryKey, + Optional.of(cursor), + cdcColumns); + + LOGGER.info("Running with namespace {}", namespace); + createNamespace(namespace); + } + + @AfterEach + public void teardown() { + teardownNamespace(namespace); + } + + /** + * Test that T+D throws an error for an incremental-dedup sync where at least one record has a null + * primary key, and that we don't write any final records. + */ + @Test + public void incrementalDedupInvalidPrimaryKey() throws Exception { + createRawTable(streamId); + createFinalTable(false, streamId, ""); + insertRawTableRecords( + streamId, + List.of( + Jsons.deserialize( + """ + { + "_airbyte_raw_id": "10d6e27d-ae7a-41b5-baf8-c4c277ef9c11", + "_airbyte_extracted_at": "2023-01-01T00:00:00Z", + "_airbyte_data": {} + } + """), + Jsons.deserialize( + """ + { + "_airbyte_raw_id": "5ce60e70-98aa-4fe3-8159-67207352c4f0", + "_airbyte_extracted_at": "2023-01-01T00:00:00Z", + "_airbyte_data": {"id1": 1, "id2": 100} + } + """))); + + String sql = generator.updateTable(incrementalDedupStream, ""); + assertThrows( + Exception.class, + () -> destinationHandler.execute(sql)); + DIFFER.diffFinalTableRecords( + emptyList(), + dumpFinalTableRecords(streamId, "")); + } + + /** + * Run a full T+D update for an incremental-dedup stream, writing to a final table with "_foo" + * suffix, with values for all data types. Verifies all behaviors for all types: + *

    + *
  • A valid, nonnull value
  • + *
  • No value (i.e. the column is missing from the record)
  • + *
  • A JSON null value
  • + *
  • An invalid value
  • + *
+ *

+ * In practice, incremental streams never write to a suffixed table, but SqlGenerator isn't allowed + * to make that assumption (and we might as well exercise that code path). + */ + @Test + public void allTypes() throws Exception { + createRawTable(streamId); + createFinalTable(false, streamId, "_foo"); + insertRawTableRecords( + streamId, + BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_inputrecords.jsonl")); + + String sql = generator.updateTable(incrementalDedupStream, "_foo"); + destinationHandler.execute(sql); + + verifyRecords( + "sqlgenerator/alltypes_expectedrecords_raw.jsonl", + dumpRawTableRecords(streamId), + "sqlgenerator/alltypes_expectedrecords_final.jsonl", + dumpFinalTableRecords(streamId, "_foo")); + } + + @Test + public void incrementalDedup() throws Exception { + createRawTable(streamId); + createFinalTable(false, streamId, ""); + insertRawTableRecords( + streamId, + BaseTypingDedupingTest.readRecords("sqlgenerator/incrementaldedup_inputrecords.jsonl")); + + String sql = generator.updateTable(incrementalDedupStream, ""); + destinationHandler.execute(sql); + + verifyRecords( + "sqlgenerator/incrementaldedup_expectedrecords_raw.jsonl", + dumpRawTableRecords(streamId), + "sqlgenerator/incrementaldedup_expectedrecords_final.jsonl", + dumpFinalTableRecords(streamId, "")); + } + + @Test + public void incrementalAppend() throws Exception { + createRawTable(streamId); + createFinalTable(false, streamId, ""); + insertRawTableRecords( + streamId, + BaseTypingDedupingTest.readRecords("sqlgenerator/incrementaldedup_inputrecords.jsonl")); + + String sql = generator.updateTable(incrementalAppendStream, ""); + destinationHandler.execute(sql); + + verifyRecordCounts( + 3, + dumpRawTableRecords(streamId), + 3, + dumpFinalTableRecords(streamId, "")); + } + + @Test + public void overwriteFinalTable() throws Exception { + createFinalTable(false, streamId, "_tmp"); + List records = singletonList(Jsons.deserialize( + """ + { + "_airbyte_raw_id": "4fa4efe2-3097-4464-bd22-11211cc3e15b", + "_airbyte_extracted_at": "2023-01-01T00:00:00Z", + "_airbyte_meta": {} + } + """)); + insertFinalTableRecords( + false, + streamId, + "_tmp", + records); + + final String sql = generator.overwriteFinalTable(streamId, "_tmp"); + destinationHandler.execute(sql); + + DIFFER.diffFinalTableRecords( + records, + dumpFinalTableRecords(streamId, "")); + } + + @Test + public void cdcImmediateDeletion() throws Exception { + createRawTable(streamId); + createFinalTable(true, streamId, ""); + insertRawTableRecords( + streamId, + singletonList(Jsons.deserialize( + """ + { + "_airbyte_raw_id": "4fa4efe2-3097-4464-bd22-11211cc3e15b", + "_airbyte_extracted_at": "2023-01-01T00:00:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "updated_at": "2023-01-01T00:00:00Z", + "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z" + } + } + """))); + + final String sql = generator.updateTable(cdcIncrementalDedupStream, ""); + destinationHandler.execute(sql); + + verifyRecordCounts( + 1, + dumpRawTableRecords(streamId), + 0, + dumpFinalTableRecords(streamId, "")); + } + + /** + * Verify that running T+D twice is idempotent. Previously there was a bug where non-dedup syncs + * with an _ab_cdc_deleted_at column would duplicate "deleted" records on each run. + */ + @Test + public void cdcIdempotent() throws Exception { + createRawTable(streamId); + createFinalTable(true, streamId, ""); + insertRawTableRecords( + streamId, + singletonList(Jsons.deserialize( + """ + { + "_airbyte_raw_id": "4fa4efe2-3097-4464-bd22-11211cc3e15b", + "_airbyte_extracted_at": "2023-01-01T00:00:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "updated_at": "2023-01-01T00:00:00Z", + "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z" + } + } + """))); + + final String sql = generator.updateTable(cdcIncrementalAppendStream, ""); + // Execute T+D twice + destinationHandler.execute(sql); + destinationHandler.execute(sql); + + verifyRecordCounts( + 1, + dumpRawTableRecords(streamId), + 1, + dumpFinalTableRecords(streamId, "")); + } + + @Test + public void cdcComplexUpdate() throws Exception { + createRawTable(streamId); + createFinalTable(true, streamId, ""); + insertRawTableRecords( + streamId, + BaseTypingDedupingTest.readRecords("sqlgenerator/cdcupdate_inputrecords_raw.jsonl")); + insertFinalTableRecords( + true, + streamId, + "", + BaseTypingDedupingTest.readRecords("sqlgenerator/cdcupdate_inputrecords_final.jsonl")); + + final String sql = generator.updateTable(cdcIncrementalDedupStream, ""); + destinationHandler.execute(sql); + + verifyRecordCounts( + // We keep the newest raw record per PK + 6, + dumpRawTableRecords(streamId), + 5, + dumpFinalTableRecords(streamId, "")); + } + + /** + * source operations: + *

    + *
  1. insert id=1 (lsn 10000)
  2. + *
  3. delete id=1 (lsn 10001)
  4. + *
+ *

+ * But the destination writes lsn 10001 before 10000. We should still end up with no records in the + * final table. + *

+ * All records have the same emitted_at timestamp. This means that we live or die purely based on + * our ability to use _ab_cdc_lsn. + */ + @Test + public void testCdcOrdering_updateAfterDelete() throws Exception { + createRawTable(streamId); + createFinalTable(true, streamId, ""); + insertRawTableRecords( + streamId, + BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_updateafterdelete_inputrecords.jsonl")); + + final String sql = generator.updateTable(cdcIncrementalDedupStream, ""); + destinationHandler.execute(sql); + + verifyRecordCounts( + 1, + dumpRawTableRecords(streamId), + 0, + dumpFinalTableRecords(streamId, "")); + } + + /** + * source operations: + *

    + *
  1. arbitrary history...
  2. + *
  3. delete id=1 (lsn 10001)
  4. + *
  5. reinsert id=1 (lsn 10002)
  6. + *
+ *

+ * But the destination receives LSNs 10002 before 10001. In this case, we should keep the reinserted + * record in the final table. + *

+ * All records have the same emitted_at timestamp. This means that we live or die purely based on + * our ability to use _ab_cdc_lsn. + */ + @Test + public void testCdcOrdering_insertAfterDelete() throws Exception { + createRawTable(streamId); + createFinalTable(true, streamId, ""); + insertRawTableRecords( + streamId, + BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_insertafterdelete_inputrecords_raw.jsonl")); + insertFinalTableRecords( + true, + streamId, + "", + BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_insertafterdelete_inputrecords_final.jsonl")); + + final String sql = generator.updateTable(cdcIncrementalDedupStream, ""); + destinationHandler.execute(sql); + + verifyRecordCounts( + 1, + dumpRawTableRecords(streamId), + 1, + dumpFinalTableRecords(streamId, "")); + } + + /** + * Create a table which includes the _ab_cdc_deleted_at column, then soft reset it using the non-cdc + * stream config. Verify that the deleted_at column gets dropped. + */ + @Test + public void softReset() throws Exception { + createRawTable(streamId); + createFinalTable(true, streamId, ""); + insertRawTableRecords( + streamId, + singletonList(Jsons.deserialize( + """ + { + "_airbyte_raw_id": "arst", + "_airbyte_extracted_at": "2023-01-01T00:00:00Z", + "_airbyte_loaded_at": "2023-01-01T00:00:00Z", + "_airbyte_data": { + "id1": 1, + "id2": 100, + "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z" + } + } + """))); + insertFinalTableRecords( + true, + streamId, + "", + singletonList(Jsons.deserialize( + """ + { + "_airbyte_raw_id": "arst", + "_airbyte_extracted_at": "2023-01-01T00:00:00Z", + "_airbyte_meta": {}, + "id1": 1, + "id2": 100, + "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z" + } + """))); + + final String sql = generator.softReset(incrementalAppendStream); + destinationHandler.execute(sql); + + List actualRawRecords = dumpRawTableRecords(streamId); + List actualFinalRecords = dumpFinalTableRecords(streamId, ""); + assertAll( + () -> assertEquals(1, actualRawRecords.size()), + () -> assertEquals(1, actualFinalRecords.size()), + () -> assertTrue( + actualFinalRecords.stream().noneMatch(record -> record.has("_ab_cdc_deleted_at")), + "_ab_cdc_deleted_at column was expected to be dropped. Actual final table had: " + actualFinalRecords)); + } + + private void verifyRecords(String expectedRawRecordsFile, + List actualRawRecords, + String expectedFinalRecordsFile, + List actualFinalRecords) { + assertAll( + () -> DIFFER.diffRawTableRecords( + BaseTypingDedupingTest.readRecords(expectedRawRecordsFile), + actualRawRecords), + () -> assertEquals( + 0, + actualRawRecords.stream() + .filter(record -> !record.hasNonNull("_airbyte_loaded_at")) + .count()), + () -> DIFFER.diffFinalTableRecords( + BaseTypingDedupingTest.readRecords(expectedFinalRecordsFile), + actualFinalRecords)); + } + + private void verifyRecordCounts(int expectedRawRecords, + List actualRawRecords, + int expectedFinalRecords, + List actualFinalRecords) { + assertAll( + () -> assertEquals( + expectedRawRecords, + actualRawRecords.size()), + () -> assertEquals( + 0, + actualRawRecords.stream() + .filter(record -> !record.hasNonNull("_airbyte_loaded_at")) + .count()), + () -> assertEquals( + expectedFinalRecords, + actualFinalRecords.size())); + } + +} diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java index af377bcb963ce..2f44eec145012 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java @@ -63,7 +63,7 @@ public abstract class BaseTypingDedupingTest { private static final JsonNode SCHEMA; static { try { - SCHEMA = Jsons.deserialize(MoreResources.readResource("schema.json")); + SCHEMA = Jsons.deserialize(MoreResources.readResource("dat/schema.json")); } catch (final IOException e) { throw new RuntimeException(e); } @@ -192,21 +192,21 @@ public void fullRefreshOverwrite() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - final List messages1 = readMessages("sync1_messages.jsonl"); + final List messages1 = readMessages("dat/sync1_messages.jsonl"); runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); - final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - final List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("dat/sync2_messages.jsonl"); runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl"); - final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_overwrite_final.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -227,21 +227,21 @@ public void fullRefreshAppend() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - final List messages1 = readMessages("sync1_messages.jsonl"); + final List messages1 = readMessages("dat/sync1_messages.jsonl"); runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); - final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - final List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("dat/sync2_messages.jsonl"); runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_raw.jsonl"); - final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_final.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -266,21 +266,21 @@ public void incrementalAppend() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - final List messages1 = readMessages("sync1_messages.jsonl"); + final List messages1 = readMessages("dat/sync1_messages.jsonl"); runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); - final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - final List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("dat/sync2_messages.jsonl"); runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_raw.jsonl"); - final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_final.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -303,21 +303,21 @@ public void incrementalDedup() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - final List messages1 = readMessages("sync1_messages.jsonl"); + final List messages1 = readMessages("dat/sync1_messages.jsonl"); runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); - final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - final List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("dat/sync2_messages.jsonl"); runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); - final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -338,21 +338,21 @@ public void incrementalDedupDefaultNamespace() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - final List messages1 = readMessages("sync1_messages.jsonl", null, streamName); + final List messages1 = readMessages("dat/sync1_messages.jsonl", null, streamName); runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); - final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1, null, streamName); // Second sync - final List messages2 = readMessages("sync2_messages.jsonl", null, streamName); + final List messages2 = readMessages("dat/sync2_messages.jsonl", null, streamName); runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); - final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2, null, streamName); } @@ -390,16 +390,16 @@ public void testIncrementalSyncDropOneColumn() throws Exception { .withStream(stream))); // First sync - List messages1 = readMessages("sync1_messages.jsonl"); + List messages1 = readMessages("dat/sync1_messages.jsonl"); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); + List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_raw.jsonl"); + List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - List messages2 = readMessages("sync2_messages.jsonl"); + List messages2 = readMessages("dat/sync2_messages.jsonl"); JsonNode trimmedSchema = SCHEMA.deepCopy(); ((ObjectNode) trimmedSchema.get("properties")).remove("name"); stream.setJsonSchema(trimmedSchema); @@ -407,8 +407,8 @@ public void testIncrementalSyncDropOneColumn() throws Exception { runSync(catalog, messages2); // The raw data is unaffected by the schema, but the final table should not have a `name` column. - List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_final.jsonl").stream() + List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl").stream() .peek(record -> ((ObjectNode) record).remove("name")) .toList(); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); @@ -465,25 +465,25 @@ public void incrementalDedupIdenticalName() throws Exception { // First sync // Read the same set of messages for both streams final List messages1 = Stream.concat( - readMessages("sync1_messages.jsonl", namespace1, streamName).stream(), - readMessages("sync1_messages.jsonl", namespace2, streamName).stream()).toList(); + readMessages("dat/sync1_messages.jsonl", namespace1, streamName).stream(), + readMessages("dat/sync1_messages.jsonl", namespace2, streamName).stream()).toList(); runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); - final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1, namespace1, streamName); verifySyncResult(expectedRawRecords1, expectedFinalRecords1, namespace2, streamName); // Second sync final List messages2 = Stream.concat( - readMessages("sync2_messages.jsonl", namespace1, streamName).stream(), - readMessages("sync2_messages.jsonl", namespace2, streamName).stream()).toList(); + readMessages("dat/sync2_messages.jsonl", namespace1, streamName).stream(), + readMessages("dat/sync2_messages.jsonl", namespace2, streamName).stream()).toList(); runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); - final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2, namespace1, streamName); verifySyncResult(expectedRawRecords2, expectedFinalRecords2, namespace2, streamName); } @@ -526,23 +526,23 @@ public void incrementalDedupChangeCursor() throws Exception { final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(configuredStream)); // First sync - final List messages1 = readMessages("sync1_cursorchange_messages.jsonl"); + final List messages1 = readMessages("dat/sync1_cursorchange_messages.jsonl"); runSync(catalog, messages1); - final List expectedRawRecords1 = readRecords("sync1_cursorchange_expectedrecords_dedup_raw.jsonl"); - final List expectedFinalRecords1 = readRecords("sync1_cursorchange_expectedrecords_dedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - final List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("dat/sync2_messages.jsonl"); configuredStream.getStream().setJsonSchema(SCHEMA); configuredStream.setCursorField(List.of("updated_at")); runSync(catalog, messages2); - final List expectedRawRecords2 = readRecords("sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl"); - final List expectedFinalRecords2 = readRecords("sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl"); + final List expectedRawRecords2 = readRecords("dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -583,7 +583,7 @@ private void verifySyncResult(final List expectedRawRecords, DIFFER.verifySyncResult(expectedRawRecords, actualRawRecords, expectedFinalRecords, actualFinalRecords); } - private static List readRecords(final String filename) throws IOException { + public static List readRecords(final String filename) throws IOException { return MoreResources.readResource(filename).lines() .map(String::trim) .filter(line -> !line.isEmpty()) diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/schema.json b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/dat/schema.json similarity index 100% rename from airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/schema.json rename to airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/dat/schema.json diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_messages.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/dat/sync1_cursorchange_messages.jsonl similarity index 100% rename from airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_cursorchange_messages.jsonl rename to airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/dat/sync1_cursorchange_messages.jsonl diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_messages.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/dat/sync1_messages.jsonl similarity index 100% rename from airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync1_messages.jsonl rename to airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/dat/sync1_messages.jsonl diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_messages.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/dat/sync2_messages.jsonl similarity index 100% rename from airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sync2_messages.jsonl rename to airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/dat/sync2_messages.jsonl diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/alltypes_inputrecords.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/alltypes_inputrecords.jsonl new file mode 100644 index 0000000000000..ba08a826ca1c1 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/alltypes_inputrecords.jsonl @@ -0,0 +1,5 @@ +{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}}' +{"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}} +{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}} +// Note that array and struct have invalid values ({} and [] respectively). +{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": {}, "number": {}, "integer": {}, "boolean": {}, "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": {}, "unknown": null}} diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcordering_insertafterdelete_inputrecords_final.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcordering_insertafterdelete_inputrecords_final.jsonl new file mode 100644 index 0000000000000..047f9e9a85f7a --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcordering_insertafterdelete_inputrecords_final.jsonl @@ -0,0 +1 @@ +{"_airbyte_raw_id": "7e7330a1-42fb-41ec-a955-52f18bd61964", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {}, "id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "alice"} diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcordering_insertafterdelete_inputrecords_raw.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcordering_insertafterdelete_inputrecords_raw.jsonl new file mode 100644 index 0000000000000..30a996600d400 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcordering_insertafterdelete_inputrecords_raw.jsonl @@ -0,0 +1,4 @@ +// First batch +{"_airbyte_raw_id": "7e7330a1-42fb-41ec-a955-52f18bd61964", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_loaded_at": "2023-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "alice"}} +// Second batch - this is an outdated deletion record, which should be ignored +{"_airbyte_raw_id": "87ff57d7-41a7-4962-a9dc-d684276283da", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T00:00:00Z", "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"}} diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcordering_updateafterdelete_inputrecords.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcordering_updateafterdelete_inputrecords.jsonl new file mode 100644 index 0000000000000..0a0c67270d03f --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcordering_updateafterdelete_inputrecords.jsonl @@ -0,0 +1,5 @@ +// Write raw deletion record from the first batch, which resulted in an empty final table. +// Note the non-null loaded_at - this is to simulate that we previously ran T+D on this record. +{"_airbyte_raw_id": "7e7330a1-42fb-41ec-a955-52f18bd61964", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_loaded_at": "2023-01-01T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"}} +// insert raw record from the second record batch - this is an outdated record that should be ignored. +{"_airbyte_raw_id": "87ff57d7-41a7-4962-a9dc-d684276283da", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T00:00:00Z", "string": "alice"}} diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcupdate_inputrecords_final.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcupdate_inputrecords_final.jsonl new file mode 100644 index 0000000000000..4280a0abcfee5 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcupdate_inputrecords_final.jsonl @@ -0,0 +1,2 @@ +{"_airbyte_raw_id": "d5790c04-52df-42f3-8f77-a543268822a7", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_meta": {}, "id1": 1, "id2": 100, "updated_at": "2022-12-31T00:00:00Z", "string": "spooky ghost"} +{"_airbyte_raw_id": "e3b03d92-0f7c-49e5-b203-573dbb7bd1cb", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_meta": {}, "id1": 5, "id2": 100, "updated_at": "2022-12-31T01:00:00Z", "string": "will be deleted'"} diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcupdate_inputrecords_raw.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcupdate_inputrecords_raw.jsonl new file mode 100644 index 0000000000000..7a15d7f39096c --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/cdcupdate_inputrecords_raw.jsonl @@ -0,0 +1,15 @@ +// Records from the first sync (note the non-null loaded_at value) +{"_airbyte_raw_id": "d5790c04-52df-42f3-8f77-a543268822a7", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_loaded_at": "2022-12-31T00:00:01Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2022-12-01T00:00:00Z", "string": "spooky ghost", "_ab_cdc_deleted_at": null}} +{"_airbyte_raw_id": "3593a002-3ab2-4e67-8b4a-e62f0f9a26f9", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_loaded_at": "2022-12-31T00:00:01Z", "_airbyte_data": {"id1": 0, "id2": 100, "updated_at": "2022-12-01T01:00:00Z", "string": "zombie", "_ab_cdc_deleted_at": "2022-12-31T00:O0:00Z"}} +{"_airbyte_raw_id": "e3b03d92-0f7c-49e5-b203-573dbb7bd1cb", "_airbyte_extracted_at": "2022-12-31T00:00:00Z", "_airbyte_loaded_at": "2022-12-31T00:00:01Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2022-12-01T02:00:00Z", "string": "will not be deleted", "_ab_cdc_deleted_at": null}} + +// Records from the second sync +{"_airbyte_raw_id": "5f959152-0db0-44b9-b7e4-0d5c44dc2664", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-010T01:00:00Z", "_ab_cdc_deleted_at": null, "string": "alice"}} +{"_airbyte_raw_id": "a182ff97-8868-42b9-b3cf-c0753fba55e1", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-010T02:00:00Z", "_ab_cdc_deleted_at": null, "string": "alice2"}} +{"_airbyte_raw_id": "65a6c31f-9ded-4e3d-9339-38ee85b0ae81", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-010T03:00:00Z", "_ab_cdc_deleted_at": null, "string": "bob"}} +{"_airbyte_raw_id": "f7fffb67-cd05-4cf7-bcd9-00f2fe796168", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-010T04:00:00Z", "_ab_cdc_deleted_at": "2022-12-31T23:59:59Z"}} +{"_airbyte_raw_id": "4d8674a5-eb6e-41ca-a310-69c64c88d101", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 0, "id2": 100, "updated_at": "2023-01-010T05:00:00Z", "_ab_cdc_deleted_at": null, "string": "zombie_returned"}} +// CDC generally outputs an explicit null for deleted_at, but verify that we can also handle the case where deleted_at is unset. +{"_airbyte_raw_id": "f0b59e49-8c74-4101-9f14-cb4d1193fd5a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-010T06:00:00Z", "string": "charlie"}} +// Verify that we can handle weird values in deleted_at +{"_airbyte_raw_id": "d4e1d989-c115-403c-9e68-5d320e6376bb", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-010T07:00:00Z", "_ab_cdc_deleted_at": {}, "string": "david1"}} diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/incrementaldedup_inputrecords.jsonl b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/incrementaldedup_inputrecords.jsonl new file mode 100644 index 0000000000000..1d850d9dc74bb --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/resources/sqlgenerator/incrementaldedup_inputrecords.jsonl @@ -0,0 +1,3 @@ +{"_airbyte_raw_id": "d7b81af0-01da-4846-a650-cc398986bc99", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}} +{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}} +{"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/StreamId.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/StreamId.java index a4d5d668aa1d6..9851ee7b7e59f 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/StreamId.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/StreamId.java @@ -36,7 +36,7 @@ public String finalTableId(String quote) { return quote + finalNamespace + quote + "." + quote + finalName + quote; } - public String finalTableId(String suffix, String quote) { + public String finalTableId(String quote, String suffix) { return quote + finalNamespace + quote + "." + quote + finalName + suffix + quote; } diff --git a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.java b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.java index 957dd4aa35439..1c2321a315afb 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/MockSqlGenerator.java @@ -21,7 +21,7 @@ public ColumnId buildColumnId(String name) { @Override public String createTable(StreamConfig stream, String suffix) { - return "CREATE TABLE " + stream.id().finalTableId(suffix, ""); + return "CREATE TABLE " + stream.id().finalTableId("", suffix); } @Override @@ -36,12 +36,12 @@ public String softReset(StreamConfig stream) { @Override public String updateTable(StreamConfig stream, String finalSuffix) { - return "UPDATE TABLE " + stream.id().finalTableId(finalSuffix, ""); + return "UPDATE TABLE " + stream.id().finalTableId("", finalSuffix); } @Override public String overwriteFinalTable(StreamId stream, String finalSuffix) { - return "OVERWRITE TABLE " + stream.finalTableId("") + " FROM " + stream.finalTableId(finalSuffix, ""); + return "OVERWRITE TABLE " + stream.finalTableId("") + " FROM " + stream.finalTableId("", finalSuffix); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index 5d326232ee7ed..d4cf4f664692c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -47,7 +47,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.7.2 +LABEL io.airbyte.version=1.7.3 LABEL io.airbyte.name=airbyte/destination-bigquery ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh" diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index a9ce1344d9bfd..18b31fbe1768c 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 1.7.2 + dockerImageTag: 1.7.3 dockerRepository: airbyte/destination-bigquery githubIssueLabel: destination-bigquery icon: bigquery.svg diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index c1d5c8611798c..342a8cd1bf604 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -189,7 +189,7 @@ public String createTable(final StreamConfig stream, final String suffix) { return new StringSubstitutor(Map.of( "final_namespace", stream.id().finalNamespace(QUOTE), "dataset_location", datasetLocation, - "final_table_id", stream.id().finalTableId(suffix, QUOTE), + "final_table_id", stream.id().finalTableId(QUOTE, suffix), "column_declarations", columnDeclarations, "cluster_config", clusterConfig)).replace( """ @@ -451,7 +451,7 @@ AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL return new StringSubstitutor(Map.of( "raw_table_id", stream.id().rawTableId(QUOTE), - "final_table_id", stream.id().finalTableId(finalSuffix, QUOTE), + "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix), "column_casts", columnCasts, "column_errors", columnErrors, "cdcConditionalOrIncludeStatement", cdcConditionalOrIncludeStatement, @@ -493,7 +493,7 @@ String dedupFinalTable(final StreamId id, final String pkList = primaryKey.stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); return new StringSubstitutor(Map.of( - "final_table_id", id.finalTableId(finalSuffix, QUOTE), + "final_table_id", id.finalTableId(QUOTE, finalSuffix), "pk_list", pkList, "cursor_name", cursor.name(QUOTE)) ).replace( @@ -529,7 +529,7 @@ String cdcDeletes(final StreamConfig stream, // we want to grab IDs for deletion from the raw table (not the final table itself) to hand out-of-order record insertions after the delete has been registered return new StringSubstitutor(Map.of( - "final_table_id", stream.id().finalTableId(finalSuffix, QUOTE), + "final_table_id", stream.id().finalTableId(QUOTE, finalSuffix), "raw_table_id", stream.id().rawTableId(QUOTE), "pk_list", pkList, "pk_extracts", pkCasts, @@ -554,7 +554,7 @@ String cdcDeletes(final StreamConfig stream, String dedupRawTable(final StreamId id, final String finalSuffix) { return new StringSubstitutor(Map.of( "raw_table_id", id.rawTableId(QUOTE), - "final_table_id", id.finalTableId(finalSuffix, QUOTE))).replace( + "final_table_id", id.finalTableId(QUOTE, finalSuffix))).replace( // Note that this leaves _all_ deletion records in the raw table. We _could_ clear them out, but it // would be painful, // and it only matters in a few edge cases. @@ -583,7 +583,7 @@ String commitRawTable(final StreamId id) { public String overwriteFinalTable(final StreamId streamId, final String finalSuffix) { return new StringSubstitutor(Map.of( "final_table_id", streamId.finalTableId(QUOTE), - "tmp_final_table", streamId.finalTableId(finalSuffix, QUOTE), + "tmp_final_table", streamId.finalTableId(QUOTE, finalSuffix), "real_final_table", streamId.finalName(QUOTE))).replace( """ DROP TABLE IF EXISTS ${final_table_id}; diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index 0cdeb8dcd13c8..196eafc29fc6f 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -5,21 +5,18 @@ package io.airbyte.integrations.destination.bigquery.typing_deduping; import static com.google.cloud.bigquery.LegacySQLTypeName.legacySQLTypeName; +import static java.util.stream.Collectors.joining; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.Field.Mode; import com.google.cloud.bigquery.FieldValue; import com.google.cloud.bigquery.FieldValueList; import com.google.cloud.bigquery.QueryJobConfiguration; @@ -27,901 +24,64 @@ import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableDefinition; -import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableResult; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; -import io.airbyte.integrations.base.destination.typing_deduping.Array; -import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; -import io.airbyte.integrations.base.destination.typing_deduping.RecordDiffer; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.JavaBaseConstants; +import io.airbyte.integrations.base.destination.typing_deduping.BaseSqlGeneratorIntegrationTest; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; -import io.airbyte.integrations.base.destination.typing_deduping.Struct; import io.airbyte.integrations.destination.bigquery.BigQueryDestination; -import io.airbyte.protocol.models.v0.DestinationSyncMode; -import io.airbyte.protocol.models.v0.SyncMode; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.UUID; -import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.text.StringSubstitutor; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// TODO write test case for multi-column PK @Execution(ExecutionMode.CONCURRENT) -public class BigQuerySqlGeneratorIntegrationTest { +public class BigQuerySqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(BigQuerySqlGeneratorIntegrationTest.class); - private static final BigQuerySqlGenerator GENERATOR = new BigQuerySqlGenerator("US"); - public static final ColumnId ID_COLUMN = GENERATOR.buildColumnId("id"); - public static final List PRIMARY_KEY = List.of(ID_COLUMN); - public static final ColumnId CURSOR = GENERATOR.buildColumnId("updated_at"); - public static final ColumnId CDC_CURSOR = GENERATOR.buildColumnId("_ab_cdc_lsn"); - public static final RecordDiffer DIFFER = new RecordDiffer( - Pair.of("id", AirbyteProtocolType.INTEGER), - Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE), - Pair.of("_ab_cdc_lsn", AirbyteProtocolType.INTEGER) - ); - public static final String QUOTE = "`"; - private static final LinkedHashMap COLUMNS; - private static final LinkedHashMap CDC_COLUMNS; private static BigQuery bq; - private static BigQueryDestinationHandler destinationHandler; - - private String testDataset; - private StreamId streamId; - - static { - COLUMNS = new LinkedHashMap<>(); - COLUMNS.put(ID_COLUMN, AirbyteProtocolType.INTEGER); - COLUMNS.put(CURSOR, AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); - COLUMNS.put(GENERATOR.buildColumnId("struct"), new Struct(new LinkedHashMap<>())); - COLUMNS.put(GENERATOR.buildColumnId("array"), new Array(AirbyteProtocolType.UNKNOWN)); - COLUMNS.put(GENERATOR.buildColumnId("string"), AirbyteProtocolType.STRING); - COLUMNS.put(GENERATOR.buildColumnId("number"), AirbyteProtocolType.NUMBER); - COLUMNS.put(GENERATOR.buildColumnId("integer"), AirbyteProtocolType.INTEGER); - COLUMNS.put(GENERATOR.buildColumnId("boolean"), AirbyteProtocolType.BOOLEAN); - COLUMNS.put(GENERATOR.buildColumnId("timestamp_with_timezone"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); - COLUMNS.put(GENERATOR.buildColumnId("timestamp_without_timezone"), AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE); - COLUMNS.put(GENERATOR.buildColumnId("time_with_timezone"), AirbyteProtocolType.TIME_WITH_TIMEZONE); - COLUMNS.put(GENERATOR.buildColumnId("time_without_timezone"), AirbyteProtocolType.TIME_WITHOUT_TIMEZONE); - COLUMNS.put(GENERATOR.buildColumnId("date"), AirbyteProtocolType.DATE); - COLUMNS.put(GENERATOR.buildColumnId("unknown"), AirbyteProtocolType.UNKNOWN); - - CDC_COLUMNS = new LinkedHashMap<>(); - CDC_COLUMNS.put(ID_COLUMN, AirbyteProtocolType.INTEGER); - CDC_COLUMNS.put(CDC_CURSOR, AirbyteProtocolType.INTEGER); - CDC_COLUMNS.put(GENERATOR.buildColumnId("_ab_cdc_deleted_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); - CDC_COLUMNS.put(GENERATOR.buildColumnId("struct"), new Struct(new LinkedHashMap<>())); - CDC_COLUMNS.put(GENERATOR.buildColumnId("array"), new Array(AirbyteProtocolType.UNKNOWN)); - CDC_COLUMNS.put(GENERATOR.buildColumnId("string"), AirbyteProtocolType.STRING); - CDC_COLUMNS.put(GENERATOR.buildColumnId("number"), AirbyteProtocolType.NUMBER); - CDC_COLUMNS.put(GENERATOR.buildColumnId("integer"), AirbyteProtocolType.INTEGER); - CDC_COLUMNS.put(GENERATOR.buildColumnId("boolean"), AirbyteProtocolType.BOOLEAN); - CDC_COLUMNS.put(GENERATOR.buildColumnId("timestamp_with_timezone"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); - CDC_COLUMNS.put(GENERATOR.buildColumnId("timestamp_without_timezone"), AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE); - CDC_COLUMNS.put(GENERATOR.buildColumnId("time_with_timezone"), AirbyteProtocolType.TIME_WITH_TIMEZONE); - CDC_COLUMNS.put(GENERATOR.buildColumnId("time_without_timezone"), AirbyteProtocolType.TIME_WITHOUT_TIMEZONE); - CDC_COLUMNS.put(GENERATOR.buildColumnId("date"), AirbyteProtocolType.DATE); - CDC_COLUMNS.put(GENERATOR.buildColumnId("unknown"), AirbyteProtocolType.UNKNOWN); - } @BeforeAll - public static void setup() throws Exception { + public static void setupBigquery() throws Exception { final String rawConfig = Files.readString(Path.of("secrets/credentials-gcs-staging.json")); final JsonNode config = Jsons.deserialize(rawConfig); - bq = BigQueryDestination.getBigQuery(config); - destinationHandler = new BigQueryDestinationHandler(bq, "US"); - } - - @BeforeEach - public void setupDataset() { - testDataset = "bq_sql_generator_test_" + UUID.randomUUID().toString().replace("-", "_"); - // This is not a typical stream ID would look like, but we're just using this to isolate our tests - // to a specific dataset. - // In practice, the final table would be testDataset.users, and the raw table would be - // airbyte.testDataset_users. - streamId = new StreamId(testDataset, "users_final", testDataset, "users_raw", testDataset, "users_final"); - LOGGER.info("Running in dataset {}", testDataset); - - bq.create(DatasetInfo.newBuilder(testDataset) - // This unfortunately doesn't delete the actual dataset after 3 days, but at least we can clear out - // the tables if the AfterEach is skipped. - .setDefaultTableLifetime(Duration.ofDays(3).toMillis()) - .build()); - } - - @AfterEach - public void teardownDataset() { - bq.delete(testDataset, BigQuery.DatasetDeleteOption.deleteContents()); - } - - @Test - public void testCreateTableIncremental() throws InterruptedException { - final StreamConfig stream = incrementalDedupStreamConfig(); - - destinationHandler.execute(GENERATOR.createTable(stream, "")); - - final Table table = bq.getTable(testDataset, "users_final"); - // The table should exist - assertNotNull(table); - final Schema schema = table.getDefinition().getSchema(); - // And we should know exactly what columns it contains - assertEquals( - // Would be nice to assert directly against StandardSQLTypeName, but bigquery returns schemas of - // LegacySQLTypeName. So we have to translate. - Schema.of( - Field.newBuilder("_airbyte_raw_id", legacySQLTypeName(StandardSQLTypeName.STRING)).setMode(Mode.REQUIRED).build(), - Field.newBuilder("_airbyte_extracted_at", legacySQLTypeName(StandardSQLTypeName.TIMESTAMP)).setMode(Mode.REQUIRED).build(), - Field.newBuilder("_airbyte_meta", legacySQLTypeName(StandardSQLTypeName.JSON)).setMode(Mode.REQUIRED).build(), - Field.of("id", legacySQLTypeName(StandardSQLTypeName.INT64)), - Field.of("updated_at", legacySQLTypeName(StandardSQLTypeName.TIMESTAMP)), - Field.of("struct", legacySQLTypeName(StandardSQLTypeName.JSON)), - Field.of("array", legacySQLTypeName(StandardSQLTypeName.JSON)), - Field.of("string", legacySQLTypeName(StandardSQLTypeName.STRING)), - Field.of("number", legacySQLTypeName(StandardSQLTypeName.NUMERIC)), - Field.of("integer", legacySQLTypeName(StandardSQLTypeName.INT64)), - Field.of("boolean", legacySQLTypeName(StandardSQLTypeName.BOOL)), - Field.of("timestamp_with_timezone", legacySQLTypeName(StandardSQLTypeName.TIMESTAMP)), - Field.of("timestamp_without_timezone", legacySQLTypeName(StandardSQLTypeName.DATETIME)), - Field.of("time_with_timezone", legacySQLTypeName(StandardSQLTypeName.STRING)), - Field.of("time_without_timezone", legacySQLTypeName(StandardSQLTypeName.TIME)), - Field.of("date", legacySQLTypeName(StandardSQLTypeName.DATE)), - Field.of("unknown", legacySQLTypeName(StandardSQLTypeName.JSON))), - schema); - // TODO this should assert partitioning/clustering configs - } - - @Test - public void testCreateTableInOtherRegion() throws InterruptedException { - final StreamConfig stream = incrementalDedupStreamConfig(); - BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(bq, "asia-east1"); - // We're creating the dataset in the wrong location in the @BeforeEach block. Explicitly delete it. - bq.getDataset(testDataset).delete(); - - destinationHandler.execute(new BigQuerySqlGenerator("asia-east1").createTable(stream, "")); - - // Empirically, it sometimes takes Bigquery nearly 30 seconds to propagate the dataset's existence. - // Give ourselves 2 minutes just in case. - for (int i = 0; i < 120; i++) { - final Dataset dataset = bq.getDataset(DatasetId.of(bq.getOptions().getProjectId(), testDataset)); - if (dataset == null) { - LOGGER.info("Sleeping and trying again... ({})", i); - Thread.sleep(1000); - } else { - assertEquals("asia-east1", dataset.getLocation()); - return; - } - } - fail("Dataset does not exist"); - } - - @Test - public void testVerifyPrimaryKeysIncremental() throws InterruptedException { - createRawTable(); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{}', '10d6e27d-ae7a-41b5-baf8-c4c277ef9c11', '2023-01-01T00:00:00Z'), - (JSON'{"id": 1}', '5ce60e70-98aa-4fe3-8159-67207352c4f0', '2023-01-01T00:00:00Z'); - """)) - .build()); - - // This variable is declared outside of the transaction, so we need to do it manually here - final String sql = "DECLARE missing_pk_count INT64;" + GENERATOR.validatePrimaryKeys(streamId, List.of(new ColumnId("id", "id", "id")), COLUMNS); - final BigQueryException e = assertThrows( - BigQueryException.class, - () -> destinationHandler.execute(sql)); - - assertTrue(e.getError().getMessage().startsWith("Raw table has 1 rows missing a primary key at"), - "Message was actually: " + e.getError().getMessage()); - } - - @Test - public void testInsertNewRecordsIncremental() throws InterruptedException { - createRawTable(); - createFinalTable(); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{"id": 1, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}}', '972fa08a-aa06-4b91-a6af-a371aee4cb1c', '2023-01-01T00:00:00Z'), - (JSON'{"id": 1, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}}', '233ad43d-de50-4a47-bbe6-7a417ce60d9d', '2023-01-01T00:00:00Z'), - (JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'd4aeb036-2d95-4880-acd2-dc69b42b03c6', '2023-01-01T00:00:00Z'); - """)) - .build()); - - final String sql = GENERATOR.insertNewRecords(incrementalDedupStreamConfig(), "", COLUMNS); - destinationHandler.execute(sql); - - final TableResult result = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId(QUOTE)).build()); - DIFFER.diffFinalTableRecords( - List.of( - Jsons.deserialize( - """ - { - "id": 1, - "updated_at": "2023-01-01T01:00:00Z", - "string": "Alice", - "struct": {"city": "San Francisco", "state": "CA"}, - "_airbyte_extracted_at": "2023-01-01T00:00:00Z", - "_airbyte_meta": {"errors":[]} - } - """ - ), - Jsons.deserialize( - """ - { - "id": 1, - "updated_at": "2023-01-01T02:00:00Z", - "string": "Alice", - "struct": {"city": "San Diego", "state": "CA"}, - "_airbyte_extracted_at": "2023-01-01T00:00:00Z", - "_airbyte_meta": {"errors":[]} - } - """ - ), - Jsons.deserialize( - """ - { - "id": 2, - "updated_at": "2023-01-01T03:00:00Z", - "string": "Bob", - "_airbyte_extracted_at": "2023-01-01T00:00:00Z", - "_airbyte_meta": {"errors":["Problem with `integer`"]} - } - """ - )), - toJsonRecords(result)); - } - - @Test - public void testDedupFinalTable() throws InterruptedException { - createRawTable(); - createFinalTable(); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{"id": 1, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}', 'd7b81af0-01da-4846-a650-cc398986bc99', '2023-01-01T00:00:00Z'), - (JSON'{"id": 1, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}', '80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z'), - (JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z'), - (JSON'{"id": 3, "string": "Charlie", "integer": 123}', '22af9e56-7ebb-4f5f-ae6b-6ba53360e41e', '2023-01-01T00:00:00Z'), - (JSON'{"id": 3, "updated_at": "2023-01-01T04:00:00Z", "string": "Charlie", "integer": 456}', '0f2375ac-94c1-4be4-99d8-06db40a8ce3e', '2023-01-01T00:00:00Z'); - - INSERT INTO ${dataset}.users_final (_airbyte_raw_id, _airbyte_extracted_at, _airbyte_meta, `id`, `updated_at`, `string`, `struct`, `integer`) values - ('d7b81af0-01da-4846-a650-cc398986bc99', '2023-01-01T00:00:00Z', JSON'{"errors":[]}', 1, '2023-01-01T01:00:00Z', 'Alice', JSON'{"city": "San Francisco", "state": "CA"}', 42), - ('80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z', JSON'{"errors":[]}', 1, '2023-01-01T02:00:00Z', 'Alice', JSON'{"city": "San Diego", "state": "CA"}', 84), - ('ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z', JSON'{"errors": ["blah blah integer"]}', 2, '2023-01-01T03:00:00Z', 'Bob', NULL, NULL), - -- cursor=NULL should be discarded in favor of cursor= - ('22af9e56-7ebb-4f5f-ae6b-6ba53360e41e', '2023-01-01T00:00:00Z', JSON'{"errors": []}', 3, NULL, 'Charlie', NULL, 123), - ('0f2375ac-94c1-4be4-99d8-06db40a8ce3e', '2023-01-01T00:00:00Z', JSON'{"errors": []}', 3, '2023-01-01T04:00:00Z', 'Charlie', NULL, 456); - """)) - .build()); - - final String sql = GENERATOR.dedupFinalTable(streamId, "", PRIMARY_KEY, CURSOR); - destinationHandler.execute(sql); - - final TableResult result = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId(QUOTE)).build()); - DIFFER.diffFinalTableRecords( - List.of( - Jsons.deserialize( - """ - { - "id": 1, - "updated_at": "2023-01-01T02:00:00Z", - "string": "Alice", - "struct": {"city": "San Diego", "state": "CA"}, - "integer": 84, - "_airbyte_extracted_at": "2023-01-01T00:00:00Z", - "_airbyte_meta": {"errors":[]} - } - """), - Jsons.deserialize( - """ - { - "id": 2, - "updated_at": "2023-01-01T03:00:00Z", - "string": "Bob", - "_airbyte_extracted_at": "2023-01-01T00:00:00Z", - "_airbyte_meta": {"errors":["blah blah integer"]} - } - """), - Jsons.deserialize( - """ - { - "id": 3, - "updated_at": "2023-01-01T04:00:00Z", - "string": "Charlie", - "integer": 456, - "_airbyte_extracted_at": "2023-01-01T00:00:00Z", - "_airbyte_meta": {"errors":[]} - } - """)), - toJsonRecords(result)); - } - - @Test - public void testDedupRawTable() throws InterruptedException { - createRawTable(); - createFinalTable(); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{"id": 1, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}', 'd7b81af0-01da-4846-a650-cc398986bc99', '2023-01-01T00:00:00Z'), - (JSON'{"id": 1, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}', '80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z'), - (JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z'); - - INSERT INTO ${dataset}.users_final (_airbyte_raw_id, _airbyte_extracted_at, _airbyte_meta, `id`, `updated_at`, `string`, `struct`, `integer`) values - ('80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z', JSON'{"errors":[]}', 1, '2023-01-01T02:00:00Z', 'Alice', JSON'{"city": "San Diego", "state": "CA"}', 84), - ('ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z', JSON'{"errors": ["blah blah integer"]}', 2, '2023-01-01T03:00:00Z', 'Bob', NULL, NULL); - """)) - .build()); - - final String sql = GENERATOR.dedupRawTable(streamId, ""); - destinationHandler.execute(sql); - - final TableResult result = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()); - DIFFER.diffRawTableRecords( - List.of( - Jsons.deserialize( - """ - { - "_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", - "_airbyte_extracted_at": "2023-01-01T00:00:00Z", - "_airbyte_data": {"id": 1, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84} - } - """ - ), - Jsons.deserialize( - """ - { - "_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", - "_airbyte_extracted_at": "2023-01-01T00:00:00Z", - "_airbyte_data": {"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"} - } - """ - )), - toJsonRecords(result)); - } - - @Test - public void testCommitRawTable() throws InterruptedException { - createRawTable(); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{"id": 1, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}', '80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z'), - (JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z'); - """)) - .build()); - - final String sql = GENERATOR.commitRawTable(streamId); - destinationHandler.execute(sql); - - final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); - assertEquals(0, rawUntypedRows); - } - - @Test - public void testFullUpdateAllTypes() throws InterruptedException { - createRawTable(); - createFinalTable("_foo"); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - INSERT INTO ${dataset}.users_raw (`_airbyte_raw_id`, `_airbyte_extracted_at`, `_airbyte_data`) VALUES - (generate_uuid(), '2023-01-01T00:00:00Z', JSON'{"id": 1, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}'), - (generate_uuid(), '2023-01-01T00:00:00Z', JSON'{"id": 2, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}'), - (generate_uuid(), '2023-01-01T00:00:00Z', JSON'{"id": 3, "updated_at": "2023-01-01T01:00:00Z"}'), - (generate_uuid(), '2023-01-01T00:00:00Z', JSON'{"id": 4, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": {}, "number": {}, "integer": {}, "boolean": {}, "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": {}, "unknown": null}'); - """)) - .build()); - - final String sql = GENERATOR.updateTable(incrementalDedupStreamConfig(), "_foo"); - destinationHandler.execute(sql); - - final TableResult finalTable = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("_foo", QUOTE)).build()); - DIFFER.diffFinalTableRecords( - List.of( - Jsons.deserialize( - """ - { - "id": 1, - "updated_at": "2023-01-01T01:00:00Z", - "array": ["foo"], - "struct": {"foo": "bar"}, - "string": "foo", - "number": 42.1, - "integer": 42, - "boolean": true, - "timestamp_with_timezone": "2023-01-23T12:34:56Z", - "timestamp_without_timezone": "2023-01-23T12:34:56", - "time_with_timezone": "12:34:56Z", - "time_without_timezone": "12:34:56", - "date": "2023-01-23", - "unknown": {}, - "_airbyte_extracted_at": "2023-01-01T00:00:00Z", - "_airbyte_meta": {"errors": []} - } - """), - Jsons.deserialize( - """ - { - "id": 2, - "updated_at": "2023-01-01T01:00:00Z", - "unknown": null, - "_airbyte_extracted_at": "2023-01-01T00:00:00Z", - "_airbyte_meta": {"errors": []} - } - """), - Jsons.deserialize( - """ - { - "id": 3, - "updated_at": "2023-01-01T01:00:00Z", - "_airbyte_extracted_at": "2023-01-01T00:00:00Z", - "_airbyte_meta": {"errors": []} - } - """), - Jsons.deserialize( - """ - { - "id": 4, - "updated_at": "2023-01-01T01:00:00Z", - "unknown": null, - "_airbyte_extracted_at": "2023-01-01T00:00:00Z", - "_airbyte_meta": { - "errors": [ - "Problem with `struct`", - "Problem with `array`", - "Problem with `string`", - "Problem with `number`", - "Problem with `integer`", - "Problem with `boolean`", - "Problem with `timestamp_with_timezone`", - "Problem with `timestamp_without_timezone`", - "Problem with `time_with_timezone`", - "Problem with `time_without_timezone`", - "Problem with `date`" - ] - } - } - """)), - toJsonRecords(finalTable)); - - final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); - assertEquals(4, rawRows); - final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); - assertEquals(0, rawUntypedRows); - } - - @Test - public void testFullUpdateIncrementalDedup() throws InterruptedException { - createRawTable(); - createFinalTable(); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{"id": 1, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}', 'd7b81af0-01da-4846-a650-cc398986bc99', '2023-01-01T00:00:00Z'), - (JSON'{"id": 1, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}', '80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z'), - (JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z'); - """)) - .build()); - - final String sql = GENERATOR.updateTable(incrementalDedupStreamConfig(), ""); - destinationHandler.execute(sql); - - // TODO - final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId(QUOTE)).build()).getTotalRows(); - assertEquals(2, finalRows); - final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); - assertEquals(2, rawRows); - final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); - assertEquals(0, rawUntypedRows); - } - - @Test - public void testFullUpdateIncrementalAppend() throws InterruptedException { - createRawTable(); - createFinalTable("_foo"); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{"id": 1, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}', 'd7b81af0-01da-4846-a650-cc398986bc99', '2023-01-01T00:00:00Z'), - (JSON'{"id": 1, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}', '80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z'), - (JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z'); - """)) - .build()); - - final String sql = GENERATOR.updateTable(incrementalAppendStreamConfig(), "_foo"); - destinationHandler.execute(sql); - - // TODO - final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("_foo", QUOTE)).build()).getTotalRows(); - assertEquals(3, finalRows); - final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); - assertEquals(3, rawRows); - final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); - assertEquals(0, rawUntypedRows); - } - - // This is also effectively the full refresh overwrite test case. - // In the overwrite case, we rely on the destination connector to tell us to write to a final table - // with a _tmp suffix, and then call overwriteFinalTable at the end of the sync. - @Test - public void testFullUpdateFullRefreshAppend() throws InterruptedException { - createRawTable(); - createFinalTable("_foo"); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{"id": 1, "updated_at": "2023-01-01T01:00:00Z", "string": "Alice", "struct": {"city": "San Francisco", "state": "CA"}, "integer": 42}', 'd7b81af0-01da-4846-a650-cc398986bc99', '2023-01-01T00:00:00Z'), - (JSON'{"id": 1, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}', '80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z'), - (JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z'); - - INSERT INTO ${dataset}.users_final_foo (_airbyte_raw_id, _airbyte_extracted_at, _airbyte_meta, `id`, `updated_at`, `string`, `struct`, `integer`) values - ('64f4390f-3da1-4b65-b64a-a6c67497f18d', '2022-12-31T00:00:00Z', JSON'{"errors": []}', 1, '2022-12-31T00:00:00Z', 'Alice', NULL, NULL); - """)) - .build()); - - final String sql = GENERATOR.updateTable(fullRefreshAppendStreamConfig(), "_foo"); - destinationHandler.execute(sql); - - // TODO - final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("_foo", QUOTE)).build()).getTotalRows(); - assertEquals(4, finalRows); - final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); - assertEquals(3, rawRows); - final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); - assertEquals(0, rawUntypedRows); } - @Test - public void testRenameFinalTable() throws InterruptedException { - createFinalTable("_tmp"); - - final String sql = GENERATOR.overwriteFinalTable(fullRefreshOverwriteStreamConfig().id(), "_tmp"); - destinationHandler.execute(sql); - - final Table table = bq.getTable(testDataset, "users_final"); - // TODO this should assert table schema + partitioning/clustering configs - assertNotNull(table); + @Override + protected BigQuerySqlGenerator getSqlGenerator() { + return new BigQuerySqlGenerator("US"); } - @Test - public void testCdcBasics() throws InterruptedException { - createRawTable(); - createFinalTableCdc(); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`, `_airbyte_loaded_at`) VALUES - (JSON'{"id": 1, "_ab_cdc_lsn": 10001, "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"}', generate_uuid(), '2023-01-01T00:00:00Z', NULL); - """)) - .build()); - - final String sql = GENERATOR.updateTable(cdcStreamConfig(), ""); - destinationHandler.execute(sql); - - // TODO better asserts - final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows(); - assertEquals(0, finalRows); - final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); - assertEquals(1, rawRows); - final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); - assertEquals(0, rawUntypedRows); + @Override + protected BigQueryDestinationHandler getDestinationHandler() { + return new BigQueryDestinationHandler(bq, "US"); } - /** - * Verify that running T+D twice is idempotent. Previously there was a bug where non-dedup syncs with - * an _ab_cdc_deleted_at column would duplicate "deleted" records on each run. - */ - @Test - public void testCdcNonDedupIdempotent() throws InterruptedException { - createRawTable(); - createFinalTableCdc(); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{"id": 1, "_ab_cdc_lsn": 10001, "_ab_cdc_deleted_at": null, "string": "alice"}', generate_uuid(), '2023-01-01T00:00:00Z'), - (JSON'{"id": 2, "_ab_cdc_lsn": 10002, "_ab_cdc_deleted_at": "2022-12-31T23:59:59Z"}', generate_uuid(), '2023-01-01T00:00:00Z'); - """)) - .build()); - - final String sql = GENERATOR.updateTable(cdcIncrementalAppendStreamConfig(), ""); - // Execute T+D twice - destinationHandler.execute(sql); - destinationHandler.execute(sql); - - // There were exactly two raw records, so there should be exactly two final records - final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows(); - assertEquals(2, finalRows); - // And the raw table should be untouched - final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); - assertEquals(2, rawRows); // we only keep the newest raw record for reach PK - final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); - assertEquals(0, rawUntypedRows); - } - - @Test - public void testCdcUpdate() throws InterruptedException { - createRawTable(); - createFinalTableCdc(); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - -- records from a previous sync - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`, `_airbyte_loaded_at`) VALUES - (JSON'{"id": 1, "_ab_cdc_lsn": 900, "string": "spooky ghost", "_ab_cdc_deleted_at": null}', '64f4390f-3da1-4b65-b64a-a6c67497f18d', '2022-12-31T00:00:00Z', '2022-12-31T00:00:01Z'), - (JSON'{"id": 0, "_ab_cdc_lsn": 901, "string": "zombie", "_ab_cdc_deleted_at": "2022-12-31T00:O0:00Z"}', generate_uuid(), '2022-12-31T00:00:00Z', '2022-12-31T00:00:01Z'), - (JSON'{"id": 5, "_ab_cdc_lsn": 902, "string": "will not be deleted", "_ab_cdc_deleted_at": null}', 'b6139181-a42c-45c3-89f2-c4b4bb3a8c9d', '2022-12-31T00:00:00Z', '2022-12-31T00:00:01Z'); - INSERT INTO ${dataset}.users_final (_airbyte_raw_id, _airbyte_extracted_at, _airbyte_meta, `id`, `_ab_cdc_lsn`, `string`, `struct`, `integer`) values - ('64f4390f-3da1-4b65-b64a-a6c67497f18d', '2022-12-31T00:00:00Z', JSON'{}', 1, 900, 'spooky ghost', NULL, NULL), - ('b6139181-a42c-45c3-89f2-c4b4bb3a8c9d', '2022-12-31T00:00:00Z', JSON'{}', 5, 901, 'will be deleted', NULL, NULL); - - -- new records from the current sync - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{"id": 2, "_ab_cdc_lsn": 10001, "_ab_cdc_deleted_at": null, "string": "alice"}', generate_uuid(), '2023-01-01T00:00:00Z'), - (JSON'{"id": 2, "_ab_cdc_lsn": 10002, "_ab_cdc_deleted_at": null, "string": "alice2"}', generate_uuid(), '2023-01-01T00:00:00Z'), - (JSON'{"id": 3, "_ab_cdc_lsn": 10003, "_ab_cdc_deleted_at": null, "string": "bob"}', generate_uuid(), '2023-01-01T00:00:00Z'), - (JSON'{"id": 1, "_ab_cdc_lsn": 10004, "_ab_cdc_deleted_at": "2022-12-31T23:59:59Z"}', generate_uuid(), '2023-01-01T00:00:00Z'), - (JSON'{"id": 0, "_ab_cdc_lsn": 10005, "_ab_cdc_deleted_at": null, "string": "zombie_returned"}', generate_uuid(), '2023-01-01T00:00:00Z'), - -- CDC generally outputs an explicit null for deleted_at, but verify that we can also handle the case where deleted_at is unset. - (JSON'{"id": 4, "_ab_cdc_lsn": 10006, "string": "charlie"}', generate_uuid(), '2023-01-01T00:00:00Z'), - -- Verify that we can handle weird values in deleted_at - (JSON'{"id": 5, "_ab_cdc_lsn": 10007, "_ab_cdc_deleted_at": {}, "string": "david"}', generate_uuid(), '2023-01-01T00:00:00Z'); - """)) - .build()); - - final String sql = GENERATOR.updateTable(cdcStreamConfig(), ""); - destinationHandler.execute(sql); - - final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows(); - assertEquals(5, finalRows); - final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); - assertEquals(6, rawRows); // we only keep the newest raw record for reach PK - final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); - assertEquals(0, rawUntypedRows); - } - - /** - * source operations: - *

    - *
  1. insert id=1 (lsn 10000)
  2. - *
  3. delete id=1 (lsn 10001)
  4. - *
- *

- * But the destination writes lsn 10001 before 10000. We should still end up with no records in the - * final table. - *

- * All records have the same emitted_at timestamp. This means that we live or die purely based on - * our ability to use _ab_cdc_lsn. - */ - @Test - public void testCdcOrdering_updateAfterDelete() throws InterruptedException { - createRawTable(); - createFinalTableCdc(); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - -- Write raw deletion record from the first batch, which resulted in an empty final table. - -- Note the non-null loaded_at - this is to simulate that we previously ran T+D on this record. - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`, `_airbyte_loaded_at`) VALUES - (JSON'{"id": 1, "_ab_cdc_lsn": 10001, "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"}', generate_uuid(), '2023-01-01T00:00:00Z', '2023-01-01T00:00:01Z'); - - -- insert raw record from the second record batch - this is an outdated record that should be ignored. - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{"id": 1, "_ab_cdc_lsn": 10000, "string": "alice"}', generate_uuid(), '2023-01-01T00:00:00Z'); - """)) - .build()); - - final String sql = GENERATOR.updateTable(cdcStreamConfig(), ""); - destinationHandler.execute(sql); - - final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows(); - assertEquals(0, finalRows); - final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); - assertEquals(1, rawRows); - final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); - assertEquals(0, rawUntypedRows); - } - - /** - * source operations: - *

    - *
  1. arbitrary history...
  2. - *
  3. delete id=1 (lsn 10001)
  4. - *
  5. reinsert id=1 (lsn 10002)
  6. - *
- *

- * But the destination receives LSNs 10002 before 10001. In this case, we should keep the reinserted - * record in the final table. - *

- * All records have the same emitted_at timestamp. This means that we live or die purely based on - * our ability to use _ab_cdc_lsn. - */ - @Test - public void testCdcOrdering_insertAfterDelete() throws InterruptedException { - createRawTable(); - createFinalTableCdc(); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - -- records from the first batch - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`, `_airbyte_loaded_at`) VALUES - (JSON'{"id": 1, "_ab_cdc_lsn": 10002, "string": "alice_reinsert"}', '64f4390f-3da1-4b65-b64a-a6c67497f18d', '2023-01-01T00:00:00Z', '2023-01-01T00:00:01Z'); - INSERT INTO ${dataset}.users_final (_airbyte_raw_id, _airbyte_extracted_at, _airbyte_meta, `id`, `_ab_cdc_lsn`, `string`) values - ('64f4390f-3da1-4b65-b64a-a6c67497f18d', '2023-01-01T00:00:00Z', JSON'{}', 1, 10002, 'alice_reinsert'); - - -- second record batch - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{"id": 1, "_ab_cdc_lsn": 10001, "_ab_cdc_deleted_at": "2023-01-01T00:01:00Z"}', generate_uuid(), '2023-01-01T00:00:00Z'); - """)) - .build()); - // Run the second round of typing and deduping. This should do nothing to the final table, because - // the delete is outdated. - final String sql = GENERATOR.updateTable(cdcStreamConfig(), ""); - destinationHandler.execute(sql); - - final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows(); - assertEquals(1, finalRows); - final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); - assertEquals(1, rawRows); - final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); - assertEquals(0, rawUntypedRows); - } - - @Test - public void softReset() throws InterruptedException { - createRawTable(); - createFinalTableCdc(); - bq.query(QueryJobConfiguration.newBuilder( - new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( - """ - ALTER TABLE ${dataset}.users_final ADD COLUMN `weird_new_column` INT64; - - INSERT INTO ${dataset}.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES - (JSON'{"id": 1, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}', '80c99b54-54b4-43bd-b51b-1f67dafa2c52', '2023-01-01T00:00:00Z'), - (JSON'{"id": 2, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}', 'ad690bfb-c2c2-4172-bd73-a16c86ccbb67', '2023-01-01T00:00:00Z'); - """)) + @Override + protected void createNamespace(String namespace) { + bq.create(DatasetInfo.newBuilder(namespace) + // This unfortunately doesn't delete the actual dataset after 3 days, but at least we'll clear out old tables automatically + .setDefaultTableLifetime(Duration.ofDays(3).toMillis()) .build()); - - final String sql = GENERATOR.softReset(incrementalDedupStreamConfig()); - destinationHandler.execute(sql); - - TableDefinition finalTableDefinition = bq.getTable(TableId.of(testDataset, "users_final")).getDefinition(); - assertTrue( - finalTableDefinition.getSchema().getFields().stream().noneMatch(f -> f.getName().equals("weird_new_column")), - "weird_new_column was expected to no longer exist after soft reset"); - final long finalRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.finalTableId("", QUOTE)).build()).getTotalRows(); - assertEquals(2, finalRows); - final long rawRows = bq.query(QueryJobConfiguration.newBuilder("SELECT * FROM " + streamId.rawTableId(QUOTE)).build()).getTotalRows(); - assertEquals(2, rawRows); - final long rawUntypedRows = bq.query(QueryJobConfiguration.newBuilder( - "SELECT * FROM " + streamId.rawTableId(QUOTE) + " WHERE _airbyte_loaded_at IS NULL").build()).getTotalRows(); - assertEquals(0, rawUntypedRows); - } - - private StreamConfig incrementalDedupStreamConfig() { - return new StreamConfig( - streamId, - SyncMode.INCREMENTAL, - DestinationSyncMode.APPEND_DEDUP, - PRIMARY_KEY, - Optional.of(CURSOR), - COLUMNS); - } - - private StreamConfig cdcStreamConfig() { - return new StreamConfig( - streamId, - SyncMode.INCREMENTAL, - DestinationSyncMode.APPEND_DEDUP, - PRIMARY_KEY, - // Much like the rest of this class - this is purely for test purposes. Real CDC cursors may not be - // exactly the same as this. - Optional.of(CDC_CURSOR), - CDC_COLUMNS); - } - - private StreamConfig cdcIncrementalAppendStreamConfig() { - return new StreamConfig( - streamId, - SyncMode.INCREMENTAL, - // This is the only difference between this and cdcStreamConfig. - DestinationSyncMode.APPEND, - PRIMARY_KEY, - Optional.of(CDC_CURSOR), - CDC_COLUMNS); - } - - private StreamConfig incrementalAppendStreamConfig() { - return new StreamConfig( - streamId, - SyncMode.INCREMENTAL, - DestinationSyncMode.APPEND, - null, - Optional.of(CURSOR), - COLUMNS); - } - - private StreamConfig fullRefreshAppendStreamConfig() { - return new StreamConfig( - streamId, - SyncMode.FULL_REFRESH, - DestinationSyncMode.APPEND, - null, - Optional.empty(), - COLUMNS); - } - - private StreamConfig fullRefreshOverwriteStreamConfig() { - return new StreamConfig( - streamId, - SyncMode.FULL_REFRESH, - DestinationSyncMode.OVERWRITE, - null, - Optional.empty(), - COLUMNS); } - // These are known-good methods for doing stuff with bigquery. - // Some of them are identical to what the sql generator does, and that's intentional. - private void createRawTable() throws InterruptedException { + @Override + protected void createRawTable(StreamId streamId) throws InterruptedException { bq.query(QueryJobConfiguration.newBuilder( new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( + "raw_table_id", streamId.rawTableId(BigQuerySqlGenerator.QUOTE))).replace( """ - CREATE TABLE ${dataset}.users_raw ( + CREATE TABLE ${raw_table_id} ( _airbyte_raw_id STRING NOT NULL, _airbyte_data JSON NOT NULL, _airbyte_extracted_at TIMESTAMP NOT NULL, @@ -933,22 +93,22 @@ private void createRawTable() throws InterruptedException { .build()); } - private void createFinalTable() throws InterruptedException { - createFinalTable(""); - } - - private void createFinalTable(final String suffix) throws InterruptedException { + @Override + protected void createFinalTable(boolean includeCdcDeletedAt, StreamId streamId, String suffix) throws InterruptedException { + String cdcDeletedAt = includeCdcDeletedAt ? "`_ab_cdc_deleted_at` TIMESTAMP," : ""; bq.query(QueryJobConfiguration.newBuilder( new StringSubstitutor(Map.of( - "dataset", testDataset, - "suffix", suffix)).replace( + "final_table_id", streamId.finalTableId(BigQuerySqlGenerator.QUOTE, suffix), + "cdc_deleted_at", cdcDeletedAt)).replace( """ - CREATE TABLE ${dataset}.users_final${suffix} ( + CREATE TABLE ${final_table_id} ( _airbyte_raw_id STRING NOT NULL, _airbyte_extracted_at TIMESTAMP NOT NULL, _airbyte_meta JSON NOT NULL, - `id` INT64, + `id1` INT64, + `id2` INT64, `updated_at` TIMESTAMP, + ${cdc_deleted_at} `struct` JSON, `array` JSON, `string` STRING, @@ -963,42 +123,235 @@ private void createFinalTable(final String suffix) throws InterruptedException { `unknown` JSON ) PARTITION BY (DATE_TRUNC(_airbyte_extracted_at, DAY)) - CLUSTER BY id, _airbyte_extracted_at; + CLUSTER BY id1, id2, _airbyte_extracted_at; """)) .build()); } - private void createFinalTableCdc() throws InterruptedException { + @Override + protected void insertFinalTableRecords(boolean includeCdcDeletedAt, StreamId streamId, String suffix, List records) throws InterruptedException { + List columnNames = includeCdcDeletedAt ? FINAL_TABLE_COLUMN_NAMES_CDC : FINAL_TABLE_COLUMN_NAMES; + String cdcDeletedAtDecl = includeCdcDeletedAt ? ",`_ab_cdc_deleted_at` TIMESTAMP" : ""; + String cdcDeletedAtName = includeCdcDeletedAt ? ",`_ab_cdc_deleted_at`" : ""; + String recordsText = records.stream() + // For each record, convert it to a string like "(rawId, extractedAt, loadedAt, data)" + .map(record -> columnNames.stream() + .map(record::get) + .map(r -> { + if (r == null) { + return "NULL"; + } + String stringContents; + if (r.isTextual()) { + stringContents = r.asText(); + } else { + stringContents = r.toString(); + } + return '"' + stringContents + // Serialized json might contain backslashes and double quotes. Escape them. + .replace("\\", "\\\\") + .replace("\"", "\\\"") + '"'; + }) + .collect(joining(","))) + .map(row -> "(" + row + ")") + .collect(joining(",")); + bq.query(QueryJobConfiguration.newBuilder( new StringSubstitutor(Map.of( - "dataset", testDataset)).replace( + "final_table_id", streamId.finalTableId(BigQuerySqlGenerator.QUOTE, suffix), + "cdc_deleted_at_name", cdcDeletedAtName, + "cdc_deleted_at_decl", cdcDeletedAtDecl, + "records", recordsText)).replace( + // Similar to insertRawTableRecords, some of these columns are declared as string and wrapped in parse_json(). + // There's also a bunch of casting, because bigquery doesn't coerce strings to e.g. int """ - CREATE TABLE ${dataset}.users_final ( - _airbyte_raw_id STRING NOT NULL, - _airbyte_extracted_at TIMESTAMP NOT NULL, - _airbyte_meta JSON NOT NULL, - `id` INT64, - `_ab_cdc_deleted_at` TIMESTAMP, - `_ab_cdc_lsn` INT64, - `struct` JSON, - `array` JSON, - `string` STRING, - `number` NUMERIC, - `integer` INT64, - `boolean` BOOL, - `timestamp_with_timezone` TIMESTAMP, - `timestamp_without_timezone` DATETIME, - `time_with_timezone` STRING, - `time_without_timezone` TIME, - `date` DATE, - `unknown` JSON + insert into ${final_table_id} ( + _airbyte_raw_id, + _airbyte_extracted_at, + _airbyte_meta, + `id1`, + `id2`, + `updated_at`, + `struct`, + `array`, + `string`, + `number`, + `integer`, + `boolean`, + `timestamp_with_timezone`, + `timestamp_without_timezone`, + `time_with_timezone`, + `time_without_timezone`, + `date`, + `unknown` + ${cdc_deleted_at_name} ) - PARTITION BY (DATE_TRUNC(_airbyte_extracted_at, DAY)) - CLUSTER BY id, _airbyte_extracted_at; + select + _airbyte_raw_id, + _airbyte_extracted_at, + parse_json(_airbyte_meta), + cast(`id1` as int64), + cast(`id2` as int64), + `updated_at`, + parse_json(`struct`), + parse_json(`array`), + `string`, + cast(`number` as numeric), + cast(`integer` as int64), + cast(`boolean` as boolean), + `timestamp_with_timezone`, + `timestamp_without_timezone`, + `time_with_timezone`, + `time_without_timezone`, + `date`, + parse_json(`unknown`) + ${cdc_deleted_at_name} + from unnest([ + STRUCT< + _airbyte_raw_id STRING, + _airbyte_extracted_at TIMESTAMP, + _airbyte_meta STRING, + `id1` STRING, + `id2` STRING, + `updated_at` TIMESTAMP, + `struct` STRING, + `array` STRING, + `string` STRING, + `number` STRING, + `integer` STRING, + `boolean` STRING, + `timestamp_with_timezone` TIMESTAMP, + `timestamp_without_timezone` DATETIME, + `time_with_timezone` STRING, + `time_without_timezone` TIME, + `date` DATE, + `unknown` STRING + ${cdc_deleted_at_decl} + > + ${records} + ]) """)) .build()); } + @Override + protected void insertRawTableRecords(StreamId streamId, List records) throws InterruptedException { + String recordsText = records.stream() + // For each record, convert it to a string like "(rawId, extractedAt, loadedAt, data)" + .map(record -> JavaBaseConstants.V2_COLUMN_NAMES.stream() + .map(record::get) + .map(r -> { + if (r == null) { + return "NULL"; + } + String stringContents; + if (r.isTextual()) { + stringContents = r.asText(); + } else { + stringContents = r.toString(); + } + return '"' + stringContents + // Serialized json might contain backslashes and double quotes. Escape them. + .replace("\\", "\\\\") + .replace("\"", "\\\"") + '"'; + }) + .collect(joining(","))) + .map(row -> "(" + row + ")") + .collect(joining(",")); + + bq.query(QueryJobConfiguration.newBuilder( + new StringSubstitutor(Map.of( + "raw_table_id", streamId.rawTableId(BigQuerySqlGenerator.QUOTE), + "records", recordsText)).replace( + // Note the parse_json call, and that _airbyte_data is declared as a string. + // This is needed because you can't insert a string literal into a JSON column + // so we build a struct literal with a string field, and then parse the field when inserting to the table. + """ + INSERT INTO ${raw_table_id} (_airbyte_raw_id, _airbyte_extracted_at, _airbyte_loaded_at, _airbyte_data) + SELECT _airbyte_raw_id, _airbyte_extracted_at, _airbyte_loaded_at, parse_json(_airbyte_data) FROM UNNEST([ + STRUCT<`_airbyte_raw_id` STRING, `_airbyte_extracted_at` TIMESTAMP, `_airbyte_loaded_at` TIMESTAMP, _airbyte_data STRING> + ${records} + ]) + """)) + .build()); + } + + @Override + protected List dumpRawTableRecords(StreamId streamId) throws Exception { + TableResult result = bq.query(QueryJobConfiguration.of("SELECT * FROM " + streamId.rawTableId(BigQuerySqlGenerator.QUOTE))); + return BigQuerySqlGeneratorIntegrationTest.toJsonRecords(result); + } + + @Override + protected List dumpFinalTableRecords(StreamId streamId, String suffix) throws Exception { + TableResult result = bq.query(QueryJobConfiguration.of("SELECT * FROM " + streamId.finalTableId(BigQuerySqlGenerator.QUOTE, suffix))); + return BigQuerySqlGeneratorIntegrationTest.toJsonRecords(result); + } + + @Override + protected void teardownNamespace(String namespace) { + bq.delete(namespace, BigQuery.DatasetDeleteOption.deleteContents()); + } + + @Override + @Test + public void testCreateTableIncremental() throws Exception { + destinationHandler.execute(generator.createTable(incrementalDedupStream, "")); + + final Table table = bq.getTable(namespace, "users_final"); + // The table should exist + assertNotNull(table); + final Schema schema = table.getDefinition().getSchema(); + // And we should know exactly what columns it contains + assertEquals( + // Would be nice to assert directly against StandardSQLTypeName, but bigquery returns schemas of + // LegacySQLTypeName. So we have to translate. + Schema.of( + Field.newBuilder("_airbyte_raw_id", legacySQLTypeName(StandardSQLTypeName.STRING)).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("_airbyte_extracted_at", legacySQLTypeName(StandardSQLTypeName.TIMESTAMP)).setMode(Field.Mode.REQUIRED).build(), + Field.newBuilder("_airbyte_meta", legacySQLTypeName(StandardSQLTypeName.JSON)).setMode(Field.Mode.REQUIRED).build(), + Field.of("id1", legacySQLTypeName(StandardSQLTypeName.INT64)), + Field.of("id2", legacySQLTypeName(StandardSQLTypeName.INT64)), + Field.of("updated_at", legacySQLTypeName(StandardSQLTypeName.TIMESTAMP)), + Field.of("struct", legacySQLTypeName(StandardSQLTypeName.JSON)), + Field.of("array", legacySQLTypeName(StandardSQLTypeName.JSON)), + Field.of("string", legacySQLTypeName(StandardSQLTypeName.STRING)), + Field.of("number", legacySQLTypeName(StandardSQLTypeName.NUMERIC)), + Field.of("integer", legacySQLTypeName(StandardSQLTypeName.INT64)), + Field.of("boolean", legacySQLTypeName(StandardSQLTypeName.BOOL)), + Field.of("timestamp_with_timezone", legacySQLTypeName(StandardSQLTypeName.TIMESTAMP)), + Field.of("timestamp_without_timezone", legacySQLTypeName(StandardSQLTypeName.DATETIME)), + Field.of("time_with_timezone", legacySQLTypeName(StandardSQLTypeName.STRING)), + Field.of("time_without_timezone", legacySQLTypeName(StandardSQLTypeName.TIME)), + Field.of("date", legacySQLTypeName(StandardSQLTypeName.DATE)), + Field.of("unknown", legacySQLTypeName(StandardSQLTypeName.JSON))), + schema); + // TODO this should assert partitioning/clustering configs + } + + @Test + public void testCreateTableInOtherRegion() throws InterruptedException { + BigQueryDestinationHandler destinationHandler = new BigQueryDestinationHandler(bq, "asia-east1"); + // We're creating the dataset in the wrong location in the @BeforeEach block. Explicitly delete it. + bq.getDataset(namespace).delete(); + + destinationHandler.execute(new BigQuerySqlGenerator("asia-east1").createTable(incrementalDedupStream, "")); + + // Empirically, it sometimes takes Bigquery nearly 30 seconds to propagate the dataset's existence. + // Give ourselves 2 minutes just in case. + for (int i = 0; i < 120; i++) { + final Dataset dataset = bq.getDataset(DatasetId.of(bq.getOptions().getProjectId(), namespace)); + if (dataset == null) { + LOGGER.info("Sleeping and trying again... ({})", i); + Thread.sleep(1000); + } else { + assertEquals("asia-east1", dataset.getLocation()); + return; + } + } + fail("Dataset does not exist"); + } + /** * TableResult contains records in a somewhat nonintuitive format (and it avoids loading them all into memory). * That's annoying for us since we're working with small test data, so just pull everything into a list. diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync1_cursorchange_expectedrecords_dedup_final.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync1_cursorchange_expectedrecords_dedup_final.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync1_cursorchange_expectedrecords_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync1_cursorchange_expectedrecords_dedup_raw.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync1_expectedrecords_dedup_final.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync1_expectedrecords_dedup_final.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync1_expectedrecords_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_dedup_raw.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync1_expectedrecords_dedup_raw.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_dedup_raw.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync1_expectedrecords_nondedup_final.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync1_expectedrecords_nondedup_final.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync1_expectedrecords_nondedup_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_raw.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync1_expectedrecords_nondedup_raw.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_raw.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_expectedrecords_fullrefresh_append_final.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_expectedrecords_fullrefresh_append_final.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_expectedrecords_fullrefresh_append_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_expectedrecords_fullrefresh_append_raw.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_expectedrecords_incremental_dedup_final.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_expectedrecords_incremental_dedup_final.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_expectedrecords_incremental_dedup_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw.jsonl similarity index 100% rename from airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sync2_expectedrecords_incremental_dedup_raw.jsonl rename to airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_raw.jsonl diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_final.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_final.jsonl new file mode 100644 index 0000000000000..4a3715106698c --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_final.jsonl @@ -0,0 +1,4 @@ +{"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}, "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": []}} +{"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "unknown": null, "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": []}} +{"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": []}} +{"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "unknown": null, "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": ["Problem with `struct`", "Problem with `array`", "Problem with `string`", "Problem with `number`", "Problem with `integer`", "Problem with `boolean`", "Problem with `timestamp_with_timezone`", "Problem with `timestamp_without_timezone`", "Problem with `time_with_timezone`", "Problem with `time_without_timezone`", "Problem with `date`"]}} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_raw.jsonl new file mode 100644 index 0000000000000..b81891d6bcced --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_raw.jsonl @@ -0,0 +1,4 @@ +{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56Z", "timestamp_without_timezone": "2023-01-23T12:34:56", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56", "date": "2023-01-23", "unknown": {}}}' +{"_airbyte_raw_id": "53ce75a5-5bcc-47a3-b45c-96c2015cfe35", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": null, "struct": null, "string": null, "number": null, "integer": null, "boolean": null, "timestamp_with_timezone": null, "timestamp_without_timezone": null, "time_with_timezone": null, "time_without_timezone": null, "date": null, "unknown": null}} +{"_airbyte_raw_id": "7e1fac0c-017e-4ad6-bc78-334a34d64fbe", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00Z"}} +{"_airbyte_raw_id": "84242b60-3a34-4531-ad75-a26702960a9a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00Z", "array": {}, "struct": [], "string": {}, "number": {}, "integer": {}, "boolean": {}, "timestamp_with_timezone": {}, "timestamp_without_timezone": {}, "time_with_timezone": {}, "time_without_timezone": {}, "date": {}, "unknown": null}} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_final.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_final.jsonl new file mode 100644 index 0000000000000..ecd140e04aad4 --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_final.jsonl @@ -0,0 +1,2 @@ +{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": []}, "id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84} +{"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_meta": {"errors": ["Problem with `integer`"]}, "id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob"} diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_raw.jsonl new file mode 100644 index 0000000000000..e2c19ff210a9e --- /dev/null +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_raw.jsonl @@ -0,0 +1,2 @@ +{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84}} +{"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:00Z", "string": "Bob", "integer": "oops"}} diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index a4980c7580c59..0000aa83b4e26 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------| +| 1.7.3 | 2023-08-03 | [\#28890](https://github.com/airbytehq/airbyte/pull/28890) | Internal code updates; improved testing | | 1.7.2 | 2023-08-02 | [\#28976](https://github.com/airbytehq/airbyte/pull/28976) | Fix composite PK handling in v1 mode | | 1.7.1 | 2023-08-02 | [\#28959](https://github.com/airbytehq/airbyte/pull/28959) | Destinations v2: Fix CDC syncs in non-dedup mode | | 1.7.0 | 2023-08-01 | [\#28894](https://github.com/airbytehq/airbyte/pull/28894) | Destinations v2: Open up early access program opt-in |