Skip to content

Commit

Permalink
snowflake s3 copy & redshift s3 refactor (#2921)
Browse files Browse the repository at this point in the history
* snowflake s3 copy

* refactor (some tests still need updating)

* revert accidentally removing files

* re-add purge

* use baseconnector

* getconnection logs error

* use generic configs for copiers/suppliers/consumers

* use stream copier terminology

* remove weird delegate generics

* some test changes

* remove non-ci test that doesn't have a good equivalent atm

* misc

* finally fixed

* tests and fix

* add credentials

* fix redshift build

* respond to comments

* fix check

* bump versions for redshift and snowflake

* fix creds
  • Loading branch information
jrhizor committed Apr 26, 2021
1 parent 11e70a7 commit a3b4444
Show file tree
Hide file tree
Showing 41 changed files with 1,536 additions and 854 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ jobs:
SLACK_TEST_CREDS: ${{ secrets.SLACK_TEST_CREDS }}
SMARTSHEETS_TEST_CREDS: ${{ secrets.SMARTSHEETS_TEST_CREDS }}
SNOWFLAKE_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_INTEGRATION_TEST_CREDS }}
SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS }}
SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }}
SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }}
STRIPE_INTEGRATION_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_TEST_CREDS }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ jobs:
SLACK_TEST_CREDS: ${{ secrets.SLACK_TEST_CREDS }}
SMARTSHEETS_TEST_CREDS: ${{ secrets.SMARTSHEETS_TEST_CREDS }}
SNOWFLAKE_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_INTEGRATION_TEST_CREDS }}
SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS: ${{ secrets.SNOWFLAKE_S3_COPY_INTEGRATION_TEST_CREDS }}
SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG: ${{ secrets.SOURCE_MARKETO_SINGER_INTEGRATION_TEST_CONFIG }}
SOURCE_RECURLY_INTEGRATION_TEST_CREDS: ${{ secrets.SOURCE_RECURLY_INTEGRATION_TEST_CREDS }}
STRIPE_INTEGRATION_TEST_CREDS: ${{ secrets.STRIPE_INTEGRATION_TEST_CREDS }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.3.0",
"dockerImageTag": "0.3.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
"name": "Redshift",
"dockerRepository": "airbyte/destination-redshift",
"dockerImageTag": "0.3.0",
"dockerImageTag": "0.3.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift",
"icon": "redshift.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.3.0
dockerImageTag: 0.3.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.3.0
dockerImageTag: 0.3.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
icon: redshift.svg
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class ExtendedNameTransformer extends StandardNameTransformer {

@Override
protected String convertStreamName(String input) {
public String convertStreamName(String input) {
return super.convertStreamName(input);
}

Expand Down
4 changes: 4 additions & 0 deletions airbyte-integrations/connectors/destination-jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ application {
}

dependencies {
implementation 'org.apache.commons:commons-lang3:3.11'
implementation 'org.apache.commons:commons-csv:1.4'
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'

implementation project(':airbyte-db')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,21 @@
package io.airbyte.integrations.destination.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.IOException;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJdbcDestination implements Destination {
public abstract class AbstractJdbcDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcDestination.class);

Expand All @@ -58,13 +55,6 @@ public AbstractJdbcDestination(final String driverClass,
this.sqlOperations = sqlOperations;
}

@Override
public ConnectorSpecification spec() throws IOException {
// return a JsonSchema representation of the spec for the integration.
final String resourceString = MoreResources.readResource("spec.json");
return Jsons.deserialize(resourceString, ConnectorSpecification.class);
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.jdbc.copy;

import com.google.common.base.Preconditions;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

public class CopyConsumer<T> extends FailureTrackingAirbyteMessageConsumer {

private final String configuredSchema;
private final T config;
private final ConfiguredAirbyteCatalog catalog;
private final JdbcDatabase db;
private final StreamCopierFactory<T> streamCopierFactory;
private final SqlOperations sqlOperations;
private final ExtendedNameTransformer nameTransformer;
private final Map<AirbyteStreamNameNamespacePair, StreamCopier> pairToCopier;

public CopyConsumer(String configuredSchema,
T config,
ConfiguredAirbyteCatalog catalog,
JdbcDatabase db,
StreamCopierFactory<T> streamCopierFactory,
SqlOperations sqlOperations,
ExtendedNameTransformer nameTransformer) {
this.configuredSchema = configuredSchema;
this.config = config;
this.catalog = catalog;
this.db = db;
this.streamCopierFactory = streamCopierFactory;
this.sqlOperations = sqlOperations;
this.nameTransformer = nameTransformer;
this.pairToCopier = new HashMap<>();

var definedSyncModes = catalog.getStreams().stream()
.map(ConfiguredAirbyteStream::getDestinationSyncMode)
.noneMatch(Objects::isNull);
Preconditions.checkState(definedSyncModes, "Undefined destination sync mode.");
}

@Override
protected void startTracked() {
var stagingFolder = UUID.randomUUID().toString();
for (var configuredStream : catalog.getStreams()) {
var stream = configuredStream.getStream();
var pair = AirbyteStreamNameNamespacePair.fromAirbyteSteam(stream);
var syncMode = configuredStream.getDestinationSyncMode();
var copier = streamCopierFactory.create(configuredSchema, config, stagingFolder, syncMode, stream, nameTransformer, db, sqlOperations);

pairToCopier.put(pair, copier);
}
}

@Override
protected void acceptTracked(AirbyteRecordMessage message) throws Exception {
var pair = AirbyteStreamNameNamespacePair.fromRecordMessage(message);
if (!pairToCopier.containsKey(pair)) {
throw new IllegalArgumentException(
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
Jsons.serialize(catalog), Jsons.serialize(message)));
}

var id = UUID.randomUUID();
var data = Jsons.serialize(message.getData());
var emittedAt = Timestamp.from(Instant.ofEpochMilli(message.getEmittedAt()));

pairToCopier.get(pair).write(id, data, emittedAt);
}

/**
* Although 'close' suggests a focus on clean up, this method also loads files into the warehouse.
* First, move the files into temporary table, then merge the temporary tables with the final
* destination tables. Lastly, do actual clean up and best-effort remove the files and temporary
* tables.
*/
public void close(boolean hasFailed) throws Exception {
closeAsOneTransaction(new ArrayList<>(pairToCopier.values()), hasFailed, db);
}

public void closeAsOneTransaction(List<StreamCopier> streamCopiers, boolean hasFailed, JdbcDatabase db) throws Exception {
try {
StringBuilder mergeCopiersToFinalTableQuery = new StringBuilder();
for (var copier : streamCopiers) {
copier.closeStagingUploader(hasFailed);

if (!hasFailed) {
copier.createTemporaryTable();
copier.copyStagingFileToTemporaryTable();
copier.createDestinationSchema();
var destTableName = copier.createDestinationTable();
var mergeQuery = copier.generateMergeStatement(destTableName);
mergeCopiersToFinalTableQuery.append(mergeQuery);
}
}

if (!hasFailed) {
sqlOperations.executeTransaction(db, mergeCopiersToFinalTableQuery.toString());
}
} finally {
for (var copier : streamCopiers) {
copier.removeFileAndDropTmpTable();
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.jdbc.copy;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CopyDestination extends BaseConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(CopyDestination.class);

/**
* A self contained method for writing a file to the persistence for testing. This method should try
* to clean up after itself by deleting the file it creates.
*/
public abstract void checkPersistence(JsonNode config) throws Exception;

public abstract ExtendedNameTransformer getNameTransformer();

public abstract JdbcDatabase getDatabase(JsonNode config) throws Exception;

public abstract SqlOperations getSqlOperations();

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
checkPersistence(config);
} catch (Exception e) {
LOGGER.error("Exception attempting to access the staging persistence: ", e);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect to the staging persistence with the provided configuration. \n" + e.getMessage());
}

try {
var nameTransformer = getNameTransformer();
var outputSchema = nameTransformer.convertStreamName(config.get("schema").asText());
JdbcDatabase database = getDatabase(config);
AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations());

return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (Exception e) {
LOGGER.error("Exception attempting to connect to the warehouse: ", e);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect to the warehouse with the provided configuration. \n" + e.getMessage());
}
}

}
Loading

0 comments on commit a3b4444

Please sign in to comment.