Skip to content

Commit

Permalink
Custom convert incoming long for date and datetime types (#36263)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodireich committed Mar 19, 2024
1 parent 411115b commit e5aea95
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 131 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.26.0 | 2024-03-19 | [\#36263](https://github.com/airbytehq/airbyte/pull/36263) | Improve conversion of debezium Date type for some edge case in mssql. |
| 0.25.0 | 2024-03-18 | [\#36203](https://github.com/airbytehq/airbyte/pull/36203) | Wiring of Transformer to StagingConsumerFactory and JdbcBufferedConsumerFactory; import changes for Kotlin conversion; State message logs to debug |
| 0.24.1 | 2024-03-13 | [\#36022](https://github.com/airbytehq/airbyte/pull/36022) | Move log4j2-test.xml to test fixtures, away from runtime classpath. |
| 0.24.0 | 2024-03-13 | [\#35944](https://github.com/airbytehq/airbyte/pull/35944) | Add `_airbyte_meta` in raw table and test fixture updates |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ public static String convertToDate(final Object date) {
} else if (date instanceof LocalDate d) {
// Incremental mode
return resolveEra(d, d.format(DATE_FORMATTER));
} else if (date instanceof final Integer d) {
return LocalDate.ofEpochDay(d).format(DATE_FORMATTER);
} else {
if (!loggedUnknownDateClass) {
LOGGER.info("Unknown class for Date data type" + date.getClass());
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.25.0
version=0.26.0
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.23.20'
cdkVersionRequired = '0.26.0'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.2
dockerImageTag: 4.0.3
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand All @@ -22,7 +22,7 @@ data:
oss:
enabled: true
releaseStage: generally_available
supportLevel: community
supportLevel: certified
tags:
- language:java
releases:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@
import io.debezium.spi.converter.RelationalColumn;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.*;
import microsoft.sql.DateTimeOffset;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
Expand All @@ -43,7 +39,7 @@ public class MssqlDebeziumConverter implements CustomConverter<SchemaBuilder, Re
private static final String DATETIME_FORMAT_MICROSECONDS = "yyyy-MM-dd'T'HH:mm:ss[.][SSSSSS]";

@Override
public void configure(Properties props) {}
public void configure(final Properties props) {}

@Override
public void converterFor(final RelationalColumn field,
Expand Down Expand Up @@ -77,7 +73,7 @@ private void registerGeometry(final RelationalColumn field,
if (input instanceof byte[]) {
try {
return Geometry.deserialize((byte[]) input).toString();
} catch (SQLServerException e) {
} catch (final SQLServerException e) {
LOGGER.error(e.getMessage());
}
}
Expand All @@ -98,7 +94,7 @@ private void registerGeography(final RelationalColumn field,
if (input instanceof byte[]) {
try {
return Geography.deserialize((byte[]) input).toString();
} catch (SQLServerException e) {
} catch (final SQLServerException e) {
LOGGER.error(e.getMessage());
}
}
Expand Down Expand Up @@ -129,9 +125,21 @@ private void registerDatetime(final RelationalColumn field,
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
if (input instanceof final Timestamp d) {
final LocalDateTime localDateTime = d.toLocalDateTime();
return localDateTime.format(DateTimeFormatter.ofPattern(DATETIME_FORMAT_MICROSECONDS));
}

if (input instanceof final Long d) {
// During schema history creation datetime input arrives in the form of epoch nanosecond
// This is needed for example for a column defined as:
// [TransactionDate] DATETIME2 (7) DEFAULT ('2024-01-01T00:00:00.0000000') NOT NULL
final Instant instant = Instant.ofEpochMilli(d / 1000 / 1000);
final LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("UTC"));
return localDateTime.format(DateTimeFormatter.ofPattern(DATETIME_FORMAT_MICROSECONDS));
}

final LocalDateTime localDateTime = ((Timestamp) input).toLocalDateTime();
return localDateTime.format(DateTimeFormatter.ofPattern(DATETIME_FORMAT_MICROSECONDS));
return input.toString();
});

}
Expand Down
Loading

0 comments on commit e5aea95

Please sign in to comment.