From 7e4797d90dc7ddfdef2bc1ba85d6a91550397da2 Mon Sep 17 00:00:00 2001 From: Cynthia Yin Date: Thu, 20 Jul 2023 13:59:52 -0700 Subject: [PATCH] Destinations V2: clean up AirbyteType code (#28430) * general cleanup - move stuff around, add more comments * guarantee `getAirbyteProtocolType` won't handle array values for `type` * rename OneOf to Union * simplify union ordering logic * update testChooseUnion * fix docs typos * Automated Commit - Format and Process Resources Changes * address comments * Automated Commit - Format and Process Resources Changes --------- Co-authored-by: cynthiaxyin --- .../BaseTypingDedupingTest.java | 119 ++++++----- .../typing_deduping/RecordDiffer.java | 107 +++++----- .../typing_deduping/AirbyteProtocolType.java | 78 +++++++ .../typing_deduping/AirbyteType.java | 155 ++++---------- .../typing_deduping/AirbyteTypeUtils.java | 156 -------------- .../destination/typing_deduping/Array.java | 9 + .../typing_deduping/CatalogParser.java | 29 ++- .../destination/typing_deduping/Struct.java | 14 ++ .../destination/typing_deduping/Union.java | 65 ++++++ .../typing_deduping/UnsupportedOneOf.java | 16 ++ .../typing_deduping/AirbyteTypeTest.java | 196 +++++++++--------- .../typing_deduping/BigQuerySqlGenerator.java | 50 ++--- .../BigQuerySqlGeneratorIntegrationTest.java | 24 +-- .../BigQuerySqlGeneratorTest.java | 22 +- .../supported-data-types.md | 28 +-- 15 files changed, 504 insertions(+), 564 deletions(-) create mode 100644 airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteProtocolType.java delete mode 100644 airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java create mode 100644 airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Array.java create mode 100644 airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Struct.java create mode 100644 airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Union.java create mode 100644 airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/UnsupportedOneOf.java diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java index 77fa9193d78dc..57e5cad52c153 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java @@ -11,7 +11,6 @@ import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.resources.MoreResources; import io.airbyte.configoss.WorkerDestinationConfig; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.AirbyteStream; import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair; @@ -64,7 +63,7 @@ public abstract class BaseTypingDedupingTest { static { try { SCHEMA = Jsons.deserialize(MoreResources.readResource("schema.json")); - } catch (IOException e) { + } catch (final IOException e) { throw new RuntimeException(e); } } @@ -161,7 +160,7 @@ public void setup() throws Exception { @AfterEach public void teardown() throws Exception { - for (AirbyteStreamNameNamespacePair streamId : streamsToTearDown) { + for (final AirbyteStreamNameNamespacePair streamId : streamsToTearDown) { teardownStreamAndNamespace(streamId.getNamespace(), streamId.getName()); } } @@ -173,7 +172,7 @@ public void teardown() throws Exception { */ @Test public void fullRefreshOverwrite() throws Exception { - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) @@ -183,21 +182,21 @@ public void fullRefreshOverwrite() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - List messages1 = readMessages("sync1_messages.jsonl"); + final List messages1 = readMessages("sync1_messages.jsonl"); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("sync2_messages.jsonl"); runSync(catalog, messages2); - List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_overwrite_final.jsonl"); + final List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_overwrite_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -208,7 +207,7 @@ public void fullRefreshOverwrite() throws Exception { */ @Test public void fullRefreshAppend() throws Exception { - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.APPEND) @@ -218,21 +217,21 @@ public void fullRefreshAppend() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - List messages1 = readMessages("sync1_messages.jsonl"); + final List messages1 = readMessages("sync1_messages.jsonl"); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("sync2_messages.jsonl"); runSync(catalog, messages2); - List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_final.jsonl"); + final List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -245,7 +244,7 @@ public void fullRefreshAppend() throws Exception { */ @Test public void incrementalAppend() throws Exception { - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() // These two lines are literally the only difference between this test and fullRefreshAppend .withSyncMode(SyncMode.INCREMENTAL) @@ -257,21 +256,21 @@ public void incrementalAppend() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - List messages1 = readMessages("sync1_messages.jsonl"); + final List messages1 = readMessages("sync1_messages.jsonl"); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("sync1_expectedrecords_nondedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_nondedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("sync2_messages.jsonl"); runSync(catalog, messages2); - List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_final.jsonl"); + final List expectedRawRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_fullrefresh_append_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -282,7 +281,7 @@ public void incrementalAppend() throws Exception { */ @Test public void incrementalDedup() throws Exception { - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) .withCursorField(List.of("updated_at")) @@ -294,21 +293,21 @@ public void incrementalDedup() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - List messages1 = readMessages("sync1_messages.jsonl"); + final List messages1 = readMessages("sync1_messages.jsonl"); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1); // Second sync - List messages2 = readMessages("sync2_messages.jsonl"); + final List messages2 = readMessages("sync2_messages.jsonl"); runSync(catalog, messages2); - List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); + final List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } @@ -317,7 +316,7 @@ public void incrementalDedup() throws Exception { */ @Test public void incrementalDedupDefaultNamespace() throws Exception { - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) .withCursorField(List.of("updated_at")) @@ -329,21 +328,21 @@ public void incrementalDedupDefaultNamespace() throws Exception { .withJsonSchema(SCHEMA)))); // First sync - List messages1 = readMessages("sync1_messages.jsonl", null, streamName); + final List messages1 = readMessages("sync1_messages.jsonl", null, streamName); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1, null, streamName); // Second sync - List messages2 = readMessages("sync2_messages.jsonl", null, streamName); + final List messages2 = readMessages("sync2_messages.jsonl", null, streamName); runSync(catalog, messages2); - List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); + final List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2, null, streamName); } @@ -375,7 +374,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception { public void testSyncUsesAirbyteStreamNamespaceIfNotNull() throws Exception { // TODO duplicate this test for each sync mode. Run 1st+2nd syncs using a stream with null // namespace: - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.FULL_REFRESH) .withCursorField(List.of("updated_at")) @@ -396,9 +395,9 @@ public void testSyncUsesAirbyteStreamNamespaceIfNotNull() throws Exception { */ @Test public void incrementalDedupIdenticalName() throws Exception { - String namespace1 = streamNamespace + "_1"; - String namespace2 = streamNamespace + "_2"; - ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + final String namespace1 = streamNamespace + "_1"; + final String namespace2 = streamNamespace + "_2"; + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream() .withSyncMode(SyncMode.INCREMENTAL) .withCursorField(List.of("updated_at")) @@ -420,26 +419,26 @@ public void incrementalDedupIdenticalName() throws Exception { // First sync // Read the same set of messages for both streams - List messages1 = Stream.concat( + final List messages1 = Stream.concat( readMessages("sync1_messages.jsonl", namespace1, streamName).stream(), readMessages("sync1_messages.jsonl", namespace2, streamName).stream()).toList(); runSync(catalog, messages1); - List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); - List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); + final List expectedRawRecords1 = readRecords("sync1_expectedrecords_dedup_raw.jsonl"); + final List expectedFinalRecords1 = readRecords("sync1_expectedrecords_dedup_final.jsonl"); verifySyncResult(expectedRawRecords1, expectedFinalRecords1, namespace1, streamName); verifySyncResult(expectedRawRecords1, expectedFinalRecords1, namespace2, streamName); // Second sync - List messages2 = Stream.concat( + final List messages2 = Stream.concat( readMessages("sync2_messages.jsonl", namespace1, streamName).stream(), readMessages("sync2_messages.jsonl", namespace2, streamName).stream()).toList(); runSync(catalog, messages2); - List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); - List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); + final List expectedRawRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("sync2_expectedrecords_incremental_dedup_final.jsonl"); verifySyncResult(expectedRawRecords2, expectedFinalRecords2, namespace1, streamName); verifySyncResult(expectedRawRecords2, expectedFinalRecords2, namespace2, streamName); } @@ -476,21 +475,21 @@ public void testDataTypes() throws Exception { // this test probably needs some configuration per destination to specify what values are supported? } - private void verifySyncResult(List expectedRawRecords, List expectedFinalRecords) throws Exception { + private void verifySyncResult(final List expectedRawRecords, final List expectedFinalRecords) throws Exception { verifySyncResult(expectedRawRecords, expectedFinalRecords, streamNamespace, streamName); } - private void verifySyncResult(List expectedRawRecords, - List expectedFinalRecords, - String streamNamespace, - String streamName) + private void verifySyncResult(final List expectedRawRecords, + final List expectedFinalRecords, + final String streamNamespace, + final String streamName) throws Exception { - List actualRawRecords = dumpRawTableRecords(streamNamespace, streamName); - List actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName); + final List actualRawRecords = dumpRawTableRecords(streamNamespace, streamName); + final List actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName); DIFFER.verifySyncResult(expectedRawRecords, actualRawRecords, expectedFinalRecords, actualFinalRecords); } - private static List readRecords(String filename) throws IOException { + private static List readRecords(final String filename) throws IOException { return MoreResources.readResource(filename).lines() .map(String::trim) .filter(line -> !line.isEmpty()) @@ -499,11 +498,11 @@ private static List readRecords(String filename) throws IOException { .toList(); } - private List readMessages(String filename) throws IOException { + private List readMessages(final String filename) throws IOException { return readMessages(filename, streamNamespace, streamName); } - private static List readMessages(String filename, String streamNamespace, String streamName) throws IOException { + private static List readMessages(final String filename, final String streamNamespace, final String streamName) throws IOException { return readRecords(filename).stream() .map(record -> Jsons.convertValue(record, AirbyteMessage.class)) .peek(message -> { @@ -527,7 +526,7 @@ public void setupProcessFactory() throws IOException { Files.createDirectories(testDir); final Path workspaceRoot = Files.createTempDirectory(testDir, "test"); jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job")); - Path localRoot = Files.createTempDirectory(testDir, "output"); + final Path localRoot = Files.createTempDirectory(testDir, "output"); processFactory = new DockerProcessFactory( workspaceRoot, workspaceRoot.toString(), @@ -536,7 +535,7 @@ public void setupProcessFactory() throws IOException { Collections.emptyMap()); } - private void runSync(ConfiguredAirbyteCatalog catalog, List messages) throws Exception { + private void runSync(final ConfiguredAirbyteCatalog catalog, final List messages) throws Exception { catalog.getStreams().forEach(s -> streamsToTearDown.add(AirbyteStreamNameNamespacePair.fromAirbyteStream(s.getStream()))); final WorkerDestinationConfig destinationConfig = new WorkerDestinationConfig() diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java index 846fb4a88bff7..e6b949f1f0eb0 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java @@ -12,7 +12,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Streams; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -54,17 +53,17 @@ public RecordDiffer(final Pair... identifyingColumns) { * JSON null is represented as a NullNode. For example, in the JSON blob {"name": null}, the `name` * field is a JSON null, and the `address` field is a SQL null. */ - public void verifySyncResult(List expectedRawRecords, - List actualRawRecords, - List expectedFinalRecords, - List actualFinalRecords) { + public void verifySyncResult(final List expectedRawRecords, + final List actualRawRecords, + final List expectedFinalRecords, + final List actualFinalRecords) { assertAll( () -> diffRawTableRecords(expectedRawRecords, actualRawRecords), () -> diffFinalTableRecords(expectedFinalRecords, actualFinalRecords)); } - public void diffRawTableRecords(List expectedRecords, List actualRecords) { - String diff = diffRecords( + public void diffRawTableRecords(final List expectedRecords, final List actualRecords) { + final String diff = diffRecords( expectedRecords.stream().map(RecordDiffer::copyWithLiftedData).collect(toList()), actualRecords.stream().map(RecordDiffer::copyWithLiftedData).collect(toList()), recordIdentityComparator, @@ -76,8 +75,8 @@ public void diffRawTableRecords(List expectedRecords, List a } } - public void diffFinalTableRecords(List expectedRecords, List actualRecords) { - String diff = diffRecords( + public void diffFinalTableRecords(final List expectedRecords, final List actualRecords) { + final String diff = diffRecords( expectedRecords, actualRecords, recordIdentityComparator, @@ -92,8 +91,8 @@ public void diffFinalTableRecords(List expectedRecords, List /** * @return A copy of the record, but with all fields in _airbyte_data lifted to the top level. */ - private static JsonNode copyWithLiftedData(JsonNode record) { - ObjectNode copy = record.deepCopy(); + private static JsonNode copyWithLiftedData(final JsonNode record) { + final ObjectNode copy = record.deepCopy(); copy.remove("_airbyte_data"); Streams.stream(record.get("_airbyte_data").fields()).forEach(field -> { if (!copy.has(field.getKey())) { @@ -111,10 +110,10 @@ private static JsonNode copyWithLiftedData(JsonNode record) { * Build a Comparator to detect equality between two records. It first compares all the identifying * columns in order, and breaks ties using extracted_at. */ - private Comparator buildIdentityComparator(Pair[] identifyingColumns) { + private Comparator buildIdentityComparator(final Pair[] identifyingColumns) { // Start with a noop comparator for convenience Comparator comp = Comparator.comparing(record -> 0); - for (Pair column : identifyingColumns) { + for (final Pair column : identifyingColumns) { comp = comp.thenComparing(record -> extract(record, column.getKey(), column.getValue())); } comp = comp.thenComparing(record -> asTimestampWithTimezone(record.get("_airbyte_extracted_at"))); @@ -124,14 +123,14 @@ private Comparator buildIdentityComparator(Pair[] /** * See {@link #buildIdentityComparator(Pair[])} for an explanation of dataExtractor. */ - private Function buildIdentityExtractor(Pair[] identifyingColumns) { + private Function buildIdentityExtractor(final Pair[] identifyingColumns) { return record -> Arrays.stream(identifyingColumns) .map(column -> getPrintableFieldIfPresent(record, column.getKey())) .collect(Collectors.joining(", ")) + getPrintableFieldIfPresent(record, "_airbyte_extracted_at"); } - private static String getPrintableFieldIfPresent(JsonNode record, String field) { + private static String getPrintableFieldIfPresent(final JsonNode record, final String field) { if (record.has(field)) { return field + "=" + record.get(field); } else { @@ -156,13 +155,13 @@ private static String getPrintableFieldIfPresent(JsonNode record, String field) * @param recordIdExtractor Dump the record's PK+cursor+extracted_at into a human-readable string * @return The diff, or empty string if there were no differences */ - private static String diffRecords(List originalExpectedRecords, - List originalActualRecords, - Comparator identityComparator, - Comparator sortComparator, - Function recordIdExtractor) { - List expectedRecords = originalExpectedRecords.stream().sorted(sortComparator).toList(); - List actualRecords = originalActualRecords.stream().sorted(sortComparator).toList(); + private static String diffRecords(final List originalExpectedRecords, + final List originalActualRecords, + final Comparator identityComparator, + final Comparator sortComparator, + final Function recordIdExtractor) { + final List expectedRecords = originalExpectedRecords.stream().sorted(sortComparator).toList(); + final List actualRecords = originalActualRecords.stream().sorted(sortComparator).toList(); // Iterate through both lists in parallel and compare each record. // Build up an error message listing any incorrect, missing, or unexpected records. @@ -170,9 +169,9 @@ private static String diffRecords(List originalExpectedRecords, int expectedRecordIndex = 0; int actualRecordIndex = 0; while (expectedRecordIndex < expectedRecords.size() && actualRecordIndex < actualRecords.size()) { - JsonNode expectedRecord = expectedRecords.get(expectedRecordIndex); - JsonNode actualRecord = actualRecords.get(actualRecordIndex); - int compare = identityComparator.compare(expectedRecord, actualRecord); + final JsonNode expectedRecord = expectedRecords.get(expectedRecordIndex); + final JsonNode actualRecord = actualRecords.get(actualRecordIndex); + final int compare = identityComparator.compare(expectedRecord, actualRecord); if (compare == 0) { // These records should be the same. Find the specific fields that are different and move on // to the next records in both lists. @@ -204,23 +203,25 @@ private static String diffRecords(List originalExpectedRecords, return message; } - private static String diffSingleRecord(Function recordIdExtractor, JsonNode expectedRecord, JsonNode actualRecord) { + private static String diffSingleRecord(final Function recordIdExtractor, + final JsonNode expectedRecord, + final JsonNode actualRecord) { boolean foundMismatch = false; String mismatchedRecordMessage = "Row had incorrect data: " + recordIdExtractor.apply(expectedRecord) + "\n"; // Iterate through each column in the expected record and compare it to the actual record's value. - for (String column : Streams.stream(expectedRecord.fieldNames()).sorted().toList()) { + for (final String column : Streams.stream(expectedRecord.fieldNames()).sorted().toList()) { // For all other columns, we can just compare their values directly. - JsonNode expectedValue = expectedRecord.get(column); - JsonNode actualValue = actualRecord.get(column); + final JsonNode expectedValue = expectedRecord.get(column); + final JsonNode actualValue = actualRecord.get(column); if (!areJsonNodesEquivalent(expectedValue, actualValue)) { mismatchedRecordMessage += generateFieldError("column " + column, expectedValue, actualValue); foundMismatch = true; } } // Then check the entire actual record for any columns that we weren't expecting. - LinkedHashMap extraColumns = checkForExtraOrNonNullFields(expectedRecord, actualRecord); + final LinkedHashMap extraColumns = checkForExtraOrNonNullFields(expectedRecord, actualRecord); if (extraColumns.size() > 0) { - for (Map.Entry extraColumn : extraColumns.entrySet()) { + for (final Map.Entry extraColumn : extraColumns.entrySet()) { mismatchedRecordMessage += generateFieldError("column " + extraColumn.getKey(), null, extraColumn.getValue()); foundMismatch = true; } @@ -232,7 +233,7 @@ private static String diffSingleRecord(Function recordIdExtrac } } - private static boolean areJsonNodesEquivalent(JsonNode expectedValue, JsonNode actualValue) { + private static boolean areJsonNodesEquivalent(final JsonNode expectedValue, final JsonNode actualValue) { if (expectedValue == null || actualValue == null) { // If one of the values is null, then we expect both of them to be null. return expectedValue == null && actualValue == null; @@ -256,9 +257,9 @@ private static boolean areJsonNodesEquivalent(JsonNode expectedValue, JsonNode a * This has the side benefit of detecting completely unexpected columns, which would be a very weird * bug but is probably still useful to catch. */ - private static LinkedHashMap checkForExtraOrNonNullFields(JsonNode expectedRecord, JsonNode actualRecord) { - LinkedHashMap extraFields = new LinkedHashMap<>(); - for (String column : Streams.stream(actualRecord.fieldNames()).sorted().toList()) { + private static LinkedHashMap checkForExtraOrNonNullFields(final JsonNode expectedRecord, final JsonNode actualRecord) { + final LinkedHashMap extraFields = new LinkedHashMap<>(); + for (final String column : Streams.stream(actualRecord.fieldNames()).sorted().toList()) { // loaded_at and raw_id are generated dynamically, so we just ignore them. if (!"_airbyte_loaded_at".equals(column) && !"_airbyte_raw_id".equals(column) && !expectedRecord.has(column)) { extraFields.put(column, actualRecord.get(column)); @@ -272,15 +273,15 @@ private static LinkedHashMap checkForExtraOrNonNullFields(Json * spaces are intentional, to make the message easier to read when it's embedded in a larger * stacktrace. */ - private static String generateFieldError(String fieldname, JsonNode expectedValue, JsonNode actualValue) { - String expectedString = expectedValue == null ? "SQL NULL (i.e. no value)" : expectedValue.toString(); - String actualString = actualValue == null ? "SQL NULL (i.e. no value)" : actualValue.toString(); + private static String generateFieldError(final String fieldname, final JsonNode expectedValue, final JsonNode actualValue) { + final String expectedString = expectedValue == null ? "SQL NULL (i.e. no value)" : expectedValue.toString(); + final String actualString = actualValue == null ? "SQL NULL (i.e. no value)" : actualValue.toString(); return " For " + fieldname + ", expected " + expectedString + " but got " + actualString + "\n"; } // These asFoo methods are used for sorting records, so their defaults are intended to make broken // records stand out. - private static String asString(JsonNode node) { + private static String asString(final JsonNode node) { if (node == null || node.isNull()) { return ""; } else if (node.isTextual()) { @@ -290,7 +291,7 @@ private static String asString(JsonNode node) { } } - private static double asDouble(JsonNode node) { + private static double asDouble(final JsonNode node) { if (node == null || !node.isNumber()) { return Double.MIN_VALUE; } else { @@ -298,7 +299,7 @@ private static double asDouble(JsonNode node) { } } - private static long asInt(JsonNode node) { + private static long asInt(final JsonNode node) { if (node == null || !node.isIntegralNumber()) { return Long.MIN_VALUE; } else { @@ -306,7 +307,7 @@ private static long asInt(JsonNode node) { } } - private static boolean asBoolean(JsonNode node) { + private static boolean asBoolean(final JsonNode node) { if (node == null || !node.isBoolean()) { return false; } else { @@ -314,31 +315,31 @@ private static boolean asBoolean(JsonNode node) { } } - private static Instant asTimestampWithTimezone(JsonNode node) { + private static Instant asTimestampWithTimezone(final JsonNode node) { if (node == null || !node.isTextual()) { return Instant.ofEpochMilli(Long.MIN_VALUE); } else { try { return Instant.parse(node.asText()); - } catch (Exception e) { + } catch (final Exception e) { return Instant.ofEpochMilli(Long.MIN_VALUE); } } } - private static LocalDateTime asTimestampWithoutTimezone(JsonNode node) { + private static LocalDateTime asTimestampWithoutTimezone(final JsonNode node) { if (node == null || !node.isTextual()) { return LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.MIN_VALUE), ZoneOffset.UTC); } else { try { return LocalDateTime.parse(node.asText()); - } catch (Exception e) { + } catch (final Exception e) { return LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.MIN_VALUE), ZoneOffset.UTC); } } } - private static OffsetTime asTimeWithTimezone(JsonNode node) { + private static OffsetTime asTimeWithTimezone(final JsonNode node) { if (node == null || !node.isTextual()) { return OffsetTime.of(0, 0, 0, 0, ZoneOffset.UTC); } else { @@ -346,33 +347,33 @@ private static OffsetTime asTimeWithTimezone(JsonNode node) { } } - private static LocalTime asTimeWithoutTimezone(JsonNode node) { + private static LocalTime asTimeWithoutTimezone(final JsonNode node) { if (node == null || !node.isTextual()) { return LocalTime.of(0, 0, 0); } else { try { return LocalTime.parse(node.asText()); - } catch (Exception e) { + } catch (final Exception e) { return LocalTime.of(0, 0, 0); } } } - private static LocalDate asDate(JsonNode node) { + private static LocalDate asDate(final JsonNode node) { if (node == null || !node.isTextual()) { return LocalDate.ofInstant(Instant.ofEpochMilli(Long.MIN_VALUE), ZoneOffset.UTC); } else { try { return LocalDate.parse(node.asText()); - } catch (Exception e) { + } catch (final Exception e) { return LocalDate.ofInstant(Instant.ofEpochMilli(Long.MIN_VALUE), ZoneOffset.UTC); } } } // Generics? Never heard of 'em. (I'm sorry) - private static Comparable extract(JsonNode node, String field, AirbyteType type) { - if (type instanceof AirbyteProtocolType t) { + private static Comparable extract(final JsonNode node, final String field, final AirbyteType type) { + if (type instanceof final AirbyteProtocolType t) { return switch (t) { case STRING -> asString(node.get(field)); case NUMBER -> asDouble(node.get(field)); diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteProtocolType.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteProtocolType.java new file mode 100644 index 0000000000000..ab800697b0e12 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteProtocolType.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.typing_deduping; + +import com.fasterxml.jackson.databind.JsonNode; + +/** + * Protocol types are ordered by precedence in the case of a Union that contains multiple types. + * Priority is given to wider scope types over narrower ones. (Note that because of dedup logic in + * {@link AirbyteType#fromJsonSchema(JsonNode)}, at most one string or date/time type can exist in a + * Union.) + */ +public enum AirbyteProtocolType implements AirbyteType { + + STRING, + DATE, + TIME_WITHOUT_TIMEZONE, + TIME_WITH_TIMEZONE, + TIMESTAMP_WITHOUT_TIMEZONE, + TIMESTAMP_WITH_TIMEZONE, + NUMBER, + INTEGER, + BOOLEAN, + UNKNOWN; + + private static AirbyteProtocolType matches(final String type) { + try { + return AirbyteProtocolType.valueOf(type.toUpperCase()); + } catch (final IllegalArgumentException e) { + LOGGER.error(String.format("Could not find matching AirbyteProtocolType for \"%s\": %s", type, e)); + return UNKNOWN; + } + } + + // Extracts the appropriate protocol type from the representative JSON + protected static AirbyteProtocolType fromJson(final JsonNode node) { + // JSON could be a string (ex: "number") + if (node.isTextual()) { + return matches(node.asText()); + } + + // or, JSON could be a node with fields + final JsonNode propertyType = node.get("type"); + final JsonNode airbyteType = node.get("airbyte_type"); + final JsonNode format = node.get("format"); + + if (AirbyteType.nodeMatches(propertyType, "boolean")) { + return BOOLEAN; + } else if (AirbyteType.nodeMatches(propertyType, "integer")) { + return INTEGER; + } else if (AirbyteType.nodeMatches(propertyType, "number")) { + return AirbyteType.nodeMatches(airbyteType, "integer") ? INTEGER : NUMBER; + } else if (AirbyteType.nodeMatches(propertyType, "string")) { + if (AirbyteType.nodeMatches(format, "date")) { + return DATE; + } else if (AirbyteType.nodeMatches(format, "time")) { + if (AirbyteType.nodeMatches(airbyteType, "time_without_timezone")) { + return TIME_WITHOUT_TIMEZONE; + } else if (AirbyteType.nodeMatches(airbyteType, "time_with_timezone")) { + return TIME_WITH_TIMEZONE; + } + } else if (AirbyteType.nodeMatches(format, "date-time")) { + if (AirbyteType.nodeMatches(airbyteType, "timestamp_without_timezone")) { + return TIMESTAMP_WITHOUT_TIMEZONE; + } else if (airbyteType == null || AirbyteType.nodeMatches(airbyteType, "timestamp_with_timezone")) { + return TIMESTAMP_WITH_TIMEZONE; + } + } else { + return STRING; + } + } + + return UNKNOWN; + } + +} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java index 4351bcc6cc407..de59c763ed9c3 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.java @@ -6,67 +6,36 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; +import com.fasterxml.jackson.databind.node.TextNode; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public sealed interface AirbyteType permits Array,OneOf,Struct,UnsupportedOneOf,AirbyteProtocolType { +public sealed interface AirbyteType permits AirbyteProtocolType,Struct,Array,UnsupportedOneOf,Union { - Logger LOGGER = LoggerFactory.getLogger(AirbyteTypeUtils.class); + Logger LOGGER = LoggerFactory.getLogger(AirbyteType.class); /** * The most common call pattern is probably to use this method on the stream schema, verify that * it's an {@link Struct} schema, and then call {@link Struct#properties()} to get the columns. *

* If the top-level schema is not an object, then we can't really do anything with it, and should - * probably fail the sync. (but see also {@link OneOf#asColumns()}). + * probably fail the sync. (but see also {@link Union#asColumns()}). */ static AirbyteType fromJsonSchema(final JsonNode schema) { try { final JsonNode topLevelType = schema.get("type"); if (topLevelType != null) { if (topLevelType.isTextual()) { - if (AirbyteTypeUtils.nodeIsType(topLevelType, "object")) { + if (nodeMatches(topLevelType, "object")) { return getStruct(schema); - } else if (AirbyteTypeUtils.nodeIsType(topLevelType, "array")) { + } else if (nodeMatches(topLevelType, "array")) { return getArray(schema); } } else if (topLevelType.isArray()) { - final List typeOptions = new ArrayList<>(); - topLevelType.elements().forEachRemaining(element -> { - // ignore "null" type and remove duplicates - String type = element.asText(""); - if (!"null".equals(type) && !typeOptions.contains(type)) { - typeOptions.add(element.asText()); - } - }); - - // we encounter an array of types that actually represents a single type rather than a OneOf - if (typeOptions.size() == 1) { - if (typeOptions.get(0).equals("object")) { - return getStruct(schema); - } else if (typeOptions.get(0).equals("array")) { - return getArray(schema); - } else { - return AirbyteTypeUtils.getAirbyteProtocolType(schema); - } - } - - final List options = typeOptions.stream().map(typeOption -> { - // Recurse into a schema that forces a specific one of each option - JsonNode schemaClone = schema.deepCopy(); - // schema is guaranteed to be an object here, because we know it has a `type` key - ((ObjectNode) schemaClone).put("type", typeOption); - return fromJsonSchema(schemaClone); - }).toList(); - return new OneOf(options); + return fromArrayJsonSchema(schema, topLevelType); } } else if (schema.hasNonNull("oneOf")) { final List options = new ArrayList<>(); @@ -78,13 +47,20 @@ static AirbyteType fromJsonSchema(final JsonNode schema) { // This is for backwards-compatibility with legacy normalization. return getStruct(schema); } - return AirbyteTypeUtils.getAirbyteProtocolType(schema); + return AirbyteProtocolType.fromJson(schema); } catch (final Exception e) { LOGGER.error("Exception parsing JSON schema {}: {}; returning UNKNOWN.", schema, e); return AirbyteProtocolType.UNKNOWN; } } + static boolean nodeMatches(final JsonNode node, final String value) { + if (node == null || !node.isTextual()) { + return false; + } + return node.equals(TextNode.valueOf(value)); + } + private static Struct getStruct(final JsonNode schema) { final LinkedHashMap propertiesMap = new LinkedHashMap<>(); final JsonNode properties = schema.get("properties"); @@ -107,87 +83,38 @@ private static Array getArray(final JsonNode schema) { } } - enum AirbyteProtocolType implements AirbyteType { - - STRING, - NUMBER, - INTEGER, - BOOLEAN, - TIMESTAMP_WITH_TIMEZONE, - TIMESTAMP_WITHOUT_TIMEZONE, - TIME_WITH_TIMEZONE, - TIME_WITHOUT_TIMEZONE, - DATE, - UNKNOWN; + private static AirbyteType fromArrayJsonSchema(final JsonNode schema, final JsonNode array) { + final List typeOptions = new ArrayList<>(); + array.elements().forEachRemaining(element -> { + // ignore "null" type and remove duplicates + final String type = element.asText(""); + if (!"null".equals(type) && !typeOptions.contains(type)) { + typeOptions.add(element.asText()); + } + }); - public static AirbyteProtocolType matches(final String type) { - try { - return AirbyteProtocolType.valueOf(type.toUpperCase()); - } catch (final IllegalArgumentException e) { - LOGGER.error(String.format("Could not find matching AirbyteProtocolType for \"%s\": %s", type, e)); - return UNKNOWN; + // we encounter an array of types that actually represents a single type rather than a Union + if (typeOptions.size() == 1) { + if (typeOptions.get(0).equals("object")) { + return getStruct(schema); + } else if (typeOptions.get(0).equals("array")) { + return getArray(schema); + } else { + return AirbyteProtocolType.fromJson(getTrimmedJsonSchema(schema, typeOptions.get(0))); } } + // Recurse into a schema that forces a specific one of each option + final List options = typeOptions.stream().map(typeOption -> fromJsonSchema(getTrimmedJsonSchema(schema, typeOption))).toList(); + return new Union(options); } - /** - * @param properties Use LinkedHashMap to preserve insertion order. - */ - record Struct(LinkedHashMap properties) implements AirbyteType { - - } - - record Array(AirbyteType items) implements AirbyteType { - - } - - /** - * Represents a {oneOf: [...]} schema. - *

- * This is purely a legacy type that we should eventually delete. See also {@link OneOf}. - */ - record UnsupportedOneOf(List options) implements AirbyteType { - - } - - /** - * Represents a {type: [a, b, ...]} schema. This is theoretically equivalent to {oneOf: [{type: a}, - * {type: b}, ...]} but legacy normalization only handles the {type: [...]} schemas. - *

- * Eventually we should: - *

    - *
  1. Announce a breaking change to handle both oneOf styles the same
  2. - *
  3. Test against some number of API sources to verify that they won't break badly
  4. - *
  5. Update {@link AirbyteType#fromJsonSchema(JsonNode)} to parse both styles into - * SupportedOneOf
  6. - *
  7. Delete UnsupportedOneOf
  8. - *
- */ - record OneOf(List options) implements AirbyteType { - - /** - * This is a hack to handle weird schemas like {type: [object, string]}. If a stream's top-level - * schema looks like this, we still want to be able to extract the object properties (i.e. treat it - * as though the string option didn't exist). - * - * @throws IllegalArgumentException if we cannot extract columns from this schema - */ - public LinkedHashMap asColumns() { - final long numObjectOptions = options.stream().filter(o -> o instanceof Struct).count(); - if (numObjectOptions > 1) { - LOGGER.error("Can't extract columns from a schema with multiple object options"); - return new LinkedHashMap<>(); - } - - return (options.stream().filter(o -> o instanceof Struct).findFirst()) - .map(o -> ((Struct) o).properties()) - .orElseGet(() -> { - LOGGER.error("Can't extract columns from a schema with no object options"); - return new LinkedHashMap<>(); - }); - } - + // Duplicates the JSON schema but keeps only one type + private static JsonNode getTrimmedJsonSchema(final JsonNode schema, final String type) { + final JsonNode schemaClone = schema.deepCopy(); + // schema is guaranteed to be an object here, because we know it has a `type` key + ((ObjectNode) schemaClone).put("type", type); + return schemaClone; } } diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java deleted file mode 100644 index b9673527ed0d2..0000000000000 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeUtils.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.base.destination.typing_deduping; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.TextNode; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AirbyteTypeUtils { - - private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteTypeUtils.class); - - // Map from a protocol type to what other protocol types should take precedence over it if present - // in a OneOf - private static final Map> EXCLUDED_PROTOCOL_TYPES_MAP = ImmutableMap.of( - AirbyteProtocolType.BOOLEAN, ImmutableList.of(AirbyteProtocolType.STRING, AirbyteProtocolType.NUMBER, AirbyteProtocolType.INTEGER), - AirbyteProtocolType.INTEGER, ImmutableList.of(AirbyteProtocolType.STRING, AirbyteProtocolType.NUMBER), - AirbyteProtocolType.NUMBER, ImmutableList.of(AirbyteProtocolType.STRING)); - - // Protocol types in order of precedence - private static final List ORDERED_PROTOCOL_TYPES = ImmutableList.of( - AirbyteProtocolType.BOOLEAN, - AirbyteProtocolType.INTEGER, - AirbyteProtocolType.NUMBER, - AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE, - AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE, - AirbyteProtocolType.DATE, - AirbyteProtocolType.TIME_WITH_TIMEZONE, - AirbyteProtocolType.TIME_WITHOUT_TIMEZONE, - AirbyteProtocolType.STRING); - - protected static boolean nodeIsType(final JsonNode node, final String type) { - if (node == null || !node.isTextual()) { - return false; - } - return node.equals(TextNode.valueOf(type)); - } - - private static boolean nodeIsOrContainsType(final JsonNode node, final String type) { - if (node == null) { - return false; - } else if (node.isTextual()) { - return nodeIsType(node, type); - } else if (node.isArray()) { - for (final JsonNode element : node) { - if (nodeIsType(element, type)) { - return true; - } - } - } - return false; - } - - protected static AirbyteType getAirbyteProtocolType(final JsonNode node) { - if (node.isTextual()) { - return AirbyteProtocolType.matches(node.asText()); - } - - final JsonNode propertyType = node.get("type"); - final JsonNode airbyteType = node.get("airbyte_type"); - final JsonNode format = node.get("format"); - - if (nodeIsOrContainsType(propertyType, "boolean")) { - return AirbyteProtocolType.BOOLEAN; - } else if (nodeIsOrContainsType(propertyType, "integer")) { - return AirbyteProtocolType.INTEGER; - } else if (nodeIsOrContainsType(propertyType, "number")) { - if (nodeIsType(airbyteType, "integer")) { - return AirbyteProtocolType.INTEGER; - } else { - return AirbyteProtocolType.NUMBER; - } - } else if (nodeIsOrContainsType(propertyType, "string")) { - if (nodeIsOrContainsType(format, "date")) { - return AirbyteProtocolType.DATE; - } else if (nodeIsType(format, "time")) { - if (nodeIsType(airbyteType, "time_without_timezone")) { - return AirbyteProtocolType.TIME_WITHOUT_TIMEZONE; - } else if (nodeIsType(airbyteType, "time_with_timezone")) { - return AirbyteProtocolType.TIME_WITH_TIMEZONE; - } - } else if (nodeIsOrContainsType(format, "date-time")) { - if (nodeIsType(airbyteType, "timestamp_without_timezone")) { - return AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE; - } else if (airbyteType == null || nodeIsType(airbyteType, "timestamp_with_timezone")) { - return AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE; - } - } else { - return AirbyteProtocolType.STRING; - } - } - - return AirbyteProtocolType.UNKNOWN; - } - - // Pick which type in a OneOf has precedence - public static AirbyteType chooseOneOfType(final OneOf o) { - final List options = o.options(); - - // record what types are present - Array foundArrayType = null; - Struct foundStructType = null; - final Map typePresenceMap = new HashMap<>(); - Arrays.stream(AirbyteProtocolType.values()).map(type -> typePresenceMap.put(type, false)); - - // looping through the options only once for efficiency - for (final AirbyteType option : options) { - if (option instanceof final Array a) { - foundArrayType = a; - } else if (option instanceof final Struct s) { - foundStructType = s; - } else if (option instanceof final AirbyteProtocolType p) { - typePresenceMap.put(p, true); - } - } - - if (foundArrayType != null) { - return foundArrayType; - } else if (foundStructType != null) { - return foundStructType; - } else { - for (final AirbyteProtocolType protocolType : ORDERED_PROTOCOL_TYPES) { - if (typePresenceMap.getOrDefault(protocolType, false)) { - boolean foundExcludedTypes = false; - final List excludedTypes = EXCLUDED_PROTOCOL_TYPES_MAP.getOrDefault(protocolType, Collections.emptyList()); - for (final AirbyteProtocolType excludedType : excludedTypes) { - if (typePresenceMap.getOrDefault(excludedType, false)) { - foundExcludedTypes = true; - break; - } - } - if (!foundExcludedTypes) { - return protocolType; - } - } - } - } - - return AirbyteProtocolType.UNKNOWN; - } - -} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Array.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Array.java new file mode 100644 index 0000000000000..dccd687e033e3 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Array.java @@ -0,0 +1,9 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.typing_deduping; + +public record Array(AirbyteType items) implements AirbyteType { + +} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java index 8401d925eef40..8b885eaedaf0f 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.java @@ -4,7 +4,6 @@ package io.airbyte.integrations.base.destination.typing_deduping; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import java.util.ArrayList; @@ -24,23 +23,23 @@ public CatalogParser(final SqlGenerator sqlGenerator) { this(sqlGenerator, DEFAULT_RAW_TABLE_NAMESPACE); } - public CatalogParser(final SqlGenerator sqlGenerator, String rawNamespaceOverride) { + public CatalogParser(final SqlGenerator sqlGenerator, final String rawNamespaceOverride) { this.sqlGenerator = sqlGenerator; this.rawNamespaceOverride = rawNamespaceOverride; } - public ParsedCatalog parseCatalog(ConfiguredAirbyteCatalog catalog) { + public ParsedCatalog parseCatalog(final ConfiguredAirbyteCatalog catalog) { // this code is bad and I feel bad // it's mostly a port of the old normalization logic to prevent tablename collisions. // tbh I have no idea if it works correctly. final List streamConfigs = new ArrayList<>(); - for (ConfiguredAirbyteStream stream : catalog.getStreams()) { + for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { final StreamConfig originalStreamConfig = toStreamConfig(stream); // Use empty string quote because we don't really care if (streamConfigs.stream().anyMatch(s -> s.id().finalTableId("").equals(originalStreamConfig.id().finalTableId(""))) || streamConfigs.stream().anyMatch(s -> s.id().rawTableId("").equals(originalStreamConfig.id().rawTableId("")))) { - String originalNamespace = stream.getStream().getNamespace(); - String originalName = stream.getStream().getName(); + final String originalNamespace = stream.getStream().getNamespace(); + final String originalName = stream.getStream().getName(); // ... this logic is ported from legacy normalization, and maybe should change? // We're taking a hash of the quoted namespace and the unquoted stream name final String hash = DigestUtils.sha1Hex(originalStreamConfig.id().finalNamespace() + "&airbyte&" + originalName).substring(0, 3); @@ -59,13 +58,13 @@ public ParsedCatalog parseCatalog(ConfiguredAirbyteCatalog catalog) { return new ParsedCatalog(streamConfigs); } - private StreamConfig toStreamConfig(ConfiguredAirbyteStream stream) { - AirbyteType schema = AirbyteType.fromJsonSchema(stream.getStream().getJsonSchema()); - LinkedHashMap airbyteColumns; - if (schema instanceof Struct o) { + private StreamConfig toStreamConfig(final ConfiguredAirbyteStream stream) { + final AirbyteType schema = AirbyteType.fromJsonSchema(stream.getStream().getJsonSchema()); + final LinkedHashMap airbyteColumns; + if (schema instanceof final Struct o) { airbyteColumns = o.properties(); - } else if (schema instanceof AirbyteType.OneOf o) { - airbyteColumns = o.asColumns(); + } else if (schema instanceof final Union u) { + airbyteColumns = u.asColumns(); } else { throw new IllegalArgumentException("Top-level schema must be an object"); } @@ -89,8 +88,8 @@ private StreamConfig toStreamConfig(ConfiguredAirbyteStream stream) { // as with the tablename collisions thing above - we're trying to preserve legacy normalization's // naming conventions here. final LinkedHashMap columns = new LinkedHashMap<>(); - for (Entry entry : airbyteColumns.entrySet()) { - ColumnId originalColumnId = sqlGenerator.buildColumnId(entry.getKey()); + for (final Entry entry : airbyteColumns.entrySet()) { + final ColumnId originalColumnId = sqlGenerator.buildColumnId(entry.getKey()); ColumnId columnId; if (columns.keySet().stream().noneMatch(c -> c.canonicalName().equals(originalColumnId.canonicalName()))) { // None of the existing columns have the same name. We can add this new column as-is. @@ -101,7 +100,7 @@ private StreamConfig toStreamConfig(ConfiguredAirbyteStream stream) { int i = 1; while (true) { columnId = sqlGenerator.buildColumnId(entry.getKey() + "_" + i); - String canonicalName = columnId.canonicalName(); + final String canonicalName = columnId.canonicalName(); if (columns.keySet().stream().noneMatch(c -> c.canonicalName().equals(canonicalName))) { break; } else { diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Struct.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Struct.java new file mode 100644 index 0000000000000..80eb61be79c5f --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Struct.java @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.typing_deduping; + +import java.util.LinkedHashMap; + +/** + * @param properties Use LinkedHashMap to preserve insertion order. + */ +public record Struct(LinkedHashMap properties) implements AirbyteType { + +} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Union.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Union.java new file mode 100644 index 0000000000000..e8b62dc36eed9 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/Union.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.typing_deduping; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; + +/** + * Represents a {type: [a, b, ...]} schema. This is theoretically equivalent to {oneOf: [{type: a}, + * {type: b}, ...]} but legacy normalization only handles the {type: [...]} schemas. + *

+ * Eventually we should: + *

    + *
  1. Announce a breaking change to handle both oneOf styles the same
  2. + *
  3. Test against some number of API sources to verify that they won't break badly
  4. + *
  5. Update {@link AirbyteType#fromJsonSchema(JsonNode)} to parse both styles into + * SupportedOneOf
  6. + *
  7. Delete UnsupportedOneOf
  8. + *
+ */ +public record Union(List options) implements AirbyteType { + + /** + * This is a hack to handle weird schemas like {type: [object, string]}. If a stream's top-level + * schema looks like this, we still want to be able to extract the object properties (i.e. treat it + * as though the string option didn't exist). + * + * @throws IllegalArgumentException if we cannot extract columns from this schema + */ + public LinkedHashMap asColumns() { + final long numObjectOptions = options.stream().filter(o -> o instanceof Struct).count(); + if (numObjectOptions > 1) { + LOGGER.error("Can't extract columns from a schema with multiple object options"); + return new LinkedHashMap<>(); + } + + return (options.stream().filter(o -> o instanceof Struct).findFirst()) + .map(o -> ((Struct) o).properties()) + .orElseGet(() -> { + LOGGER.error("Can't extract columns from a schema with no object options"); + return new LinkedHashMap<>(); + }); + } + + // Picks which type in a Union takes precedence + public AirbyteType chooseType() { + final Comparator comparator = Comparator.comparing(t -> { + if (t instanceof Array) { + return -2; + } else if (t instanceof Struct) { + return -1; + } else if (t instanceof final AirbyteProtocolType p) { + return List.of(AirbyteProtocolType.values()).indexOf(p); + } + return Integer.MAX_VALUE; + }); + + return options.stream().min(comparator).orElse(AirbyteProtocolType.UNKNOWN); + } + +} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/UnsupportedOneOf.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/UnsupportedOneOf.java new file mode 100644 index 0000000000000..3d3c84636a3c4 --- /dev/null +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/UnsupportedOneOf.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.destination.typing_deduping; + +import java.util.List; + +/** + * Represents a {oneOf: [...]} schema. + *

+ * This is purely a legacy type that we should eventually delete. See also {@link Union}. + */ +public record UnsupportedOneOf(List options) implements AirbyteType { + +} diff --git a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java index 5d4120e5cd215..da80eeee31c56 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/AirbyteTypeTest.java @@ -4,19 +4,19 @@ package io.airbyte.integrations.base.destination.typing_deduping; +import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType.*; +import static io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.fromJsonSchema; +import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import io.airbyte.commons.json.Jsons; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import org.junit.jupiter.api.Test; public class AirbyteTypeTest { @@ -62,7 +62,7 @@ public void testStruct() { }, "key9": { "type": "string", - "format": ["date-time", "foo"], + "format": "date-time", "airbyte_type": "timestamp_with_timezone" }, "key10": { @@ -113,7 +113,7 @@ public void testStruct() { }, "key9": { "type": ["string"], - "format": ["date-time", "foo"], + "format": "date-time", "airbyte_type": "timestamp_with_timezone" }, "key10": { @@ -164,7 +164,7 @@ public void testStruct() { }, "key9": { "type": ["null", "string"], - "format": ["date-time", "foo"], + "format": "date-time", "airbyte_type": "timestamp_with_timezone" }, "key10": { @@ -179,21 +179,21 @@ public void testStruct() { """); final LinkedHashMap propertiesMap = new LinkedHashMap<>(); - propertiesMap.put("key1", AirbyteProtocolType.BOOLEAN); - propertiesMap.put("key2", AirbyteProtocolType.INTEGER); - propertiesMap.put("key3", AirbyteProtocolType.INTEGER); - propertiesMap.put("key4", AirbyteProtocolType.NUMBER); - propertiesMap.put("key5", AirbyteProtocolType.DATE); - propertiesMap.put("key6", AirbyteProtocolType.TIME_WITHOUT_TIMEZONE); - propertiesMap.put("key7", AirbyteProtocolType.TIME_WITH_TIMEZONE); - propertiesMap.put("key8", AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE); - propertiesMap.put("key9", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); - propertiesMap.put("key10", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); - propertiesMap.put("key11", AirbyteProtocolType.STRING); + propertiesMap.put("key1", BOOLEAN); + propertiesMap.put("key2", INTEGER); + propertiesMap.put("key3", INTEGER); + propertiesMap.put("key4", NUMBER); + propertiesMap.put("key5", DATE); + propertiesMap.put("key6", TIME_WITHOUT_TIMEZONE); + propertiesMap.put("key7", TIME_WITH_TIMEZONE); + propertiesMap.put("key8", TIMESTAMP_WITHOUT_TIMEZONE); + propertiesMap.put("key9", TIMESTAMP_WITH_TIMEZONE); + propertiesMap.put("key10", TIMESTAMP_WITH_TIMEZONE); + propertiesMap.put("key11", STRING); final AirbyteType struct = new Struct(propertiesMap); for (final String schema : structSchema) { - assertEquals(struct, AirbyteType.fromJsonSchema(Jsons.deserialize(schema))); + assertEquals(struct, fromJsonSchema(Jsons.deserialize(schema))); } } @@ -218,7 +218,7 @@ public void testEmptyStruct() { final AirbyteType struct = new Struct(new LinkedHashMap<>()); for (final String schema : structSchema) { - assertEquals(struct, AirbyteType.fromJsonSchema(Jsons.deserialize(schema))); + assertEquals(struct, fromJsonSchema(Jsons.deserialize(schema))); } } @@ -235,10 +235,10 @@ public void testImplicitStruct() { """; final LinkedHashMap propertiesMap = new LinkedHashMap<>(); - propertiesMap.put("key1", AirbyteProtocolType.BOOLEAN); + propertiesMap.put("key1", BOOLEAN); final AirbyteType struct = new Struct(propertiesMap); - assertEquals(struct, AirbyteType.fromJsonSchema(Jsons.deserialize(structSchema))); + assertEquals(struct, fromJsonSchema(Jsons.deserialize(structSchema))); } @Test @@ -275,9 +275,9 @@ public void testArray() { } """); - final AirbyteType array = new Array(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); + final AirbyteType array = new Array(TIMESTAMP_WITH_TIMEZONE); for (final String schema : arraySchema) { - assertEquals(array, AirbyteType.fromJsonSchema(Jsons.deserialize(schema))); + assertEquals(array, fromJsonSchema(Jsons.deserialize(schema))); } } @@ -301,9 +301,9 @@ public void testEmptyArray() { } """); - final AirbyteType array = new Array(AirbyteProtocolType.UNKNOWN); + final AirbyteType array = new Array(UNKNOWN); for (final String schema : arraySchema) { - assertEquals(array, AirbyteType.fromJsonSchema(Jsons.deserialize(schema))); + assertEquals(array, fromJsonSchema(Jsons.deserialize(schema))); } } @@ -316,71 +316,71 @@ public void testUnsupportedOneOf() { """; final List options = new ArrayList<>(); - options.add(AirbyteProtocolType.NUMBER); - options.add(AirbyteProtocolType.STRING); + options.add(NUMBER); + options.add(STRING); final UnsupportedOneOf unsupportedOneOf = new UnsupportedOneOf(options); - assertEquals(unsupportedOneOf, AirbyteType.fromJsonSchema(Jsons.deserialize(unsupportedOneOfSchema))); + assertEquals(unsupportedOneOf, fromJsonSchema(Jsons.deserialize(unsupportedOneOfSchema))); } @Test - public void testOneOf() { + public void testUnion() { - final String oneOfSchema = """ + final String unionSchema = """ { "type": ["string", "number"] } """; final List options = new ArrayList<>(); - options.add(AirbyteProtocolType.STRING); - options.add(AirbyteProtocolType.NUMBER); + options.add(STRING); + options.add(NUMBER); - final OneOf oneOf = new OneOf(options); - assertEquals(oneOf, AirbyteType.fromJsonSchema(Jsons.deserialize(oneOfSchema))); + final Union union = new Union(options); + assertEquals(union, fromJsonSchema(Jsons.deserialize(unionSchema))); } @Test - public void testOneOfComplex() { - JsonNode schema = Jsons.deserialize(""" - { - "type": ["string", "object", "array", "null", "string", "object", "array", "null"], - "properties": { - "foo": {"type": "string"} - }, - "items": {"type": "string"} - } - """); - - AirbyteType parsed = AirbyteType.fromJsonSchema(schema); - - AirbyteType expected = new OneOf(List.of( - AirbyteProtocolType.STRING, + public void testUnionComplex() { + final JsonNode schema = Jsons.deserialize(""" + { + "type": ["string", "object", "array", "null", "string", "object", "array", "null"], + "properties": { + "foo": {"type": "string"} + }, + "items": {"type": "string"} + } + """); + + final AirbyteType parsed = fromJsonSchema(schema); + + final AirbyteType expected = new Union(List.of( + STRING, new Struct(new LinkedHashMap<>() { { - put("foo", AirbyteProtocolType.STRING); + put("foo", STRING); } }), - new Array(AirbyteProtocolType.STRING))); + new Array(STRING))); assertEquals(expected, parsed); } @Test - public void testOneOfUnderspecifiedNonPrimitives() { - JsonNode schema = Jsons.deserialize(""" - { - "type": ["string", "object", "array", "null", "string", "object", "array", "null"] - } - """); + public void testUnionUnderspecifiedNonPrimitives() { + final JsonNode schema = Jsons.deserialize(""" + { + "type": ["string", "object", "array", "null", "string", "object", "array", "null"] + } + """); - AirbyteType parsed = AirbyteType.fromJsonSchema(schema); + final AirbyteType parsed = fromJsonSchema(schema); - AirbyteType expected = new OneOf(List.of( - AirbyteProtocolType.STRING, + final AirbyteType expected = new Union(List.of( + STRING, new Struct(new LinkedHashMap<>()), - new Array(AirbyteProtocolType.UNKNOWN))); + new Array(UNKNOWN))); assertEquals(expected, parsed); } @@ -391,7 +391,7 @@ public void testInvalidTextualType() { "type": "foo" } """; - assertEquals(AirbyteProtocolType.UNKNOWN, AirbyteType.fromJsonSchema(Jsons.deserialize(invalidTypeSchema))); + assertEquals(UNKNOWN, fromJsonSchema(Jsons.deserialize(invalidTypeSchema))); } @Test @@ -401,7 +401,7 @@ public void testInvalidBooleanType() { "type": true } """; - assertEquals(AirbyteProtocolType.UNKNOWN, AirbyteType.fromJsonSchema(Jsons.deserialize(invalidTypeSchema))); + assertEquals(UNKNOWN, fromJsonSchema(Jsons.deserialize(invalidTypeSchema))); } @Test @@ -417,64 +417,56 @@ public void testInvalid() { invalidSchema.add("{}"); for (final String schema : invalidSchema) { - assertEquals(AirbyteProtocolType.UNKNOWN, AirbyteType.fromJsonSchema(Jsons.deserialize(schema))); + assertEquals(UNKNOWN, fromJsonSchema(Jsons.deserialize(schema))); } } @Test - public void testChooseOneOf() { - // test ordering + public void testChooseUnion() { + final Map unionToType = new HashMap<>(); - OneOf o = new OneOf(ImmutableList.of(AirbyteProtocolType.STRING, AirbyteProtocolType.DATE)); - assertEquals(AirbyteProtocolType.DATE, AirbyteTypeUtils.chooseOneOfType(o)); - - final Array a = new Array(AirbyteProtocolType.TIME_WITH_TIMEZONE); - o = new OneOf(ImmutableList.of(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE, a)); - assertEquals(a, AirbyteTypeUtils.chooseOneOfType(o)); + final Array a = new Array(BOOLEAN); final LinkedHashMap properties = new LinkedHashMap<>(); - properties.put("key1", AirbyteProtocolType.UNKNOWN); - properties.put("key2", AirbyteProtocolType.TIME_WITHOUT_TIMEZONE); + properties.put("key1", UNKNOWN); + properties.put("key2", INTEGER); final Struct s = new Struct(properties); - o = new OneOf(ImmutableList.of(AirbyteProtocolType.TIMESTAMP_WITHOUT_TIMEZONE, s)); - assertEquals(s, AirbyteTypeUtils.chooseOneOfType(o)); - - // test exclusion - - o = new OneOf(ImmutableList.of(AirbyteProtocolType.BOOLEAN, AirbyteProtocolType.INTEGER)); - assertEquals(AirbyteProtocolType.INTEGER, AirbyteTypeUtils.chooseOneOfType(o)); - o = new OneOf(ImmutableList.of(AirbyteProtocolType.INTEGER, AirbyteProtocolType.NUMBER, AirbyteProtocolType.DATE)); - assertEquals(AirbyteProtocolType.NUMBER, AirbyteTypeUtils.chooseOneOfType(o)); + unionToType.put(new Union(ImmutableList.of(s, a)), a); + unionToType.put(new Union(ImmutableList.of(NUMBER, a)), a); + unionToType.put(new Union(ImmutableList.of(INTEGER, s)), s); + unionToType.put(new Union(ImmutableList.of(NUMBER, DATE, BOOLEAN)), DATE); + unionToType.put(new Union(ImmutableList.of(INTEGER, BOOLEAN, NUMBER)), NUMBER); + unionToType.put(new Union(ImmutableList.of(BOOLEAN, INTEGER)), INTEGER); - o = new OneOf(ImmutableList.of(AirbyteProtocolType.BOOLEAN, AirbyteProtocolType.NUMBER, AirbyteProtocolType.STRING)); - assertEquals(AirbyteProtocolType.STRING, AirbyteTypeUtils.chooseOneOfType(o)); + assertAll( + unionToType.entrySet().stream().map(e -> () -> assertEquals(e.getValue(), e.getKey().chooseType()))); } @Test public void testAsColumns() { - OneOf o = new OneOf(List.of( - AirbyteProtocolType.STRING, + final Union u = new Union(List.of( + STRING, new Struct(new LinkedHashMap<>() { { - put("foo", AirbyteProtocolType.STRING); + put("foo", STRING); } }), - new Array(AirbyteProtocolType.STRING), + new Array(STRING), // This is bad behavior, but it matches current behavior so we'll test it. - // Ideally, we would recognize that the sub-oneOfs are also objects. - new OneOf(List.of(new Struct(new LinkedHashMap<>()))), + // Ideally, we would recognize that the sub-unions are also objects. + new Union(List.of(new Struct(new LinkedHashMap<>()))), new UnsupportedOneOf(List.of(new Struct(new LinkedHashMap<>()))))); - LinkedHashMap columns = o.asColumns(); + final LinkedHashMap columns = u.asColumns(); assertEquals( new LinkedHashMap<>() { { - put("foo", AirbyteProtocolType.STRING); + put("foo", STRING); } }, @@ -483,28 +475,28 @@ public void testAsColumns() { @Test public void testAsColumnsMultipleObjects() { - OneOf o = new OneOf(List.of( + final Union u = new Union(List.of( new Struct(new LinkedHashMap<>()), new Struct(new LinkedHashMap<>()))); // This prooobably should throw an exception, but for the sake of smooth rollout it just logs a // warning for now. - assertEquals(new LinkedHashMap<>(), o.asColumns()); + assertEquals(new LinkedHashMap<>(), u.asColumns()); } @Test public void testAsColumnsNoObjects() { - OneOf o = new OneOf(List.of( - AirbyteProtocolType.STRING, - new Array(AirbyteProtocolType.STRING), + final Union u = new Union(List.of( + STRING, + new Array(STRING), new UnsupportedOneOf(new ArrayList<>()), // Similar to testAsColumns(), this is bad behavior. - new OneOf(List.of(new Struct(new LinkedHashMap<>()))), + new Union(List.of(new Struct(new LinkedHashMap<>()))), new UnsupportedOneOf(List.of(new Struct(new LinkedHashMap<>()))))); // This prooobably should throw an exception, but for the sake of smooth rollout it just logs a // warning for now. - assertEquals(new LinkedHashMap<>(), o.asColumns()); + assertEquals(new LinkedHashMap<>(), u.asColumns()); } } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java index 748e383a2e5ca..c5f5a596fcc76 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGenerator.java @@ -14,22 +14,20 @@ import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TimePartitioning; import com.google.common.annotations.VisibleForTesting; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteTypeUtils; import io.airbyte.integrations.base.destination.typing_deduping.AlterTableReport; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; +import io.airbyte.integrations.base.destination.typing_deduping.Union; +import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; import io.airbyte.integrations.destination.bigquery.BigQuerySQLNameTransformer; import io.airbyte.protocol.models.v0.DestinationSyncMode; - import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; @@ -38,7 +36,6 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; import org.slf4j.Logger; @@ -99,8 +96,8 @@ public StandardSQLTypeName toDialectType(final AirbyteType type) { return StandardSQLTypeName.JSON; } else if (type instanceof UnsupportedOneOf) { return StandardSQLTypeName.JSON; - } else if (type instanceof final OneOf o) { - final AirbyteType typeWithPrecedence = AirbyteTypeUtils.chooseOneOfType(o); + } else if (type instanceof final Union u) { + final AirbyteType typeWithPrecedence = u.chooseType(); final StandardSQLTypeName dialectType; if ((typeWithPrecedence instanceof Struct) || (typeWithPrecedence instanceof Array)) { dialectType = StandardSQLTypeName.JSON; @@ -115,9 +112,9 @@ public StandardSQLTypeName toDialectType(final AirbyteType type) { } private String extractAndCast(final ColumnId column, final AirbyteType airbyteType) { - if (airbyteType instanceof OneOf o) { - // This is guaranteed to not be a OneOf, so we won't recurse infinitely - final AirbyteType chosenType = AirbyteTypeUtils.chooseOneOfType(o); + if (airbyteType instanceof final Union u) { + // This is guaranteed to not be a Union, so we won't recurse infinitely + final AirbyteType chosenType = u.chooseType(); return extractAndCast(column, chosenType); } else if (airbyteType instanceof Struct) { // We need to validate that the struct is actually a struct. @@ -160,7 +157,6 @@ ELSE JSON_QUERY(`_airbyte_data`, '$.${column_name}') // TODO maybe make this a BiMap and elevate this method and its inverse (toDestinationSQLType?) to the SQLGenerator? public StandardSQLTypeName toDialectType(final AirbyteProtocolType airbyteProtocolType) { return switch (airbyteProtocolType) { - // TODO doublecheck these case STRING -> StandardSQLTypeName.STRING; case NUMBER -> StandardSQLTypeName.NUMERIC; case INTEGER -> StandardSQLTypeName.INT64; @@ -201,7 +197,7 @@ PARTITION BY (DATE_TRUNC(_airbyte_extracted_at, DAY)) } private List clusteringColumns(final StreamConfig stream) { - List clusterColumns = new ArrayList<>(); + final List clusterColumns = new ArrayList<>(); if (stream.destinationSyncMode() == DestinationSyncMode.APPEND_DEDUP) { // We're doing deduping, therefore we have a primary key. // Cluster on all the PK columns @@ -230,7 +226,7 @@ public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, } boolean tableClusteringMatches = false; boolean tablePartitioningMatches = false; - if (existingTable instanceof StandardTableDefinition standardExistingTable) { + if (existingTable instanceof final StandardTableDefinition standardExistingTable) { tableClusteringMatches = clusteringMatches(stream, standardExistingTable); tablePartitioningMatches = partitioningMatches(standardExistingTable); } @@ -246,14 +242,14 @@ public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, } @VisibleForTesting - public boolean clusteringMatches(StreamConfig stream, StandardTableDefinition existingTable) { + public boolean clusteringMatches(final StreamConfig stream, final StandardTableDefinition existingTable) { return existingTable.getClustering() == null ? false : containsAllIgnoreCase( existingTable.getClustering().getFields().stream().collect(Collectors.toSet()), clusteringColumns(stream)); } @VisibleForTesting - public boolean partitioningMatches(StandardTableDefinition existingTable) { + public boolean partitioningMatches(final StandardTableDefinition existingTable) { return existingTable.getTimePartitioning() == null ? false : existingTable.getTimePartitioning() .getField() .equalsIgnoreCase("_airbyte_extracted_at") && @@ -308,21 +304,21 @@ public AlterTableReport buildAlterTableReport(final StreamConfig stream, final T * @return whether all the {@link SqlGenerator#FINAL_TABLE_AIRBYTE_COLUMNS} are present */ @VisibleForTesting - public static boolean schemaContainAllFinalTableV2AirbyteColumns(Collection columnNames) { + public static boolean schemaContainAllFinalTableV2AirbyteColumns(final Collection columnNames) { return FINAL_TABLE_AIRBYTE_COLUMNS.stream() .allMatch(column -> containsIgnoreCase(columnNames, column)); } @Override public List softReset(final StreamConfig stream) { - String createTempTable = createTable(stream, SOFT_RESET_SUFFIX); - String clearLoadedAt = clearLoadedAt(stream.id()); - String rebuildInTempTable = updateTable(SOFT_RESET_SUFFIX, stream); - String overwriteFinalTable = overwriteFinalTableStatement(stream, SOFT_RESET_SUFFIX); + final String createTempTable = createTable(stream, SOFT_RESET_SUFFIX); + final String clearLoadedAt = clearLoadedAt(stream.id()); + final String rebuildInTempTable = updateTable(SOFT_RESET_SUFFIX, stream); + final String overwriteFinalTable = overwriteFinalTableStatement(stream, SOFT_RESET_SUFFIX); return List.of(createTempTable, clearLoadedAt, rebuildInTempTable, overwriteFinalTable); } - private String clearLoadedAt(StreamId streamId) { + private String clearLoadedAt(final StreamId streamId) { return new StringSubstitutor(Map.of("raw_table_id", streamId.rawTableId(QUOTE))) .replace(""" UPDATE ${raw_table_id} SET _airbyte_loaded_at = NULL WHERE 1=1; @@ -510,7 +506,7 @@ String cdcDeletes(final StreamConfig stream, } final String pkList = stream.primaryKey().stream().map(columnId -> columnId.name(QUOTE)).collect(joining(",")); - String pkCasts = stream.primaryKey().stream().map(pk -> extractAndCast(pk, streamColumns.get(pk))).collect(joining(",\n")); + final String pkCasts = stream.primaryKey().stream().map(pk -> extractAndCast(pk, streamColumns.get(pk))).collect(joining(",\n")); // we want to grab IDs for deletion from the raw table (not the final table itself) to hand out-of-order record insertions after the delete has been registered return new StringSubstitutor(Map.of( @@ -573,7 +569,7 @@ public Optional overwriteFinalTable(final String finalSuffix, final Stre } } - private String overwriteFinalTableStatement(StreamConfig stream, String finalSuffix) { + private String overwriteFinalTableStatement(final StreamConfig stream, final String finalSuffix) { return new StringSubstitutor(Map.of( "final_table_id", stream.id().finalTableId(QUOTE), "tmp_final_table", stream.id().finalTableId(finalSuffix, QUOTE), diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java index a1841a295bbbf..8ea522da62337 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorIntegrationTest.java @@ -22,14 +22,14 @@ import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableResult; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.base.destination.typing_deduping.RecordDiffer; +import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; +import io.airbyte.integrations.base.destination.typing_deduping.RecordDiffer; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; import io.airbyte.integrations.destination.bigquery.BigQueryDestination; import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.SyncMode; @@ -114,8 +114,8 @@ public class BigQuerySqlGeneratorIntegrationTest { @BeforeAll public static void setup() throws Exception { - String rawConfig = Files.readString(Path.of("secrets/credentials-gcs-staging.json")); - JsonNode config = Jsons.deserialize(rawConfig); + final String rawConfig = Files.readString(Path.of("secrets/credentials-gcs-staging.json")); + final JsonNode config = Jsons.deserialize(rawConfig); bq = BigQueryDestination.getBigQuery(config); destinationHandler = new BigQueryDestinationHandler(bq); @@ -145,7 +145,7 @@ public void teardownDataset() { @Test public void testCreateTableIncremental() throws InterruptedException { - StreamConfig stream = incrementalDedupStreamConfig(); + final StreamConfig stream = incrementalDedupStreamConfig(); destinationHandler.execute(GENERATOR.createTable(stream, "")); @@ -811,7 +811,7 @@ private void createFinalTable() throws InterruptedException { createFinalTable(""); } - private void createFinalTable(String suffix) throws InterruptedException { + private void createFinalTable(final String suffix) throws InterruptedException { bq.query(QueryJobConfiguration.newBuilder( new StringSubstitutor(Map.of( "dataset", testDataset, @@ -877,7 +877,7 @@ PARTITION BY (DATE_TRUNC(_airbyte_extracted_at, DAY)) * TableResult contains records in a somewhat nonintuitive format (and it avoids loading them all into memory). * That's annoying for us since we're working with small test data, so just pull everything into a list. */ - public static List toJsonRecords(TableResult result) { + public static List toJsonRecords(final TableResult result) { return result.streamAll().map(row -> toJson(result.getSchema(), row)).toList(); } @@ -886,12 +886,12 @@ public static List toJsonRecords(TableResult result) { * This method does that conversion, using the schema to determine which type is most appropriate. Then we just dump * everything into a jsonnode for interop with RecordDiffer. */ - private static JsonNode toJson(Schema schema, FieldValueList row) { + private static JsonNode toJson(final Schema schema, final FieldValueList row) { final ObjectNode json = (ObjectNode) Jsons.emptyObject(); for (int i = 0; i < schema.getFields().size(); i++) { final Field field = schema.getFields().get(i); final FieldValue value = row.get(i); - JsonNode typedValue; + final JsonNode typedValue; if (!value.isNull()) { typedValue = switch (field.getType().getStandardType()) { case BOOL -> Jsons.jsonNode(value.getBooleanValue()); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java index cd290b12ef457..2313f37737705 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQuerySqlGeneratorTest.java @@ -11,13 +11,13 @@ import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TimePartitioning; import com.google.common.collect.ImmutableList; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.AirbyteProtocolType; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Array; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.OneOf; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType.UnsupportedOneOf; +import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.Union; +import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -43,12 +43,12 @@ public void testToDialectType() { assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(a)); assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(new UnsupportedOneOf(new ArrayList<>()))); - OneOf o = new OneOf(ImmutableList.of(s)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(o)); - o = new OneOf(ImmutableList.of(a)); - assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(o)); - o = new OneOf(ImmutableList.of(AirbyteProtocolType.BOOLEAN, AirbyteProtocolType.NUMBER)); - assertEquals(StandardSQLTypeName.NUMERIC, generator.toDialectType(o)); + Union u = new Union(ImmutableList.of(s)); + assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(u)); + u = new Union(ImmutableList.of(a)); + assertEquals(StandardSQLTypeName.JSON, generator.toDialectType(u)); + u = new Union(ImmutableList.of(AirbyteProtocolType.BOOLEAN, AirbyteProtocolType.NUMBER)); + assertEquals(StandardSQLTypeName.NUMERIC, generator.toDialectType(u)); } @Test diff --git a/docs/understanding-airbyte/supported-data-types.md b/docs/understanding-airbyte/supported-data-types.md index cb11bf5de539b..f588436e6aa61 100644 --- a/docs/understanding-airbyte/supported-data-types.md +++ b/docs/understanding-airbyte/supported-data-types.md @@ -10,20 +10,20 @@ This type system does not constrain values. However, destinations may not fully This table summarizes the available types. See the [Specific Types](#specific-types) section for explanation of optional parameters. -| Airbyte type | JSON Schema | Examples | -| -------------------------------------------------------------- | ----------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------- | -| String | `{"type": "string""}` | `"foo bar"` | -| Boolean | `{"type": "boolean"}` | `true` or `false` | -| Date | `{"type": "string", "format": "date"}` | `"2021-01-23"`, `"2021-01-23 BC"` | -| Timestamp with timezone | `{"type": "string", "format": "date-time", "airbyte_type": "timestamp_with_timezone"}` | `"2022-11-22T01:23:45.123456+05:00"`, `"2022-11-22T01:23:45Z BC"` | -| Timestamp without timezone | `{"type": "string", "format": "date-time", "airbyte_type": "timestamp_without_timezone"}` | `"2022-11-22T01:23:45"`, `"2022-11-22T01:23:45.123456 BC"` | -| Time without timezone | `{"type": "string", "airbyte_type": "time_with_timezone"}` | `"01:23:45.123456"`, `"01:23:45"` | -| Time with timezone | `{"type": "string", "airbyte_type": "time_without_timezone"}` | `"01:23:45.123456+05:00"`, `"01:23:45Z"` | -| Integer | `{"type": "integer"}` or `{"type": "number", "airbyte_type": "integer"}` | `42` | -| Number | `{"type": "number"}` | `1234.56` | -| Array | `{"type": "array"}`; optionally `items` | `[1, 2, 3]` | -| Object | `{"type": "object"}`; optionally `properties` | `{"foo": "bar"}` | -| Union | `{"oneOf": [...]}` | |ß +| Airbyte type | JSON Schema | Examples | +|----------------------------|-----------------------------------------------------------------------------------------------------|-------------------------------------------------------------------| +| String | `{"type": "string"}` | `"foo bar"` | +| Boolean | `{"type": "boolean"}` | `true` or `false` | +| Date | `{"type": "string", "format": "date"}` | `"2021-01-23"`, `"2021-01-23 BC"` | +| Timestamp without timezone | `{"type": "string", "format": "date-time", "airbyte_type": "timestamp_without_timezone"}` | `"2022-11-22T01:23:45"`, `"2022-11-22T01:23:45.123456 BC"` | +| Timestamp with timezone | `{"type": "string", "format": "date-time"}`; optionally `"airbyte_type": "timestamp_with_timezone"` | `"2022-11-22T01:23:45.123456+05:00"`, `"2022-11-22T01:23:45Z BC"` | +| Time without timezone | `{"type": "string", "format": "time", "airbyte_type": "time_without_timezone"}` | `"01:23:45.123456"`, `"01:23:45"` | +| Time with timezone | `{"type": "string", "format": "time", "airbyte_type": "time_with_timezone"}` | `"01:23:45.123456+05:00"`, `"01:23:45Z"` | +| Integer | `{"type": "integer"}` or `{"type": "number", "airbyte_type": "integer"}` | `42` | +| Number | `{"type": "number"}` | `1234.56` | +| Array | `{"type": "array"}`; optionally `items` | `[1, 2, 3]` | +| Object | `{"type": "object"}`; optionally `properties` | `{"foo": "bar"}` | +| Union | `{"oneOf": [...]}` | | ### Record structure As a reminder, sources expose a `discover` command, which returns a list of [`AirbyteStreams`](https://github.com/airbytehq/airbyte/blob/111131a193359027d0081de1290eb4bb846662ef/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml#L122), and a `read` method, which emits a series of [`AirbyteRecordMessages`](https://github.com/airbytehq/airbyte/blob/111131a193359027d0081de1290eb4bb846662ef/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml#L46-L66). The type system determines what a valid `json_schema` is for an `AirbyteStream`, which in turn dictates what messages `read` is allowed to emit.