Skip to content

branch-4.1: [pick](streamingjob) pick streaming-job batch #63079 #63402 #63404 #63471 #63480 #63490 #63514 #63618#63812

Merged
yiguolei merged 8 commits into
apache:branch-4.1from
JNSimba:cherrypick-streaming-job-4.1-20260528
May 29, 2026
Merged

branch-4.1: [pick](streamingjob) pick streaming-job batch #63079 #63402 #63404 #63471 #63480 #63490 #63514 #63618#63812
yiguolei merged 8 commits into
apache:branch-4.1from
JNSimba:cherrypick-streaming-job-4.1-20260528

Conversation

@JNSimba
Copy link
Copy Markdown
Member

@JNSimba JNSimba commented May 28, 2026

Cherry-picked from:

JNSimba and others added 8 commits May 28, 2026 15:36
…ache#63079)

## Summary

`StreamingInsertJob` (CDC FROM-TO and `cdc_stream` TVF paths) used to
call `splitChunks()` synchronously inside `CREATE STREAMING JOB`, asking
cdc_client to cut every chunk of every table before returning. On
large/non-uniform PK tables this can take 30+ minutes — far beyond the
BE→cdc_client BRPC 60s timeout, and the SQL client blocks the whole
time.

This PR makes splitting tick-driven by the FE scheduler:

- `CREATE` returns immediately; no more synchronous `splitChunks()`.
- Each scheduler tick `advanceSplits()` issues one short fetchSplits RPC
(default `batchSize=100`) and pushes that batch into `remainingSplits`.
Tasks dispatch as soon as the first batch lands, so end-to-end
first-byte latency stays close to flink-cdc's.
- cdc_client is **stateless** — every RPC reconstructs `ChunkSplitter`
from the `(currentSplittingTable, nextSplitStart, nextSplitId)` triple
supplied by FE; flink-cdc internals are untouched (uses the public
`ChunkSplitter` API only).
- Crash recovery uses three sources of truth:
- editlog persists `committedSplitProgress` (3-field `SplitProgress`) +
existing `chunkHighWatermarkMap` / `binlogOffsetPersist`
- `streaming_job_meta` system table holds full `chunk_list` JSON per
table (UPSERT each `advanceSplits`)
  - cdc_client memory holds nothing
- Both FROM-TO (multi-table) and TVF (single-table) paths share the same
`SourceOffsetProvider#initSplitProgress` / `noMoreSplits` /
`advanceSplits` interface;
`StreamingJobSchedulerTask.handlePendingState` pre-advances one batch so
the first task doesn't wait a full `max_interval`.

(cherry picked from commit be49e67)
…ses for mysql/pg (apache#63404)

## Summary

- Update predicted `.out` values for cdc data-type / boundary regression
cases (mysql `enum_set`, `integer_boundary`, `json_types`; pg
`array_boundary`, `jsonb_types`, `time_types`, `uuid`) so expected
output matches the actual cdc-streamed values.
- Tune a few `.groovy` cases:
- `pg_array_boundary`: narrow the `multi_dim` column to 1D data; remove
the `array<text>` element with embedded `"` whose comparison is
non-deterministic.
- `pg_streaming_postgres_job`: drop the inline LIKE-wildcard "decoy
table" coverage (will return in a dedicated case once the underlying FE
handling is addressed separately).
- `mysql_enum_set` / `mysql_integer_boundary` / `mysql_json_types` /
`mysql_partition`: minor data adjustments to keep the suite stable.
- Add a pre-generated `.out` for
`test_streaming_mysql_job_charset_and_strings` whose groovy already
exists.

