From 00927128459cf5d77aea9bf375163e347974d994 Mon Sep 17 00:00:00 2001 From: Subodh Kant Chaturvedi Date: Wed, 10 Aug 2022 21:48:57 +0530 Subject: [PATCH] fix postgres data handling from WAL logs in CDC mode (#15481) * fix postgres data handling from WAL logs in CDC mode * format * use formatter for dates also (#15485) * format * change test structure * change log to debug Co-authored-by: Edward Gao --- .../java/io/airbyte/db/DataTypeUtils.java | 5 + ...bstractJdbcCompatibleSourceOperations.java | 23 +-- .../io/airbyte/db/jdbc/DateTimeConverter.java | 131 ++++++++++++++ .../debezium/internals/DateTimeConverter.java | 122 ------------- .../internals/DebeziumConverterUtils.java | 3 + .../debezium/internals/PostgresConverter.java | 5 +- .../source/AbstractSourceConnectorTest.java | 5 +- .../AbstractSourceDatabaseTypeTest.java | 32 +++- .../postgres/PostgresCdcProperties.java | 8 +- .../postgres/PostgresSourceOperations.java | 16 +- .../AbstractPostgresSourceDatatypeTest.java | 34 ++-- ...alSnapshotPostgresSourceDatatypeTest.java} | 3 +- .../CdcWalLogsPostgresSourceDatatypeTest.java | 169 ++++++++++++++++++ 13 files changed, 387 insertions(+), 169 deletions(-) create mode 100644 airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/DateTimeConverter.java delete mode 100644 airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java rename airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/{CdcPostgresSourceDatatypeTest.java => CdcInitialSnapshotPostgresSourceDatatypeTest.java} (96%) create mode 100644 airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/DataTypeUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/DataTypeUtils.java index 692d4ea050f11..6e82d77a8fc46 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/DataTypeUtils.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/DataTypeUtils.java @@ -16,6 +16,10 @@ import java.time.format.DateTimeFormatter; import java.util.function.Function; +/** + * TODO : Replace all the DateTime related logic of this class with + * {@link io.airbyte.db.jdbc.DateTimeConverter} + */ public class DataTypeUtils { public static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd'T'HH:mm:ss'Z'"; @@ -27,6 +31,7 @@ public class DataTypeUtils { public static final DateTimeFormatter TIMETZ_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSSXXX"); public static final DateTimeFormatter TIMESTAMPTZ_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX"); public static final DateTimeFormatter OFFSETDATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSS XXX"); + public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); // wrap SimpleDateFormat in a function because SimpleDateFormat is not threadsafe as a static final. public static DateFormat getDateFormat() { diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java index 8d4560cf4335a..eea2286e8f342 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java @@ -258,18 +258,19 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection, return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName; } - protected ObjectType getObject(ResultSet resultSet, int index, Class clazz) throws SQLException { + protected ObjectType getObject(final ResultSet resultSet, final int index, final Class clazz) throws SQLException { return resultSet.getObject(index, clazz); } - protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { - OffsetTime timetz = getObject(resultSet, index, OffsetTime.class); + protected void putTimeWithTimezone(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + final OffsetTime timetz = getObject(resultSet, index, OffsetTime.class); node.put(columnName, timetz.format(TIMETZ_FORMATTER)); } - protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { - OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class); - LocalDate localDate = timestamptz.toLocalDate(); + protected void putTimestampWithTimezone(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) + throws SQLException { + final OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class); + final LocalDate localDate = timestamptz.toLocalDate(); node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER))); } @@ -283,7 +284,7 @@ protected void putTimestampWithTimezone(ObjectNode node, String columnName, Resu * * You most likely would prefer to call one of the overloaded methods, which accept temporal types. */ - public static String resolveEra(boolean isBce, String value) { + public static String resolveEra(final boolean isBce, final String value) { String mangledValue = value; if (isBce) { if (mangledValue.startsWith("-")) { @@ -296,11 +297,11 @@ public static String resolveEra(boolean isBce, String value) { return mangledValue; } - public static boolean isBce(LocalDate date) { + public static boolean isBce(final LocalDate date) { return date.getEra().equals(IsoEra.BCE); } - public static String resolveEra(LocalDate date, String value) { + public static String resolveEra(final LocalDate date, final String value) { return resolveEra(isBce(date), value); } @@ -311,14 +312,14 @@ public static String resolveEra(LocalDate date, String value) { * This is technically kind of sketchy due to ancient timestamps being weird (leap years, etc.), but * my understanding is that {@link #ONE_CE} has the same weirdness, so it cancels out. */ - public static String resolveEra(Date date, String value) { + public static String resolveEra(final Date date, final String value) { return resolveEra(date.before(ONE_CE), value); } /** * See {@link #resolveEra(Date, String)} for explanation. */ - public static String resolveEra(Timestamp timestamp, String value) { + public static String resolveEra(final Timestamp timestamp, final String value) { return resolveEra(timestamp.before(ONE_CE), value); } diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/DateTimeConverter.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/DateTimeConverter.java new file mode 100644 index 0000000000000..68e8208a4f072 --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/DateTimeConverter.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.jdbc; + +import static io.airbyte.db.DataTypeUtils.DATE_FORMATTER; +import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER; +import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER; +import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER; +import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER; +import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.resolveEra; +import static java.time.ZoneOffset.UTC; + +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DateTimeConverter { + + private static final Logger LOGGER = LoggerFactory.getLogger(DateTimeConverter.class); + public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = DateTimeFormatter.ofPattern( + "HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]"); + + public static String convertToTimeWithTimezone(final Object time) { + if (time instanceof final java.time.OffsetTime timetz) { + return timetz.format(TIMETZ_FORMATTER); + } + final OffsetTime timetz = OffsetTime.parse(time.toString(), TIME_WITH_TIMEZONE_FORMATTER); + return timetz.format(TIMETZ_FORMATTER); + } + + public static String convertToTimestampWithTimezone(final Object timestamp) { + if (timestamp instanceof final Timestamp t) { + // In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type. + // Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually mangles the + // value for ancient dates, because leap years weren't applied consistently in ye olden days. + // Additionally, toInstant() (and toLocalDateTime()) actually lose the era indicator, so we can't + // rely on their getEra() methods. + // So we have special handling for this case, which sidesteps the toInstant conversion. + final ZonedDateTime timestamptz = t.toLocalDateTime().atZone(UTC); + final String value = timestamptz.format(TIMESTAMPTZ_FORMATTER); + return resolveEra(t, value); + } else if (timestamp instanceof final OffsetDateTime t) { + return resolveEra(t.toLocalDate(), t.format(TIMESTAMPTZ_FORMATTER)); + } else if (timestamp instanceof final ZonedDateTime timestamptz) { + return resolveEra(timestamptz.toLocalDate(), timestamptz.format(TIMESTAMPTZ_FORMATTER)); + } else { + // This case probably isn't strictly necessary, but I'm leaving it just in case there's some weird + // situation that I'm not aware of. + final Instant instant = Instant.parse(timestamp.toString()); + final OffsetDateTime offsetDateTime = OffsetDateTime.ofInstant(instant, UTC); + final ZonedDateTime timestamptz = ZonedDateTime.from(offsetDateTime); + final LocalDate localDate = timestamptz.toLocalDate(); + final String value = timestamptz.format(TIMESTAMPTZ_FORMATTER); + return resolveEra(localDate, value); + } + } + + /** + * See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening + * here. + */ + public static String convertToTimestamp(final Object timestamp) { + if (timestamp instanceof final Timestamp t) { + // Snapshot mode + final LocalDateTime localDateTime = t.toLocalDateTime(); + final String value = localDateTime.format(TIMESTAMP_FORMATTER); + return resolveEra(t, value); + } else if (timestamp instanceof final Instant i) { + // Incremental mode + return resolveEra(i.atZone(UTC).toLocalDate(), i.atOffset(UTC).toLocalDateTime().format(TIMESTAMP_FORMATTER)); + } else { + final LocalDateTime localDateTime = LocalDateTime.parse(timestamp.toString()); + final LocalDate date = localDateTime.toLocalDate(); + final String value = localDateTime.format(TIMESTAMP_FORMATTER); + return resolveEra(date, value); + } + } + + /** + * See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening + * here. + */ + public static String convertToDate(final Object date) { + if (date instanceof final Date d) { + // Snapshot mode + final LocalDate localDate = ((Date) date).toLocalDate(); + return resolveEra(d, localDate.format(DATE_FORMATTER)); + } else if (date instanceof LocalDate d) { + // Incremental mode + return resolveEra(d, d.format(DATE_FORMATTER)); + } else { + final LocalDate localDate = LocalDate.parse(date.toString()); + return resolveEra(localDate, localDate.format(DATE_FORMATTER)); + } + } + + public static String convertToTime(final Object time) { + if (time instanceof final Time sqlTime) { + return sqlTime.toLocalTime().format(TIME_FORMATTER); + } else if (time instanceof final LocalTime localTime) { + return localTime.format(TIME_FORMATTER); + } else if (time instanceof java.time.Duration) { + long value = ((Duration) time).toNanos(); + if (value >= 0 && value <= TimeUnit.DAYS.toNanos(1)) { + return LocalTime.ofNanoOfDay(value).format(TIME_FORMATTER); + } else { + final long updatedValue = 0 > value ? Math.abs(value) : TimeUnit.DAYS.toNanos(1); + LOGGER.debug("Time values must use number of milliseconds greater than 0 and less than 86400000000000 but its {}, converting to {} ", value, + updatedValue); + return LocalTime.ofNanoOfDay(updatedValue).format(TIME_FORMATTER); + } + } else { + return LocalTime.parse(time.toString()).format(TIME_FORMATTER); + } + } + +} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java deleted file mode 100644 index 719eda0995fcc..0000000000000 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.debezium.internals; - -import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER; -import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER; -import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER; -import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER; -import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.isBce; -import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.resolveEra; - -import java.sql.Date; -import java.sql.Timestamp; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.OffsetTime; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; - -public class DateTimeConverter { - - public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = DateTimeFormatter.ofPattern( - "HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]"); - - public static String convertToTimeWithTimezone(Object time) { - OffsetTime timetz = OffsetTime.parse(time.toString(), TIME_WITH_TIMEZONE_FORMATTER); - return timetz.format(TIMETZ_FORMATTER); - } - - public static String convertToTimestampWithTimezone(Object timestamp) { - if (timestamp instanceof Timestamp t) { - // In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type. - // Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually mangles the - // value for ancient dates, because leap years weren't applied consistently in ye olden days. - // Additionally, toInstant() (and toLocalDateTime()) actually lose the era indicator, so we can't - // rely on their getEra() methods. - // So we have special handling for this case, which sidesteps the toInstant conversion. - ZonedDateTime timestamptz = t.toLocalDateTime().atZone(ZoneOffset.UTC); - String value = timestamptz.format(TIMESTAMPTZ_FORMATTER); - return resolveEra(t, value); - } else if (timestamp instanceof OffsetDateTime t) { - // In incremental mode, debezium emits java.time.OffsetDateTime objects. - // java.time classes have a year 0, but the standard AD/BC system does not. For example, - // "0001-01-01 BC" is represented as LocalDate("0000-01-01"). - // We just subtract one year to hack around this difference. - LocalDate localDate = t.toLocalDate(); - if (isBce(localDate)) { - t = t.minusYears(1); - } - return resolveEra(localDate, t.toString()); - } else { - // This case probably isn't strictly necessary, but I'm leaving it just in case there's some weird - // situation that I'm not aware of. - Instant instant = Instant.parse(timestamp.toString()); - OffsetDateTime offsetDateTime = OffsetDateTime.ofInstant(instant, ZoneOffset.UTC); - ZonedDateTime timestamptz = ZonedDateTime.from(offsetDateTime); - LocalDate localDate = timestamptz.toLocalDate(); - String value = timestamptz.format(TIMESTAMPTZ_FORMATTER); - return resolveEra(localDate, value); - } - } - - /** - * See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening - * here. - */ - public static String convertToTimestamp(Object timestamp) { - if (timestamp instanceof Timestamp t) { - // Snapshot mode - LocalDateTime localDateTime = t.toLocalDateTime(); - String value = localDateTime.format(TIMESTAMP_FORMATTER); - return resolveEra(t, value); - } else if (timestamp instanceof Instant i) { - // Incremental mode - LocalDate localDate = i.atZone(ZoneOffset.UTC).toLocalDate(); - if (isBce(localDate)) { - // i.minus(1, ChronoUnit.YEARS) would be nice, but it throws an exception because you can't subtract - // YEARS from an Instant - i = i.atZone(ZoneOffset.UTC).minusYears(1).toInstant(); - } - return resolveEra(localDate, i.toString()); - } else { - LocalDateTime localDateTime = LocalDateTime.parse(timestamp.toString()); - final LocalDate date = localDateTime.toLocalDate(); - String value = localDateTime.format(TIMESTAMP_FORMATTER); - return resolveEra(date, value); - } - } - - /** - * See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening - * here. - */ - public static Object convertToDate(Object date) { - if (date instanceof Date d) { - // Snapshot mode - LocalDate localDate = ((Date) date).toLocalDate(); - return resolveEra(d, localDate.toString()); - } else if (date instanceof LocalDate d) { - // Incremental mode - if (isBce(d)) { - d = d.minusYears(1); - } - return resolveEra(d, d.toString()); - } else { - LocalDate localDate = LocalDate.parse(date.toString()); - return resolveEra(localDate, localDate.toString()); - } - } - - public static String convertToTime(Object time) { - LocalTime localTime = LocalTime.parse(time.toString()); - return localTime.format(TIME_FORMATTER); - } - -} diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java index ab0a9e6cde163..e5499cb2fe4ce 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java @@ -23,6 +23,9 @@ private DebeziumConverterUtils() { throw new UnsupportedOperationException(); } + /** + * TODO : Replace usage of this method with {@link io.airbyte.db.jdbc.DateTimeConverter} + */ public static String convertDate(final Object input) { /** * While building this custom converter we were not sure what type debezium could return cause there diff --git a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java index e3b7889a2c843..c985b09cf420b 100644 --- a/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java +++ b/airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.debezium.internals; +import io.airbyte.db.jdbc.DateTimeConverter; import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import java.math.BigDecimal; @@ -21,7 +22,7 @@ public class PostgresConverter implements CustomConverter DateTimeConverter.convertToDate(x); case "TIME" -> DateTimeConverter.convertToTime(x); case "INTERVAL" -> convertInterval((PGInterval) x); - default -> DebeziumConverterUtils.convertDate(x); + default -> throw new IllegalArgumentException("Unknown field type " + fieldType.toUpperCase(Locale.ROOT)); }; }); } diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java index 2da738d5c0438..ecea6f96332e6 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java @@ -110,15 +110,14 @@ public void setUpInternal() throws Exception { localRoot = Files.createTempDirectory(testDir, "output"); environment = new TestDestinationEnv(localRoot); workerConfigs = new WorkerConfigs(new EnvConfigs()); - - setupEnvironment(environment); - processFactory = new DockerProcessFactory( workerConfigs, workspaceRoot, workspaceRoot.toString(), localRoot.toString(), "host"); + + setupEnvironment(environment); } @AfterEach diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java index a804b2dc243b3..7f5adf34bc2b1 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.java @@ -13,6 +13,7 @@ import io.airbyte.db.Database; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -22,6 +23,7 @@ import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.SyncMode; import java.io.IOException; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -42,7 +44,7 @@ public abstract class AbstractSourceDatabaseTypeTest extends AbstractSourceConne private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSourceDatabaseTypeTest.class); - private final List testDataHolders = new ArrayList<>(); + protected final List testDataHolders = new ArrayList<>(); /** * The column name will be used for a PK column in the test tables. Override it if default name is @@ -179,7 +181,7 @@ private void setupDatabaseInternal() throws Exception { * * @return configured catalog */ - private ConfiguredAirbyteCatalog getConfiguredCatalog() { + protected ConfiguredAirbyteCatalog getConfiguredCatalog() { return new ConfiguredAirbyteCatalog().withStreams( testDataHolders .stream() @@ -242,4 +244,30 @@ protected void printMarkdownTestTable() { LOGGER.info(getMarkdownTestTable()); } + protected ConfiguredAirbyteStream createDummyTableWithData(final Database database) throws SQLException { + database.query(ctx -> { + ctx.fetch("CREATE TABLE " + getNameSpace() + ".random_dummy_table(id INTEGER PRIMARY KEY, test_column VARCHAR(63));"); + ctx.fetch("INSERT INTO " + getNameSpace() + ".random_dummy_table VALUES (2, 'Random Data');"); + return null; + }); + + return new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + "random_dummy_table", + getNameSpace(), + Field.of("id", JsonSchemaType.INTEGER), + Field.of("test_column", JsonSchemaType.STRING)) + .withSourceDefinedCursor(true) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of("id")))); + + } + + protected List extractStateMessages(final List messages) { + return messages.stream().filter(r -> r.getType() == Type.STATE).map(AirbyteMessage::getState) + .collect(Collectors.toList()); + } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java index 2e47c6bc14872..659a315895242 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java @@ -13,7 +13,12 @@ public class PostgresCdcProperties { static Properties getDebeziumDefaultProperties(final JsonNode config) { final Properties props = commonProperties(); props.setProperty("plugin.name", PostgresUtils.getPluginValue(config.get("replication_method"))); - props.setProperty("snapshot.mode", "initial"); + if (config.has("snapshot_mode")) { + //The parameter `snapshot_mode` is passed in test to simulate reading the WAL Logs directly and skip initial snapshot + props.setProperty("snapshot.mode", config.get("snapshot_mode").asText()); + } else { + props.setProperty("snapshot.mode", "initial"); + } props.setProperty("slot.name", config.get("replication_method").get("replication_slot").asText()); props.setProperty("publication.name", config.get("replication_method").get("publication").asText()); @@ -29,6 +34,7 @@ private static Properties commonProperties() { props.setProperty("converters", "datetime"); props.setProperty("datetime.type", PostgresConverter.class.getName()); + props.setProperty("include.unknown.datatypes", "true"); return props; } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 03a7a6d910e1e..7051b2936ee74 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -20,6 +20,7 @@ import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; import io.airbyte.db.DataTypeUtils; +import io.airbyte.db.jdbc.DateTimeConverter; import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.protocol.models.JsonSchemaType; import java.math.BigDecimal; @@ -214,26 +215,17 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob @Override protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { - LocalDate date = getObject(resultSet, index, LocalDate.class); - if (isBce(date)) { - // java.time uses a year 0, but the standard AD/BC system does not. So we just subtract one to hack - // around this difference. - date = date.minusYears(1); - } - node.put(columnName, resolveEra(date, date.toString())); + node.put(columnName, DateTimeConverter.convertToDate(getObject(resultSet, index, LocalDate.class))); } @Override protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { - final LocalTime time = getObject(resultSet, index, LocalTime.class); - node.put(columnName, time.format(TIME_FORMATTER)); + node.put(columnName, DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime.class))); } @Override protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { - final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class); - final LocalDate date = timestamp.toLocalDate(); - node.put(columnName, resolveEra(date, timestamp.format(TIMESTAMP_FORMATTER))); + node.put(columnName, DateTimeConverter.convertToTimestamp(resultSet.getTimestamp(index))); } @Override diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java index afe2854743aa5..5ea93aded9850 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java @@ -398,21 +398,6 @@ protected void initTests() { .build()); } - // time with time zone - for (final String fullSourceType : Set.of("timetz", "time with time zone")) { - addDataTypeTestData( - TestDataHolder.builder() - .sourceType("timetz") - .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING_TIME_WITH_TIMEZONE) - .addInsertValues("null", "'13:00:01'", "'13:00:00+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05.012345Z+8'", "'13:00:06.00000Z-8'") - // A time value without time zone will use the time zone set on the database, which is Z-7, - // so 13:00:01 is returned as 13:00:01-07. - .addExpectedValues(null, "13:00:01.000000-07:00", "13:00:00.000000+08:00", "13:00:03.000000-08:00", "13:00:04.000000Z", - "13:00:05.012345-08:00", "13:00:06.000000+08:00") - .build()); - } - // timestamp without time zone for (final String fullSourceType : Set.of("timestamp", "timestamp without time zone")) { addDataTypeTestData( @@ -555,6 +540,25 @@ protected void initTests() { {"ISBN-13":"978-1449370000","weight":"11.2 ounces","paperback":"243","publisher":"postgresqltutorial.com","language":"English"}""", null) .build()); + + addTimeWithTimeZoneTest(); + } + + protected void addTimeWithTimeZoneTest() { + // time with time zone + for (final String fullSourceType : Set.of("timetz", "time with time zone")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("timetz") + .fullSourceDataType(fullSourceType) + .airbyteType(JsonSchemaType.STRING_TIME_WITH_TIMEZONE) + .addInsertValues("null", "'13:00:01'", "'13:00:00+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05.012345Z+8'", "'13:00:06.00000Z-8'") + // A time value without time zone will use the time zone set on the database, which is Z-7, + // so 13:00:01 is returned as 13:00:01-07. + .addExpectedValues(null, "13:00:01.000000-07:00", "13:00:00.000000+08:00", "13:00:03.000000-08:00", "13:00:04.000000Z", + "13:00:05.012345-08:00", "13:00:06.000000+08:00") + .build()); + } } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcInitialSnapshotPostgresSourceDatatypeTest.java similarity index 96% rename from airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java rename to airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcInitialSnapshotPostgresSourceDatatypeTest.java index 37abb3a3c5444..bc9ff87b0522e 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcInitialSnapshotPostgresSourceDatatypeTest.java @@ -18,7 +18,7 @@ import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; -public class CdcPostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest { +public class CdcInitialSnapshotPostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest { private static final String SCHEMA_NAME = "test"; private static final String SLOT_NAME_BASE = "debezium_slot"; @@ -55,6 +55,7 @@ protected Database setupDatabase() throws Exception { .put("replication_method", replicationMethod) .put("is_test", true) .put(JdbcUtils.SSL_KEY, false) + .put("snapshot_mode", "initial_only") .build()); dslContext = DSLContextFactory.create( diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java new file mode 100644 index 0000000000000..fb671420dd414 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.Database; +import io.airbyte.db.factory.DSLContextFactory; +import io.airbyte.db.factory.DatabaseDriver; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.standardtest.source.TestDataHolder; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.integrations.util.HostPortResolver; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.JsonSchemaType; +import java.util.List; +import java.util.Set; +import org.jooq.SQLDialect; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.MountableFile; + +public class CdcWalLogsPostgresSourceDatatypeTest extends AbstractPostgresSourceDatatypeTest { + + private static final String SCHEMA_NAME = "test"; + private static final String SLOT_NAME_BASE = "debezium_slot"; + private static final String PUBLICATION = "publication"; + private static final int INITIAL_WAITING_SECONDS = 5; + private JsonNode stateAfterFirstSync; + + @Override + protected List runRead(ConfiguredAirbyteCatalog configuredCatalog) throws Exception { + if (stateAfterFirstSync == null) { + throw new RuntimeException("stateAfterFirstSync is null"); + } + return super.runRead(configuredCatalog, stateAfterFirstSync); + } + + @Override + protected void setupEnvironment(TestDestinationEnv environment) throws Exception { + final Database database = setupDatabase(); + initTests(); + for (final TestDataHolder test : testDataHolders) { + database.query(ctx -> { + ctx.fetch(test.getCreateSqlQuery()); + return null; + }); + } + + final ConfiguredAirbyteStream dummyTableWithData = createDummyTableWithData(database); + final ConfiguredAirbyteCatalog catalog = getConfiguredCatalog(); + catalog.getStreams().add(dummyTableWithData); + + final List allMessages = super.runRead(catalog); + if (allMessages.size() != 2) { + throw new RuntimeException("First sync should only generate 2 records"); + } + final List stateAfterFirstBatch = extractStateMessages(allMessages); + if (stateAfterFirstBatch == null || stateAfterFirstBatch.isEmpty()) { + throw new RuntimeException("stateAfterFirstBatch should not be null or empty"); + } + stateAfterFirstSync = Jsons.jsonNode(stateAfterFirstBatch); + if (stateAfterFirstSync == null) { + throw new RuntimeException("stateAfterFirstSync should not be null"); + } + for (final TestDataHolder test : testDataHolders) { + database.query(ctx -> { + test.getInsertSqlQueries().forEach(ctx::fetch); + return null; + }); + } + } + + @Override + protected Database setupDatabase() throws Exception { + + container = new PostgreSQLContainer<>("postgres:14-alpine") + .withCopyFileToContainer(MountableFile.forClasspathResource("postgresql.conf"), + "/etc/postgresql/postgresql.conf") + .withCommand("postgres -c config_file=/etc/postgresql/postgresql.conf"); + container.start(); + + /** + * The publication is not being set as part of the config and because of it + * {@link io.airbyte.integrations.source.postgres.PostgresSource#isCdc(JsonNode)} returns false, as + * a result no test in this class runs through the cdc path. + */ + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "CDC") + .put("replication_slot", SLOT_NAME_BASE) + .put("publication", PUBLICATION) + .put("initial_waiting_seconds", INITIAL_WAITING_SECONDS) + .build()); + config = Jsons.jsonNode(ImmutableMap.builder() + .put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(container)) + .put(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(container)) + .put(JdbcUtils.DATABASE_KEY, container.getDatabaseName()) + .put(JdbcUtils.SCHEMAS_KEY, List.of(SCHEMA_NAME)) + .put(JdbcUtils.USERNAME_KEY, container.getUsername()) + .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) + .put("replication_method", replicationMethod) + .put("is_test", true) + .put(JdbcUtils.SSL_KEY, false) + .build()); + + dslContext = DSLContextFactory.create( + config.get(JdbcUtils.USERNAME_KEY).asText(), + config.get(JdbcUtils.PASSWORD_KEY).asText(), + DatabaseDriver.POSTGRESQL.getDriverClassName(), + String.format(DatabaseDriver.POSTGRESQL.getUrlFormatString(), + container.getHost(), + container.getFirstMappedPort(), + config.get(JdbcUtils.DATABASE_KEY).asText()), + SQLDialect.POSTGRES); + final Database database = new Database(dslContext); + + database.query(ctx -> { + ctx.execute( + "SELECT pg_create_logical_replication_slot('" + SLOT_NAME_BASE + "', 'pgoutput');"); + ctx.execute("CREATE PUBLICATION " + PUBLICATION + " FOR ALL TABLES;"); + ctx.execute("CREATE EXTENSION hstore;"); + return null; + }); + + database.query(ctx -> ctx.fetch("CREATE SCHEMA TEST;")); + database.query(ctx -> ctx.fetch("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy');")); + database.query(ctx -> ctx.fetch("CREATE TYPE inventory_item AS (\n" + + " name text,\n" + + " supplier_id integer,\n" + + " price numeric\n" + + ");")); + + database.query(ctx -> ctx.fetch("SET TIMEZONE TO 'MST'")); + return database; + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + dslContext.close(); + container.close(); + } + + public boolean testCatalog() { + return true; + } + + @Override + protected void addTimeWithTimeZoneTest() { + // time with time zone + for (final String fullSourceType : Set.of("timetz", "time with time zone")) { + addDataTypeTestData( + TestDataHolder.builder() + .sourceType("timetz") + .fullSourceDataType(fullSourceType) + .airbyteType(JsonSchemaType.STRING_TIME_WITH_TIMEZONE) + .addInsertValues("null", "'13:00:01'", "'13:00:00+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05.012345Z+8'", "'13:00:06.00000Z-8'") + // A time value without time zone will use the time zone set on the database, which is Z-7, + // so 13:00:01 is returned as 13:00:01-07. + .addExpectedValues(null, "20:00:01.000000Z", "05:00:00.000000Z", "21:00:03.000000Z", "13:00:04.000000Z", "21:00:05.012345Z", + "05:00:06.000000Z") + .build()); + } + } +}