Skip to content

Commit

Permalink
Destination redshift: Switch back to jooq execution; add e2e special …
Browse files Browse the repository at this point in the history
…characters test (#34562)
  • Loading branch information
edgao committed Feb 9, 2024
1 parent 80bd719 commit 2b2408a
Show file tree
Hide file tree
Showing 16 changed files with 93 additions and 41 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.20.0 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Add new test cases to BaseTypingDedupingTest to exercise special characters. |
| 0.19.0 | 2024-02-01 | [\#34745](https://github.com/airbytehq/airbyte/pull/34745) | Reorganize CDK module structure. |
| 0.18.0 | 2024-02-08 | [\#33606](https://github.com/airbytehq/airbyte/pull/33606) | Add updated Initial and Incremental Stream State definitions for DB Sources. |
| 0.17.1 | 2024-02-08 | [\#35027](https://github.com/airbytehq/airbyte/pull/35027) | Make state handling thread safe in async destination framework. |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.19.0
version=0.20.0
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertAll;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -380,6 +381,36 @@ public void incrementalDedup() throws Exception {
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison());
}

/**
* Run the first sync from {@link #incrementalDedup()}, but repeat the messages many times. Some
* destinations behave differently with small vs large record count, so this test case tries to
* exercise that behavior.
*/
@Test
public void largeDedupSync() throws Exception {
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(List.of("updated_at"))
.withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
.withPrimaryKey(List.of(List.of("id1"), List.of("id2")))
.withStream(new AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA))));

// Run a sync with 25K copies of the input messages
final List<AirbyteMessage> messages1 = repeatList(25_000, readMessages("dat/sync1_messages.jsonl"));

runSync(catalog, messages1);

// The raw table will contain 25K copies of each record
final List<JsonNode> expectedRawRecords1 = repeatList(25_000, readRecords("dat/sync1_expectedrecords_raw.jsonl"));
// But the final table should be fully deduped
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison());
}

/**
* Identical to {@link #incrementalDedup()}, except that the stream has no namespace.
*/
Expand Down Expand Up @@ -707,6 +738,14 @@ public void testDataTypes() throws Exception {
// this test probably needs some configuration per destination to specify what values are supported?
}

private <T> List<T> repeatList(final int n, final List<T> list) {
return Collections
.nCopies(n, list)
.stream()
.flatMap(List::stream)
.collect(toList());
}

protected void verifySyncResult(final List<JsonNode> expectedRawRecords,
final List<JsonNode> expectedFinalRecords,
final boolean disableFinalTableComparison)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}}
// Emit a record with an invalid age.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}}
// Emit a record with interesting characters in one of the values.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}}}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.17.0'
cdkVersionRequired = '0.20.0'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down Expand Up @@ -39,9 +39,3 @@ dependencies {
testImplementation "org.mockito:mockito-inline:4.1.0"

}

