Skip to content

Commit

Permalink
Destination redshift: Update to latest cdk (#34613)
Browse files Browse the repository at this point in the history
Co-authored-by: Gireesh Sreepathi <gisripa@gmail.com>
  • Loading branch information
edgao and gisripa committed May 6, 2024
1 parent 0da1499 commit d31f0dd
Show file tree
Hide file tree
Showing 33 changed files with 57 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.29.12'
cdkVersionRequired = '0.33.0'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand All @@ -29,7 +29,7 @@ dependencies {
implementation 'com.amazonaws:aws-java-sdk-s3:1.11.978'
// TODO: Verify no aws sdk code is pulled by this dependency causing classpath conflicts
// https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-jdbc10-driver-differences.html
implementation 'com.amazon.redshift:redshift-jdbc42:2.1.0.23'
implementation 'com.amazon.redshift:redshift-jdbc42:2.1.0.26'
implementation 'org.apache.commons:commons-csv:1.4'
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 2.4.3
dockerImageTag: 2.5.0
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public static JsonNode getJdbcConfig(final JsonNode redshiftConfig) {
}

@Override
protected JdbcSqlGenerator getSqlGenerator() {
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
return new RedshiftSqlGenerator(super.getNamingResolver());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public JsonNode toJdbcConfig(final JsonNode config) {
}

@Override
protected JdbcSqlGenerator getSqlGenerator() {
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
return new RedshiftSqlGenerator(getNamingResolver());
}

Expand Down Expand Up @@ -271,7 +271,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
typerDeduper,
parsedCatalog,
defaultNamespace,
true)
JavaBaseConstants.DestinationColumns.V2_WITH_META)
.setDataTransformer(getDataTransformer(parsedCatalog, defaultNamespace))
.build()
.createAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public String uploadRecordsToStage(final JdbcDatabase database,

private String putManifest(final String manifestContents, final String stagingPath) {
final String manifestFilePath = stagingPath + String.format("%s.manifest", UUID.randomUUID());
s3StorageOperations.uploadManifest(s3Config.getBucketName(), manifestFilePath, manifestContents);
s3StorageOperations.uploadManifest(manifestFilePath, manifestContents);
return manifestFilePath;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class RedshiftRawTableAirbyteMetaMigration(
"Executing RawTableAirbyteMetaMigration for ${stream.id.originalNamespace}.${stream.id.originalName} for real"
)
destinationHandler.execute(
getRawTableMetaColumnAddDdl(stream.id.rawNamespace!!, stream.id.rawName!!)
getRawTableMetaColumnAddDdl(stream.id.rawNamespace, stream.id.rawName)
)

// Update the state. We didn't modify the table in a relevant way, so don't invalidate the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,17 @@ protected SQLDialect getDialect() {
*/

@Override
protected Field<?> castedField(final Field<?> field, final AirbyteType type, final String alias, final boolean useExpensiveSaferCasting) {
protected Field<?> castedField(final Field<?> field, final AirbyteType type, final boolean useExpensiveSaferCasting) {
if (type instanceof final AirbyteProtocolType airbyteProtocolType) {
switch (airbyteProtocolType) {
case STRING -> {
return field(CASE_STATEMENT_SQL_TEMPLATE,
jsonTypeOf(field).ne("string").and(field.isNotNull()),
jsonSerialize(field),
castedField(field, airbyteProtocolType, useExpensiveSaferCasting)).as(quotedName(alias));
castedField(field, airbyteProtocolType, useExpensiveSaferCasting));
}
default -> {
return castedField(field, airbyteProtocolType, useExpensiveSaferCasting).as(quotedName(alias));
return castedField(field, airbyteProtocolType, useExpensiveSaferCasting);
}
}

Expand All @@ -117,12 +117,12 @@ protected Field<?> castedField(final Field<?> field, final AirbyteType type, fin
return switch (type.getTypeName()) {
case Struct.TYPE, UnsupportedOneOf.TYPE -> field(CASE_STATEMENT_NO_ELSE_SQL_TEMPLATE,
jsonTypeOf(field).eq("object"),
cast(field, getStructType())).as(quotedName(alias));
cast(field, getStructType()));
case Array.TYPE -> field(CASE_STATEMENT_NO_ELSE_SQL_TEMPLATE,
jsonTypeOf(field).eq("array"),
cast(field, getArrayType())).as(quotedName(alias));
cast(field, getArrayType()));
// No nested Unions supported so this will definitely not result in infinite recursion.
case Union.TYPE -> castedField(field, ((Union) type).chooseType(), alias, useExpensiveSaferCasting);
case Union.TYPE -> castedField(field, ((Union) type).chooseType(), useExpensiveSaferCasting);
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + type);
};
}
Expand All @@ -135,8 +135,7 @@ protected List<Field<?>> extractRawDataFields(final LinkedHashMap<ColumnId, Airb
.map(column -> castedField(
field(quotedName(COLUMN_NAME_DATA, column.getKey().getOriginalName())),
column.getValue(),
column.getKey().getName(),
useExpensiveSaferCasting))
useExpensiveSaferCasting).as(column.getKey().getName()))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -176,7 +175,7 @@ Field<?> toCastingErrorCaseStmt(final ColumnId column, final AirbyteType type) {
// TODO: Timestamp format issues can result in null values when cast, add regex check if destination
// supports regex functions.
return field(CASE_STATEMENT_SQL_TEMPLATE,
field.isNotNull().and(castedField(field, type, column.getName(), true).isNull()),
field.isNotNull().and(castedField(field, type, true).as(column.getName()).isNull()),
function("ARRAY", getSuperType(),
function("JSON_PARSE", getSuperType(), val(
"{\"field\": \"" + column.getName() + "\", "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.jooq.DSLContext;
import org.jooq.conf.Settings;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

public abstract class AbstractRedshiftTypingDedupingTest extends JdbcTypingDedupingTest {
Expand Down Expand Up @@ -63,6 +64,9 @@ protected DSLContext getDslContext() {
}

@Test
@Disabled("Redshift connector 2.4.3 and below are rendered useless with "
+ "Redshift cluster version https://docs.aws.amazon.com/redshift/latest/mgmt/cluster-versions.html#cluster-version-181 "
+ "due to metadata calls hanging. We cannot run this test anymore")
public void testRawTableMetaMigration_append() throws Exception {
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
Expand All @@ -86,6 +90,9 @@ public void testRawTableMetaMigration_append() throws Exception {
}

@Test
@Disabled("Redshift connector 2.4.3 and below are rendered useless with "
+ "Redshift cluster version https://docs.aws.amazon.com/redshift/latest/mgmt/cluster-versions.html#cluster-version-181 "
+ "due to metadata calls hanging. We cannot run this test anymore")
public void testRawTableMetaMigration_incrementalDedupe() throws Exception {
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@

public class RedshiftSqlGeneratorIntegrationTest extends JdbcSqlGeneratorIntegrationTest<RedshiftState> {

@Override
protected boolean getSupportsSafeCast() {
return true;
}

/**
* Redshift's JDBC driver doesn't map certain data types onto {@link java.sql.JDBCType} usefully.
* This class adds special handling for those types.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"age","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"registration_date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}]}, "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": {"changes": []}}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Keep the Alice record with more recent updated_at
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"age","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"registration_date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"address","change":"NULLED","reason":"SOURCE_RETRIEVAL_ERROR"}]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"address","change":"NULLED","reason":"SOURCE_RETRIEVAL_ERROR"}]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
// Invalid columns are nulled out (i.e. SQL null, not JSON null)
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"age","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"registration_date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"address","change":"NULLED","reason":"SOURCE_RETRIEVAL_ERROR"}]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": {"changes": []}}
// Invalid data is still allowed in the raw table.
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}, "_airbyte_meta": {"changes": [{"field": "address", "change": "NULLED", "reason": "SOURCE_RETRIEVAL_ERROR"}]}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": {"changes": [{"field": "address", "change": "NULLED", "reason": "SOURCE_RETRIEVAL_ERROR"}]}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}, "_airbyte_meta": {"changes": []}}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
// Emit a record with no _ab_cdc_deleted_at field. CDC sources typically emit an explicit null, but we should handle both cases.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}}}
// Emit a record with an invalid age & address nulled at source.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}, "meta": {"changes": [{"field": "address", "change": "NULLED", "reason": "SOURCE_RETRIEVAL_ERROR"}]}}}
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "meta": {"changes": [{"field": "address", "change": "NULLED", "reason": "SOURCE_RETRIEVAL_ERROR"}]}}}
// Emit a record with interesting characters in one of the values.
{"type": "RECORD", "record": {"emitted_at": 1000, "data": {"id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}}}
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"changes":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
// Charlie wasn't re-emitted with updated_at, so it still has a null cursor
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"age","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"registration_date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"}]}, "id1": 2, "id2": 200, "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 2, "id2": 200, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 0, "_ab_cdc_deleted_at": null, "name" :"Alice", "address": {"city": "San Francisco", "state": "CA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "old_cursor": 1, "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": "this is not an integer", "registration_date": "this is not a date"}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_data": {"id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00Z", "_ab_cdc_deleted_at": null, "name": "Bob", "address": {"city": "New York", "state": "NY"}}, "_airbyte_meta": {"changes": []}}
{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_data": {"id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00Z"}, "_airbyte_meta": {"changes": []}}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00.000000Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"age","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"registration_date","change":"NULLED","reason":"DESTINATION_TYPECAST_ERROR"},{"field":"address","change":"NULLED","reason":"SOURCE_RETRIEVAL_ERROR"}]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[{"field":"address","change":"NULLED","reason":"SOURCE_RETRIEVAL_ERROR"}]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000Z", "name": "Charlie", "age": 42, "registration_date": "2023-12-23"}
{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000Z", "_airbyte_meta": {"changes":[]}, "id1": 3, "id2": 200, "updated_at": "2000-01-01T00:04:00.000000Z", "name": "a\bb\fc\nd\re\tf`~!@#$%^&*()_+-=[]\\{}|'\",./<>?"}

{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000Z", "_airbyte_meta":{"changes":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}}
Expand Down
Loading

0 comments on commit d31f0dd

Please sign in to comment.