Skip to content

Commit

Permalink
Destination Redshift: Json transformer for graceful handling of large…
Browse files Browse the repository at this point in the history
… records (#36203)

This includes CDK changes to wire the `Transformer` interface into `AsyncStreamConsumer`
  • Loading branch information
gisripa committed Mar 18, 2024
1 parent a852963 commit 1a410aa
Show file tree
Hide file tree
Showing 26 changed files with 721 additions and 115 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Expand Up @@ -145,6 +145,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.25.0 | 2024-03-18 | [\#36203](https://github.com/airbytehq/airbyte/pull/36203) | Wiring of Transformer to StagingConsumerFactory and JdbcBufferedConsumerFactory; import changes for Kotlin conversion; State message logs to debug |
| 0.24.1 | 2024-03-13 | [\#36022](https://github.com/airbytehq/airbyte/pull/36022) | Move log4j2-test.xml to test fixtures, away from runtime classpath. |
| 0.24.0 | 2024-03-13 | [\#35944](https://github.com/airbytehq/airbyte/pull/35944) | Add `_airbyte_meta` in raw table and test fixture updates |
| 0.23.20 | 2024-03-12 | [\#36011](https://github.com/airbytehq/airbyte/pull/36011) | Debezium configuration for conversion of null value on a column with default value. |
Expand Down
@@ -1 +1 @@
version=0.24.1
version=0.25.0
Expand Up @@ -21,6 +21,8 @@
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.async.deser.IdentityDataTransformer;
import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer;
import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteRecordMessage;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
Expand Down Expand Up @@ -283,6 +285,11 @@ protected String getDatabaseName(final JsonNode config) {
return config.get(JdbcUtils.DATABASE_KEY).asText();
}

protected StreamAwareDataTransformer getDataTransformer(final ParsedCatalog parsedCatalog,
final String defaultNamespace) {
return new IdentityDataTransformer();
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
Expand All @@ -296,36 +303,29 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final JdbcDatabase database = getDatabase(getDataSource(config));
final String defaultNamespace;
final TyperDeduper typerDeduper;
if (TypingAndDedupingFlag.isDestinationV2()) {
defaultNamespace = config.get(getConfigSchemaKey()).asText();
addDefaultNamespaceToStreams(catalog, defaultNamespace);
typerDeduper = getV2TyperDeduper(config, catalog, database);
} else {
defaultNamespace = null;
typerDeduper = new NoopTyperDeduper();
// Short circuit for non-v2 destinations.
if (!TypingAndDedupingFlag.isDestinationV2()) {
return JdbcBufferedConsumerFactory.createAsync(
outputRecordCollector,
database,
sqlOperations,
namingResolver,
config,
catalog,
null,
new NoopTyperDeduper());
}
return JdbcBufferedConsumerFactory.createAsync(
outputRecordCollector,
database,
sqlOperations,
namingResolver,
config,
catalog,
defaultNamespace,
typerDeduper);

final String defaultNamespace = config.get(getConfigSchemaKey()).asText();
addDefaultNamespaceToStreams(catalog, defaultNamespace);
return getV2MessageConsumer(config, catalog, outputRecordCollector, database, defaultNamespace);
}

/**
* Creates the appropriate TyperDeduper class for the jdbc destination and the user's configuration
*
* @param config the configuration for the connection
* @param catalog the catalog for the connection
* @param database a database instance
* @return the appropriate TyperDeduper instance for this connection.
*/
private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JdbcDatabase database) {
private SerializedAirbyteMessageConsumer getV2MessageConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final String defaultNamespace) {
final JdbcSqlGenerator sqlGenerator = getSqlGenerator();
Optional<String> rawNamespaceOverride = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE);
final ParsedCatalog parsedCatalog = rawNamespaceOverride
Expand All @@ -346,7 +346,17 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi
typerDeduper =
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, migrations);
}
return typerDeduper;

return JdbcBufferedConsumerFactory.createAsync(
outputRecordCollector,
database,
sqlOperations,
namingResolver,
config,
catalog,
defaultNamespace,
typerDeduper,
getDataTransformer(parsedCatalog, defaultNamespace));
}

}
Expand Up @@ -17,7 +17,11 @@
import io.airbyte.cdk.integrations.destination.StreamSyncSummary;
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer;
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager;
import io.airbyte.cdk.integrations.destination.async.deser.DeserializationUtil;
import io.airbyte.cdk.integrations.destination.async.deser.IdentityDataTransformer;
import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer;
import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.async.state.FlushFailure;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseFunction;
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction;
Expand Down Expand Up @@ -70,7 +74,8 @@ public static SerializedAirbyteMessageConsumer createAsync(final Consumer<Airbyt
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final String defaultNamespace,
final TyperDeduper typerDeduper) {
final TyperDeduper typerDeduper,
final StreamAwareDataTransformer dataTransformer) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired());
return new AsyncStreamConsumer(
outputRecordCollector,
Expand All @@ -79,8 +84,23 @@ public static SerializedAirbyteMessageConsumer createAsync(final Consumer<Airbyt
new JdbcInsertFlushFunction(recordWriterFunction(database, sqlOperations, writeConfigs, catalog)),
catalog,
new BufferManager((long) (Runtime.getRuntime().maxMemory() * 0.2)),
new FlushFailure(),
Optional.ofNullable(defaultNamespace),
Executors.newFixedThreadPool(2));
Executors.newFixedThreadPool(2),
dataTransformer,
new DeserializationUtil());
}