configurations.all {
resolutionStrategy {
force libs.jooq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 2.1.6
dockerImageTag: 2.1.7
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.integrations.destination.redshift.manifest.Entry;
import io.airbyte.integrations.destination.redshift.manifest.Manifest;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Base64;
import java.util.Base64.Encoder;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.joda.time.DateTime;

public class RedshiftS3StagingSqlOperations extends RedshiftSqlOperations implements StagingOperations {

Expand All @@ -44,7 +46,7 @@ public RedshiftS3StagingSqlOperations(final NamingConventionTransformer nameTran
this.s3StorageOperations = new S3StorageOperations(nameTransformer, s3Client, s3Config);
this.s3Config = s3Config;
this.objectMapper = new ObjectMapper();
if (encryptionConfig instanceof AesCbcEnvelopeEncryption e) {
if (encryptionConfig instanceof final AesCbcEnvelopeEncryption e) {
this.s3StorageOperations.addBlobDecorator(new AesCbcEnvelopeEncryptionBlobDecorator(e.key()));
this.keyEncryptingKey = e.key();
} else {
Expand All @@ -57,21 +59,22 @@ public String getStagingPath(final UUID connectionId,
final String namespace,
final String streamName,
final String outputTableName,
final DateTime writeDatetime) {
final Instant writeDatetime) {
final String bucketPath = s3Config.getBucketPath();
final String prefix = bucketPath.isEmpty() ? "" : bucketPath + (bucketPath.endsWith("/") ? "" : "/");
final ZonedDateTime zdt = writeDatetime.atZone(ZoneOffset.UTC);
return nameTransformer.applyDefaultCase(String.format("%s%s/%s_%02d_%02d_%02d_%s/",
prefix,
nameTransformer.applyDefaultCase(nameTransformer.convertStreamName(outputTableName)),
writeDatetime.year().get(),
writeDatetime.monthOfYear().get(),
writeDatetime.dayOfMonth().get(),
writeDatetime.hourOfDay().get(),
zdt.getYear(),
zdt.getMonthValue(),
zdt.getDayOfMonth(),
zdt.getHour(),
connectionId));
}

@Override
public String getStageName(String namespace, String streamName) {
public String getStageName(final String namespace, final String streamName) {
return "garbage-unused";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import org.jooq.InsertValuesStep4;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.conf.ParamType;
import org.jooq.conf.Settings;
import org.jooq.conf.StatementType;
import org.jooq.impl.DefaultDataType;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -112,7 +113,19 @@ protected void insertRecordsInternalV2(final JdbcDatabase database,
// > default
for (final List<PartialAirbyteMessage> batch : Iterables.partition(records, 10_000)) {
LOGGER.info("Prepared batch size: {}, {}, {}", batch.size(), schemaName, tableName);
final DSLContext create = using(connection, SQLDialect.POSTGRES);
final DSLContext create = using(
connection,
SQLDialect.POSTGRES,
// Force inlined params.
// jooq normally tries to intelligently use bind params when possible.
// This would cause queries with many params to use inline params,
// but small queries would use bind params.
// In turn, that would force us to intelligently escape string values,
// since we need to escape inlined strings
// but need to not escape bound strings.
// Instead, we force jooq to always inline params,
// and always call escapeStringLiteral() on the string values.
new Settings().withStatementType(StatementType.STATIC_STATEMENT));
// JOOQ adds some overhead here. Building the InsertValuesStep object takes about 139ms for 5K
// records.
// That's a nontrivial execution speed loss when the actual statement execution takes 500ms.
Expand All @@ -138,13 +151,7 @@ protected void insertRecordsInternalV2(final JdbcDatabase database,
val(Instant.ofEpochMilli(record.getRecord().getEmittedAt()).atOffset(ZoneOffset.UTC)),
val((OffsetDateTime) null));
}
// Intentionally don't use insert.execute().
// jooq will try to use a parameterized query if there are not too many query params.
// This means that for small queries, we would need to not-escape backslashes,
// but for large queries we _would_ need to escape them.
// Instead, force jooq to always inline params.
// This allows us to always escape backslashes.
connection.createStatement().execute(insert.getSQL(ParamType.INLINED));
insert.execute();
LOGGER.info("Executed batch size: {}, {}, {}", batch.size(), schemaName, tableName);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
// Invalid columns are nulled out (i.e. SQL null, not JSON null)
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
// Invalid data is still allowed in the raw table.
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}

{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
// Delete Bob, keep Charlie
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"errors":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}}
// And append the records from the second sync
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,12 @@ with
"numbered_rows" as (
select
*,
row_number() over (partition by
"id1",
"id2"
order by
"updated_at" desc NULLS LAST,
"_airbyte_extracted_at" desc
) as "row_number"
row_number() over (
partition by "id1", "id2"
order by
"updated_at" desc NULLS LAST,
"_airbyte_extracted_at" desc
) as "row_number"
from "intermediate_data"
)
select
Expand Down Expand Up @@ -193,21 +192,21 @@ where "_airbyte_raw_id" in (
from (
select
"_airbyte_raw_id",
row_number() over (partition by
"id1",
"id2"
order by
"updated_at" desc NULLS LAST,
"_airbyte_extracted_at" desc
) as "row_number"
row_number() over (
partition by "id1", "id2"
order by
"updated_at" desc NULLS LAST,
"_airbyte_extracted_at" desc
) as "row_number"
from "test_schema"."users_finalunittest"
) as "airbyte_ids"
where "row_number" <> 1
);
delete from "test_schema"."users_finalunittest"
where "_ab_cdc_deleted_at" is not null;
update "test_schema"."users_raw"
set "_airbyte_loaded_at" = GETDATE()
set
"_airbyte_loaded_at" = GETDATE()
where (
"_airbyte_loaded_at" is null
and "_airbyte_extracted_at" > '2023-02-15T18:35:24Z'
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/redshift.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.1.7 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Switch back to jooq-based sql execution for standard insert |
| 2.1.6 | 2024-02-08 | [\#34502](https://github.com/airbytehq/airbyte/pull/34502) | Update to CDK version 0.17.0 |
| 2.1.5 | 2024-01-30 | [\#34680](https://github.com/airbytehq/airbyte/pull/34680) | Update to CDK version 0.16.3 |
| 2.1.4 | 2024-01-29 | [\#34634](https://github.com/airbytehq/airbyte/pull/34634) | Use lowercase raw schema and table in T+D [CDK changes](https://github.com/airbytehq/airbyte/pull/34533) |
Expand Down

0 comments on commit 2b2408a

Please sign in to comment.