(cherry picked from commit 825f5ce)
…dml/id-gap/decimal/datetime pk) and fix split-bound java.time deserialize (apache#63471)

### What problem does this PR solve?

Extend CDC streaming-job regression coverage across MySQL/Postgres for
primary-key related scenarios that the existing suites did not exercise;
plus a small cdc-client follow-up to apache#63219 so split-bound restore
handles the modern java.time types that some JDBC drivers return by
default.

#### Regression cases added

| Case | Tables | Guarded by |
|---|---|---|
| `*_composite_pk` | composite PK + full-PK mapping table | composite PK
chunk split + INSERT/UPDATE/DELETE locating across all PK columns;
tenant/org-boundary chunk slicing |
| `*_snapshot_with_concurrent_dml` | 1000-row table + unrelated decoy |
source-side INSERT/UPDATE/DELETE during snapshot phase converges
correctly; unrelated table outside include_tables not leaked |
| `*_id_gap_completeness` | dense id 1~100 + outlier id 10000000 +
post-outlier 10000001~10000100 | snapshot reader covers tail rows past
the chunk-time max without dropping; binlog DML across all three id
regions applies correctly |
| `*_decimal_pk` | DECIMAL(20,4) PK + (MySQL only) BIGINT UNSIGNED PK |
evenly-path BigDecimal / BigInteger arithmetic in chunk splitter; bigint
unsigned values up to 2^64-1 |
| \`mysql_*_datetime_pk\` | DATETIME(6) PK + composite (DATETIME, id) PK
| java.time.LocalDateTime split-bound JSON round-trip; composite
temporal PK locating |

(cherry picked from commit 090b476)
…pache#63480)

## Summary

Small fixes accumulated during a code review of the streaming-job
module. None of these change runtime semantics for currently-supported
scenarios; they tighten validation, fix a typo, and harden a concurrent
field.

- `StreamingInsertJob.cleanup()`: log was filling `dbId=%s` with
`getJobId()`. Fix to `getDbId()`.
- `StreamingInsertJob.getShowSQL()`: typo `TO DATABSE` → `TO DATABASE`
in the synthesized SQL shown by SHOW JOBS.
- `JdbcSourceOffsetProvider.currentOffset`: mark `volatile`. The write
in `updateOffset()` happens outside `splitsLock` while
`hasMoreDataToConsume()` / `getShowCurrentOffset()` read it without
holding the lock.
- `DataSourceConfigValidator.isValidValue`: `snapshot_split_size` /
`snapshot_parallelism` previously only checked null/empty. Reject
non-numeric and non-positive values up front so users get a clear
CREATE-time error instead of a context-less `NumberFormatException` at
runtime.
- `CdcStreamTableValuedFunction.validate`: previously only checked
`jdbc_url` / `type` / `table` / `offset`. Now also rejects missing
`database` (MySQL), missing `schema` (PostgreSQL), and unsupported
`type` values, so the error surfaces at TVF parse time rather than later
inside `getRemoteDbName`.

(cherry picked from commit 3543341)
…ldcards in JdbcPostgreSQLClient (apache#63402)

### What problem does this PR solve?

`JdbcPostgreSQLClient.getJdbcColumnsInfo` calls
`DatabaseMetaData.getColumns(catalog, schemaPattern, tableNamePattern,
columnNamePattern)`. Per the JDBC spec the 3rd argument is a **SQL LIKE
pattern**, so literal `_` / `%` characters in the requested table name
are interpreted as wildcards by the Postgres driver. When a streaming
job is created with `include_tables = "user_info_pg_normal1"` and a
neighbour table like `userXinfo_pg_normal1` happens to coexist in the
same schema, the metadata query returns columns from **both** tables.
The combined result then trips `CREATE TABLE` on the Doris side with
errors such as `errCode = 2, detailMessage = Duplicate column name
'name'`, or pollutes the auto-created table schema with stray columns.

The repro is trivial: in the same Postgres schema create

- `user_info_pg_normal1(name varchar, age int2)` — the table we want to
capture
- `userXinfo_pg_normal1(name varchar, weight float8)` — a decoy whose
name only differs from the target by a single character that `_` matches

then run `CREATE JOB ... include_tables = "user_info_pg_normal1"`.
Without the fix the schema fetched for the target leaks `weight` (or
`Duplicate column name 'name'`, depending on column order).

Fix: after fetching the `ResultSet`, drop rows whose `TABLE_NAME` does
not exactly equal the requested `remoteTableName`. We deliberately do
**not** escape `_` / `%` at the source — relying on
`DatabaseMetaData.getSearchStringEscape()` is driver-version dependent
(older Oracle drivers don't honour escape sequences in `getTables`),
while filtering on the consumer side is deterministic and
driver-agnostic.

Scope:

- Only `JdbcPostgreSQLClient` is patched. This is the path used by
Postgres streaming jobs (the failing case). MySQL streaming jobs were
checked against the same decoy pattern and do not reproduce the bug
because MySQL Connector/J doesn't pull neighbour rows here in practice —
so `JdbcMySQLClient` is left untouched in this PR.
- The JDBC catalog path lives in a separate module
(`fe-connector-jdbc/.../JdbcConnectorClient`) and is **not** part of
this PR. It already does partial escape but intentionally skips `_` /
`%` for driver-compatibility reasons; a follow-up can apply the same
after-the-fact filter there.

(cherry picked from commit 41581e5)
… modes and pg slot lifecycle (apache#63514)

## Summary

Add 5 regression cases for CDC operational invariants:

- \`test_streaming_mysql_job_offset_earliest\` — earliest replays binlog
without snapshot
- \`test_streaming_postgres_job_publication\` (Tests 5/6/7) — fail-fast
validation on missing slot / publication / required tables
- \`test_streaming_postgres_job_drop_during_snapshot\` — DROP
mid-snapshot cleans up auto slot/pub
- \`test_streaming_postgres_job_special_offset_restart_fe\` — JSON LSN
offset survives FE restart
- \`test_streaming_postgres_job_slot_lsn_advance\` — confirmed_flush_lsn
advances, freezes on PAUSE, resumes after RESUME

\`drop_during_snapshot\` surfaced a real cdc_client concurrency bug:
HTTP \`/api/close\` raced the async-write task thread on
\`JdbcIncrementalSourceReader\` snapshot-reader lists (CME), making
\`PostgresSourceReader.close\` abort before dropping the auto-managed
slot/publication. Fix: thread-safe collections (CopyOnWriteArrayList /
ConcurrentHashMap.newKeySet) + serialize reader state mutations
(\`prepareSnapshotSplits\` / \`prepareStreamSplit\` /
\`finishSplitRecords\` / \`close\`) on the reader monitor;
\`pollRecords\` stays monitor-free so a blocking poll never fences
\`close\` out. Mirrored in \`MySqlSourceReader\`.

## Test plan
- [ ] regression-test runs locally

(cherry picked from commit 7fa4d2e)
…in cdc-client (apache#63618)

### What problem does this PR solve?

Problem Summary:

When a Postgres CDC streaming job ingests rows whose timestamp / date
columns hold historical values (pre-1970 with sub-millisecond precision,
or pre-1582 / pre-1901 dates), two independent bugs in cdc-client cause
data corruption or task crash:

1. `DebeziumJsonDeserializer.convertTimestamp` uses signed `/` and `%`
on negative `micros` / `nanos`, producing a negative `nanoOfMillisecond`
and tripping Flink `TimestampData`'s `checkArgument(nanoOfMillisecond >=
0)`. Result: the ingestion task crashes whenever a pre-1970 timestamp
with sub-millisecond precision flows through (e.g. `1969-12-31
23:59:59.999123`).

2. The snapshot path reads column values via `rs.getObject()`, which
routes through PG JDBC's `TimestampUtils` + `GregorianCalendar`. For
pre-1582 timestamps the Julian/proleptic cutover shifts values by N
days; for pre-1901 timestamps the JVM time zone's LMT offset shifts
values by the LMT difference (e.g. ~343s in `Asia/Shanghai`). Result:
the same PG value (e.g. `0001-01-01 00:00:00`) yields different doris
values depending on whether the row was synced via snapshot or via
binlog.

This PR fixes both:

1. Use `Math.floorDiv` / `Math.floorMod` so the millisecond / nanosecond
split stays valid for negative epoch values.
2. Dispatch `TIMESTAMP` / `TIMESTAMPTZ` / `DATE` columns through
`LocalDateTime` / `OffsetDateTime` / `LocalDate` in the snapshot reader,
bypassing `GregorianCalendar` entirely. Preserve the legacy
`Timestamp(Long.MAX/MIN_VALUE)` sentinel for `+/-infinity`.

(cherry picked from commit a0e0ee5)
…per-reader assignment (apache#63490)

## Summary

- Add an optional `server_id` source property for MySQL CDC streaming
jobs. Accepts a single value (e.g. `5400`) or a range (e.g.
`5400-5408`). When unset, the value is derived from the jobId hash so
existing jobs keep their current server_id when `snapshot_parallelism =
1`.
- Fix a latent collision: when `snapshot_parallelism > 1` and
source-side DML happens during snapshot, all parallel
`SnapshotSplitReader` instances previously shared the same server_id and
their backfill BinaryLogClient connections kicked each other out of
MySQL's dump-thread slot, dropping binlog events between low and high
watermark. Each subtask now gets a distinct server_id from the resolved
range; the single binlog reader uses the range start.
- Cross-field check: reject `server_id` range width smaller than
`snapshot_parallelism` at job startup with a clear fix-it suggestion.

(cherry picked from commit 7c4dfe9)
@JNSimba JNSimba requested a review from yiguolei as a code owner May 28, 2026 07:39
@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented May 28, 2026

run buildall

@JNSimba JNSimba changed the title branch-4.1: pick streaming-job batch #63079 #63402 #63404 #63471 #63480 #63490 #63514 #63618 branch-4.1: [pick](streamingjob) pick streaming-job batch #63079 #63402 #63404 #63471 #63480 #63490 #63514 #63618 May 28, 2026
@hello-stephen
Copy link
Copy Markdown
Contributor

FE UT Coverage Report

Increment line coverage 39.59% (173/437) 🎉
Increment coverage report
Complete coverage report

@hello-stephen
Copy link
Copy Markdown
Contributor

FE Regression Coverage Report

Increment line coverage 3.14% (14/446) 🎉
Increment coverage report
Complete coverage report

@yiguolei yiguolei merged commit 7d84e2e into apache:branch-4.1 May 29, 2026
29 of 33 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants