Skip to content

Commit

Permalink
🎉 Introduce Redshift Destination COPY Strategy (#2606)
Browse files Browse the repository at this point in the history
Plumb everything together to allow a user to, via configuration, pick either the less efficient and easier to set up INSERT strategy, or the more efficient and slightly more set up COPY strategy.

Effectiveness of COPY is obvious with larger and larger number of rows for a rough 3 - 4 x improvement. Logs show we are now effectively read constrained.

Modify the JSON spec to take additional S3 configuration. The RedshiftDestination is split into the InsertDestination and the CopyDestination. The RedshiftDestination picks the CopyDestination if this S3 configuration exists, falling back to the InsertDestination otherwise.

The original RedshiftDestination is now the RedshiftInsertDestination class - all that previous functionality is identical.
  • Loading branch information
davinchia committed Mar 29, 2021
1 parent 4a30abb commit 947e24b
Show file tree
Hide file tree
Showing 16 changed files with 575 additions and 262 deletions.
3 changes: 3 additions & 0 deletions airbyte-commons/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
<Logger name="io.netty" level="INFO" />
<Logger name="io.grpc" level="INFO" />
<Logger name="io.temporal" level="INFO" />
<Logger name="org.apache" level="WARN" />
<Logger name="httpclient" level="WARN" />
<Logger name="com.amazonaws" level="WARN" />

</Loggers>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
"name": "Redshift",
"dockerRepository": "airbyte/destination-redshift",
"dockerImageTag": "0.2.3",
"dockerImageTag": "0.2.4",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.2.3
dockerImageTag: 0.2.4
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
name: MeiliSearch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,10 @@ public ConnectorSpecification spec() throws IOException {

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try (final JdbcDatabase database = getDatabase(config)) {
// attempt to get metadata from the database as a cheap way of seeing if we can connect.
database.bufferedResultSetQuery(conn -> conn.getMetaData().getCatalogs(), JdbcUtils::rowToJson);

// verify we have write permissions on the target schema by creating a table with a random name,
// then dropping that table
try (final JdbcDatabase database = getDatabase(config)) {
String outputSchema = namingResolver.getIdentifier(config.get("schema").asText());
String outputTableName = "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
sqlOperations.createSchemaIfNotExists(database, outputSchema);
sqlOperations.createTableIfNotExists(database, outputSchema, outputTableName);
sqlOperations.dropTableIfExists(database, outputSchema, outputTableName);

attemptSQLCreateAndDropTableOperations(outputSchema, database, namingResolver, sqlOperations);
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (Exception e) {
LOGGER.debug("Exception while checking connection: ", e);
Expand All @@ -89,6 +81,22 @@ public AirbyteConnectionStatus check(JsonNode config) {
}
}

public static void attemptSQLCreateAndDropTableOperations(String outputSchema,
JdbcDatabase database,
NamingConventionTransformer namingResolver,
SqlOperations sqlOps)
throws Exception {
// attempt to get metadata from the database as a cheap way of seeing if we can connect.
database.bufferedResultSetQuery(conn -> conn.getMetaData().getCatalogs(), JdbcUtils::rowToJson);

// verify we have write permissions on the target schema by creating a table with a random name,
// then dropping that table
String outputTableName = "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
sqlOps.createSchemaIfNotExists(database, outputSchema);
sqlOps.createTableIfNotExists(database, outputSchema, outputTableName);
sqlOps.dropTableIfExists(database, outputSchema, outputTableName);
}

protected JdbcDatabase getDatabase(JsonNode config) {
final JsonNode jdbcConfig = toJdbcConfig(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.version=0.2.4

LABEL io.airbyte.name=airbyte/destination-redshift
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ dependencies {

testImplementation project(':airbyte-test-utils')

testImplementation 'io.findify:s3mock_2.13:0.2.6'
testImplementation 'org.apache.commons:commons-text:1.9'
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.apache.commons:commons-dbcp2:2.7.0'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-redshift')
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import alex.mojaki.s3upload.MultiPartOutputStream;
import alex.mojaki.s3upload.StreamTransferManager;
import com.amazonaws.services.s3.AmazonS3;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.NamingConventionTransformer;
Expand All @@ -46,6 +45,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is meant to represent all the required operations to replicate an
* {@link io.airbyte.protocol.models.AirbyteStream} into Redshift using the Copy strategy. The data
* is streamed into a staging S3 bucket in multiple parts. This file is then loaded into a Redshift
* temporary table via a Copy statement, before being moved into the final destination table. The
* staging files and temporary tables are best-effort cleaned up. A single S3 file is currently
* sufficiently performant.
*/
public class RedshiftCopier {

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftCopier.class);
Expand All @@ -60,7 +67,7 @@ public class RedshiftCopier {
private static final int PART_SIZE_MB = 10;

private final String s3BucketName;
private final String runFolder;
private final String stagingFolder;
private final SyncMode syncMode;
private final String schemaName;
private final String streamName;
Expand All @@ -75,25 +82,9 @@ public class RedshiftCopier {
private final CSVPrinter csvPrinter;
private final String tmpTableName;

public RedshiftCopier(
String runFolder,
SyncMode syncMode,
String schema,
String streamName,
AmazonS3 client,
JdbcDatabase redshiftDb,
String s3KeyId,
String s3key,
String s3Region)
throws IOException {
this(RedshiftCopyDestination.DEFAULT_AIRBYTE_STAGING_S3_BUCKET, runFolder, syncMode, schema, streamName, client, redshiftDb, s3KeyId, s3key,
s3Region);
}

@VisibleForTesting
public RedshiftCopier(
String s3BucketName,
String runFolder,
String stagingFolder,
SyncMode syncMode,
String schema,
String streamName,
Expand All @@ -104,7 +95,7 @@ public RedshiftCopier(
String s3Region)
throws IOException {
this.s3BucketName = s3BucketName;
this.runFolder = runFolder;
this.stagingFolder = stagingFolder;
this.syncMode = syncMode;
this.schemaName = schema;
this.streamName = streamName;
Expand All @@ -123,7 +114,7 @@ public RedshiftCopier(
// configured part size.
// Memory consumption is queue capacity * part size = 10 * 10 = 100 MB at current configurations.
this.multipartUploadManager =
new StreamTransferManager(s3BucketName, getPath(runFolder, streamName), client)
new StreamTransferManager(s3BucketName, getPath(stagingFolder, streamName), client)
.numUploadThreads(DEFAULT_UPLOAD_THREADS)
.queueCapacity(DEFAULT_QUEUE_CAPACITY)
.partSize(PART_SIZE_MB);
Expand Down Expand Up @@ -158,7 +149,7 @@ public void uploadToS3(AirbyteRecordMessage message) throws IOException {
}

public void removeS3FileAndDropTmpTable() throws Exception {
var s3StagingFile = getPath(runFolder, streamName);
var s3StagingFile = getPath(stagingFolder, streamName);
LOGGER.info("Begin cleaning s3 staging file {}.", s3StagingFile);
if (s3Client.doesObjectExist(s3BucketName, s3StagingFile)) {
s3Client.deleteObject(s3BucketName, s3StagingFile);
Expand Down Expand Up @@ -200,7 +191,8 @@ private void createTmpTableAndCopyS3FileInto() throws SQLException {
LOGGER.info("Preparing tmp table in destination for stream {}. tmp table name: {}.", streamName, tmpTableName);
REDSHIFT_SQL_OPS.createTableIfNotExists(redshiftDb, schemaName, tmpTableName);
LOGGER.info("Starting copy to tmp table {} in destination for stream {} .", tmpTableName, streamName);
REDSHIFT_SQL_OPS.copyS3CsvFileIntoTable(redshiftDb, getFullS3Path(s3BucketName, runFolder, streamName), schemaName, tmpTableName, s3KeyId, s3Key,
REDSHIFT_SQL_OPS.copyS3CsvFileIntoTable(redshiftDb, getFullS3Path(s3BucketName, stagingFolder, streamName), schemaName, tmpTableName, s3KeyId,
s3Key,
s3Region);
LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName);
}
Expand Down
Loading

0 comments on commit 947e24b

Please sign in to comment.