From 52f7942fefbe01408532441f8fd751efc23fea09 Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 25 May 2026 18:12:46 +0800 Subject: [PATCH 1/5] [fix](streaming-job) handle pre-1970 sub-millisecond timestamps in cdc-client deserializer --- .../deserialize/DebeziumJsonDeserializer.java | 26 +++++-- .../DebeziumJsonDeserializerTest.java | 76 +++++++++++++++++++ 2 files changed, 94 insertions(+), 8 deletions(-) create mode 100644 fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java index 7876597660e0b8..b2f2e26bfa7218 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java @@ -307,15 +307,25 @@ private Object convertTimestamp(String typeName, Object dbzObj) { case Timestamp.SCHEMA_NAME: return TimestampData.fromEpochMillis((Long) dbzObj).toTimestamp().toString(); case MicroTimestamp.SCHEMA_NAME: - long micro = (long) dbzObj; - return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000)) - .toTimestamp() - .toString(); + { + // floorDiv/floorMod keep nanoOfMillisecond non-negative for pre-1970 + // values. + long micro = (long) dbzObj; + long millis = Math.floorDiv(micro, 1000L); + int nanos = (int) Math.floorMod(micro, 1000L) * 1000; + return TimestampData.fromEpochMillis(millis, nanos) + .toTimestamp() + .toString(); + } case NanoTimestamp.SCHEMA_NAME: - long nano = (long) dbzObj; - return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000)) - .toTimestamp() - .toString(); + { + long nano = (long) dbzObj; + long millis = Math.floorDiv(nano, 1_000_000L); + int nanos = (int) Math.floorMod(nano, 1_000_000L); + return TimestampData.fromEpochMillis(millis, nanos) + .toTimestamp() + .toString(); + } } } LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java new file mode 100644 index 00000000000000..ac4d7b58226493 --- /dev/null +++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializerTest.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.source.deserialize; + +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Method; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.debezium.time.MicroTimestamp; +import io.debezium.time.NanoTimestamp; + +/** Unit tests for {@link DebeziumJsonDeserializer}. */ +class DebeziumJsonDeserializerTest { + + private final DebeziumJsonDeserializer deserializer = new DebeziumJsonDeserializer(); + + // ─── convertTimestamp ───────────────────────────────────────────────────── + + @Test + void microTimestamp_negativeSubMillisecond_doesNotThrow() { + // micros = -877 ⇒ 1969-12-31 23:59:59.999123. Signed `/` `%` produced a + // negative nanoOfMillisecond and tripped TimestampData's >= 0 check. + Object out = invokeConvertTimestamp(MicroTimestamp.SCHEMA_NAME, -877L); + assertEquals("1969-12-31 23:59:59.999123", out.toString()); + } + + @Test + void microTimestamp_positive_unchanged() { + Object out = invokeConvertTimestamp(MicroTimestamp.SCHEMA_NAME, 1_234_567L); + assertEquals("1970-01-01 00:00:01.234567", out.toString()); + } + + @Test + void microTimestamp_negativeIntegerMillis_unchanged() { + // micros = -1000 ⇒ 1969-12-31 23:59:59.999, negative but no sub-millisecond + // (the old code happened to produce the right result here; protect that path). + Object out = invokeConvertTimestamp(MicroTimestamp.SCHEMA_NAME, -1000L); + assertEquals("1969-12-31 23:59:59.999", out.toString()); + } + + @Test + void nanoTimestamp_negativeSubMillisecond_doesNotThrow() { + // nanos = -877_000 ⇒ 1969-12-31 23:59:59.999123. + Object out = invokeConvertTimestamp(NanoTimestamp.SCHEMA_NAME, -877_000L); + assertEquals("1969-12-31 23:59:59.999123", out.toString()); + } + + private Object invokeConvertTimestamp(String typeName, Object dbzObj) { + try { + Method m = + DebeziumJsonDeserializer.class.getDeclaredMethod( + "convertTimestamp", String.class, Object.class); + m.setAccessible(true); + return m.invoke(deserializer, typeName, dbzObj); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } +} From f464e62a538469e07eb358e3bed4c525d8e59e5e Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 25 May 2026 18:12:55 +0800 Subject: [PATCH 2/5] [fix](streaming-job) align pg snapshot timestamp/date values with binlog path --- .../connection/PostgresConnection.java | 905 ++++++++++++++++++ .../source/fetch/PostgresScanFetchTask.java | 391 ++++++++ ...tgres_job_snapshot_historical_dates.groovy | 225 +++++ 3 files changed, 1521 insertions(+) create mode 100644 fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java create mode 100644 fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy diff --git a/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java new file mode 100644 index 00000000000000..2f6ca5756dde4b --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java @@ -0,0 +1,905 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.postgresql.connection; + +import com.zaxxer.hikari.pool.HikariProxyConnection; +import io.debezium.DebeziumException; +import io.debezium.annotation.VisibleForTesting; +import io.debezium.config.Configuration; +import io.debezium.connector.postgresql.PgOid; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresSchema; +import io.debezium.connector.postgresql.PostgresType; +import io.debezium.connector.postgresql.PostgresValueConverter; +import io.debezium.connector.postgresql.TypeRegistry; +import io.debezium.connector.postgresql.spi.SlotState; +import io.debezium.data.SpecialValueDecimal; +import io.debezium.jdbc.JdbcConfiguration; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; +import io.debezium.relational.ColumnEditor; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import io.debezium.relational.Tables; +import io.debezium.schema.DatabaseSchema; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; +import org.apache.kafka.connect.errors.ConnectException; +import org.postgresql.core.BaseConnection; +import org.postgresql.jdbc.PgConnection; +import org.postgresql.jdbc.TimestampUtils; +import org.postgresql.replication.LogSequenceNumber; +import org.postgresql.util.PGmoney; +import org.postgresql.util.PSQLState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + +/** + * Copied from Flink Cdc 3.6.0 + * + *

Line 820~854: modified getColumnValue method to fix FLINK-39748. + */ +public class PostgresConnection extends JdbcConnection { + + public static final String CONNECTION_STREAMING = "Debezium Streaming"; + public static final String CONNECTION_SLOT_INFO = "Debezium Slot Info"; + public static final String CONNECTION_DROP_SLOT = "Debezium Drop Slot"; + public static final String CONNECTION_VALIDATE_CONNECTION = "Debezium Validate Connection"; + public static final String CONNECTION_HEARTBEAT = "Debezium Heartbeat"; + public static final String CONNECTION_GENERAL = "Debezium General"; + + private static final Pattern FUNCTION_DEFAULT_PATTERN = + Pattern.compile("^[(]?[A-Za-z0-9_.]+\\((?:.+(?:, ?.+)*)?\\)"); + private static final Pattern EXPRESSION_DEFAULT_PATTERN = + Pattern.compile("\\(+(?:.+(?:[+ - * / < > = ~ ! @ # % ^ & | ` ?] ?.+)+)+\\)"); + private static Logger LOGGER = LoggerFactory.getLogger(PostgresConnection.class); + + private static final String URL_PATTERN = + "jdbc:postgresql://${" + + JdbcConfiguration.HOSTNAME + + "}:${" + + JdbcConfiguration.PORT + + "}/${" + + JdbcConfiguration.DATABASE + + "}"; + protected static final ConnectionFactory FACTORY = + JdbcConnection.patternBasedFactory( + URL_PATTERN, + org.postgresql.Driver.class.getName(), + PostgresConnection.class.getClassLoader(), + JdbcConfiguration.PORT.withDefault( + PostgresConnectorConfig.PORT.defaultValueAsString())); + + /** + * Obtaining a replication slot may fail if there's a pending transaction. We're retrying to get + * a slot for 30 min. + */ + private static final int MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT = 900; + + private static final Duration PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS = + Duration.ofSeconds(2); + + private final TypeRegistry typeRegistry; + private final PostgresDefaultValueConverter defaultValueConverter; + + /** + * Creates a Postgres connection using the supplied configuration. If necessary this connection + * is able to resolve data type mappings. Such a connection requires a {@link + * PostgresValueConverter}, and will provide its own {@link TypeRegistry}. Usually only one such + * connection per connector is needed. + * + * @param config {@link Configuration} instance, may not be null. + * @param valueConverterBuilder supplies a configured {@link PostgresValueConverter} for a given + * {@link TypeRegistry} + * @param connectionUsage a symbolic name of the connection to be tracked in monitoring tools + */ + public PostgresConnection( + JdbcConfiguration config, + PostgresValueConverterBuilder valueConverterBuilder, + String connectionUsage) { + this(config, valueConverterBuilder, connectionUsage, FACTORY); + } + + /** + * Creates a Postgres connection using the supplied configuration. If necessary this connection + * is able to resolve data type mappings. Such a connection requires a {@link + * PostgresValueConverter}, and will provide its own {@link TypeRegistry}. Usually only one such + * connection per connector is needed. + * + * @param config {@link Configuration} instance, may not be null. + * @param valueConverterBuilder supplies a configured {@link PostgresValueConverter} for a given + * {@link TypeRegistry} + * @param connectionUsage a symbolic name of the connection to be tracked in monitoring tools + */ + public PostgresConnection( + JdbcConfiguration config, + PostgresValueConverterBuilder valueConverterBuilder, + String connectionUsage, + ConnectionFactory factory) { + super( + addDefaultSettings(config, connectionUsage), + factory, + PostgresConnection::validateServerVersion, + null, + "\"", + "\""); + + if (Objects.isNull(valueConverterBuilder)) { + this.typeRegistry = null; + this.defaultValueConverter = null; + } else { + this.typeRegistry = new TypeRegistry(this); + + final PostgresValueConverter valueConverter = + valueConverterBuilder.build(this.typeRegistry); + this.defaultValueConverter = + new PostgresDefaultValueConverter(valueConverter, this.getTimestampUtils()); + } + } + + /** + * Create a Postgres connection using the supplied configuration and {@link TypeRegistry} + * + * @param config {@link Configuration} instance, may not be null. + * @param typeRegistry an existing/already-primed {@link TypeRegistry} instance + * @param connectionUsage a symbolic name of the connection to be tracked in monitoring tools + */ + public PostgresConnection( + PostgresConnectorConfig config, TypeRegistry typeRegistry, String connectionUsage) { + super( + addDefaultSettings(config.getJdbcConfig(), connectionUsage), + FACTORY, + PostgresConnection::validateServerVersion, + null, + "\"", + "\""); + if (Objects.isNull(typeRegistry)) { + this.typeRegistry = null; + this.defaultValueConverter = null; + } else { + this.typeRegistry = typeRegistry; + final PostgresValueConverter valueConverter = + PostgresValueConverter.of(config, this.getDatabaseCharset(), typeRegistry); + this.defaultValueConverter = + new PostgresDefaultValueConverter(valueConverter, this.getTimestampUtils()); + } + } + + /** + * Creates a Postgres connection using the supplied configuration. The connector is the regular + * one without datatype resolution capabilities. + * + * @param config {@link Configuration} instance, may not be null. + * @param connectionUsage a symbolic name of the connection to be tracked in monitoring tools + */ + public PostgresConnection(JdbcConfiguration config, String connectionUsage) { + this(config, null, connectionUsage); + } + + /** Return an unwrapped PgConnection instead of HikariProxyConnection */ + @Override + public synchronized Connection connection() throws SQLException { + Connection conn = connection(true); + if (conn instanceof HikariProxyConnection) { + // assuming HikariCP use org.postgresql.jdbc.PgConnection + return conn.unwrap(PgConnection.class); + } + return conn; + } + + static JdbcConfiguration addDefaultSettings( + JdbcConfiguration configuration, String connectionUsage) { + // we require Postgres 9.4 as the minimum server version since that's where logical + // replication was first introduced + return JdbcConfiguration.adapt( + configuration + .edit() + .with("assumeMinServerVersion", "9.4") + .with("ApplicationName", connectionUsage) + .build()); + } + + /** + * Returns a JDBC connection string for the current configuration. + * + * @return a {@code String} where the variables in {@code urlPattern} are replaced with values + * from the configuration + */ + public String connectionString() { + return connectionString(URL_PATTERN); + } + + /** + * Prints out information about the REPLICA IDENTITY status of a table. This in turn determines + * how much information is available for UPDATE and DELETE operations for logical replication. + * + * @param tableId the identifier of the table + * @return the replica identity information; never null + * @throws SQLException if there is a problem obtaining the replica identity information for the + * given table + */ + public ServerInfo.ReplicaIdentity readReplicaIdentityInfo(TableId tableId) throws SQLException { + String statement = + "SELECT relreplident FROM pg_catalog.pg_class c " + + "LEFT JOIN pg_catalog.pg_namespace n ON c.relnamespace=n.oid " + + "WHERE n.nspname=? and c.relname=?"; + String schema = + tableId.schema() != null && tableId.schema().length() > 0 + ? tableId.schema() + : "public"; + StringBuilder replIdentity = new StringBuilder(); + prepareQuery( + statement, + stmt -> { + stmt.setString(1, schema); + stmt.setString(2, tableId.table()); + }, + rs -> { + if (rs.next()) { + replIdentity.append(rs.getString(1)); + } else { + LOGGER.warn( + "Cannot determine REPLICA IDENTITY information for table '{}'", + tableId); + } + }); + return ServerInfo.ReplicaIdentity.parseFromDB(replIdentity.toString()); + } + + /** + * Returns the current state of the replication slot + * + * @param slotName the name of the slot + * @param pluginName the name of the plugin used for the desired slot + * @return the {@link SlotState} or null, if no slot state is found + * @throws SQLException + */ + public SlotState getReplicationSlotState(String slotName, String pluginName) + throws SQLException { + ServerInfo.ReplicationSlot slot; + try { + slot = readReplicationSlotInfo(slotName, pluginName); + if (slot.equals(ServerInfo.ReplicationSlot.INVALID)) { + return null; + } else { + return slot.asSlotState(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ConnectException( + "Interrupted while waiting for valid replication slot info", e); + } + } + + /** + * Fetches the state of a replication stage given a slot name and plugin name + * + * @param slotName the name of the slot + * @param pluginName the name of the plugin used for the desired slot + * @return the {@link ServerInfo.ReplicationSlot} object or a {@link + * ServerInfo.ReplicationSlot#INVALID} if the slot is not valid + * @throws SQLException is thrown by the underlying JDBC + */ + private ServerInfo.ReplicationSlot fetchReplicationSlotInfo(String slotName, String pluginName) + throws SQLException { + final String database = database(); + final ServerInfo.ReplicationSlot slot = + queryForSlot( + slotName, + database, + pluginName, + rs -> { + if (rs.next()) { + boolean active = rs.getBoolean("active"); + final Lsn confirmedFlushedLsn = + parseConfirmedFlushLsn(slotName, pluginName, database, rs); + if (confirmedFlushedLsn == null) { + return null; + } + Lsn restartLsn = + parseRestartLsn(slotName, pluginName, database, rs); + if (restartLsn == null) { + return null; + } + final Long xmin = rs.getLong("catalog_xmin"); + return new ServerInfo.ReplicationSlot( + active, confirmedFlushedLsn, restartLsn, xmin); + } else { + LOGGER.debug( + "No replication slot '{}' is present for plugin '{}' and database '{}'", + slotName, + pluginName, + database); + return ServerInfo.ReplicationSlot.INVALID; + } + }); + return slot; + } + + /** + * Fetches a replication slot, repeating the query until either the slot is created or until the + * max number of attempts has been reached + * + *

To fetch the slot without the retries, use the {@link + * PostgresConnection#fetchReplicationSlotInfo} call + * + * @param slotName the slot name + * @param pluginName the name of the plugin + * @return the {@link ServerInfo.ReplicationSlot} object or a {@link + * ServerInfo.ReplicationSlot#INVALID} if the slot is not valid + * @throws SQLException is thrown by the underyling jdbc driver + * @throws InterruptedException is thrown if we don't return an answer within the set number of + * retries + */ + @VisibleForTesting + ServerInfo.ReplicationSlot readReplicationSlotInfo(String slotName, String pluginName) + throws SQLException, InterruptedException { + final String database = database(); + final Metronome metronome = + Metronome.parker(PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS, Clock.SYSTEM); + + for (int attempt = 1; attempt <= MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT; attempt++) { + final ServerInfo.ReplicationSlot slot = fetchReplicationSlotInfo(slotName, pluginName); + if (slot != null) { + LOGGER.info("Obtained valid replication slot {}", slot); + return slot; + } + LOGGER.warn( + "Cannot obtain valid replication slot '{}' for plugin '{}' and database '{}' [during attempt {} out of {}, concurrent tx probably blocks taking snapshot.", + slotName, + pluginName, + database, + attempt, + MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT); + metronome.pause(); + } + + throw new ConnectException( + "Unable to obtain valid replication slot. " + + "Make sure there are no long-running transactions running in parallel as they may hinder the allocation of the replication slot when starting this connector"); + } + + protected ServerInfo.ReplicationSlot queryForSlot( + String slotName, + String database, + String pluginName, + ResultSetMapper map) + throws SQLException { + return prepareQueryAndMap( + "select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", + statement -> { + statement.setString(1, slotName); + statement.setString(2, database); + statement.setString(3, pluginName); + }, + map); + } + + /** + * Obtains the LSN to resume streaming from. On PG 9.5 there is no confirmed_flushed_lsn yet, so + * restart_lsn will be read instead. This may result in more records to be re-read after a + * restart. + */ + private Lsn parseConfirmedFlushLsn( + String slotName, String pluginName, String database, ResultSet rs) { + Lsn confirmedFlushedLsn = null; + + try { + confirmedFlushedLsn = + tryParseLsn(slotName, pluginName, database, rs, "confirmed_flush_lsn"); + } catch (SQLException e) { + LOGGER.info("unable to find confirmed_flushed_lsn, falling back to restart_lsn"); + try { + confirmedFlushedLsn = + tryParseLsn(slotName, pluginName, database, rs, "restart_lsn"); + } catch (SQLException e2) { + throw new ConnectException( + "Neither confirmed_flush_lsn nor restart_lsn could be found"); + } + } + + return confirmedFlushedLsn; + } + + private Lsn parseRestartLsn(String slotName, String pluginName, String database, ResultSet rs) { + Lsn restartLsn = null; + try { + restartLsn = tryParseLsn(slotName, pluginName, database, rs, "restart_lsn"); + } catch (SQLException e) { + throw new ConnectException("restart_lsn could be found"); + } + + return restartLsn; + } + + private Lsn tryParseLsn( + String slotName, String pluginName, String database, ResultSet rs, String column) + throws ConnectException, SQLException { + Lsn lsn = null; + + String lsnStr = rs.getString(column); + if (lsnStr == null) { + return null; + } + try { + lsn = Lsn.valueOf(lsnStr); + } catch (Exception e) { + throw new ConnectException( + "Value " + + column + + " in the pg_replication_slots table for slot = '" + + slotName + + "', plugin = '" + + pluginName + + "', database = '" + + database + + "' is not valid. This is an abnormal situation and the database status should be checked."); + } + if (!lsn.isValid()) { + throw new ConnectException("Invalid LSN returned from database"); + } + return lsn; + } + + /** + * Drops a replication slot that was created on the DB + * + * @param slotName the name of the replication slot, may not be null + * @return {@code true} if the slot was dropped, {@code false} otherwise + */ + public boolean dropReplicationSlot(String slotName) { + final int ATTEMPTS = 3; + for (int i = 0; i < ATTEMPTS; i++) { + try { + execute("select pg_drop_replication_slot('" + slotName + "')"); + return true; + } catch (SQLException e) { + // slot is active + if (PSQLState.OBJECT_IN_USE.getState().equals(e.getSQLState())) { + if (i < ATTEMPTS - 1) { + LOGGER.debug( + "Cannot drop replication slot '{}' because it's still in use", + slotName); + } else { + LOGGER.warn( + "Cannot drop replication slot '{}' because it's still in use", + slotName); + return false; + } + } else if (PSQLState.UNDEFINED_OBJECT.getState().equals(e.getSQLState())) { + LOGGER.debug("Replication slot {} has already been dropped", slotName); + return false; + } else { + LOGGER.error("Unexpected error while attempting to drop replication slot", e); + return false; + } + } + try { + Metronome.parker(Duration.ofSeconds(1), Clock.system()).pause(); + } catch (InterruptedException e) { + } + } + return false; + } + + /** + * Drops the debezium publication that was created. + * + * @param publicationName the publication name, may not be null + * @return {@code true} if the publication was dropped, {@code false} otherwise + */ + public boolean dropPublication(String publicationName) { + try { + LOGGER.debug("Dropping publication '{}'", publicationName); + execute("DROP PUBLICATION " + publicationName); + return true; + } catch (SQLException e) { + if (PSQLState.UNDEFINED_OBJECT.getState().equals(e.getSQLState())) { + LOGGER.debug("Publication {} has already been dropped", publicationName); + } else { + LOGGER.error("Unexpected error while attempting to drop publication", e); + } + return false; + } + } + + @Override + public synchronized void close() { + try { + super.close(); + } catch (SQLException e) { + LOGGER.error("Unexpected error while closing Postgres connection", e); + } + } + + /** + * Returns the PG id of the current active transaction + * + * @return a PG transaction identifier, or null if no tx is active + * @throws SQLException if anything fails. + */ + public Long currentTransactionId() throws SQLException { + AtomicLong txId = new AtomicLong(0); + query( + "select (case pg_is_in_recovery() when 't' then 0 else txid_current() end) AS pg_current_txid", + rs -> { + if (rs.next()) { + txId.compareAndSet(0, rs.getLong(1)); + } + }); + long value = txId.get(); + return value > 0 ? value : null; + } + + /** + * Returns the current position in the server tx log. + * + * @return a long value, never negative + * @throws SQLException if anything unexpected fails. + */ + public long currentXLogLocation() throws SQLException { + AtomicLong result = new AtomicLong(0); + int majorVersion = connection().getMetaData().getDatabaseMajorVersion(); + query( + majorVersion >= 10 + ? "select (case pg_is_in_recovery() when 't' then pg_last_wal_receive_lsn() else pg_current_wal_lsn() end) AS pg_current_wal_lsn" + : "select * from pg_current_xlog_location()", + rs -> { + if (!rs.next()) { + throw new IllegalStateException( + "there should always be a valid xlog position"); + } + result.compareAndSet(0, LogSequenceNumber.valueOf(rs.getString(1)).asLong()); + }); + return result.get(); + } + + /** + * Returns information about the PG server to which this instance is connected. + * + * @return a {@link ServerInfo} instance, never {@code null} + * @throws SQLException if anything fails + */ + public ServerInfo serverInfo() throws SQLException { + ServerInfo serverInfo = new ServerInfo(); + query( + "SELECT version(), current_user, current_database()", + rs -> { + if (rs.next()) { + serverInfo + .withServer(rs.getString(1)) + .withUsername(rs.getString(2)) + .withDatabase(rs.getString(3)); + } + }); + String username = serverInfo.username(); + if (username != null) { + query( + "SELECT oid, rolname, rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin, rolreplication FROM pg_roles " + + "WHERE pg_has_role('" + + username + + "', oid, 'member')", + rs -> { + while (rs.next()) { + String roleInfo = + "superuser: " + + rs.getBoolean(3) + + ", replication: " + + rs.getBoolean(8) + + ", inherit: " + + rs.getBoolean(4) + + ", create role: " + + rs.getBoolean(5) + + ", create db: " + + rs.getBoolean(6) + + ", can log in: " + + rs.getBoolean(7); + String roleName = rs.getString(2); + serverInfo.addRole(roleName, roleInfo); + } + }); + } + return serverInfo; + } + + public Charset getDatabaseCharset() { + try { + return Charset.forName(((BaseConnection) connection()).getEncoding().name()); + } catch (SQLException e) { + throw new DebeziumException("Couldn't obtain encoding for database " + database(), e); + } + } + + public TimestampUtils getTimestampUtils() { + try { + return ((PgConnection) this.connection()).getTimestampUtils(); + } catch (SQLException e) { + throw new DebeziumException( + "Couldn't get timestamp utils from underlying connection", e); + } + } + + private static void validateServerVersion(Statement statement) throws SQLException { + DatabaseMetaData metaData = statement.getConnection().getMetaData(); + int majorVersion = metaData.getDatabaseMajorVersion(); + int minorVersion = metaData.getDatabaseMinorVersion(); + if (majorVersion < 9 || (majorVersion == 9 && minorVersion < 4)) { + throw new SQLException("Cannot connect to a version of Postgres lower than 9.4"); + } + } + + @Override + public String quotedColumnIdString(String columnName) { + if (columnName.contains("\"")) { + columnName = columnName.replaceAll("\"", "\"\""); + } + + return super.quotedColumnIdString(columnName); + } + + @Override + protected int resolveNativeType(String typeName) { + return getTypeRegistry().get(typeName).getRootType().getOid(); + } + + @Override + protected int resolveJdbcType(int metadataJdbcType, int nativeType) { + // Special care needs to be taken for columns that use user-defined domain type data types + // where resolution of the column's JDBC type needs to be that of the root type instead of + // the actual column to properly influence schema building and value conversion. + return getTypeRegistry().get(nativeType).getRootType().getJdbcId(); + } + + @Override + protected Optional readTableColumn( + ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnFilter) + throws SQLException { + return doReadTableColumn(columnMetadata, tableId, columnFilter); + } + + public Optional readColumnForDecoder( + ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnNameFilter) + throws SQLException { + return doReadTableColumn(columnMetadata, tableId, columnNameFilter) + .map(ColumnEditor::create); + } + + private Optional doReadTableColumn( + ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnFilter) + throws SQLException { + // FLINK-38965: Filter out columns from other tables that might be returned due to + // PostgreSQL LIKE wildcard matching. The underscore '_' matches any single character, + // and '%' matches any sequence of characters. For example: + // - When querying 'user_sink', the pattern may also match 'userbsink' (due to '_') + // - When querying 'user%data' (where % is literal), it may match 'user_test_data' (due to + // '%') + final String resultTableName = columnMetadata.getString(3); + if (!tableId.table().equals(resultTableName)) { + return Optional.empty(); + } + + final String columnName = columnMetadata.getString(4); + if (columnFilter == null + || columnFilter.matches( + tableId.catalog(), tableId.schema(), tableId.table(), columnName)) { + final ColumnEditor column = Column.editor().name(columnName); + column.type(columnMetadata.getString(6)); + + // first source the length/scale from the column metadata provided by the driver + // this may be overridden below if the column type is a user-defined domain type + column.length(columnMetadata.getInt(7)); + if (columnMetadata.getObject(9) != null) { + column.scale(columnMetadata.getInt(9)); + } + + column.optional(isNullable(columnMetadata.getInt(11))); + column.position(columnMetadata.getInt(17)); + column.autoIncremented("YES".equalsIgnoreCase(columnMetadata.getString(23))); + + String autogenerated = null; + try { + autogenerated = columnMetadata.getString(24); + } catch (SQLException e) { + // ignore, some drivers don't have this index - e.g. Postgres + } + column.generated("YES".equalsIgnoreCase(autogenerated)); + + // Lookup the column type from the TypeRegistry + // For all types, we need to set the Native and Jdbc types by using the root-type + final PostgresType nativeType = getTypeRegistry().get(column.typeName()); + column.nativeType(nativeType.getRootType().getOid()); + column.jdbcType(nativeType.getRootType().getJdbcId()); + + // For domain types, the postgres driver is unable to traverse a nested unbounded + // hierarchy of types and report the right length/scale of a given type. We use + // the TypeRegistry to accomplish this since it is capable of traversing the type + // hierarchy upward to resolve length/scale regardless of hierarchy depth. + if (TypeRegistry.DOMAIN_TYPE == nativeType.getJdbcId()) { + column.length(nativeType.getDefaultLength()); + column.scale(nativeType.getDefaultScale()); + } + + final String defaultValueExpression = columnMetadata.getString(13); + if (defaultValueExpression != null + && getDefaultValueConverter().supportConversion(column.typeName())) { + column.defaultValueExpression(defaultValueExpression); + } + + return Optional.of(column); + } + + return Optional.empty(); + } + + public PostgresDefaultValueConverter getDefaultValueConverter() { + Objects.requireNonNull( + defaultValueConverter, "Connection does not provide default value converter"); + return defaultValueConverter; + } + + public TypeRegistry getTypeRegistry() { + Objects.requireNonNull(typeRegistry, "Connection does not provide type registry"); + return typeRegistry; + } + + @Override + public > Object getColumnValue( + ResultSet rs, int columnIndex, Column column, Table table, T schema) + throws SQLException { + try { + final ResultSetMetaData metaData = rs.getMetaData(); + final String columnTypeName = metaData.getColumnTypeName(columnIndex); + final PostgresType type = + ((PostgresSchema) schema).getTypeRegistry().get(columnTypeName); + + LOGGER.trace("Type of incoming data is: {}", type.getOid()); + LOGGER.trace("ColumnTypeName is: {}", columnTypeName); + LOGGER.trace("Type is: {}", type); + + if (type.isArrayType()) { + return rs.getArray(columnIndex); + } + + switch (type.getOid()) { + case PgOid.MONEY: + // TODO author=Horia Chiorean date=14/11/2016 description=workaround for + // https://github.com/pgjdbc/pgjdbc/issues/100 + final String sMoney = rs.getString(columnIndex); + if (sMoney == null) { + return sMoney; + } + if (sMoney.startsWith("-")) { + // PGmoney expects negative values to be provided in the format of + // "($XXXXX.YY)" + final String negativeMoney = "(" + sMoney.substring(1) + ")"; + return new PGmoney(negativeMoney).val; + } + return new PGmoney(sMoney).val; + case PgOid.BIT: + return rs.getString(columnIndex); + case PgOid.NUMERIC: + final String s = rs.getString(columnIndex); + if (s == null) { + return s; + } + + Optional value = PostgresValueConverter.toSpecialValue(s); + return value.isPresent() + ? value.get() + : new SpecialValueDecimal(rs.getBigDecimal(columnIndex)); + case PgOid.TIME: + // To handle time 24:00:00 supported by TIME columns, read the column as a + // string. + case PgOid.TIMETZ: + // In order to guarantee that we resolve TIMETZ columns with proper microsecond + // precision, + // read the column as a string instead and then re-parse inside the converter. + return rs.getString(columnIndex); + case PgOid.TIMESTAMP: + { + // LocalDateTime bypasses GregorianCalendar's Julian/Gregorian cutover + // (1582-10-15), which shifts pre-cutover values by N days + // (e.g. 0001-01-01 by 2 days). + LocalDateTime ldt = rs.getObject(columnIndex, LocalDateTime.class); + if (ldt == null) { + return null; + } + // PG +/-infinity surfaces as Timestamp(Long.MAX/MIN_VALUE) via the legacy + // rs.getObject() path; preserve that contract for downstream converters. + if (ldt == LocalDateTime.MAX) { + return new Timestamp(Long.MAX_VALUE); + } + if (ldt == LocalDateTime.MIN) { + return new Timestamp(Long.MIN_VALUE); + } + return ldt; + } + case PgOid.TIMESTAMPTZ: + { + OffsetDateTime odt = rs.getObject(columnIndex, OffsetDateTime.class); + if (odt == null) { + return null; + } + if (odt == OffsetDateTime.MAX) { + return new Timestamp(Long.MAX_VALUE); + } + if (odt == OffsetDateTime.MIN) { + return new Timestamp(Long.MIN_VALUE); + } + return odt; + } + case PgOid.DATE: + return rs.getObject(columnIndex, LocalDate.class); + default: + Object x = rs.getObject(columnIndex); + if (x != null) { + LOGGER.trace( + "rs getobject returns class: {}; rs getObject value is: {}", + x.getClass(), + x); + } + return x; + } + } catch (SQLException e) { + // not a known type + return super.getColumnValue(rs, columnIndex, column, table, schema); + } + } + + @Override + protected String[] supportedTableTypes() { + return new String[] {"VIEW", "MATERIALIZED VIEW", "TABLE", "PARTITIONED TABLE"}; + } + + @Override + protected boolean isTableType(String tableType) { + return "TABLE".equals(tableType) || "PARTITIONED TABLE".equals(tableType); + } + + @Override + protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) { + if (columnName != null) { + return !FUNCTION_DEFAULT_PATTERN.matcher(columnName).matches() + && !EXPRESSION_DEFAULT_PATTERN.matcher(columnName).matches(); + } + return false; + } + + /** + * Retrieves all {@code TableId}s in a given database catalog, including partitioned tables. + * + * @param catalogName the catalog/database name + * @return set of all table ids for existing table objects + * @throws SQLException if a database exception occurred + */ + public Set getAllTableIds(String catalogName) throws SQLException { + return readTableNames(catalogName, null, null, new String[] {"TABLE", "PARTITIONED TABLE"}); + } + + @FunctionalInterface + public interface PostgresValueConverterBuilder { + PostgresValueConverter build(TypeRegistry registry); + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java b/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java new file mode 100644 index 00000000000000..6c26cf4e74ac33 --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTask.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.source.fetch; + +import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; +import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; +import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask; +import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask; +import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; +import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetUtils; +import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils; +import org.apache.flink.util.FlinkRuntimeException; + +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresEventDispatcher; +import io.debezium.connector.postgresql.PostgresOffsetContext; +import io.debezium.connector.postgresql.PostgresPartition; +import io.debezium.connector.postgresql.PostgresSchema; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; +import io.debezium.connector.postgresql.connection.ReplicationConnection; +import io.debezium.connector.postgresql.spi.SlotState; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; +import io.debezium.pipeline.source.spi.SnapshotProgressListener; +import io.debezium.pipeline.spi.SnapshotResult; +import io.debezium.relational.Column; +import io.debezium.relational.RelationalSnapshotChangeEventSource; +import io.debezium.relational.SnapshotChangeRecordEmitter; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import io.debezium.util.Clock; +import io.debezium.util.ColumnUtils; +import io.debezium.util.Strings; +import io.debezium.util.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static io.debezium.connector.postgresql.PostgresObjectUtils.waitForReplicationSlotReady; +import static io.debezium.connector.postgresql.Utils.refreshSchema; + +/** + * Copied from Flink Cdc 3.6.0 + * + *

Line 333~336: modified createDataEventsForTable to fix FLINK-39748. + */ +public class PostgresScanFetchTask extends AbstractScanFetchTask { + + private static final Logger LOG = LoggerFactory.getLogger(PostgresScanFetchTask.class); + + public PostgresScanFetchTask(SnapshotSplit split) { + super(split); + } + + @Override + public void execute(Context context) throws Exception { + + PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext) context; + PostgresSourceConfig sourceConfig = (PostgresSourceConfig) context.getSourceConfig(); + try { + // create slot here, because a slot can only read wal-log after its own creation. + // if skip backfill, no need to create slot here + maybeCreateSlotForBackFillReadTask( + ctx.getConnection(), + ctx.getReplicationConnection(), + sourceConfig.getSlotNameForBackfillTask(), + ctx.getPluginName(), + sourceConfig.isSkipSnapshotBackfill()); + super.execute(context); + } finally { + // remove slot after snapshot slit finish + maybeDropSlotForBackFillReadTask( + (PostgresReplicationConnection) ctx.getReplicationConnection(), + sourceConfig.isSkipSnapshotBackfill()); + } + } + + @Override + protected void executeDataSnapshot(Context context) throws Exception { + PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext) context; + + PostgresSnapshotSplitReadTask snapshotSplitReadTask = + new PostgresSnapshotSplitReadTask( + ctx.getConnection(), + ctx.getDbzConnectorConfig(), + ctx.getDatabaseSchema(), + ctx.getOffsetContext(), + ctx.getEventDispatcher(), + ctx.getSnapshotChangeEventSourceMetrics(), + snapshotSplit); + + StoppableChangeEventSourceContext changeEventSourceContext = + new StoppableChangeEventSourceContext(); + SnapshotResult snapshotResult = + snapshotSplitReadTask.execute( + changeEventSourceContext, ctx.getPartition(), ctx.getOffsetContext()); + + if (!snapshotResult.isCompletedOrSkipped()) { + taskRunning = false; + throw new IllegalStateException( + String.format("Read snapshot for postgres split %s fail", snapshotResult)); + } + } + + @Override + protected void executeBackfillTask(Context context, StreamSplit backfillStreamSplit) + throws Exception { + PostgresSourceFetchTaskContext ctx = (PostgresSourceFetchTaskContext) context; + + final PostgresOffsetContext.Loader loader = + new PostgresOffsetContext.Loader(ctx.getDbzConnectorConfig()); + final PostgresOffsetContext postgresOffsetContext = + PostgresOffsetUtils.getPostgresOffsetContext( + loader, backfillStreamSplit.getStartingOffset()); + + final PostgresStreamFetchTask.StreamSplitReadTask backfillReadTask = + new PostgresStreamFetchTask.StreamSplitReadTask( + ctx.getDbzConnectorConfig(), + ctx.getSnapShotter(), + ctx.getConnection(), + ctx.getEventDispatcher(), + ctx.getWaterMarkDispatcher(), + ctx.getErrorHandler(), + ctx.getTaskContext().getClock(), + ctx.getDatabaseSchema(), + ctx.getTaskContext(), + ctx.getReplicationConnection(), + backfillStreamSplit); + LOG.info( + "Execute backfillReadTask for split {} with slot name {}", + snapshotSplit, + ((PostgresSourceConfig) ctx.getSourceConfig()).getSlotNameForBackfillTask()); + backfillReadTask.execute( + new StoppableChangeEventSourceContext(), ctx.getPartition(), postgresOffsetContext); + } + + /** + * Create a slot before snapshot reading so that the slot can track the WAL log during the + * snapshot reading phase. + */ + private void maybeCreateSlotForBackFillReadTask( + PostgresConnection jdbcConnection, + ReplicationConnection replicationConnection, + String slotName, + String pluginName, + boolean skipSnapshotBackfill) { + // if skip backfill, no need to create slot here + if (skipSnapshotBackfill) { + return; + } + + try { + SlotState slotInfo = null; + try { + slotInfo = jdbcConnection.getReplicationSlotState(slotName, pluginName); + } catch (SQLException e) { + LOG.info("Unable to load info of replication slot, will try to create the slot"); + } + if (slotInfo == null) { + try { + replicationConnection.createReplicationSlot().orElse(null); + } catch (SQLException ex) { + String message = "Creation of replication slot failed"; + if (ex.getMessage().contains("already exists")) { + message += + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each."; + } + throw new FlinkRuntimeException(message, ex); + } + } + waitForReplicationSlotReady(30, jdbcConnection, slotName, pluginName); + } catch (Throwable t) { + throw new FlinkRuntimeException(t); + } + } + + /** Drop slot for backfill task and close replication connection. */ + private void maybeDropSlotForBackFillReadTask( + PostgresReplicationConnection replicationConnection, boolean skipSnapshotBackfill) { + // if skip backfill, no need to create slot here + if (skipSnapshotBackfill) { + return; + } + + try { + replicationConnection.close(true); + } catch (Throwable t) { + LOG.error("Unexpected error while dropping replication slot", t); + throw new FlinkRuntimeException(t); + } + } + + /** A SnapshotChangeEventSource implementation for Postgres to read snapshot split. */ + public static class PostgresSnapshotSplitReadTask + extends AbstractSnapshotChangeEventSource { + private static final Logger LOG = + LoggerFactory.getLogger(PostgresSnapshotSplitReadTask.class); + + private final PostgresConnection jdbcConnection; + private final PostgresConnectorConfig connectorConfig; + private final PostgresEventDispatcher eventDispatcher; + private final SnapshotSplit snapshotSplit; + private final PostgresOffsetContext offsetContext; + private final PostgresSchema databaseSchema; + private final SnapshotProgressListener snapshotProgressListener; + private final Clock clock; + + public PostgresSnapshotSplitReadTask( + PostgresConnection jdbcConnection, + PostgresConnectorConfig connectorConfig, + PostgresSchema databaseSchema, + PostgresOffsetContext previousOffset, + PostgresEventDispatcher eventDispatcher, + SnapshotProgressListener snapshotProgressListener, + SnapshotSplit snapshotSplit) { + super(connectorConfig, snapshotProgressListener); + this.jdbcConnection = jdbcConnection; + this.connectorConfig = connectorConfig; + this.snapshotProgressListener = snapshotProgressListener; + this.databaseSchema = databaseSchema; + this.eventDispatcher = eventDispatcher; + this.snapshotSplit = snapshotSplit; + this.offsetContext = previousOffset; + this.clock = Clock.SYSTEM; + } + + @Override + protected SnapshotResult doExecute( + ChangeEventSourceContext context, + PostgresOffsetContext previousOffset, + SnapshotContext snapshotContext, + SnapshottingTask snapshottingTask) + throws Exception { + final PostgresSnapshotContext ctx = (PostgresSnapshotContext) snapshotContext; + ctx.offset = offsetContext; + + refreshSchema(databaseSchema, jdbcConnection, true); + createDataEvents(ctx, snapshotSplit.getTableId()); + + return SnapshotResult.completed(ctx.offset); + } + + private void createDataEvents(PostgresSnapshotContext snapshotContext, TableId tableId) + throws InterruptedException { + EventDispatcher.SnapshotReceiver snapshotReceiver = + eventDispatcher.getSnapshotChangeEventReceiver(); + LOG.info("Snapshotting table {}", tableId); + createDataEventsForTable( + snapshotContext, + snapshotReceiver, + Objects.requireNonNull(databaseSchema.tableFor(tableId))); + snapshotReceiver.completeSnapshot(); + } + + /** Dispatches the data change events for the records of a single table. */ + private void createDataEventsForTable( + PostgresSnapshotContext snapshotContext, + EventDispatcher.SnapshotReceiver snapshotReceiver, + Table table) + throws InterruptedException { + + long exportStart = clock.currentTimeInMillis(); + LOG.info( + "Exporting data from split '{}' of table {}", + snapshotSplit.splitId(), + table.id()); + + List uuidFields = + snapshotSplit.getSplitKeyType().getFieldNames().stream() + .filter(field -> table.columnWithName(field).typeName().equals("uuid")) + .collect(Collectors.toList()); + + List columnNames = + table.columns().stream() + .map(column -> jdbcConnection.quotedColumnIdString(column.name())) + .collect(Collectors.toList()); + final String selectSql = + PostgresQueryUtils.buildSplitScanQuery( + snapshotSplit.getTableId(), + snapshotSplit.getSplitKeyType(), + snapshotSplit.getSplitStart() == null, + snapshotSplit.getSplitEnd() == null, + columnNames, + uuidFields); + LOG.debug( + "For split '{}' of table {} using select statement: '{}'", + snapshotSplit.splitId(), + table.id(), + selectSql); + + try (PreparedStatement selectStatement = + PostgresQueryUtils.readTableSplitDataStatement( + jdbcConnection, + selectSql, + snapshotSplit.getSplitStart() == null, + snapshotSplit.getSplitEnd() == null, + snapshotSplit.getSplitStart(), + snapshotSplit.getSplitEnd(), + snapshotSplit.getSplitKeyType().getFieldCount(), + connectorConfig.getSnapshotFetchSize()); + ResultSet rs = selectStatement.executeQuery()) { + + ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table); + long rows = 0; + Threads.Timer logTimer = getTableScanLogTimer(); + + while (rs.next()) { + rows++; + final Object[] row = new Object[columnArray.getGreatestColumnPosition()]; + for (int i = 0; i < columnArray.getColumns().length; i++) { + Column col = columnArray.getColumns()[i]; + row[col.position() - 1] = + jdbcConnection.getColumnValue( + rs, i + 1, col, table, databaseSchema); + } + if (logTimer.expired()) { + long stop = clock.currentTimeInMillis(); + LOG.info( + "Exported {} records for split '{}' after {}", + rows, + snapshotSplit.splitId(), + Strings.duration(stop - exportStart)); + snapshotProgressListener.rowsScanned( + snapshotContext.partition, table.id(), rows); + logTimer = getTableScanLogTimer(); + } + snapshotContext.offset.event(table.id(), clock.currentTime()); + SnapshotChangeRecordEmitter emitter = + new SnapshotChangeRecordEmitter<>( + snapshotContext.partition, snapshotContext.offset, row, clock); + eventDispatcher.dispatchSnapshotEvent( + snapshotContext.partition, table.id(), emitter, snapshotReceiver); + } + LOG.info( + "Finished exporting {} records for split '{}', total duration '{}'", + rows, + snapshotSplit.splitId(), + Strings.duration(clock.currentTimeInMillis() - exportStart)); + } catch (SQLException e) { + throw new FlinkRuntimeException( + "Snapshotting of table " + table.id() + " failed", e); + } + } + + private Threads.Timer getTableScanLogTimer() { + return Threads.timer(clock, LOG_INTERVAL); + } + + @Override + protected SnapshottingTask getSnapshottingTask( + PostgresPartition partition, PostgresOffsetContext previousOffset) { + return new SnapshottingTask(false, true); + } + + @Override + protected PostgresSnapshotContext prepare(PostgresPartition partition) throws Exception { + return new PostgresSnapshotContext(partition); + } + + private static class PostgresSnapshotContext + extends RelationalSnapshotChangeEventSource.RelationalSnapshotContext< + PostgresPartition, PostgresOffsetContext> { + + public PostgresSnapshotContext(PostgresPartition partition) throws SQLException { + super(partition, ""); + } + } + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy new file mode 100644 index 00000000000000..7d9cb844d388c5 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +/** + * Verify snapshot and binlog paths produce identical values for historical + * dates that previously drifted in the snapshot path (PG JDBC's + * GregorianCalendar + JVM-zone LMT). + * + * Phases: + * 1. snapshot batch (ids 1..N) inserted in postgres before the job starts; + * after sync, assert values in doris match the original input. + * 2. binlog INSERT batch (ids 11..10+N) with the same boundary values; + * assert each binlog row equals its snapshot counterpart cell-for-cell. + * 3. binlog UPDATE: rewrite id=1's columns to a different boundary value + * and assert the streamed change lands. + * 4. binlog DELETE: remove id=2 and assert it disappears in doris. + */ +suite("test_streaming_postgres_job_snapshot_historical_dates", + "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_snapshot_historical_dates_name" + def currentDb = (sql "select database()")[0][0] + def table1 = "streaming_pg_historical_dates" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + return + } + + String pg_port = context.config.otherConfigs.get("pg_14_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // Boundary rows. Picked to exercise: + // - Julian/Gregorian cutover (0001-01-01 ⇒ 2-day drift, 1582-10-04/15 boundary) + // - LMT offset for pre-1901 values in zones like Asia/Shanghai (1900-12-31 vs 1901-01-02) + // - sub-millisecond precision on a pre-1970 value (negative micros, bug B) + // - NULL across all three columns + def boundaryRows = [ + [ts: "0001-01-01 00:00:00.000123", tstz: "0001-01-01 00:00:00.000123+00", date: "0001-01-01"], + [ts: "1582-10-04 12:34:56.000000", tstz: "1582-10-04 12:34:56+00", date: "1582-10-04"], + [ts: "1582-10-15 00:00:00.000000", tstz: "1582-10-15 00:00:00+00", date: "1582-10-15"], + [ts: "1900-12-31 23:59:59.999000", tstz: "1900-12-31 23:59:59.999+00", date: "1900-12-31"], + [ts: "1901-01-02 00:00:00.000000", tstz: "1901-01-02 00:00:00+09", date: "1901-01-02"], + [ts: "1969-12-31 23:59:59.999123", tstz: "1969-12-31 23:59:59.999123+00", date: "1969-12-31"], + [ts: null, tstz: null, date: null], + ] + def rowsPerBatch = boundaryRows.size() + def snapshotIdBase = 1 + def binlogIdBase = 11 + + def buildInsertValues = { int idBase -> + boundaryRows.withIndex().collect { row, i -> + def id = idBase + i + def tsLit = row.ts == null ? "NULL" : "TIMESTAMP '${row.ts}'" + def tstzLit = row.tstz == null ? "NULL" : "TIMESTAMPTZ '${row.tstz}'" + def dateLit = row.date == null ? "NULL" : "DATE '${row.date}'" + "(${id}, ${tsLit}, ${tstzLit}, ${dateLit})" + }.join(",\n ") + } + + def dumpJobOnFailure = { + log.info("show job: " + sql("""select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("show task: " + sql("""select * from tasks("type"="insert") where JobName='${jobName}'""")) + } + + // ── postgres setup + snapshot batch ─────────────────────────────────────── + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """ + CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + id bigint PRIMARY KEY, + ts_col timestamp(6), + tstz_col timestamp(6) with time zone, + date_col date + ) + """ + sql """ + INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES + ${buildInsertValues(snapshotIdBase)} + """ + } + + // ── start streaming job (offset=initial ⇒ snapshot + binlog) ────────────── + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}?timezone=UTC", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + // ── phase 1: snapshot ───────────────────────────────────────────────────── + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until { + def cnt = sql """SELECT count(1) FROM ${currentDb}.${table1}""" + log.info("snapshot row count: ${cnt}") + cnt.size() == 1 && (cnt.get(0).get(0) as long) == (long) rowsPerBatch + } + } catch (Exception ex) { + dumpJobOnFailure() + throw ex + } + + qt_snapshot """SELECT id, ts_col, tstz_col, date_col FROM ${currentDb}.${table1} ORDER BY id""" + + // ── phase 2: binlog INSERT ──────────────────────────────────────────────── + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """ + INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES + ${buildInsertValues(binlogIdBase)} + """ + } + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until { + def cnt = sql """SELECT count(1) FROM ${currentDb}.${table1}""" + cnt.size() == 1 && (cnt.get(0).get(0) as long) == (long) (rowsPerBatch * 2) + } + } catch (Exception ex) { + dumpJobOnFailure() + throw ex + } + + qt_binlog_insert """SELECT id, ts_col, tstz_col, date_col FROM ${currentDb}.${table1} + WHERE id >= ${binlogIdBase} ORDER BY id""" + + // Parity: snapshot row i must equal binlog row i+10 cell-for-cell. + def snapshotRows = sql """SELECT ts_col, tstz_col, date_col FROM ${currentDb}.${table1} + WHERE id < ${binlogIdBase} ORDER BY id""" + def binlogRows = sql """SELECT ts_col, tstz_col, date_col FROM ${currentDb}.${table1} + WHERE id >= ${binlogIdBase} ORDER BY id""" + assert snapshotRows.size() == rowsPerBatch + assert binlogRows.size() == rowsPerBatch + for (int i = 0; i < rowsPerBatch; i++) { + def s = snapshotRows.get(i) + def b = binlogRows.get(i) + assert s.get(0)?.toString() == b.get(0)?.toString() : + "ts_col mismatch at row ${i}: snapshot=${s.get(0)} binlog=${b.get(0)}" + assert s.get(1)?.toString() == b.get(1)?.toString() : + "tstz_col mismatch at row ${i}: snapshot=${s.get(1)} binlog=${b.get(1)}" + assert s.get(2)?.toString() == b.get(2)?.toString() : + "date_col mismatch at row ${i}: snapshot=${s.get(2)} binlog=${b.get(2)}" + } + + // ── phase 3: binlog UPDATE ──────────────────────────────────────────────── + // Rewrite id=1 (originally 0001-01-01) to a different boundary value via UPDATE. + def updatedTs = "1582-10-15 12:00:00.000123" + def updatedTstz = "1582-10-15 12:00:00.000123+00" + def updatedDate = "1900-12-31" + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """ + UPDATE ${pgDB}.${pgSchema}.${table1} + SET ts_col = TIMESTAMP '${updatedTs}', + tstz_col = TIMESTAMPTZ '${updatedTstz}', + date_col = DATE '${updatedDate}' + WHERE id = 1 + """ + } + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until { + def row = sql """SELECT cast(date_col as string) FROM ${currentDb}.${table1} WHERE id = 1""" + row.size() == 1 && row.get(0).get(0) == updatedDate + } + } catch (Exception ex) { + dumpJobOnFailure() + throw ex + } + qt_binlog_update """SELECT id, ts_col, tstz_col, date_col FROM ${currentDb}.${table1} + WHERE id = 1""" + + // ── phase 4: binlog DELETE ──────────────────────────────────────────────── + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DELETE FROM ${pgDB}.${pgSchema}.${table1} WHERE id = 2""" + } + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until { + def cnt = sql """SELECT count(1) FROM ${currentDb}.${table1} WHERE id = 2""" + cnt.size() == 1 && (cnt.get(0).get(0) as long) == 0L + } + } catch (Exception ex) { + dumpJobOnFailure() + throw ex + } + qt_binlog_after_delete """SELECT id, ts_col, tstz_col, date_col FROM ${currentDb}.${table1} + ORDER BY id""" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" +} From 6d724a4f2d88b7cd57363a14f0773ad65e6c19fe Mon Sep 17 00:00:00 2001 From: wudi Date: Mon, 25 May 2026 21:18:05 +0800 Subject: [PATCH 3/5] [fix](streaming-job) add pg historical-date golden file and align all_type timetz output --- .../test_streaming_postgres_job_all_type.out | 4 +- ...postgres_job_snapshot_historical_dates.out | 37 +++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) create mode 100644 regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out index abdc4038774d5d..feeb6757d2d2b7 100644 --- a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_all_type.out @@ -34,9 +34,9 @@ xml_col text Yes false \N NONE hstore_col text Yes false \N NONE -- !select_all_types_null -- -1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} 08:00:2b:01:02:03:04:05 1 {"a":"1","b":"2"} +1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 04:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} 08:00:2b:01:02:03:04:05 1 {"a":"1","b":"2"} -- !select_all_types_null2 -- -1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 12:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} 08:00:2b:01:02:03:04:05 1 {"a":"1","b":"2"} +1 1 100 1000 1.23 4.56 12345.678901 char varchar text value true 2024-01-01 12:00 04:00:00Z 2024-01-01T12:00 2024-01-01T04:00 P0Y0M1DT0H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555555 {"a":1} {"b": 2} 192.168.1.1 192.168.0.0/24 08:00:2b:01:02:03 qg== Cg== [1, 2, 3] ["a", "b", "c"] {"coordinates":[1,2],"type":"Point","srid":0} 08:00:2b:01:02:03:04:05 1 {"a":"1","b":"2"} 2 2 200 2000 7.89 0.12 99999.000001 char2 varchar2 another text false 2025-01-01 23:59:59 23:59:59Z 2025-01-01T23:59:59 2025-01-01T23:59:59 P0Y0M0DT2H0M0S 3q2+7w== 11111111-2222-3333-4444-555555555556 {"x":10} {"y": 20} 10.0.0.1 10.0.0.0/16 08:00:2b:aa:bb:cc 8A== Dw== [10, 20] ["x", "y"] {"coordinates":[3,4],"type":"Point","srid":0} 08:00:2b:aa:bb:cc:dd:ee 2 {"x":"10","y":"20"} diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out new file mode 100644 index 00000000000000..25df93f3b530ea --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out @@ -0,0 +1,37 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !snapshot -- +1 0001-01-01T00:00:00.000123 0001-01-01T00:00:00.000123 0001-01-01 +2 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04 +3 1582-10-15T00:00 1582-10-15T00:00 1582-10-15 +4 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31 +5 1901-01-02T00:00 1901-01-01T15:00 1901-01-02 +6 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123 1969-12-31 +7 \N \N \N + +-- !binlog_insert -- +11 0001-01-01T00:00:00.000123 0001-01-01T00:00:00.000123 0001-01-01 +12 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04 +13 1582-10-15T00:00 1582-10-15T00:00 1582-10-15 +14 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31 +15 1901-01-02T00:00 1901-01-01T15:00 1901-01-02 +16 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123 1969-12-31 +17 \N \N \N + +-- !binlog_update -- +1 1582-10-15T12:00:00.000123 1582-10-15T12:00:00.000123 1900-12-31 + +-- !binlog_after_delete -- +1 1582-10-15T12:00:00.000123 1582-10-15T12:00:00.000123 1900-12-31 +3 1582-10-15T00:00 1582-10-15T00:00 1582-10-15 +4 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31 +5 1901-01-02T00:00 1901-01-01T15:00 1901-01-02 +6 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123 1969-12-31 +7 \N \N \N +11 0001-01-01T00:00:00.000123 0001-01-01T00:00:00.000123 0001-01-01 +12 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04 +13 1582-10-15T00:00 1582-10-15T00:00 1582-10-15 +14 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31 +15 1901-01-02T00:00 1901-01-01T15:00 1901-01-02 +16 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123 1969-12-31 +17 \N \N \N + From db12d6fa9918c5729795b0c717e9f360100913de Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 26 May 2026 11:40:21 +0800 Subject: [PATCH 4/5] [improvement](streaming-job) include column/value context in cdc-client convert error --- .../deserialize/DebeziumJsonDeserializer.java | 23 +++++++- ...postgres_job_snapshot_historical_dates.out | 54 +++++++++++-------- ...tgres_job_snapshot_historical_dates.groovy | 4 ++ 3 files changed, 56 insertions(+), 25 deletions(-) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java index b2f2e26bfa7218..dfbd91cb652082 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java @@ -17,10 +17,12 @@ package org.apache.doris.cdcclient.source.deserialize; +import org.apache.doris.cdcclient.exception.CdcClientException; import org.apache.doris.cdcclient.utils.ConfigUtil; import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; import org.apache.flink.cdc.debezium.utils.TemporalConversions; import org.apache.flink.table.data.TimestampData; @@ -161,6 +163,7 @@ private String extractAfterRow(Struct value, Schema valueSchema, Set exc if (!excludeColumns.contains(field.name())) { Object valueConverted = convert( + field.name(), field.schema(), after.getWithoutDefault(field.name())); record.put(field.name(), valueConverted); @@ -185,6 +188,7 @@ private String extractBeforeRow(Struct value, Schema valueSchema, Set ex if (!excludeColumns.contains(field.name())) { Object valueConverted = convert( + field.name(), field.schema(), before.getWithoutDefault(field.name())); record.put(field.name(), valueConverted); @@ -194,7 +198,22 @@ private String extractBeforeRow(Struct value, Schema valueSchema, Set ex return objectMapper.writeValueAsString(record); } - private Object convert(Schema fieldSchema, Object dbzObj) { + private Object convert(String fieldName, Schema fieldSchema, Object dbzObj) { + try { + return convertInternal(fieldSchema, dbzObj); + } catch (Exception e) { + String msg = + String.format( + "Failed to convert column '%s' value=%s: %s", + fieldName, + dbzObj, + ExceptionUtils.getMessage(e)); + LOG.error(msg, e); + throw new RuntimeException(msg); + } + } + + private Object convertInternal(Schema fieldSchema, Object dbzObj) { if (dbzObj == null) { return null; } @@ -374,7 +393,7 @@ private Object convertToArray(Schema fieldSchema, Object dbzObj) { Schema elementSchema = fieldSchema.valueSchema(); List result = new ArrayList<>(); for (Object element : (List) dbzObj) { - result.add(element == null ? null : convert(elementSchema, element)); + result.add(element == null ? null : convertInternal(elementSchema, element)); } return result; } diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out index 25df93f3b530ea..0df4f0ca7371d1 100644 --- a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.out @@ -1,37 +1,45 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !snapshot -- 1 0001-01-01T00:00:00.000123 0001-01-01T00:00:00.000123 0001-01-01 -2 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04 -3 1582-10-15T00:00 1582-10-15T00:00 1582-10-15 -4 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31 -5 1901-01-02T00:00 1901-01-01T15:00 1901-01-02 -6 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123 1969-12-31 -7 \N \N \N +2 0500-06-15T10:00 0500-06-15T15:00 0500-06-15 +3 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04 +4 1582-10-15T00:00 1582-10-15T00:00 1582-10-15 +5 1800-07-20T03:30 1800-07-19T22:00 1800-07-20 +6 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31 +7 1901-01-02T00:00 1901-01-01T15:00 1901-01-02 +8 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123 1969-12-31 +9 \N \N \N -- !binlog_insert -- 11 0001-01-01T00:00:00.000123 0001-01-01T00:00:00.000123 0001-01-01 -12 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04 -13 1582-10-15T00:00 1582-10-15T00:00 1582-10-15 -14 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31 -15 1901-01-02T00:00 1901-01-01T15:00 1901-01-02 -16 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123 1969-12-31 -17 \N \N \N +12 0500-06-15T10:00 0500-06-15T15:00 0500-06-15 +13 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04 +14 1582-10-15T00:00 1582-10-15T00:00 1582-10-15 +15 1800-07-20T03:30 1800-07-19T22:00 1800-07-20 +16 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31 +17 1901-01-02T00:00 1901-01-01T15:00 1901-01-02 +18 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123 1969-12-31 +19 \N \N \N -- !binlog_update -- 1 1582-10-15T12:00:00.000123 1582-10-15T12:00:00.000123 1900-12-31 -- !binlog_after_delete -- 1 1582-10-15T12:00:00.000123 1582-10-15T12:00:00.000123 1900-12-31 -3 1582-10-15T00:00 1582-10-15T00:00 1582-10-15 -4 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31 -5 1901-01-02T00:00 1901-01-01T15:00 1901-01-02 -6 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123 1969-12-31 -7 \N \N \N +3 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04 +4 1582-10-15T00:00 1582-10-15T00:00 1582-10-15 +5 1800-07-20T03:30 1800-07-19T22:00 1800-07-20 +6 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31 +7 1901-01-02T00:00 1901-01-01T15:00 1901-01-02 +8 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123 1969-12-31 +9 \N \N \N 11 0001-01-01T00:00:00.000123 0001-01-01T00:00:00.000123 0001-01-01 -12 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04 -13 1582-10-15T00:00 1582-10-15T00:00 1582-10-15 -14 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31 -15 1901-01-02T00:00 1901-01-01T15:00 1901-01-02 -16 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123 1969-12-31 -17 \N \N \N +12 0500-06-15T10:00 0500-06-15T15:00 0500-06-15 +13 1582-10-04T12:34:56 1582-10-04T12:34:56 1582-10-04 +14 1582-10-15T00:00 1582-10-15T00:00 1582-10-15 +15 1800-07-20T03:30 1800-07-19T22:00 1800-07-20 +16 1900-12-31T23:59:59.999 1900-12-31T23:59:59.999 1900-12-31 +17 1901-01-02T00:00 1901-01-01T15:00 1901-01-02 +18 1969-12-31T23:59:59.999123 1969-12-31T23:59:59.999123 1969-12-31 +19 \N \N \N diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy index 7d9cb844d388c5..db8b748abbda5a 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_snapshot_historical_dates.groovy @@ -60,13 +60,17 @@ suite("test_streaming_postgres_job_snapshot_historical_dates", // Boundary rows. Picked to exercise: // - Julian/Gregorian cutover (0001-01-01 ⇒ 2-day drift, 1582-10-04/15 boundary) + // - negative tz offset before the cutover (0500-06-15 with -05) + // - half-hour tz offset pre-1900 (1800-07-20 with +05:30) // - LMT offset for pre-1901 values in zones like Asia/Shanghai (1900-12-31 vs 1901-01-02) // - sub-millisecond precision on a pre-1970 value (negative micros, bug B) // - NULL across all three columns def boundaryRows = [ [ts: "0001-01-01 00:00:00.000123", tstz: "0001-01-01 00:00:00.000123+00", date: "0001-01-01"], + [ts: "0500-06-15 10:00:00.000000", tstz: "0500-06-15 10:00:00-05", date: "0500-06-15"], [ts: "1582-10-04 12:34:56.000000", tstz: "1582-10-04 12:34:56+00", date: "1582-10-04"], [ts: "1582-10-15 00:00:00.000000", tstz: "1582-10-15 00:00:00+00", date: "1582-10-15"], + [ts: "1800-07-20 03:30:00.000000", tstz: "1800-07-20 03:30:00+05:30", date: "1800-07-20"], [ts: "1900-12-31 23:59:59.999000", tstz: "1900-12-31 23:59:59.999+00", date: "1900-12-31"], [ts: "1901-01-02 00:00:00.000000", tstz: "1901-01-02 00:00:00+09", date: "1901-01-02"], [ts: "1969-12-31 23:59:59.999123", tstz: "1969-12-31 23:59:59.999123+00", date: "1969-12-31"], From fea5db9110034824628fc722de83558a66072d80 Mon Sep 17 00:00:00 2001 From: wudi Date: Tue, 26 May 2026 11:42:03 +0800 Subject: [PATCH 5/5] [improvement](streaming-job) drop unused import and tidy cdc-client convert error format --- .../source/deserialize/DebeziumJsonDeserializer.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java index dfbd91cb652082..6881dd70d40a93 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java @@ -17,7 +17,6 @@ package org.apache.doris.cdcclient.source.deserialize; -import org.apache.doris.cdcclient.exception.CdcClientException; import org.apache.doris.cdcclient.utils.ConfigUtil; import org.apache.doris.job.cdc.DataSourceConfigKeys; @@ -205,9 +204,7 @@ private Object convert(String fieldName, Schema fieldSchema, Object dbzObj) { String msg = String.format( "Failed to convert column '%s' value=%s: %s", - fieldName, - dbzObj, - ExceptionUtils.getMessage(e)); + fieldName, dbzObj, ExceptionUtils.getMessage(e)); LOG.error(msg, e); throw new RuntimeException(msg); }