Skip to content

Commit

Permalink
format files (#8756)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosmarxm committed Dec 13, 2021
1 parent 346b411 commit e45dc12
Show file tree
Hide file tree
Showing 30 changed files with 1,341 additions and 859 deletions.
Expand Up @@ -30,8 +30,8 @@ public static void main(final String[] args) throws Exception {

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config);
return CopyConsumerFactory.create(
outputRecordCollector,
Expand Down
Expand Up @@ -66,15 +66,15 @@ public abstract class S3StreamCopier implements StreamCopier {
private final StagingFilenameGenerator filenameGenerator;

public S3StreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final String s3FileName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final String s3FileName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
this.destSyncMode = destSyncMode;
this.schemaName = schema;
this.streamName = streamName;
Expand Down Expand Up @@ -224,10 +224,10 @@ private void closeAndWaitForUpload() throws IOException {
}

public abstract void copyS3CsvFileIntoTable(JdbcDatabase database,
String s3FileLocation,
String schema,
String tableName,
S3DestinationConfig s3Config)
String s3FileLocation,
String schema,
String tableName,
S3DestinationConfig s3Config)
throws SQLException;

}
Expand Up @@ -22,12 +22,12 @@ public abstract class S3StreamCopierFactory implements StreamCopierFactory<S3Des
*/
@Override
public StreamCopier create(final String configuredSchema,
final S3DestinationConfig s3Config,
final String stagingFolder,
final ConfiguredAirbyteStream configuredStream,
final ExtendedNameTransformer nameTransformer,
final JdbcDatabase db,
final SqlOperations sqlOperations) {
final S3DestinationConfig s3Config,
final String stagingFolder,
final ConfiguredAirbyteStream configuredStream,
final ExtendedNameTransformer nameTransformer,
final JdbcDatabase db,
final SqlOperations sqlOperations) {
try {
final AirbyteStream stream = configuredStream.getStream();
final DestinationSyncMode syncMode = configuredStream.getDestinationSyncMode();
Expand All @@ -44,14 +44,14 @@ public StreamCopier create(final String configuredSchema,
* For specific copier suppliers to implement.
*/
public abstract StreamCopier create(String stagingFolder,
DestinationSyncMode syncMode,
String schema,
String streamName,
AmazonS3 s3Client,
JdbcDatabase db,
S3DestinationConfig s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations)
DestinationSyncMode syncMode,
String schema,
String streamName,
AmazonS3 s3Client,
JdbcDatabase db,
S3DestinationConfig s3Config,
ExtendedNameTransformer nameTransformer,
SqlOperations sqlOperations)
throws Exception;

}
Expand Up @@ -20,19 +20,22 @@
import java.util.function.Consumer;

/**
* A more efficient Redshift Destination than the sql-based {@link RedshiftDestination}. Instead of inserting data as batched SQL INSERTs, we follow
* Redshift best practices and, 1) Stream the data to S3, creating multiple compressed files per stream. 2) Create a manifest file to load the data
* files in parallel. See: https://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-use-copy.html for more info.
* A more efficient Redshift Destination than the sql-based {@link RedshiftDestination}. Instead of
* inserting data as batched SQL INSERTs, we follow Redshift best practices and, 1) Stream the data
* to S3, creating multiple compressed files per stream. 2) Create a manifest file to load the data
* files in parallel. See:
* https://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-use-copy.html for more info.
* <p>
* Creating multiple files per stream currently has the naive approach of one file per batch on a stream up to the max limit of (26 * 26 * 26) 17576
* files. Each batch is randomly prefixed by 3 Alpha characters and on a collision the batch is appended to the existing file.
* Creating multiple files per stream currently has the naive approach of one file per batch on a
* stream up to the max limit of (26 * 26 * 26) 17576 files. Each batch is randomly prefixed by 3
* Alpha characters and on a collision the batch is appended to the existing file.
*/
public class RedshiftCopyS3Destination extends CopyDestination {

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
return CopyConsumerFactory.create(
outputRecordCollector,
Expand Down
Expand Up @@ -31,14 +31,14 @@ public class RedshiftStreamCopier extends S3StreamCopier {
private String manifestFilePath = null;

public RedshiftStreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", FILE_PREFIX_LENGTH) + "_" + streamName,
client, db, s3Config, nameTransformer, sqlOperations);
objectMapper = new ObjectMapper();
Expand All @@ -56,11 +56,11 @@ public void copyStagingFileToTemporaryTable() {

@Override
public void copyS3CsvFileIntoTable(
final JdbcDatabase database,
final String s3FileLocation,
final String schema,
final String tableName,
final S3DestinationConfig s3Config) {
final JdbcDatabase database,
final String s3FileLocation,
final String schema,
final String tableName,
final S3DestinationConfig s3Config) {
throw new RuntimeException("Redshift Stream Copier should not copy individual files without use of a manifest");
}

Expand Down
Expand Up @@ -17,14 +17,14 @@ public class RedshiftStreamCopierFactory extends S3StreamCopierFactory {

@Override
public StreamCopier create(final String stagingFolder,
final DestinationSyncMode syncMode,
final String schema,
final String streamName,
final AmazonS3 s3Client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations)
final DestinationSyncMode syncMode,
final String schema,
final String streamName,
final AmazonS3 s3Client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations)
throws Exception {
return new RedshiftStreamCopier(stagingFolder, syncMode, schema, streamName, s3Client, db, s3Config, nameTransformer, sqlOperations);
}
Expand Down
Expand Up @@ -46,14 +46,15 @@ public AirbyteConnectionStatus check(final JsonNode config) {

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog configuredCatalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final ConfiguredAirbyteCatalog configuredCatalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final S3WriterFactory formatterFactory = new ProductionWriterFactory();
return new S3Consumer(S3DestinationConfig.getS3DestinationConfig(config), configuredCatalog, formatterFactory, outputRecordCollector);
}

/**
* Note that this method completely ignores s3Config.getBucketPath(), in favor of the bucketPath parameter.
* Note that this method completely ignores s3Config.getBucketPath(), in favor of the bucketPath
* parameter.
*/
public static void attemptS3WriteAndDelete(final S3DestinationConfig s3Config, final String bucketPath) {
attemptS3WriteAndDelete(s3Config, bucketPath, s3Config.getS3Client());
Expand All @@ -72,4 +73,5 @@ private static void attemptWriteAndDeleteS3Object(final S3DestinationConfig s3Co
s3.putObject(s3Bucket, outputTableName, "check-content");
s3.deleteObject(s3Bucket, outputTableName);
}

}
Expand Up @@ -14,8 +14,9 @@
import com.fasterxml.jackson.databind.JsonNode;

/**
* An S3 configuration. Typical usage sets at most one of {@code bucketPath} (necessary for more delicate data syncing to S3) and {@code partSize}
* (used by certain bulk-load database operations).
* An S3 configuration. Typical usage sets at most one of {@code bucketPath} (necessary for more
* delicate data syncing to S3) and {@code partSize} (used by certain bulk-load database
* operations).
*/
public class S3DestinationConfig {

Expand All @@ -34,28 +35,29 @@ public class S3DestinationConfig {
private final S3FormatConfig formatConfig;

/**
* The part size should not matter in any use case that depends on this constructor. So the default 10 MB is used.
* The part size should not matter in any use case that depends on this constructor. So the default
* 10 MB is used.
*/
public S3DestinationConfig(
final String endpoint,
final String bucketName,
final String bucketPath,
final String bucketRegion,
final String accessKeyId,
final String secretAccessKey,
final S3FormatConfig formatConfig) {
final String endpoint,
final String bucketName,
final String bucketPath,
final String bucketRegion,
final String accessKeyId,
final String secretAccessKey,
final S3FormatConfig formatConfig) {
this(endpoint, bucketName, bucketPath, bucketRegion, accessKeyId, secretAccessKey, DEFAULT_PART_SIZE_MB, formatConfig);
}

public S3DestinationConfig(
final String endpoint,
final String bucketName,
final String bucketPath,
final String bucketRegion,
final String accessKeyId,
final String secretAccessKey,
final Integer partSize,
final S3FormatConfig formatConfig) {
final String endpoint,
final String bucketName,
final String bucketPath,
final String bucketRegion,
final String accessKeyId,
final String secretAccessKey,
final Integer partSize,
final S3FormatConfig formatConfig) {
this.endpoint = endpoint;
this.bucketName = bucketName;
this.bucketPath = bucketPath;
Expand All @@ -75,7 +77,8 @@ public static S3DestinationConfig getS3DestinationConfig(final JsonNode config)
if (config.get("s3_bucket_path") != null) {
bucketPath = config.get("s3_bucket_path").asText();
}
// In the "normal" S3 destination, this is never null. However, the Redshift and Snowflake copy destinations don't set a Format config.
// In the "normal" S3 destination, this is never null. However, the Redshift and Snowflake copy
// destinations don't set a Format config.
S3FormatConfig format = null;
if (config.get("format") != null) {
format = S3FormatConfigs.getS3FormatConfig(config);
Expand All @@ -88,8 +91,7 @@ public static S3DestinationConfig getS3DestinationConfig(final JsonNode config)
config.get("access_key_id").asText(),
config.get("secret_access_key").asText(),
partSize,
format
);
format);
}

public String getEndpoint() {
Expand Down Expand Up @@ -145,4 +147,5 @@ public AmazonS3 getS3Client() {
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
}

}
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3;

import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -28,8 +32,7 @@ public void setup() {
"fake-region",
"fake-accessKeyId",
"fake-secretAccessKey",
null
);
null);
}

@Test
Expand All @@ -49,4 +52,5 @@ public void createsThenDeletesTestFile() {

verifyNoMoreInteractions(s3);
}

}
Expand Up @@ -749,11 +749,7 @@
},
{
"name": "date_field",
"type": [
"null",
{ "type": "int", "logicalType": "date" },
"string"
],
"type": ["null", { "type": "int", "logicalType": "date" }, "string"],
"default": null
},
{
Expand Down
Expand Up @@ -21,8 +21,8 @@ public class SnowflakeCopyS3Destination extends CopyDestination {

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
return CopyConsumerFactory.create(
outputRecordCollector,
getDatabase(config),
Expand Down
Expand Up @@ -22,25 +22,25 @@ public class SnowflakeS3StreamCopier extends S3StreamCopier {
private static final int FILE_PREFIX_LENGTH = 5;

public SnowflakeS3StreamCopier(final String stagingFolder,
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
final DestinationSyncMode destSyncMode,
final String schema,
final String streamName,
final AmazonS3 client,
final JdbcDatabase db,
final S3DestinationConfig s3Config,
final ExtendedNameTransformer nameTransformer,
final SqlOperations sqlOperations) {
super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", FILE_PREFIX_LENGTH) + "_" + streamName,
client, db, s3Config, nameTransformer, sqlOperations);
}

@Override
public void copyS3CsvFileIntoTable(
final JdbcDatabase database,
final String s3FileLocation,
final String schema,
final String tableName,
final S3DestinationConfig s3Config)
final JdbcDatabase database,
final String s3FileLocation,
final String schema,
final String tableName,
final S3DestinationConfig s3Config)
throws SQLException {
final var copyQuery = String.format(
"COPY INTO %s.%s FROM '%s' "
Expand Down

0 comments on commit e45dc12

Please sign in to comment.