diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index d7c7535e1119d..2db83e4fa83d6 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.20.0 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Add new test cases to BaseTypingDedupingTest to exercise special characters. | | 0.19.0 | 2024-02-01 | [\#34745](https://github.com/airbytehq/airbyte/pull/34745) | Reorganize CDK module structure. | | 0.18.0 | 2024-02-08 | [\#33606](https://github.com/airbytehq/airbyte/pull/33606) | Add updated Initial and Incremental Stream State definitions for DB Sources. | | 0.17.1 | 2024-02-08 | [\#35027](https://github.com/airbytehq/airbyte/pull/35027) | Make state handling thread safe in async destination framework. | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 8fa71c162ea3f..a4c13e003d78c 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.19.0 +version=0.20.0 diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java index 92a8ea0f8dae2..5c7cd00e8ae28 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.base.destination.typing_deduping; +import static java.util.stream.Collectors.toList; import static org.junit.jupiter.api.Assertions.assertAll; import com.fasterxml.jackson.databind.JsonNode; @@ -380,6 +381,36 @@ public void incrementalDedup() throws Exception { verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()); } + /** + * Run the first sync from {@link #incrementalDedup()}, but repeat the messages many times. Some + * destinations behave differently with small vs large record count, so this test case tries to + * exercise that behavior. + */ + @Test + public void largeDedupSync() throws Exception { + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(List.of("updated_at")) + .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP) + .withPrimaryKey(List.of(List.of("id1"), List.of("id2"))) + .withStream(new AirbyteStream() + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(SCHEMA)))); + + // Run a sync with 25K copies of the input messages + final List messages1 = repeatList(25_000, readMessages("dat/sync1_messages.jsonl")); + + runSync(catalog, messages1); + + // The raw table will contain 25K copies of each record + final List expectedRawRecords1 = repeatList(25_000, readRecords("dat/sync1_expectedrecords_raw.jsonl")); + // But the final table should be fully deduped + final List expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"); + verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison()); + } + /** * Identical to {@link #incrementalDedup()}, except that the stream has no namespace. */ @@ -707,6 +738,14 @@ public void testDataTypes() throws Exception { // this test probably needs some configuration per destination to specify what values are supported? } + private List repeatList(final int n, final List list) { + return Collections + .nCopies(n, list) + .stream() + .flatMap(List::stream) + .collect(toList()); + } + protected void verifySyncResult(final List expectedRawRecords, final List expectedFinalRecords, final boolean disableFinalTableComparison) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/dat/sync1_messages.jsonl b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/dat/sync1_messages.jsonl index 4c5dec1a24ea2..653e49e39e207 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/dat/sync1_messages.jsonl +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/resources/dat/sync1_messages.jsonl @@ -10,3 +10,5 @@ {"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}} // Emit a record with an invalid age. {"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}} +// Emit a record with interesting characters in one of the values. +{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}}} diff --git a/airbyte-integrations/connectors/destination-redshift/build.gradle b/airbyte-integrations/connectors/destination-redshift/build.gradle index 859b1843c1ca6..de1540f7811a9 100644 --- a/airbyte-integrations/connectors/destination-redshift/build.gradle +++ b/airbyte-integrations/connectors/destination-redshift/build.gradle @@ -4,7 +4,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.17.0' + cdkVersionRequired = '0.20.0' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } @@ -39,9 +39,3 @@ dependencies { testImplementation "org.mockito:mockito-inline:4.1.0" } - -configurations.all { - resolutionStrategy { - force libs.jooq - } -} diff --git a/airbyte-integrations/connectors/destination-redshift/metadata.yaml b/airbyte-integrations/connectors/destination-redshift/metadata.yaml index 1cfa9269aaa2e..e32903ada77db 100644 --- a/airbyte-integrations/connectors/destination-redshift/metadata.yaml +++ b/airbyte-integrations/connectors/destination-redshift/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc - dockerImageTag: 2.1.6 + dockerImageTag: 2.1.7 dockerRepository: airbyte/destination-redshift documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift githubIssueLabel: destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java index 44dee9e63591e..c2b4da5c97ef9 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftS3StagingSqlOperations.java @@ -19,13 +19,15 @@ import io.airbyte.commons.lang.Exceptions; import io.airbyte.integrations.destination.redshift.manifest.Entry; import io.airbyte.integrations.destination.redshift.manifest.Manifest; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.util.Base64; import java.util.Base64.Encoder; import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; -import org.joda.time.DateTime; public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implements StagingOperations { @@ -44,7 +46,7 @@ public RedshiftS3StagingSqlOperations(final NamingConventionTransformer nameTran this.s3StorageOperations = new S3StorageOperations(nameTransformer, s3Client, s3Config); this.s3Config = s3Config; this.objectMapper = new ObjectMapper(); - if (encryptionConfig instanceof AesCbcEnvelopeEncryption e) { + if (encryptionConfig instanceof final AesCbcEnvelopeEncryption e) { this.s3StorageOperations.addBlobDecorator(new AesCbcEnvelopeEncryptionBlobDecorator(e.key())); this.keyEncryptingKey = e.key(); } else { @@ -57,21 +59,22 @@ public String getStagingPath(final UUID connectionId, final String namespace, final String streamName, final String outputTableName, - final DateTime writeDatetime) { + final Instant writeDatetime) { final String bucketPath = s3Config.getBucketPath(); final String prefix = bucketPath.isEmpty() ? "" : bucketPath + (bucketPath.endsWith("/") ? "" : "/"); + final ZonedDateTime zdt = writeDatetime.atZone(ZoneOffset.UTC); return nameTransformer.applyDefaultCase(String.format("%s%s/%s_%02d_%02d_%02d_%s/", prefix, nameTransformer.applyDefaultCase(nameTransformer.convertStreamName(outputTableName)), - writeDatetime.year().get(), - writeDatetime.monthOfYear().get(), - writeDatetime.dayOfMonth().get(), - writeDatetime.hourOfDay().get(), + zdt.getYear(), + zdt.getMonthValue(), + zdt.getDayOfMonth(), + zdt.getHour(), connectionId)); } @Override - public String getStageName(String namespace, String streamName) { + public String getStageName(final String namespace, final String streamName) { return "garbage-unused"; } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java index a25a4b53846f5..4c8927098ea83 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/operations/RedshiftSqlOperations.java @@ -31,7 +31,8 @@ import org.jooq.InsertValuesStep4; import org.jooq.Record; import org.jooq.SQLDialect; -import org.jooq.conf.ParamType; +import org.jooq.conf.Settings; +import org.jooq.conf.StatementType; import org.jooq.impl.DefaultDataType; import org.jooq.impl.SQLDataType; import org.slf4j.Logger; @@ -112,7 +113,19 @@ protected void insertRecordsInternalV2(final JdbcDatabase database, // > default for (final List batch : Iterables.partition(records, 10_000)) { LOGGER.info("Prepared batch size: {}, {}, {}", batch.size(), schemaName, tableName); - final DSLContext create = using(connection, SQLDialect.POSTGRES); + final DSLContext create = using( + connection, + SQLDialect.POSTGRES, + // Force inlined params. + // jooq normally tries to intelligently use bind params when possible. + // This would cause queries with many params to use inline params, + // but small queries would use bind params. + // In turn, that would force us to intelligently escape string values, + // since we need to escape inlined strings + // but need to not escape bound strings. + // Instead, we force jooq to always inline params, + // and always call escapeStringLiteral() on the string values. + new Settings().withStatementType(StatementType.STATIC_STATEMENT)); // JOOQ adds some overhead here. Building the InsertValuesStep object takes about 139ms for 5K // records. // That's a nontrivial execution speed loss when the actual statement execution takes 500ms. @@ -138,13 +151,7 @@ protected void insertRecordsInternalV2(final JdbcDatabase database, val(Instant.ofEpochMilli(record.getRecord().getEmittedAt()).atOffset(ZoneOffset.UTC)), val((OffsetDateTime) null)); } - // Intentionally don't use insert.execute(). - // jooq will try to use a parameterized query if there are not too many query params. - // This means that for small queries, we would need to not-escape backslashes, - // but for large queries we _would_ need to escape them. - // Instead, force jooq to always inline params. - // This allows us to always escape backslashes. - connection.createStatement().execute(insert.getSQL(ParamType.INLINED)); + insert.execute(); LOGGER.info("Executed batch size: {}, {}, {}", batch.size(), schemaName, tableName); } }); diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl index c805113dc6c20..61024be7867d0 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl @@ -2,3 +2,4 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl index 8aa8521830614..6f53b9f3c12dd 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl @@ -3,3 +3,4 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} // Invalid columns are nulled out (i.e. SQL null, not JSON null) {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl index 80fac124d28dc..4012c086a9e61 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync1_expectedrecords_raw.jsonl @@ -3,3 +3,4 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} // Invalid data is still allowed in the raw table. {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl index 6e9258bab2552..0989dfc17ed07 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl @@ -2,6 +2,7 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl index 13c59b2f99121..1187ca159d722 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl @@ -1,3 +1,4 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} // Delete Bob, keep Charlie {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl index 32a7e57b1c147..2f634c6ad4e95 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/resources/dat/sync2_expectedrecords_raw.jsonl @@ -3,6 +3,7 @@ {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}} // And append the records from the second sync {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}} {"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}} diff --git a/airbyte-integrations/connectors/destination-redshift/src/test/resources/typing_deduping_with_cdc.sql b/airbyte-integrations/connectors/destination-redshift/src/test/resources/typing_deduping_with_cdc.sql index bc107dd680e9f..371b189e4856a 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test/resources/typing_deduping_with_cdc.sql +++ b/airbyte-integrations/connectors/destination-redshift/src/test/resources/typing_deduping_with_cdc.sql @@ -156,13 +156,12 @@ with "numbered_rows" as ( select *, - row_number() over (partition by - "id1", - "id2" - order by - "updated_at" desc NULLS LAST, - "_airbyte_extracted_at" desc - ) as "row_number" + row_number() over ( + partition by "id1", "id2" + order by + "updated_at" desc NULLS LAST, + "_airbyte_extracted_at" desc + ) as "row_number" from "intermediate_data" ) select @@ -193,13 +192,12 @@ where "_airbyte_raw_id" in ( from ( select "_airbyte_raw_id", - row_number() over (partition by - "id1", - "id2" - order by - "updated_at" desc NULLS LAST, - "_airbyte_extracted_at" desc - ) as "row_number" + row_number() over ( + partition by "id1", "id2" + order by + "updated_at" desc NULLS LAST, + "_airbyte_extracted_at" desc + ) as "row_number" from "test_schema"."users_finalunittest" ) as "airbyte_ids" where "row_number" <> 1 @@ -207,7 +205,8 @@ where "_airbyte_raw_id" in ( delete from "test_schema"."users_finalunittest" where "_ab_cdc_deleted_at" is not null; update "test_schema"."users_raw" -set "_airbyte_loaded_at" = GETDATE() +set +"_airbyte_loaded_at" = GETDATE() where ( "_airbyte_loaded_at" is null and "_airbyte_extracted_at" > '2023-02-15T18:35:24Z' diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index 32df89a47f707..ba51ae50ba1f0 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -237,6 +237,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c | Version | Date | Pull Request | Subject | |:--------|:-----------|:------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.1.7 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Switch back to jooq-based sql execution for standard insert | | 2.1.6 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Update to CDK version 0.17.0 | | 2.1.5 | 2024-01-30 | [\#34680](https://github.com/airbytehq/airbyte/pull/34680) | Update to CDK version 0.16.3 | | 2.1.4 | 2024-01-29 | [\#34634](https://github.com/airbytehq/airbyte/pull/34634) | Use lowercase raw schema and table in T+D [CDK changes](https://github.com/airbytehq/airbyte/pull/34533) |