Skip to content

Commit

Permalink
Snowflake destination : Enable DAT tests (#12912)
Browse files Browse the repository at this point in the history
* enable new DAT for Snowflake and fix timezone transformation

* Make Snowflake in line with new DAT tests.
+ move method of putting text values into node to util a class
+ format

* Rollback to DestinationAcceptanceTest as a parent class.
  • Loading branch information
DoNotPanicUA committed May 17, 2022
1 parent abc038e commit 28ccd06
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.standardtest.destination;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;

public class DestinationAcceptanceTestUtils {

public static void putStringIntoJson(String stringValue, String fieldName, ObjectNode node) {
if (stringValue != null && (stringValue.startsWith("[") && stringValue.endsWith("]")
|| stringValue.startsWith("{") && stringValue.endsWith("}"))) {
node.set(fieldName, Jsons.deserialize(stringValue));
} else {
node.put(fieldName, stringValue);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import java.util.Arrays;
import org.jooq.Record;

Expand All @@ -24,12 +23,7 @@ protected JsonNode getJsonFromRecord(Record record) {
switch (field.getDataType().getTypeName()) {
case "varchar", "nvarchar", "jsonb", "other":
var stringValue = (value != null ? value.toString() : null);
if (stringValue != null && (stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\[.*\\]$")
|| stringValue.replaceAll("[^\\x00-\\x7F]", "").matches("^\\{.*\\}$"))) {
node.set(field.getName(), Jsons.deserialize(stringValue));
} else {
node.put(field.getName(), stringValue);
}
DestinationAcceptanceTestUtils.putStringIntoJson(stringValue, field.getName(), node);
break;
default:
node.put(field.getName(), (value != null ? value.toString() : null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
public class SnowflakeCopyAzureBlobStorageDestination extends CopyDestination {

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Consumer<AirbyteMessage> outputRecordCollector) {
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector) {
final DataSource dataSource = getDataSource(config);
return CopyConsumerFactory.create(
outputRecordCollector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,5 @@ private static boolean isPurgeStagingData(final JsonNode config) {
return loadingMethod.get("purge_staging_data").asBoolean();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ protected String getCopyQuery(final String stagingPath,
dstTableName,
generateBucketPath(stagingPath),
credentialConfig.getAccessKeyId(),
credentialConfig.getSecretAccessKey()
);
credentialConfig.getSecretAccessKey());
}

private String generateBucketPath(final String stagingPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,19 @@
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.standardtest.destination.DataArgumentsProvider;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.junit.jupiter.api.Disabled;
Expand Down Expand Up @@ -62,6 +57,26 @@ protected JsonNode getConfig() {
return config;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new SnowflakeTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

public JsonNode getStaticConfig() {
final JsonNode insertConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/insert_config.json")));
Preconditions.checkArgument(!SnowflakeDestinationResolver.isS3Copy(insertConfig));
Expand All @@ -84,7 +99,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
throws Exception {
return retrieveRecordsFromTable(NAME_TRANSFORMER.getRawTableName(streamName), NAME_TRANSFORMER.getNamespace(namespace))
.stream()
.map(j -> Jsons.deserialize(j.get(JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase()).asText()))
.map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase()))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -118,29 +133,13 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv test
throws Exception {
final String tableName = NAME_TRANSFORMER.getIdentifier(streamName);
final String schema = NAME_TRANSFORMER.getNamespace(namespace);
// Temporarily disabling the behavior of the ExtendedNameTransformer, see (issue #1785) so we don't
// use quoted names
// if (!tableName.startsWith("\"")) {
// // Currently, Normalization always quote tables identifiers
// tableName = "\"" + tableName + "\"";
// }
return retrieveRecordsFromTable(tableName, schema);
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = NAME_TRANSFORMER.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schema) throws SQLException {
TimeZone timeZone = TimeZone.getTimeZone("UTC");
TimeZone.setDefault(timeZone);

return database.bufferedResultSetQuery(
connection -> {
try (final ResultSet tableInfo = connection.createStatement()
Expand All @@ -149,11 +148,12 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
// check that we're creating permanent tables. DBT defaults to transient tables, which have
// `TRANSIENT` as the value for the `kind` column.
assertEquals("TABLE", tableInfo.getString("kind"));
connection.createStatement().execute("ALTER SESSION SET TIMEZONE = 'UTC';");
return connection.createStatement()
.executeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT));
}
},
JdbcUtils.getDefaultSourceOperations()::rowToJson);
new SnowflakeTestSourceOperations()::rowToJson);
}

// for each test we create a new schema in the database. run the test in there and then remove it.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -15,4 +19,5 @@ public JsonNode getStaticConfig() {
Preconditions.checkArgument(!SnowflakeDestinationResolver.isGcsCopy(copyConfig));
return copyConfig;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;

import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;

public class SnowflakeTestDataComparator extends AdvancedTestDataComparator {

public static final NamingConventionTransformer NAME_TRANSFORMER = new SnowflakeSQLNameTransformer();

private static final String SNOWFLAKE_DATETIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
private static final String SNOWFLAKE_DATE_FORMAT = "yyyy-MM-dd";
private static final String POSTGRES_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'";

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = NAME_TRANSFORMER.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

private LocalDate parseDate(String dateValue) {
if (dateValue != null) {
var format = (dateValue.matches(".+Z") ? SNOWFLAKE_DATETIME_FORMAT : SNOWFLAKE_DATE_FORMAT);
return LocalDate.parse(dateValue, DateTimeFormatter.ofPattern(format));
} else {
return null;
}
}

private LocalDate parseLocalDate(String dateTimeValue) {
if (dateTimeValue != null) {
var format = (dateTimeValue.matches(".+Z") ? POSTGRES_DATETIME_WITH_TZ_FORMAT : AIRBYTE_DATETIME_FORMAT);
return LocalDate.parse(dateTimeValue, DateTimeFormatter.ofPattern(format));
} else {
return null;
}
}

@Override
protected boolean compareDateTimeValues(String expectedValue, String actualValue) {
var destinationDate = parseLocalDate(actualValue);
var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATETIME_FORMAT));
return expectedDate.equals(destinationDate);
}

@Override
protected boolean compareDateValues(String expectedValue, String actualValue) {
var destinationDate = parseDate(actualValue);
var expectedDate = LocalDate.parse(expectedValue, DateTimeFormatter.ofPattern(AIRBYTE_DATE_FORMAT));
return expectedDate.equals(destinationDate);
}

@Override
protected ZonedDateTime parseDestinationDateWithTz(String destinationValue) {
return ZonedDateTime.of(LocalDateTime.parse(destinationValue, DateTimeFormatter.ofPattern(POSTGRES_DATETIME_WITH_TZ_FORMAT)), ZoneOffset.UTC);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTestUtils;
import java.sql.ResultSet;
import java.sql.SQLException;

public class SnowflakeTestSourceOperations extends JdbcSourceOperations {

@Override
protected void putString(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
DestinationAcceptanceTestUtils.putStringIntoJson(resultSet.getString(index), columnName, node);
}

}

0 comments on commit 28ccd06

Please sign in to comment.