Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom convert incoming long for date and datetime types #36263

1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.26.0 | 2024-03-19 | [\#36263](https://github.com/airbytehq/airbyte/pull/36263) | Improve conversion of debezium Date type for some edge case in mssql. |
| 0.25.0 | 2024-03-18 | [\#36203](https://github.com/airbytehq/airbyte/pull/36203) | Wiring of Transformer to StagingConsumerFactory and JdbcBufferedConsumerFactory; import changes for Kotlin conversion; State message logs to debug |
| 0.24.1 | 2024-03-13 | [\#36022](https://github.com/airbytehq/airbyte/pull/36022) | Move log4j2-test.xml to test fixtures, away from runtime classpath. |
| 0.24.0 | 2024-03-13 | [\#35944](https://github.com/airbytehq/airbyte/pull/35944) | Add `_airbyte_meta` in raw table and test fixture updates |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ public static String convertToDate(final Object date) {
} else if (date instanceof LocalDate d) {
// Incremental mode
return resolveEra(d, d.format(DATE_FORMATTER));
} else if (date instanceof final Integer d) {
return LocalDate.ofEpochDay(d).format(DATE_FORMATTER);
} else {
if (!loggedUnknownDateClass) {
LOGGER.info("Unknown class for Date data type" + date.getClass());
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.25.0
version=0.26.0
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.23.20'
cdkVersionRequired = '0.26.0'
features = ['db-sources']
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.2
dockerImageTag: 4.0.3
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand All @@ -22,7 +22,7 @@ data:
oss:
enabled: true
releaseStage: generally_available
supportLevel: community
supportLevel: certified
tags:
- language:java
releases:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@
import io.debezium.spi.converter.RelationalColumn;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.*;
import microsoft.sql.DateTimeOffset;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
Expand All @@ -43,7 +39,7 @@ public class MssqlDebeziumConverter implements CustomConverter<SchemaBuilder, Re
private static final String DATETIME_FORMAT_MICROSECONDS = "yyyy-MM-dd'T'HH:mm:ss[.][SSSSSS]";

@Override
public void configure(Properties props) {}
public void configure(final Properties props) {}

@Override
public void converterFor(final RelationalColumn field,
Expand Down Expand Up @@ -77,7 +73,7 @@ private void registerGeometry(final RelationalColumn field,
if (input instanceof byte[]) {
try {
return Geometry.deserialize((byte[]) input).toString();
} catch (SQLServerException e) {
} catch (final SQLServerException e) {
LOGGER.error(e.getMessage());
}
}
Expand All @@ -98,7 +94,7 @@ private void registerGeography(final RelationalColumn field,
if (input instanceof byte[]) {
try {
return Geography.deserialize((byte[]) input).toString();
} catch (SQLServerException e) {
} catch (final SQLServerException e) {
LOGGER.error(e.getMessage());
}
}
Expand Down Expand Up @@ -129,9 +125,21 @@ private void registerDatetime(final RelationalColumn field,
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
if (input instanceof final Timestamp d) {
final LocalDateTime localDateTime = d.toLocalDateTime();
return localDateTime.format(DateTimeFormatter.ofPattern(DATETIME_FORMAT_MICROSECONDS));
}

if (input instanceof final Long d) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it the case that for Mssql that all timestamp 'numbers' are always with nanosecond precision?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're only seeing this Long input when values are converted as part of acquiring schema history.
Dates on actual row records are still seen as incoming TimeStamp. None of this is documented anywhere as far as I can tell.

We need to take a close look at some point as this area is uneven in terms of implementation.
And some of the code here is almost three years old.

// During schema history creation datetime input arrives in the form of epoch nanosecond
// This is needed for example for a column defined as:
// [TransactionDate] DATETIME2 (7) DEFAULT ('2024-01-01T00:00:00.0000000') NOT NULL
final Instant instant = Instant.ofEpochMilli(d / 1000 / 1000);
final LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("UTC"));
return localDateTime.format(DateTimeFormatter.ofPattern(DATETIME_FORMAT_MICROSECONDS));
}

final LocalDateTime localDateTime = ((Timestamp) input).toLocalDateTime();
return localDateTime.format(DateTimeFormatter.ofPattern(DATETIME_FORMAT_MICROSECONDS));
return input.toString();
});

}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.0.3 | 2024-03-19 | [36263](https://github.com/airbytehq/airbyte/pull/36263) | Fix a failure seen in CDC with tables containing default values. |
| 4.0.2 | 2024-03-06 | [35792](https://github.com/airbytehq/airbyte/pull/35792) | Initial sync will now send record count in state message. |
| 4.0.1 | 2024-03-12 | [36011](https://github.com/airbytehq/airbyte/pull/36011) | Read correctly null values of columns with default value in CDC. |
| 4.0.0 | 2024-03-06 | [35873](https://github.com/airbytehq/airbyte/pull/35873) | Terabyte-sized tables support, reliability improvements, bug fixes. |
Expand Down
Loading