public static SerializedAirbyteMessageConsumer createAsync(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final SqlOperations sqlOperations,
final NamingConventionTransformer namingResolver,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final String defaultNamespace,
final TyperDeduper typerDeduper) {
return createAsync(outputRecordCollector, database, sqlOperations, namingResolver, config, catalog, defaultNamespace, typerDeduper,
new IdentityDataTransformer());
}

private static List<WriteConfig> createWriteConfigs(final NamingConventionTransformer namingResolver,
Expand Down
Expand Up @@ -15,6 +15,8 @@
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.async.AsyncStreamConsumer;
import io.airbyte.cdk.integrations.destination.async.buffers.BufferManager;
import io.airbyte.cdk.integrations.destination.async.deser.IdentityDataTransformer;
import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer;
import io.airbyte.cdk.integrations.destination.jdbc.WriteConfig;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
Expand Down Expand Up @@ -66,6 +68,8 @@ public class StagingConsumerFactory extends SerialStagingConsumerFactory {
private final Optional<Long> bufferMemoryLimit;
private final long optimalBatchSizeBytes;

private final StreamAwareDataTransformer dataTransformer;

private StagingConsumerFactory(
final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
Expand All @@ -80,7 +84,8 @@ private StagingConsumerFactory(
final String defaultNamespace,
final boolean useDestinationsV2Columns,
final Optional<Long> bufferMemoryLimit,
final long optimalBatchSizeBytes) {
final long optimalBatchSizeBytes,
final StreamAwareDataTransformer dataTransformer) {
this.outputRecordCollector = outputRecordCollector;
this.database = database;
this.stagingOperations = stagingOperations;
Expand All @@ -95,6 +100,7 @@ private StagingConsumerFactory(
this.useDestinationsV2Columns = useDestinationsV2Columns;
this.bufferMemoryLimit = bufferMemoryLimit;
this.optimalBatchSizeBytes = optimalBatchSizeBytes;
this.dataTransformer = dataTransformer;
}

public static class Builder {
Expand All @@ -119,6 +125,8 @@ public static class Builder {
private Optional<Long> bufferMemoryLimit = Optional.empty();
private long optimalBatchSizeBytes = 50 * 1024 * 1024;

private StreamAwareDataTransformer dataTransformer;

private Builder() {}

public Builder setBufferMemoryLimit(final Optional<Long> bufferMemoryLimit) {
Expand All @@ -131,6 +139,11 @@ public Builder setOptimalBatchSizeBytes(final long optimalBatchSizeBytes) {
return this;
}

public Builder setDataTransformer(final StreamAwareDataTransformer dataTransformer) {
this.dataTransformer = dataTransformer;
return this;
}

public StagingConsumerFactory build() {
return new StagingConsumerFactory(
outputRecordCollector,
Expand All @@ -146,7 +159,8 @@ public StagingConsumerFactory build() {
defaultNamespace,
useDestinationsV2Columns,
bufferMemoryLimit,
optimalBatchSizeBytes);
optimalBatchSizeBytes,
dataTransformer != null ? dataTransformer : new IdentityDataTransformer());
}

}
Expand Down Expand Up @@ -211,7 +225,8 @@ public SerializedAirbyteMessageConsumer createAsync() {
flusher,
catalog,
new BufferManager(getMemoryLimit(bufferMemoryLimit)),
Optional.ofNullable(defaultNamespace));
Optional.ofNullable(defaultNamespace),
dataTransformer);
}

private static long getMemoryLimit(final Optional<Long> bufferMemoryLimit) {
Expand Down
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.24.0'
cdkVersionRequired = '0.25.0'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 2.2.0
dockerImageTag: 2.3.0
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift
Expand Down
Expand Up @@ -15,18 +15,21 @@
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer;
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftDestinationHandler;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftRawTableAirbyteMetaMigration;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSuperLimitationTransformer;
import io.airbyte.integrations.destination.redshift.util.RedshiftUtil;
import java.time.Duration;
import java.util.HashMap;
Expand Down Expand Up @@ -135,4 +138,9 @@ protected List<Migration<RedshiftState>> getMigrations(JdbcDatabase database,
return List.of(new RedshiftRawTableAirbyteMetaMigration(database, databaseName));
}

@Override
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
return new RedshiftSuperLimitationTransformer(parsedCatalog, defaultNamespace);
}

}
Expand Up @@ -25,6 +25,7 @@
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer;
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
Expand Down Expand Up @@ -56,6 +57,7 @@
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftRawTableAirbyteMetaMigration;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSqlGenerator;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftState;
import io.airbyte.integrations.destination.redshift.typing_deduping.RedshiftSuperLimitationTransformer;
import io.airbyte.integrations.destination.redshift.util.RedshiftUtil;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
Expand Down Expand Up @@ -197,6 +199,13 @@ protected List<Migration<RedshiftState>> getMigrations(JdbcDatabase database,
return List.of(new RedshiftRawTableAirbyteMetaMigration(database, databaseName));
}

@Override
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
// Redundant override to keep in consistent with InsertDestination. TODO: Unify these 2 classes with
// composition.
return new RedshiftSuperLimitationTransformer(parsedCatalog, defaultNamespace);
}

@Override
@Deprecated
public AirbyteMessageConsumer getConsumer(final JsonNode config,
Expand Down Expand Up @@ -257,6 +266,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
typerDeduper =
new DefaultTyperDeduper<>(sqlGenerator, redshiftDestinationHandler, parsedCatalog, migrator, v2TableMigrator, redshiftMigrations);
}

return StagingConsumerFactory.builder(
outputRecordCollector,
database,
Expand All @@ -269,7 +279,10 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
typerDeduper,
parsedCatalog,
defaultNamespace,
true).build().createAsync();
true)
.setDataTransformer(getDataTransformer(parsedCatalog, defaultNamespace))
.build()
.createAsync();
}

/**
Expand Down
Expand Up @@ -20,9 +20,9 @@
import com.google.common.collect.Iterables;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperationsUtils;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.commons.json.Jsons;
import java.sql.SQLException;
import java.time.Instant;
Expand Down

0 comments on commit 1a410aa

Please sign in to comment.