Skip to content

Commit

Permalink
馃帀 New destination: S3 (#3672)
Browse files Browse the repository at this point in the history
* Update README icon links

* Update airbyte-specification doc

* Extend base connector

* Remove redundant region

* Separate warning from info

* Implement s3 destination

* Run format

* Clarify logging message

* Rename variables and functions

* Update documentation

* Rename and annotate interface

* Inject formatter factory

* Remove part size

* Fix spec field names and add unit tests

* Add unit tests for csv output formatter

* Format code

* Complete acceptance test and fix bugs

* Fix uuid

* Remove generator template files

They belong to another PR.

* Add unhappy test case

* Checkin airbyte state message

* Adjust stream transfer manager parameters

* Use underscore in filename

* Create csv sheet generator to handle data processing

* Format code

* Add partition id to filename

* Rename date format variable
  • Loading branch information
tuliren committed Jun 3, 2021
1 parent baf5d4d commit c13b988
Show file tree
Hide file tree
Showing 57 changed files with 2,056 additions and 56 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ jobs:
ZENDESK_TALK_TEST_CREDS: ${{ secrets.ZENDESK_TALK_TEST_CREDS }}
ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }}
PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }}
DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }}
- run: |
docker login -u airbytebot -p ${DOCKER_PASSWORD}
./tools/integrations/manage.sh publish airbyte-integrations/${{ github.event.inputs.connector }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ jobs:
ZENDESK_TALK_TEST_CREDS: ${{ secrets.ZENDESK_TALK_TEST_CREDS }}
ZOOM_INTEGRATION_TEST_CREDS: ${{ secrets.ZOOM_INTEGRATION_TEST_CREDS }}
PLAID_INTEGRATION_TEST_CREDS: ${{ secrets.PLAID_INTEGRATION_TEST_CREDS }}
DESTINATION_S3_INTEGRATION_TEST_CREDS: ${{ secrets.DESTINATION_S3_INTEGRATION_TEST_CREDS }}
- run: |
./tools/bin/ci_integration_test.sh ${{ github.event.inputs.connector }}
name: test ${{ github.event.inputs.connector }}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Introduction

![GitHub Workflow Status](https://img.shields.io/github/workflow/status/airbytehq/airbyte/Airbyte%20CI) ![License](https://img.shields.io/github/license/airbytehq/airbyte)
[![GitHub Workflow Status](https://img.shields.io/github/workflow/status/airbytehq/airbyte/Airbyte%20CI)](https://github.com/airbytehq/airbyte/actions/workflows/gradle.yml) [![License](https://img.shields.io/github/license/airbytehq/airbyte)](./LICENSE)

![](docs/.gitbook/assets/airbyte_horizontal_color_white-background.svg)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362",
"name": "S3",
"dockerRepository": "airbyte/destination-s3",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3"
}
7 changes: 7 additions & 0 deletions airbyte-config/init/src/main/resources/icons/s3.svg
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.3.7
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
name: S3
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ public void accept(AirbyteMessage msg) throws Exception {

@Override
public void close() throws Exception {
LOGGER.info("hasFailed: {}.", hasFailed);
if (hasFailed) {
LOGGER.warn("Airbyte message consumer: failed.");
} else {
LOGGER.info("Airbyte message consumer: succeeded.");
}
close(hasFailed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,17 @@ public abstract class DestinationAcceptanceTest {
* @param streamName - name of the stream for which we are retrieving records.
* @param namespace - the destination namespace records are located in. Null if not applicable.
* Usually a JDBC schema.
* @param streamSchema - schema of the stream to be retrieved. This is only necessary for
* destinations in which data types cannot be accurately inferred (e.g. in CSV destination,
* every value is a string).
* @return All of the records in the destination at the time this method is invoked.
* @throws Exception - can throw any exception, test framework will handle.
*/
protected abstract List<JsonNode> retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception;
protected abstract List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
String streamName,
String namespace,
JsonNode streamSchema)
throws Exception;

/**
* Returns a destination's default schema. The default implementation assumes this corresponds to
Expand Down Expand Up @@ -221,10 +228,10 @@ protected boolean implementsRecordSizeLimitChecks() {
}

/**
* Same idea as {@link #retrieveRecords(TestDestinationEnv, String, String)}. Except this method
* should pull records from the table that contains the normalized records and convert them back
* into the data as it would appear in an {@link AirbyteRecordMessage}. Only need to override this
* method if {@link #implementsBasicNormalization} returns true.
* Same idea as {@link #retrieveRecords(TestDestinationEnv, String, String, JsonNode)}. Except this
* method should pull records from the table that contains the normalized records and convert them
* back into the data as it would appear in an {@link AirbyteRecordMessage}. Only need to override
* this method if {@link #implementsBasicNormalization} returns true.
*
* @param testEnv - information about the test environment.
* @param streamName - name of the stream for which we are retrieving records.
Expand Down Expand Up @@ -882,7 +889,7 @@ private void retrieveRawRecordsAndAssertSameMessages(AirbyteCatalog catalog, Lis
for (final AirbyteStream stream : catalog.getStreams()) {
final String streamName = stream.getName();
final String schema = stream.getNamespace() != null ? stream.getNamespace() : defaultSchema;
List<AirbyteRecordMessage> msgList = retrieveRecords(testEnv, streamName, schema)
List<AirbyteRecordMessage> msgList = retrieveRecords(testEnv, streamName, schema, stream.getJsonSchema())
.stream()
.map(data -> new AirbyteRecordMessage().withStream(streamName).withNamespace(schema).withData(data))
.collect(Collectors.toList());
Expand Down Expand Up @@ -922,7 +929,7 @@ private void assertSameData(List<JsonNode> expected, List<JsonNode> actual) {
final Iterator<Entry<String, JsonNode>> expectedDataIterator = expectedData.fields();
LOGGER.info("Expected row {}", expectedData);
LOGGER.info("Actual row {}", actualData);
assertEquals(expectedData.size(), actualData.size());
assertEquals(expectedData.size(), actualData.size(), "Unequal row size");
while (expectedDataIterator.hasNext()) {
final Entry<String, JsonNode> expectedEntry = expectedDataIterator.next();
final JsonNode expectedValue = expectedEntry.getValue();
Expand Down Expand Up @@ -1025,6 +1032,13 @@ public Path getLocalRoot() {
return localRoot;
}

@Override
public String toString() {
return "TestDestinationEnv{" +
"localRoot=" + localRoot +
'}';
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.base.Charsets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Destination;
Expand All @@ -56,7 +56,6 @@
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand All @@ -70,7 +69,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryDestination implements Destination {
public class BigQueryDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestination.class);
static final String CONFIG_DATASET_ID = "dataset_id";
Expand All @@ -88,13 +87,6 @@ public BigQueryDestination() {
namingResolver = new StandardNameTransformer();
}

@Override
public ConnectorSpecification spec() throws IOException {
// return a jsonschema representation of the spec for the integration.
final String resourceString = MoreResources.readResource("spec.json");
return Jsons.deserialize(resourceString, ConnectorSpecification.class);
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv testEnv, S
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception {
protected List<JsonNode> retrieveRecords(TestDestinationEnv env,
String streamName,
String namespace,
JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace))
.stream()
.map(node -> node.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.CommitOnStateAirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
Expand All @@ -41,7 +41,6 @@
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.io.FileWriter;
import java.io.IOException;
Expand All @@ -59,7 +58,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CsvDestination implements Destination {
public class CsvDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(CsvDestination.class);

Expand All @@ -71,12 +70,6 @@ public CsvDestination() {
namingResolver = new StandardNameTransformer();
}

@Override
public ConnectorSpecification spec() throws IOException {
final String resourceString = MoreResources.readResource("spec.json");
return Jsons.deserialize(resourceString, ConnectorSpecification.class);
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ protected JsonNode getFailCheckConfig() {
public void testCheckConnectionInvalidCredentials() {}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace) throws Exception {
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,
String streamName,
String namespace,
JsonNode streamSchema)
throws Exception {
final List<Path> allOutputs = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList());
final Optional<Path> streamOutput =
allOutputs.stream().filter(path -> path.getFileName().toString().contains(new StandardNameTransformer().getRawTableName(streamName)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private void closeAndWaitForUpload() throws IOException {
LOGGER.info("All data for {} stream uploaded.", streamName);
}

public static void attemptWriteToPersistence(S3Config s3Config) {
public static void attemptS3WriteAndDelete(S3Config s3Config) {
final String outputTableName = "_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "");
attemptWriteAndDeleteS3Object(s3Config, outputTableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ protected JsonNode getFailCheckConfig() {
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName, String namespace) throws Exception {
protected List<JsonNode> retrieveRecords(TestDestinationEnv env,
String streamName,
String namespace,
JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namingResolver.getIdentifier(namespace))
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
Expand Down

0 comments on commit c13b988

Please sign in to comment.