Skip to content

[FLINK-39391][cdc-connector] Propagate scan.snapshot.fetch.size to Debezium properties in Oracle, SqlServer, DB2, and Postgres connectors#4359

Open
paulo-t wants to merge 3 commits into
apache:masterfrom
paulo-t:fix/propagate-snapshot-fetch-size-to-debezium
Open

[FLINK-39391][cdc-connector] Propagate scan.snapshot.fetch.size to Debezium properties in Oracle, SqlServer, DB2, and Postgres connectors#4359
paulo-t wants to merge 3 commits into
apache:masterfrom
paulo-t:fix/propagate-snapshot-fetch-size-to-debezium

Conversation

@paulo-t
Copy link
Copy Markdown

@paulo-t paulo-t commented Apr 2, 2026

This closes FLINK-39391.

The user-configured scan.snapshot.fetch.size option has no effect on the actual JDBC fetchSize used during the incremental snapshot phase for Oracle, SqlServer, DB2, and Postgres connectors.

The value is correctly parsed and stored in JdbcSourceConfig.fetchSize, but is never written into the Debezium Properties object that the snapshot execution path actually reads.

For Oracle/SqlServer/DB2, the snapshot task calls connectorConfig.getQueryFetchSize(), which reads Debezium's query.fetch.size — defaulting to 0. This causes the JDBC driver to fall back to its own small default (e.g., Oracle defaults to fetchSize=10). On high-latency networks this results in order-of-magnitude performance degradation: ~440 rows/s instead of ~3,500 rows/s with 4 parallel readers on ~31ms RTT.

For Postgres, the snapshot task reads snapshot.fetch.size which defaults to 2000, so the impact is less severe but the user-configured value is still silently ignored.

MySQL is NOT affected — MySqlSourceConfigFactory already correctly sets props.setProperty("database.fetchSize", ...).

The fix adds a props.setProperty() call in each affected SourceConfigFactory.create() before the dbzProperties.putAll() block, so that explicit user overrides via debezium.query.fetch.size / debezium.snapshot.fetch.size still take precedence.

tianfengyu added 2 commits April 2, 2026 17:35
…bezium properties in Oracle, SqlServer, DB2, and Postgres connectors

The user-configured scan.snapshot.fetch.size is stored in
JdbcSourceConfig.fetchSize but was never written into the Debezium
Properties object. The snapshot execution path reads fetchSize from
Debezium ConnectorConfig (query.fetch.size for Oracle/SqlServer/DB2,
snapshot.fetch.size for Postgres) instead of JdbcSourceConfig, so the
user value was silently ignored.

For Oracle/SqlServer/DB2 this defaults to 0, which causes the JDBC
driver to use its own default (e.g. Oracle defaults to fetchSize=10).
On high-latency networks this leads to severe performance degradation
(observed ~9x slowdown at 31ms RTT vs 3ms RTT).

This commit propagates fetchSize into the corresponding Debezium
property before the user-defined debezium properties override, so
explicit debezium.query.fetch.size / debezium.snapshot.fetch.size
overrides still take precedence.

MySQL connector is not affected as it already sets database.fetchSize.

Made-with: Cursor
… to Debezium properties

Verify that scan.snapshot.fetch.size is correctly propagated to the
underlying Debezium query.fetch.size (Oracle/SqlServer/DB2) and
snapshot.fetch.size (Postgres) properties, including default value
propagation and user-provided debezium property override behavior.

Made-with: Cursor
@ThorneANN
Copy link
Copy Markdown
Contributor

ThorneANN commented May 21, 2026

Thank you for your contribution and i see that snapshot.fetch.size instand of query.fecth.size ,may be  the query.fecth.sizeconfig also can the same effect ?

@paulo-t
Copy link
Copy Markdown
Author

paulo-t commented May 21, 2026

Thanks for pointing this out.

I checked the latest upstream master before updating the PR. The issue has not been fixed there yet: Oracle, SQLServer, and DB2 scan fetch tasks still read connectorConfig.getQueryFetchSize(), while Postgres reads connectorConfig.getSnapshotFetchSize().

You are right that scan.snapshot.fetch.size should map to Debezium snapshot fetch-size semantics. Mapping it to query.fetch.size would make the current Oracle/SQLServer/DB2 code path effective, but it is not semantically correct.

I have updated the PR accordingly:

  • Oracle / SQLServer / DB2 scan fetch tasks now use connectorConfig.getSnapshotFetchSize().
  • Oracle / SQLServer / DB2 config factories now propagate Flink CDC scan.snapshot.fetch.size to Debezium snapshot.fetch.size.
  • The related unit tests now assert snapshot.fetch.size and getSnapshotFetchSize().
  • Postgres already used snapshot.fetch.size / getSnapshotFetchSize(), so it remains consistent with the updated behavior.

I also ran the related factory/config tests locally and they passed.

@ThorneANN
Copy link
Copy Markdown
Contributor

@yuxiqian cc

Copy link
Copy Markdown
Contributor

@ThorneANN ThorneANN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants