Skip to content

Commit

Permalink
πŸ› πŸ“ Source Snowflake: fix coercion-to-UTC for timestamp datasource (#…
Browse files Browse the repository at this point in the history
…31631)

Co-authored-by: stephane-airbyte <stephane-airbyte@users.noreply.github.com>
  • Loading branch information
stephane-airbyte and stephane-airbyte committed Oct 24, 2023
1 parent 6d01784 commit df9da28
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1093,9 +1093,17 @@ def compare_records(
missing_expected = set(expected) - set(actual)

if missing_expected:
extra = set(actual) - set(expected)
msg = f"Stream {stream_name}: All expected records must be produced"
detailed_logger.info(msg)
detailed_logger.log_json_list(missing_expected)
detailed_logger.info("missing expected:")
detailed_logger.log_json_list(sorted(missing_expected, key=lambda record: record["ID"]))
detailed_logger.info("expected:")
detailed_logger.log_json_list(sorted(expected, key=lambda record: record["ID"]))
detailed_logger.info("actual:")
detailed_logger.log_json_list(sorted(actual, key=lambda record: record["ID"]))
detailed_logger.info("extra:")
detailed_logger.log_json_list(sorted(extra, key=lambda record: record["ID"]))
pytest.fail(msg)

if not extra_records:
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: e2d65910-8c8b-40a1-ae7d-ee2416b2bfa2
dockerImageTag: 0.2.1
dockerImageTag: 0.2.2
dockerRepository: airbyte/source-snowflake
documentationUrl: https://docs.airbyte.com/integrations/sources/snowflake
githubIssueLabel: source-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.DataTypeUtils;
import io.airbyte.cdk.db.jdbc.DateTimeConverter;
import io.airbyte.cdk.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
Expand All @@ -24,6 +25,9 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -121,11 +125,22 @@ protected void setDate(final PreparedStatement preparedStatement, final int para
preparedStatement.setDate(parameterIndex, Date.valueOf(date));
}

private static final DateTimeFormatter SNOWFLAKE_TIMESTAMPTZ_FORMATTER = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.appendLiteral(' ')
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.optionalStart()
.appendLiteral(' ')
.append(DateTimeFormatter.ofPattern("XX"))
.toFormatter();

@Override
protected void putTimestampWithTimezone(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
throws SQLException {
final Timestamp timestamp = resultSet.getTimestamp(index);
node.put(columnName, DateTimeConverter.convertToTimestampWithTimezone(timestamp));
final String timestampAsString = resultSet.getString(index);
OffsetDateTime timestampWithOffset = OffsetDateTime.parse(timestampAsString, SNOWFLAKE_TIMESTAMPTZ_FORMATTER);
node.put(columnName, timestampWithOffset.format(DataTypeUtils.TIMESTAMPTZ_FORMATTER));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,30 +285,38 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("TIMESTAMP")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'2018-03-22 12:00:00.123'", "'2018-03-22 12:00:00.123456'")
.addExpectedValues(null, "2018-03-22T12:00:00.123", "2018-03-22T12:00:00.123456")
.build());
.addInsertValues("null", "'2018-03-26 12:00:00.123'", "'2018-03-26 12:00:00.123456'")
.addExpectedValues(null, "2018-03-26T12:00:00.123", "2018-03-26T12:00:00.123456")
.build());// This is very brittle. A change of parameters on the customer's account could change the values
// returned by snowflake
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIMESTAMP_LTZ")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)
.addInsertValues("null", "'2018-03-22 12:00:00.123 +05:00'", "'2018-03-22 12:00:00.123456 +05:00'")
.addExpectedValues(null, "2018-03-22T07:00:00.123000Z", "2018-03-22T07:00:00.123456Z")
.build());
.addInsertValues("null", "'2018-03-25 12:00:00.123 +05:00'", "'2018-03-25 12:00:00.123456 +05:00'")
.addExpectedValues(null, "2018-03-25T00:00:00.123000-07:00", "2018-03-25T00:00:00.123000-07:00")
// We moved from +5 to -7 timezone, so 12:00 becomes 00:00.
// Snowflake default timestamp precision is TIME(3), so we lose anything past ms
.build());// This is extremely brittle. A change of parameters on the customer's account,
// or a change of timezone where this code is executed (!!) could change the values returned by
// snowflake
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIMESTAMP_NTZ")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'2018-03-22 12:00:00.123 +05:00'", "'2018-03-22 12:00:00.123456 +05:00'")
.addExpectedValues(null, "2018-03-22T12:00:00.123", "2018-03-22T12:00:00.123456")
.build());
.addInsertValues("null", "'2018-03-24 12:00:00.123 +05:00'", "'2018-03-24 12:00:00.123456 +05:00'")
.addExpectedValues(null, "2018-03-24T12:00:00.123", "2018-03-24T12:00:00.123456")
.build()); // This is very brittle. A change of parameters on the customer's account could change the values
// returned by snowflake
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIMESTAMP_TZ")
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)
.addInsertValues("null", "'2018-03-22 12:00:00.123 +05:00'", "'2018-03-22 12:00:00.123456 +05:00'")
.addExpectedValues(null, "2018-03-22T07:00:00.123000Z", "2018-03-22T07:00:00.123456Z")
.build());
.addInsertValues("null", "'2018-03-23 12:00:00.123 +05:00'", "'2018-03-23 12:00:00.123456 +05:00'")
.addExpectedValues(null, "2018-03-23T12:00:00.123000+05:00", "2018-03-23T12:00:00.123000+05:00")
// Snowflake default timestamp-to-string conversion is TIME(3), so we lose anything past ms
.build());// This is very brittle. A change of parameters on the customer's account could change the values
// returned by snowflake

// Semi-structured Data Types
addDataTypeTestData(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ To read more please check official [Snowflake documentation](https://docs.snowfl

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------|
| 0.2.2 | 2023-10-20 | [31613](https://github.com/airbytehq/airbyte/pull/31613) | Fixed handling of TIMESTAMP_TZ columns. upgrade |
| 0.2.1 | 2023-10-11 | [31252](https://github.com/airbytehq/airbyte/pull/31252) | Snowflake JDBC version upgrade |
| 0.2.0 | 2023-06-26 | [27737](https://github.com/airbytehq/airbyte/pull/27737) | License Update: Elv2 |
| 0.1.36 | 2023-06-20 | [27212](https://github.com/airbytehq/airbyte/pull/27212) | Fix silent exception swallowing in StreamingJdbcDatabase |
Expand Down

0 comments on commit df9da28

Please sign in to comment.