Skip to content

Commit

Permalink
🎉 Destination s3 / gcs: add option for uncompressed csv and jsonl for…
Browse files Browse the repository at this point in the history
…mat (#12167)

* Add gzip compression option

* Add file extension method to s3 format config

* Pass gzip compression to serialized buffer

* Add unit test

* Format code

* Update integration test

* Bump version and update doc

* Fix unit test

* Add extra gzip tests for csv and jsonl

* Make compression an oneOf param

* Migrate csv config to new compression spec

* Migrate jsonl config to new compression spec

* Update docs

* Fix unit test

* Fix integration tests

* Format code

* Bump version

* auto-bump connector version

* Bump gcs version in seed

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
tuliren and octavia-squidington-iii committed Apr 22, 2022
1 parent e57ac9e commit 9a0442c
Show file tree
Hide file tree
Showing 37 changed files with 658 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.2.3
dockerImageTag: 0.2.4
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
icon: googlecloudstorage.svg
resourceRequirements:
Expand Down Expand Up @@ -208,7 +208,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.3.2
dockerImageTag: 0.3.3
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
resourceRequirements:
Expand Down
100 changes: 98 additions & 2 deletions airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1288,7 +1288,7 @@
- "overwrite"
- "append"
supportsNamespaces: true
- dockerImage: "airbyte/destination-gcs:0.2.3"
- dockerImage: "airbyte/destination-gcs:0.2.4"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs"
connectionSpecification:
Expand Down Expand Up @@ -1560,6 +1560,31 @@
default: 5
examples:
- 5
compression:
title: "Compression"
type: "object"
description: "Whether the output files should be compressed. If compression\
\ is selected, the output filename will have an extra extension\
\ (GZIP: \".csv.gz\")."
oneOf:
- title: "No Compression"
requires:
- "compression_type"
properties:
compression_type:
type: "string"
enum:
- "No Compression"
default: "No Compression"
- title: "GZIP"
requires:
- "compression_type"
properties:
compression_type:
type: "string"
enum:
- "GZIP"
default: "GZIP"
- title: "JSON Lines: newline-delimited JSON"
required:
- "format_type"
Expand All @@ -1579,6 +1604,29 @@
default: 5
examples:
- 5
compression:
title: "Compression"
type: "object"
description: "Whether the output files should be compressed. If compression\
\ is selected, the output filename will have an extra extension\
\ (GZIP: \".jsonl.gz\")."
oneOf:
- title: "No Compression"
requires: "compression_type"
properties:
compression_type:
type: "string"
enum:
- "No Compression"
default: "No Compression"
- title: "GZIP"
requires: "compression_type"
properties:
compression_type:
type: "string"
enum:
- "GZIP"
default: "GZIP"
- title: "Parquet: Columnar Storage"
required:
- "format_type"
Expand Down Expand Up @@ -3596,7 +3644,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.3.2"
- dockerImage: "airbyte/destination-s3:0.3.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3"
connectionSpecification:
Expand Down Expand Up @@ -3838,6 +3886,31 @@
default: 5
examples:
- 5
compression:
title: "Compression"
type: "object"
description: "Whether the output files should be compressed. If compression\
\ is selected, the output filename will have an extra extension\
\ (GZIP: \".csv.gz\")."
oneOf:
- title: "No Compression"
requires:
- "compression_type"
properties:
compression_type:
type: "string"
enum:
- "No Compression"
default: "No Compression"
- title: "GZIP"
requires:
- "compression_type"
properties:
compression_type:
type: "string"
enum:
- "GZIP"
default: "GZIP"
- title: "JSON Lines: newline-delimited JSON"
required:
- "format_type"
Expand All @@ -3857,6 +3930,29 @@
default: 5
examples:
- 5
compression:
title: "Compression"
type: "object"
description: "Whether the output files should be compressed. If compression\
\ is selected, the output filename will have an extra extension\
\ (GZIP: \".jsonl.gz\")."
oneOf:
- title: "No Compression"
requires: "compression_type"
properties:
compression_type:
type: "string"
enum:
- "No Compression"
default: "No Compression"
- title: "GZIP"
requires: "compression_type"
properties:
compression_type:
type: "string"
enum:
- "GZIP"
default: "GZIP"
- title: "Parquet: Columnar Storage"
required:
- "format_type"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.airbyte.integrations.destination.gcs.util.GcsUtils;
import io.airbyte.integrations.destination.record_buffer.FileBuffer;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.s3.avro.AvroSerializedBuffer;
import io.airbyte.integrations.destination.s3.avro.S3AvroFormatConfig;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
Expand Down Expand Up @@ -284,7 +283,7 @@ public AirbyteMessageConsumer getGcsRecordConsumer(final JsonNode config,
avroFormatConfig,
recordFormatterCreator,
getAvroSchemaCreator(),
() -> new FileBuffer(AvroSerializedBuffer.DEFAULT_SUFFIX));
() -> new FileBuffer(S3AvroFormatConfig.DEFAULT_SUFFIX));

LOGGER.info("Creating BigQuery staging message consumer with staging ID {} at {}", stagingId, syncDatetime);
return new BigQueryStagingConsumerFactory().create(
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.3
LABEL io.airbyte.version=0.2.4
LABEL io.airbyte.name=airbyte/destination-gcs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,35 @@
"type": "integer",
"default": 5,
"examples": [5]
},
"compression": {
"title": "Compression",
"type": "object",
"description": "Whether the output files should be compressed. If compression is selected, the output filename will have an extra extension (GZIP: \".csv.gz\").",
"oneOf": [
{
"title": "No Compression",
"requires": ["compression_type"],
"properties": {
"compression_type": {
"type": "string",
"enum": ["No Compression"],
"default": "No Compression"
}
}
},
{
"title": "GZIP",
"requires": ["compression_type"],
"properties": {
"compression_type": {
"type": "string",
"enum": ["GZIP"],
"default": "GZIP"
}
}
}
]
}
}
},
Expand All @@ -276,6 +305,35 @@
"type": "integer",
"default": 5,
"examples": [5]
},
"compression": {
"title": "Compression",
"type": "object",
"description": "Whether the output files should be compressed. If compression is selected, the output filename will have an extra extension (GZIP: \".jsonl.gz\").",
"oneOf": [
{
"title": "No Compression",
"requires": "compression_type",
"properties": {
"compression_type": {
"type": "string",
"enum": ["No Compression"],
"default": "No Compression"
}
}
},
{
"title": "GZIP",
"requires": "compression_type",
"properties": {
"compression_type": {
"type": "string",
"enum": ["GZIP"],
"default": "GZIP"
}
}
}
]
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.StreamSupport;
import java.util.zip.GZIPInputStream;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.csv.QuoteMode;
Expand All @@ -38,7 +37,8 @@ public GcsCsvDestinationAcceptanceTest() {
protected JsonNode getFormatConfig() {
return Jsons.jsonNode(Map.of(
"format_type", outputFormat,
"flattening", Flattening.ROOT_LEVEL.getValue()));
"flattening", Flattening.ROOT_LEVEL.getValue(),
"compression", Jsons.jsonNode(Map.of("compression_type", "No Compression"))));
}

/**
Expand Down Expand Up @@ -84,9 +84,9 @@ private static JsonNode getJsonNode(final Map<String, String> input, final Map<S
return json;
}

private static void addNoTypeValue(ObjectNode json, String key, String value) {
private static void addNoTypeValue(final ObjectNode json, final String key, final String value) {
if (value != null && (value.matches("^\\[.*\\]$")) || value.matches("^\\{.*\\}$")) {
var newNode = Jsons.deserialize(value);
final var newNode = Jsons.deserialize(value);
json.set(key, newNode);
} else {
json.put(key, value);
Expand All @@ -106,7 +106,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,

for (final S3ObjectSummary objectSummary : objectSummaries) {
try (final S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey());
final Reader in = new InputStreamReader(new GZIPInputStream(object.getObjectContent()), StandardCharsets.UTF_8)) {
final Reader in = getReader(object)) {
final Iterable<CSVRecord> records = CSVFormat.DEFAULT
.withQuoteMode(QuoteMode.NON_NUMERIC)
.withFirstRecordAsHeader()
Expand All @@ -119,4 +119,8 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
return jsonRecords;
}

protected Reader getReader(final S3Object s3Object) throws IOException {
return new InputStreamReader(s3Object.getObjectContent(), StandardCharsets.UTF_8);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.gcs;

import com.amazonaws.services.s3.model.S3Object;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.zip.GZIPInputStream;

public class GcsCsvGzipDestinationAcceptanceTest extends GcsCsvDestinationAcceptanceTest {

@Override
protected JsonNode getFormatConfig() {
// config without compression defaults to GZIP
return Jsons.jsonNode(Map.of(
"format_type", outputFormat,
"flattening", Flattening.ROOT_LEVEL.getValue()));
}

protected Reader getReader(final S3Object s3Object) throws IOException {
return new InputStreamReader(new GZIPInputStream(s3Object.getObjectContent()), StandardCharsets.UTF_8);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPInputStream;

public class GcsJsonlDestinationAcceptanceTest extends GcsDestinationAcceptanceTest {

Expand All @@ -27,7 +26,9 @@ protected GcsJsonlDestinationAcceptanceTest() {

@Override
protected JsonNode getFormatConfig() {
return Jsons.jsonNode(Map.of("format_type", outputFormat));
return Jsons.jsonNode(Map.of(
"format_type", outputFormat,
"compression", Jsons.jsonNode(Map.of("compression_type", "No Compression"))));
}

@Override
Expand All @@ -41,8 +42,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,

for (final S3ObjectSummary objectSummary : objectSummaries) {
final S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey());
try (final BufferedReader reader =
new BufferedReader(new InputStreamReader(new GZIPInputStream(object.getObjectContent()), StandardCharsets.UTF_8))) {
try (final BufferedReader reader = getReader(object)) {
String line;
while ((line = reader.readLine()) != null) {
jsonRecords.add(Jsons.deserialize(line).get(JavaBaseConstants.COLUMN_NAME_DATA));
Expand All @@ -53,4 +53,8 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
return jsonRecords;
}

protected BufferedReader getReader(final S3Object s3Object) throws IOException {
return new BufferedReader(new InputStreamReader(s3Object.getObjectContent(), StandardCharsets.UTF_8));
}

}
Loading

0 comments on commit 9a0442c

Please sign in to comment.