Skip to content

[FLINK-40064][python][e2e] Migrate PyFlink DataStream e2e test from Kafka to FileSource/FileSink#28629

Merged
MartijnVisser merged 2 commits into
apache:masterfrom
MartijnVisser:pyflink-e2e-filesystem
Jul 3, 2026
Merged

[FLINK-40064][python][e2e] Migrate PyFlink DataStream e2e test from Kafka to FileSource/FileSink#28629
MartijnVisser merged 2 commits into
apache:masterfrom
MartijnVisser:pyflink-e2e-filesystem

Conversation

@MartijnVisser

@MartijnVisser MartijnVisser commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

What is the purpose of the change

FLINK-40048 removed the flink-sql-connector-kafka dependency and its maven-dependency-plugin copy from flink-sql-client-test, so no Kafka sql-jar is staged into target/sql-jars anymore. test_pyflink.sh still executed KAFKA_SQL_JAR=$(find "$SQL_JARS_DIR" | grep "kafka") under set -Eeuo pipefail, which now exits 1 and aborts the "PyFlink end-to-end test" nightly leg (e2e_4_ci) before any test runs (first seen in build 76685).

The jar and all Kafka scaffolding in the script only served the "Test PyFlink DataStream job" case, disabled since FLINK-36185 because data_stream_job.py used the legacy FlinkKafkaConsumer/FlinkKafkaProducer. This PR rewrites that job to the in-repo filesystem connector and re-enables the case, fixing the nightly leg and restoring PyFlink DataStream coverage (KeyedProcessFunction + event-time timers) dormant since September 2024. No active Kafka e2e coverage is lost; PyFlink-Kafka e2e coverage belongs in the externalized flink-connector-kafka repository.

Brief change log

  • Rewrite data_stream_job.py onto the bounded FileSource and unified FileSink; JSON lines are parsed in a map function
  • Disable periodic watermarks so all timers fire at MIN_VALUE + 1500 on the end-of-input MAX_WATERMARK, making the output deterministic (the same output the Kafka-based test asserted)
  • Use blocking env.execute(); the committer commits all pending part files at end of input, so no checkpointing is needed
  • Re-enable the DataStream case in test_pyflink.sh with a sorted diff over the committed part files, and remove the Kafka scaffolding (kafka_sql_common.sh stays; test_confluent_schema_registry.sh still sources it)

Verifying this change

This change added tests and can be verified as follows:

  • Ran the rewritten job twice standalone on a local minicluster (source-built PyFlink, Python 3.12); both runs produced exactly the 16 expected lines, byte-identical, with a cleanly committed part file and no in-progress leftovers
  • The re-enabled case fails loudly on missing or wrong output (empty part-file set diffs against the expected lines and exits 1)
  • The e2e_4 leg runs the full script in CI

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?
  • Yes (Claude Code (Fable 5))

Generated-by: Claude Code (Fable 5)

…afka to FileSource/FileSink

Rewrite the job on the bounded FileSource and unified FileSink and
re-enable the case.

Generated-by: Claude Code (Fable 5)
@flinkbot

flinkbot commented Jul 3, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build


watermark_strategy = WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5))\
.with_timestamp_assigner(KafkaRowTimestampAssigner())
watermark_strategy = WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(5)) \

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does setting the auto watermark interval above 0 with a wm strategy with a 5 second duration do the same thing as WatermarkStrategy.no_watermarks()? Could be a simplification if so, and wouldnt need to set the env config?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost: with the interval above 0 they are not quite equivalent, because the bounded-out-of-orderness generator emits periodically, so on a slow read the watermark could advance mid-stream and change which timestamps the timers register at. But you are right that no_watermarks() is the cleaner way to get exactly the interval-0 behavior: it never emits during processing, the timestamp assigner still chains onto it (needed for the ctx.timestamp() assertions), and the end-of-input MAX_WATERMARK is forwarded by TimestampsAndWatermarksOperator regardless of the strategy, so the timers still fire. Applied in fe0a7c4 and re-verified locally (same 16 lines, identical across runs).

…eriodic watermarks

Generated-by: Claude Code (Fable 5)
@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Jul 3, 2026
@MartijnVisser MartijnVisser merged commit 8cc44d1 into apache:master Jul 3, 2026
@MartijnVisser MartijnVisser deleted the pyflink-e2e-filesystem branch July 3, 2026 21:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants