Skip to content

Commit

Permalink
S3 and GCS destinations: Updating processing data types for Avro/Parq…
Browse files Browse the repository at this point in the history
…uet formats (#13483)

* S3 destination: Updating processing data types for Avro/Parquet formats

* S3 destination: handle comparing data types

* S3 destination: clean code

* S3 destination: clean code

* S3 destination: handle case with unexpected json schema type

* S3 destination: clean code

* S3 destination: Extract the same logic for Avro/Parquet formats to separate parent class

* S3 destination: clean code

* S3 destination: clean code

* GCS destination: Update data types processing for Avro/Parquet formats

* GCS destination: clean redundant code

* S3 destination: handle case with numbers inside array

* S3 destination: clean code

* S3 destination: add unit test

* S3 destination: update unit test cases with number types.

* S3 destination: update unit tests.

* S3 destination: bump version for s3 and gcs

* auto-bump connector version

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
sashaNeshcheret and octavia-squidington-iii committed Jun 14, 2022
1 parent ccd053d commit 8e54f4f
Show file tree
Hide file tree
Showing 21 changed files with 689 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
- name: Google Cloud Storage (GCS)
destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a
dockerRepository: airbyte/destination-gcs
dockerImageTag: 0.2.6
dockerImageTag: 0.2.7
documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs
icon: googlecloudstorage.svg
resourceRequirements:
Expand Down Expand Up @@ -244,7 +244,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.3.6
dockerImageTag: 0.3.7
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1486,7 +1486,7 @@
- "overwrite"
- "append"
supportsNamespaces: true
- dockerImage: "airbyte/destination-gcs:0.2.6"
- dockerImage: "airbyte/destination-gcs:0.2.7"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs"
connectionSpecification:
Expand Down Expand Up @@ -3895,7 +3895,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.3.6"
- dockerImage: "airbyte/destination-s3:0.3.7"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.standardtest.destination;

import java.util.stream.Stream;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;

public class NumberDataTypeTestArgumentProvider implements ArgumentsProvider {

public static final String NUMBER_DATA_TYPE_TEST_CATALOG = "number_data_type_test_catalog.json";
public static final String NUMBER_DATA_TYPE_TEST_MESSAGES = "number_data_type_test_messages.txt";
public static final String NUMBER_DATA_TYPE_ARRAY_TEST_CATALOG = "number_data_type_array_test_catalog.json";
public static final String NUMBER_DATA_TYPE_ARRAY_TEST_MESSAGES = "number_data_type_array_test_messages.txt";

@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
Arguments.of(NUMBER_DATA_TYPE_TEST_CATALOG, NUMBER_DATA_TYPE_TEST_MESSAGES),
Arguments.of(NUMBER_DATA_TYPE_ARRAY_TEST_CATALOG, NUMBER_DATA_TYPE_ARRAY_TEST_MESSAGES));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"streams": [
{
"name": "array_test_1",
"json_schema": {
"properties": {
"array_number": {
"type": ["array"],
"items": {
"type": "number"
}
},
"array_float": {
"type": ["array"],
"items": {
"type": "number",
"airbyte_type": "float"
}
},
"array_integer": {
"type": ["array"],
"items": {
"type": "number",
"airbyte_type": "integer"
}
},
"array_big_integer": {
"type": ["array"],
"items": {
"type": "number",
"airbyte_type": "big_integer"
}
}
}
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"type": "RECORD", "record": {"stream": "array_test_1", "emitted_at": 1602637589100, "data": { "array_number" : [-12345.678, 100000000000000000.1234],"array_float" : [-12345.678, 0, 1000000000000000000000000000000000000000000000000000.1234], "array_integer" : [42, 0, 12345], "array_big_integer" : [0, 1141241234124123141241234124] }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"streams": [
{
"name": "int_test",
"json_schema": {
"properties": {
"data": {
"type": "number",
"airbyte_type": "integer"
}
}
}
},
{
"name": "big_integer_test",
"json_schema": {
"properties": {
"data": {
"type": "number",
"airbyte_type": "big_integer"
}
}
}
},
{
"name": "float_test",
"json_schema": {
"properties": {
"data": {
"type": "number",
"airbyte_type": "float"
}
}
}
},
{
"name": "default_number_test",
"json_schema": {
"properties": {
"data": {
"type": "number"
}
}
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{"type": "RECORD", "record": {"stream": "int_test", "emitted_at": 1602637589100, "data": { "data" : 42 }}}
{"type": "RECORD", "record": {"stream": "int_test", "emitted_at": 1602637589200, "data": { "data" : 0 }}}
{"type": "RECORD", "record": {"stream": "int_test", "emitted_at": 1602637589300, "data": { "data" : -12345 }}}
{"type": "RECORD", "record": {"stream": "big_integer_test", "emitted_at": 1602637589100, "data": { "data" : 1231123412412314 }}}
{"type": "RECORD", "record": {"stream": "big_integer_test", "emitted_at": 1602637589200, "data": { "data" : 0 }}}
{"type": "RECORD", "record": {"stream": "big_integer_test", "emitted_at": 1602637589300, "data": { "data" : -1234 }}}
{"type": "RECORD", "record": {"stream": "float_test", "emitted_at": 1602637589100, "data": { "data" : 56.78 }}}
{"type": "RECORD", "record": {"stream": "float_test", "emitted_at": 1602637589200, "data": { "data" : 0 }}}
{"type": "RECORD", "record": {"stream": "float_test", "emitted_at": 1602637589300, "data": { "data" : -12345.678 }}}
{"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589100, "data": { "data" : 10000000000000000000000.1234 }}}
{"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589200, "data": { "data" : 0 }}}
{"type": "RECORD", "record": {"stream": "default_number_test", "emitted_at": 1602637589300, "data": { "data" : -12345.678 }}}
{"type": "STATE", "state": { "data": {"start_date": "2022-02-14"}}}
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-gcs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.2.6
LABEL io.airbyte.version=0.2.7
LABEL io.airbyte.name=airbyte/destination-gcs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;

public class GcsAvroDestinationAcceptanceTest extends GcsDestinationAcceptanceTest {
public class GcsAvroDestinationAcceptanceTest extends GcsAvroParquetDestinationAcceptanceTest {

protected GcsAvroDestinationAcceptanceTest() {
super(S3Format.AVRO);
Expand Down Expand Up @@ -71,4 +75,25 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
return jsonRecords;
}

@Override
protected Map<String, Set<Type>> retrieveDataTypesFromPersistedFiles(final String streamName, final String namespace) throws Exception {

final List<S3ObjectSummary> objectSummaries = getAllSyncedObjects(streamName, namespace);
Map<String, Set<Type>> resultDataTypes = new HashMap<>();

for (final S3ObjectSummary objectSummary : objectSummaries) {
final S3Object object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey());
try (final DataFileReader<Record> dataFileReader = new DataFileReader<>(
new SeekableByteArrayInput(object.getObjectContent().readAllBytes()),
new GenericDatumReader<>())) {
while (dataFileReader.hasNext()) {
final GenericData.Record record = dataFileReader.next();
Map<String, Set<Type>> actualDataTypes = getTypes(record);
resultDataTypes.putAll(actualDataTypes);
}
}
}
return resultDataTypes;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.gcs;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.destination.s3.S3Format;
import io.airbyte.integrations.destination.s3.avro.JsonSchemaType;
import io.airbyte.integrations.standardtest.destination.NumberDataTypeTestArgumentProvider;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData.Record;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

public abstract class GcsAvroParquetDestinationAcceptanceTest extends GcsDestinationAcceptanceTest {

protected GcsAvroParquetDestinationAcceptanceTest(S3Format s3Format) {
super(s3Format);
}

@ParameterizedTest
@ArgumentsSource(NumberDataTypeTestArgumentProvider.class)
public void testNumberDataType(String catalogFileName, String messagesFileName) throws Exception {
final AirbyteCatalog catalog = readCatalogFromFile(catalogFileName);
final List<AirbyteMessage> messages = readMessagesFromFile(messagesFileName);

final JsonNode config = getConfig();
final String defaultSchema = getDefaultSchema(config);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false);

for (final AirbyteStream stream : catalog.getStreams()) {
final String streamName = stream.getName();
final String schema = stream.getNamespace() != null ? stream.getNamespace() : defaultSchema;

Map<String, Set<Type>> actualSchemaTypes = retrieveDataTypesFromPersistedFiles(streamName, schema);
Map<String, Set<Type>> expectedSchemaTypes = retrieveExpectedDataTypes(stream);

assertEquals(expectedSchemaTypes, actualSchemaTypes);
}
}

private Map<String, Set<Type>> retrieveExpectedDataTypes(AirbyteStream stream) {
Iterable<String> iterableNames = () -> stream.getJsonSchema().get("properties").fieldNames();
Map<String, JsonNode> nameToNode = StreamSupport.stream(iterableNames.spliterator(), false)
.collect(Collectors.toMap(
Function.identity(),
name -> getJsonNode(stream, name)));

return nameToNode
.entrySet()
.stream()
.collect(Collectors.toMap(
Entry::getKey,
entry -> getExpectedSchemaType(entry.getValue())));
}

private JsonNode getJsonNode(AirbyteStream stream, String name) {
JsonNode properties = stream.getJsonSchema().get("properties");
if (properties.size() == 1) {
return properties.get("data");
}
return properties.get(name).get("items");
}

private Set<Type> getExpectedSchemaType(JsonNode fieldDefinition) {
final JsonNode typeProperty = fieldDefinition.get("type");
final JsonNode airbyteTypeProperty = fieldDefinition.get("airbyte_type");
final String airbyteTypePropertyText = airbyteTypeProperty == null ? null : airbyteTypeProperty.asText();
return Arrays.stream(JsonSchemaType.values())
.filter(
value -> value.getJsonSchemaType().equals(typeProperty.asText()) && compareAirbyteTypes(airbyteTypePropertyText, value))
.map(JsonSchemaType::getAvroType)
.collect(Collectors.toSet());
}

private boolean compareAirbyteTypes(String airbyteTypePropertyText, JsonSchemaType value) {
if (airbyteTypePropertyText == null) {
return value.getJsonSchemaAirbyteType() == null;
}
return airbyteTypePropertyText.equals(value.getJsonSchemaAirbyteType());
}

private AirbyteCatalog readCatalogFromFile(final String catalogFilename) throws IOException {
return Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
}

private List<AirbyteMessage> readMessagesFromFile(final String messagesFilename) throws IOException {
return MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
}

protected abstract Map<String, Set<Type>> retrieveDataTypesFromPersistedFiles(final String streamName, final String namespace) throws Exception;

protected Map<String, Set<Type>> getTypes(Record record) {

List<Field> fieldList = record
.getSchema()
.getFields()
.stream()
.filter(field -> !field.name().startsWith("_airbyte"))
.toList();

if (fieldList.size() == 1) {
return fieldList
.stream()
.collect(
Collectors.toMap(
Field::name,
field -> field.schema().getTypes().stream().map(Schema::getType).filter(type -> !type.equals(Type.NULL))
.collect(Collectors.toSet())));
} else {
return fieldList
.stream()
.collect(
Collectors.toMap(
Field::name,
field -> field.schema().getTypes()
.stream().filter(type -> !type.getType().equals(Type.NULL))
.flatMap(type -> type.getElementType().getTypes().stream()).map(Schema::getType).filter(type -> !type.equals(Type.NULL))
.collect(Collectors.toSet())));
}
}

}
Loading

0 comments on commit 8e54f4f

Please sign in to comment.