[runners-spark] Add Spark 4 runner#38255
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces support for Apache Spark 4 in the Beam Spark runner. By leveraging the existing shared base and per-version override plumbing, the implementation remains lightweight and avoids duplicating the Spark 3 source tree. The changes include necessary build system updates, compatibility adjustments for Scala 2.13, and the addition of required CI infrastructure to ensure stability for the new Spark 4 runner. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Ignored Files
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces an experimental Spark 4 structured streaming runner for Java, built against Spark 4.0.2 and Scala 2.13, requiring Java 17. The changes include new modules for the Spark 4 runner and job server, updates to shared Spark source for Scala 2.12/2.13 compatibility, and various dependency adjustments. Feedback includes a critical fix for handling multi-windowed data in EncoderHelpers, removing redundant or unchecked casts in BoundedDatasetFactory and GroupByKeyTranslatorBatch, and addressing fragile reflection and implementation details in EncoderFactory.
Iterables.getOnlyElement(windows) crashes with IllegalArgumentException when a WindowedValue is associated with more than one window (e.g. after a sliding window assignment). Compute the max maxTimestamp() across all associated windows instead, falling back to a clear error if the iterable is unexpectedly empty. Applied identically to the shared base and the Spark 4 override. Flagged by Gemini Code Assist on PR apache#38255.
source.split returns List<? extends BoundedSource<T>>, which already satisfies the subsequent stream usage. The cast was unchecked and would trip heap-pollution warnings. Applied identically to the shared base and the Spark 4 override. Flagged by Gemini Code Assist on PR apache#38255.
…slatorBatch The (Iterator<V>) cast inside fun2 is redundant: fun2's signature infers the iterator type. The shared base translator at the analogous call site already calls iterableOnce(it) without a cast. Flagged by Gemini Code Assist on PR apache#38255.
…cument trait setter Replace getConstructors()[0] (JVM-defined ordering, not stable) with a helper that picks the widest public constructor. The downstream switch already dispatches on parameter count to pick the right argument shape per Spark version, so this just makes the choice deterministic. Also document the org$apache$spark...$_setter_$isStruct_$eq method — it is the synthetic setter the Scala compiler emits for trait val fields, required when implementing AgnosticEncoders.StructEncoder from Java. Both flagged by Gemini Code Assist on PR apache#38255.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the experimental Spark 4 structured streaming runner for Java, built against Spark 4.0.2 and Scala 2.13. It adds new modules for the Spark 4 runner and job-server, updates build configurations to support Java 17, and implements necessary translation logic and encoders for Spark 4 compatibility. Additionally, existing Spark runner code was refactored to maintain compatibility across Spark 3 and 4, including updates to Scala interop and Kryo registration. Review feedback identified minor typos in documentation and constant names, as well as a potential logic inconsistency in the reflective invocation of Spark constructors that could lead to fragility across versions.
Three trivial typos flagged on PR apache#38255 round 2 review, applied identically to the shared base and the Spark 4 override: - CombinePerKeyTranslatorBatch: "other there other missing features?" -> "are there other missing features?" - GroupByKeyTranslatorBatch: "build-in" -> "built-in" - EncoderHelpers: PRIMITIV_TYPES -> PRIMITIVE_TYPES (constant + caller)
In EncoderFactory.invoke(Expression obj, ...), the switch was keyed on STATIC_INVOKE_CONSTRUCTOR.getParameterCount() but the body actually calls INVOKE_CONSTRUCTOR. This worked by coincidence: across the supported Spark 3.x versions both constructors happen to share the same parameter counts at the same dispatch points. A future Spark release where the two diverge would silently pick the wrong branch. Switch on INVOKE_CONSTRUCTOR.getParameterCount() to match the constructor that is actually invoked, and align with the convention used by newInstance() further down. In the Spark 4 override this also lets us collapse the `case 8: case 9:` fallthrough back to a single `case 8:`, since INVOKE_CONSTRUCTOR remains 8 params in Spark 4 even though STATIC_INVOKE_CONSTRUCTOR grew to 9. Applied identically to the shared base and the Spark 4 override. Flagged by Gemini Code Assist on PR apache#38255.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces an experimental Spark 4 runner for the Java SDK, built against Spark 4.0.2 and Scala 2.13, and requiring Java 17. The changes include the addition of new modules for the Spark 4 runner and its job server, updates to build configurations, and significant refactoring of shared Spark runner code to ensure compatibility across Scala 2.12 and 2.13. Feedback is provided regarding a contradiction in the release notes where the runner is described as a "structured streaming runner" despite currently only supporting batch mode.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces an experimental Spark 4 runner for the Java SDK, supporting batch processing with Spark 4.0.2 and Scala 2.13 on Java 17. It adds new modules for the Spark 4 runner and job server, while updating shared Spark source code to maintain compatibility with both Spark 3 and Spark 4. Key changes include the implementation of Spark 4-specific translators and encoders, and the migration from JavaConversions to JavaConverters. A redundant unchecked cast was identified in GroupByKeyHelpers.java where v.getWindows() can be used directly.
|
End-to-end smoke test (Spark 4.0.2 / Scala 2.13 / Java 17): Built A couple of items worth flagging before merge:
|
I remember this happened for Spark 3 as well: #26985 Yeah we can follow up on documentation or exclude dependencies |
|
Two follow-ups pushed (
PTAL when you get a chance or have some spare time, @Abacn |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces an experimental Spark 4 runner for the Java SDK, supporting batch processing with Spark 4.0.2 and Scala 2.13 on Java 17. It adds the :runners:spark:4 module and job-server components while updating shared Spark source code to ensure compatibility across Scala versions. Feedback was provided to correct a terminology inconsistency in an error message within the EncoderHelpers class to use "encoder" instead of "coder".
|
Re-tested at |
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
The PreCommit Java failure on This is the known Flink flake tracked in #21333, and the PR does not touch any Flink code. Pushed an empty commit ( |
The PreCommit Java failure on the previous run was a single timeout in FlinkRequiresStableInputTest.testParDoRequiresStableInputPortable (:runners:flink:1.17:test) — known flake tracked in apache#21333. This PR does not touch any Flink code. Squash or drop on rebase before merge.
…oDynamicWithStrictTimeout Wall-clock-timing test (100ms inter-message + 150ms strict batch timeout) in sdks/java/io/amazon-web-services2 SQS — unrelated to this PR (no AWS2/SQS/Direct-runner files touched), and master is green for the same PreCommit on 6106b30.
`Java Wordcount Direct Runner (windows-latest)` failed at the :buildSrc configure step with HTTP 403 fetching legacy Spotless 5.6.1 transitive deps from repo.maven.apache.org (spotless-lib:2.7.0, durian-*:1.2.0, jgit:5.8.0). Network/infra flake — PR doesn't touch examples or buildSrc, master 'Java Tests' workflow consistently green.
Both checks failed on the prior empty retry commit (e19b80c). Reproduced locally at e19b80c: spotlessCheck and Spark checkStyleMain/Test all pass. PR doesn't touch any GCP IO code, and both checks were green on the immediately preceding branch commits (5abbb21, 604037f) and on master (6106b30, e01f711). Treating as infra flakes; squash before merge.
…r paths Address codecov/patch on PR apache#38255 by exercising the new branches added for Scala 2.13 / null-safe error logging: - Refactor SparkRunnerKryoRegistrator's nested Scala-array Class.forName fallback into a small @VisibleForTesting findFirstAvailableClass helper and add unit tests for first-hit, fallback, no-match, and empty-input paths. - Add EvaluationContextTest covering the catch (RuntimeException) / catch (Exception) blocks in evaluate() and collect(), including the null-message path that motivated the String.valueOf wrap.
4ce7cbf to
2bf3607
Compare
…38271) * [runners-spark] Use robust constructor resolution in EncoderFactory Replace `(Constructor<X>) X.class.getConstructors()[0]` for StaticInvoke, Invoke, and NewInstance with a `primaryConstructor()` helper that picks the constructor with the most parameters. Class.getConstructors() returns constructors in JVM-defined order that is not guaranteed stable, so resolving the widest constructor explicitly makes the lookup robust to future Spark releases that add overloaded constructors. Today this is a no-op: StaticInvoke / Invoke / NewInstance only have one public constructor each in Spark 3.1.x through 3.5.x, so getConstructors()[0] and the widest constructor resolve to the same one. The change is purely defensive. Same fix has already landed in the Spark 4 override in PR #38255 (commit 9c071c5, flagged by Gemini Code Assist round 1). Porting it to the shared base keeps both code paths consistent. * Address Gemini review: guard primaryConstructor against empty ctors If Class.getConstructors() returns empty (e.g. all public constructors removed in a future Spark version), throw an IllegalStateException with the class name instead of letting ctors[0] surface as a cryptic ArrayIndexOutOfBoundsException during static initialization.
|
Hi @Abacn — Phase 1 #38324 is in and this branch is now rebased on master at 1. Shared-base Scala 2.12/2.13 compat tweaks + codecov tests 2. Spark 4 CI workflows 3. Spark 4 runner module + wiring Happy to ship in this order, or bundle CI back into the module PR if you'd rather one fewer round-trip. Let me know what fits in your opinion. |
|
Thanks, I was hoping to test the change manually as part of review but haven't get into it yet. Once the tests run we can see if we can merge this as is or more work need to be done. |
|
Absolutely! No time pressure, I just wanted to signal that I am ready to split this if this is too big. |
| kryo.register(scalaArrayClass); | ||
| } else { | ||
| LOG.warn( | ||
| "Neither scala.collection.mutable.ArraySeq$ofRef (Scala 2.13) nor " |
There was a problem hiding this comment.
Should we throw here, in case warning silently ignored.
| tasks.register("sparkVersionsTest") { | ||
| group = "Verification" | ||
| dependsOn sparkVersions.collect{k,v -> "sparkVersion${k}Test"} | ||
| } |
There was a problem hiding this comment.
Check failure in my side:
* What went wrong:
Execution failed for task ':runners:spark:4:analyzeClassesDependencies'.
> Dependency analysis found issues.
usedUndeclaredArtifacts
- org.apache.spark:spark-connect-shims_2.13:4.0.2@jar
| // Additional supported Spark 4.x versions for compatibility tests. | ||
| // Can be expanded as new patch releases are published. | ||
| def sparkVersions = [ | ||
| // "402": "4.0.2", // primary version; already tested via the default build |
There was a problem hiding this comment.
Since this is empty, we can remove the SparkVersion tests in build.gradle and beam_PreCommit_Java_Spark4_Versions is not needed for now. This makes PR simpler as well.
| // To address breaking interfaces between various version of Spark 3, expressions are | ||
| // created reflectively. This is fine as it's just needed once to create the query plan. | ||
| switch (STATIC_INVOKE_CONSTRUCTOR.getParameterCount()) { | ||
| switch (INVOKE_CONSTRUCTOR.getParameterCount()) { |
There was a problem hiding this comment.
I think this has reverted a workaround commented above, or addressing an existing bug? If this is needed for Spark 4, consider make this patch spark 4 specific (put the modified source into spark/4/src)
|
|
||
| @Override | ||
| public PipelineResult.State cancel() throws IOException { | ||
| pipelineExecution.cancel(true); |
|
|
||
| * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). | ||
| * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). | ||
| * Experimental Spark 4 runner added (Java), built against Spark 4.0.2 / Scala 2.13 and requiring Java 17. Currently supports batch only; streaming is not yet supported ([#36841](https://github.com/apache/beam/issues/36841)). |
There was a problem hiding this comment.
We can add to CHANGES.md latter when ValidatesRunner tests all setup and confirmed working.
| exclude 'Dockerfile' | ||
| } | ||
|
|
||
| task copySpark4Dockerfile(type: Copy) { |
There was a problem hiding this comment.
Execution failed for task ':runners:spark:4:job-server:container:dockerPrepare'.
Entry Dockerfile is a duplicate but no duplicate handling strategy has been set. Please refer to https://docs.gradle.org/8.14.3/dsl/org.gradle.api.tasks.Copy.html#org.gradle.api.tasks.Copy:duplicatesStrategy for details.
Needs some logic to handle dockerfile file pickup here. I can defer the runners/spark/job-server/container module to portable runner support later
- SparkRunnerKryoRegistrator: throw IllegalStateException instead of
LOG.warn when neither ArraySeq$ofRef (Scala 2.13) nor WrappedArray$ofRef
(Scala 2.12) is on the classpath, so the missing class isn't silently
ignored. Drops the now-unused Logger field and slf4j imports.
- spark_runner.gradle: declare org.apache.spark:spark-connect-shims_2.13
as a provided dep gated on isSparkAtLeast("4.0.0"). Spark 4 splits the
Connect shim classes out of spark-sql; with enableStrictDependencies
this surfaced as analyzeClassesDependencies usedUndeclaredArtifacts.
The artifact does not exist for Spark 3, so the gate prevents Spark 3
resolution failures.
- runners/spark/4/build.gradle: drop the empty sparkVersions test
scaffolding (no additional Spark 4.x patch versions to test against
yet) and delete the now-unused
.github/workflows/beam_PreCommit_Java_Spark4_Versions.yml workflow
+ its README.md row.
- EncoderFactory (shared base): revert the line 94 switch to
STATIC_INVOKE_CONSTRUCTOR.getParameterCount(), keeping Spark 3 behavior
byte-for-byte unchanged. Spark 4's complete EncoderFactory override
under runners/spark/4/src/.../EncoderFactory.java is unaffected.
- CHANGES.md: drop the Highlights line for Spark 4. Will re-add when
ValidatesRunner tests are set up and confirmed working, matching the
Phase 1 apache#38324 pattern.
- runners/spark/4/job-server/container: delete the entire module
(build.gradle + Dockerfile) and remove its include() from
settings.gradle.kts. Per @Abacn's offer to defer the container module
to portable runner support later. The fat-jar :runners:spark:4:job-server
module is kept.
| ## Highlights | ||
|
|
||
| * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). | ||
| * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). |
There was a problem hiding this comment.
Possibly a rebase error. Just revert the change on this branch.
There was a problem hiding this comment.
Restored the placeholder Java SDK line in #38489. Sorry for the rebase noise.
| @@ -0,0 +1,3 @@ | |||
| { | |||
| "comment": "Modify this file in a trivial way to cause this test suite to run" | |||
There was a problem hiding this comment.
Both trigger files are not needed (one not effective yet; another one is a leftover of deleted workflow)
There was a problem hiding this comment.
Deleted both trigger files in #38489. Note that since this comment, #38478 renamed the Spark4StructuredStreaming ValidatesRunner workflow to beam_PostCommit_Java_ValidatesRunner_Spark4.yml, so the second trigger JSON is now doubly orphaned. Left the renamed workflow YAML in place — happy to add a stub trigger JSON for it if you'd prefer, but my read of your guidance on #38453 is that we don't add those for new workflows.
|
The latest a few comments are plain text and not affect builds. Merging for now so we can move forward. |
|
@Abacn Sorry I was away for Friday/Monday - I will pick this up |
…pache#38271) * [runners-spark] Use robust constructor resolution in EncoderFactory Replace `(Constructor<X>) X.class.getConstructors()[0]` for StaticInvoke, Invoke, and NewInstance with a `primaryConstructor()` helper that picks the constructor with the most parameters. Class.getConstructors() returns constructors in JVM-defined order that is not guaranteed stable, so resolving the widest constructor explicitly makes the lookup robust to future Spark releases that add overloaded constructors. Today this is a no-op: StaticInvoke / Invoke / NewInstance only have one public constructor each in Spark 3.1.x through 3.5.x, so getConstructors()[0] and the widest constructor resolve to the same one. The change is purely defensive. Same fix has already landed in the Spark 4 override in PR apache#38255 (commit 9c071c5, flagged by Gemini Code Assist round 1). Porting it to the shared base keeps both code paths consistent. * Address Gemini review: guard primaryConstructor against empty ctors If Class.getConstructors() returns empty (e.g. all public constructors removed in a future Spark version), throw an IllegalStateException with the class name instead of letting ctors[0] surface as a cryptic ArrayIndexOutOfBoundsException during static initialization.
* build: add Spark 4.0.2 version property and Scala 2.13 support
Add spark4_version (4.0.2) to BeamModulePlugin alongside the existing
spark3_version. Update spark_runner.gradle to conditionally select the
correct Scala library (2.13 vs 2.12), Jackson module, Kafka test
dependency, and require Java 17 when building against Spark 4.
Register the new :runners:spark:4 module in settings.gradle.kts.
These changes are purely additive — all conditionals gate on
spark_version.startsWith("4") or spark_scala_version == '2.13', leaving
the Spark 3 build path untouched.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* refactor: make shared Spark source compatible with Scala 2.12 and 2.13
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* build: add runners/spark/4/ build configuration
Add the Gradle build file for the Spark 4 structured streaming runner.
The module mirrors runners/spark/3/ — it inherits the shared RDD-base
source from runners/spark/src/ via copySourceBase and adds its own
Structured Streaming implementation in src/main/java.
Key differences from the Spark 3 build:
- Uses spark4_version (4.0.2) with Scala 2.13.
- Excludes DStream-based streaming tests (Spark 4 supports only
structured streaming batch).
- Unconditionally adds --add-opens JVM flags required by Kryo on
Java 17 (Spark 4's minimum).
- Binds Spark driver to 127.0.0.1 for macOS compatibility.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* feat: add Spark 4 structured streaming runner source
Add the Spark 4 structured streaming runner implementation and tests.
Most files are adapted from the Spark 3 structured streaming runner
with targeted changes for Spark 4 / Scala 2.13 API compatibility.
Key Spark 4-specific changes (diff against runners/spark/3/src/):
EncoderFactory — Replaced the direct ExpressionEncoder constructor
(removed in Spark 4) with BeamAgnosticEncoder, a named class
implementing both AgnosticExpressionPathEncoder (for expression
delegation via toCatalyst/fromCatalyst) and AgnosticEncoders
.StructEncoder (so Dataset.select(TypedColumn) creates an N-attribute
plan, preventing FIELD_NUMBER_MISMATCH). The toCatalyst/fromCatalyst
methods substitute the provided input expression via transformUp,
enabling correct nesting inside composite encoders like
Encoders.tuple().
EncoderHelpers — Added toExpressionEncoder() helper to handle Spark 4
built-in encoders that are AgnosticEncoder subclasses rather than
ExpressionEncoder.
GroupByKeyTranslatorBatch — Migrated from internal catalyst Expression
API (CreateNamedStruct, Literal$) to public Column API (struct(),
lit(), array()), as required by Spark 4.
BoundedDatasetFactory — Use classic.Dataset$.MODULE$.ofRows() as
Dataset moved to org.apache.spark.sql.classic in Spark 4.
ScalaInterop — Replace WrappedArray.ofRef (removed in Scala 2.13)
with JavaConverters.asScalaBuffer().toList() in seqOf().
GroupByKeyHelpers, CombinePerKeyTranslatorBatch — Replace
TraversableOnce with IterableOnce (Scala 2.13 rename).
SparkStructuredStreamingPipelineResult — Replace sparkproject.guava
with Beam's vendored Guava.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* ci: add Spark 4 PreCommit and PostCommit workflows
Add GitHub Actions workflows for the Spark 4 runner module:
- beam_PreCommit_Java_Spark4_Versions: runs sparkVersionsTest on
changes to runners/spark/**. Currently a no-op (the sparkVersions
map is empty) but scaffolds future patch version coverage.
- beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming: runs
the structured streaming test suite on Java 17.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
* Add PreCommit Java Spark4 Versions workflow
* Add cancellation support to Spark pipeline execution
* Remove unused endOfData() call in close method
Remove endOfData() call in close method.
* build: add Spark 4 job-server and container modules
Add job-server and container build configurations for Spark 4,
mirroring the existing Spark 3 job-server setup. The container
uses eclipse-temurin:17 (Spark 4 requires Java 17). The shared
spark_job_server.gradle gains a requireJavaVersion conditional
for Spark 4 parent projects.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* build: remove spark.driver.host workaround from Spark 4 build
The hostname binding hack is no longer needed now that the local
machine resolves its hostname to 127.0.0.1 via /etc/hosts.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
* docs: add Spark 4 runner entry to CHANGES.md
Called out in /ultrareview as a missing contributor checklist item.
Adds a Highlight line and a New Features / Improvements entry under
the 2.74.0 Unreleased section, referencing issue apache#36841.
* docs: explain classic.SparkSession downcast in BoundedDatasetFactory
Per /ultrareview feedback: the one-line comment didn't make clear why
the cast is safe. Expand it to note that SparkSession.builder() always
returns a classic.SparkSession at runtime, which is why the downcast
avoids reflection.
* fix: log warning when neither WrappedArray nor ArraySeq class is found
Per /ultrareview feedback: the fallback branch silently swallowed the
second ClassNotFoundException. In practice one of the two classes is
always present (Scala 2.12 vs 2.13 stdlib), but a silent skip could
mask a broken classpath. Emit a LOG.warn instead.
* build: compare spark_version numerically via isSparkAtLeast helper
Per /ultrareview feedback: the five `"$spark_version" >= "3.5.0"` checks
were lexicographic string comparisons. They happened to work for 3.5.0
and 4.0.2 only because '4' > '3' as chars — a future "3.10.0" release
would compare less than "3.5.0" and silently drop the Spark 3.5+
dependencies and exclusions.
Introduce an `isSparkAtLeast` closure that tokenizes on `.` and `-`,
keeps numeric parts, and compares component-by-component. Replace all
five call sites.
* [Spark Runner] Slim Spark 4 to override-only files
With spark_runner.gradle now layering per-major source overrides on top
of the shared base, runners/spark/4/src/ no longer needs to duplicate
62 byte-identical structured-streaming files. Keep only the 11 files
that actually differ for Spark 4 / Scala 2.13. Switch the build.gradle
to spark_major = '4' (the new mechanism) and bump spark_versions to 3,4.
Compiled output unchanged — the deleted files are reproduced identically
inside build/source-overrides by the Copy task.
* [Spark Runner] Use java.io.Serializable in DoFnRunnerFactory base
scala.Serializable was removed in Scala 2.13. java.io.Serializable
works identically on both Scala 2.12 and 2.13, so this can live in
the shared base instead of needing a Spark-4-only override file.
* [Spark Runner] Null-guard error message logging in EvaluationContext base
Wrap Throwables.getRootCause(e).getMessage() in String.valueOf(...)
to make the error logging robust to a null root-cause message. The
behaviour change applies equally to Spark 3 and Spark 4, so the
fix lives in the shared base and the Spark-4 override is dropped.
* [Spark Runner] Cancel execution future and use Beam-vendored Guava in PipelineResult
Two changes that previously lived only in the Spark-4 override and
are equally valid for Spark 3:
1. cancel() now actually cancels the executing future
(pipelineExecution.cancel(true)) in addition to setting the state
to CANCELLED. Without this, calling cancel() left the pipeline
running silently — a real bug, not a Spark-4 specific concern.
2. Switch from Spark's shaded guava (org.sparkproject.guava) to the
Beam-vendored guava that is already on the classpath. Spark 4
no longer exposes the sparkproject guava package; using the
vendored one removes the version coupling for both runners.
* ci: re-trigger to clear flaky UnboundedScheduledExecutorServiceTest
Empty commit to re-run CI. The only failure on the prior head was
UnboundedScheduledExecutorServiceTest.testThreadsAreAddedOnlyAsNeededWithContention,
a known flake (apache#31590) — the test itself acknowledges
contention-induced extra threads in its inline comment. Squash or
drop on rebase before merge.
* [Spark Runner] Fix maxTimestamp to handle multi-window values
Iterables.getOnlyElement(windows) crashes with IllegalArgumentException
when a WindowedValue is associated with more than one window (e.g. after
a sliding window assignment). Compute the max maxTimestamp() across all
associated windows instead, falling back to a clear error if the iterable
is unexpectedly empty.
Applied identically to the shared base and the Spark 4 override. Flagged
by Gemini Code Assist on PR apache#38255.
* [Spark Runner] Drop unchecked cast in BoundedDatasetFactory.split
source.split returns List<? extends BoundedSource<T>>, which already
satisfies the subsequent stream usage. The cast was unchecked and would
trip heap-pollution warnings. Applied identically to the shared base
and the Spark 4 override. Flagged by Gemini Code Assist on PR apache#38255.
* [Spark Runner] Drop redundant Iterator cast in Spark 4 GroupByKeyTranslatorBatch
The (Iterator<V>) cast inside fun2 is redundant: fun2's signature
infers the iterator type. The shared base translator at the analogous
call site already calls iterableOnce(it) without a cast. Flagged by
Gemini Code Assist on PR apache#38255.
* [Spark Runner] Spark 4 EncoderFactory: stable constructor lookup + document trait setter
Replace getConstructors()[0] (JVM-defined ordering, not stable) with a
helper that picks the widest public constructor. The downstream switch
already dispatches on parameter count to pick the right argument shape
per Spark version, so this just makes the choice deterministic.
Also document the org$apache$spark...$_setter_$isStruct_$eq method —
it is the synthetic setter the Scala compiler emits for trait val fields,
required when implementing AgnosticEncoders.StructEncoder from Java.
Both flagged by Gemini Code Assist on PR apache#38255.
* [Spark Runner] Fix Javadoc/comment typos flagged by Gemini
Three trivial typos flagged on PR apache#38255 round 2 review, applied
identically to the shared base and the Spark 4 override:
- CombinePerKeyTranslatorBatch: "other there other missing features?"
-> "are there other missing features?"
- GroupByKeyTranslatorBatch: "build-in" -> "built-in"
- EncoderHelpers: PRIMITIV_TYPES -> PRIMITIVE_TYPES (constant + caller)
* [Spark Runner] Switch EncoderFactory.invoke on the right constructor
In EncoderFactory.invoke(Expression obj, ...), the switch was keyed on
STATIC_INVOKE_CONSTRUCTOR.getParameterCount() but the body actually
calls INVOKE_CONSTRUCTOR. This worked by coincidence: across the
supported Spark 3.x versions both constructors happen to share the
same parameter counts at the same dispatch points. A future Spark
release where the two diverge would silently pick the wrong branch.
Switch on INVOKE_CONSTRUCTOR.getParameterCount() to match the
constructor that is actually invoked, and align with the convention
used by newInstance() further down. In the Spark 4 override this also
lets us collapse the `case 8: case 9:` fallthrough back to a single
`case 8:`, since INVOKE_CONSTRUCTOR remains 8 params in Spark 4 even
though STATIC_INVOKE_CONSTRUCTOR grew to 9.
Applied identically to the shared base and the Spark 4 override.
Flagged by Gemini Code Assist on PR apache#38255.
* Update CHANGES.md
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
* [Spark 4] Drop redundant Collection cast in GroupByKeyHelpers
WindowedValue#getWindows() returns Collection<? extends BoundedWindow>,
which is already an Iterable and can be passed straight to
ScalaInterop.scalaIterator(...). The intermediate local variable and the
unchecked cast to Collection<BoundedWindow> were redundant.
Applied in both the shared base and the Spark 4 override.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
* [Spark 4] Add module README with slf4j-jdk14 known-issue note
Documents the Spark 4 runner's requirements (Java 17, Scala 2.13,
Spark 4.0.x, batch-only) and the slf4j-jdk14 ↔ jul-to-slf4j conflict
that is the Spark 4 manifestation of apache#26985 (fixed for Spark 3 in
apache#27001). The shared spark_runner.gradle already excludes slf4j-jdk14
for in-tree builds; this note tells downstream consumers to mirror the
exclude when assembling their own runtime classpath against
beam-runners-spark-4.
* [runners-spark] Address Gemini nits: use encoder terminology in exception messages
* Trigger Build
* ci: re-trigger to clear flaky FlinkRequiresStableInputTest
The PreCommit Java failure on the previous run was a single timeout in
FlinkRequiresStableInputTest.testParDoRequiresStableInputPortable
(:runners:flink:1.17:test) — known flake tracked in apache#21333. This PR
does not touch any Flink code. Squash or drop on rebase before merge.
* ci: re-trigger to clear flaky SqsIOWriteBatchesTest.testWriteBatchesToDynamicWithStrictTimeout
Wall-clock-timing test (100ms inter-message + 150ms strict batch
timeout) in sdks/java/io/amazon-web-services2 SQS — unrelated to
this PR (no AWS2/SQS/Direct-runner files touched), and master is
green for the same PreCommit on 6106b30.
* ci: re-trigger to clear Maven Central 403 on Windows wordcount
`Java Wordcount Direct Runner (windows-latest)` failed at the
:buildSrc configure step with HTTP 403 fetching legacy Spotless
5.6.1 transitive deps from repo.maven.apache.org
(spotless-lib:2.7.0, durian-*:1.2.0, jgit:5.8.0). Network/infra
flake — PR doesn't touch examples or buildSrc, master 'Java Tests'
workflow consistently green.
* ci: re-trigger to clear flaky Spotless + GCP IO Direct PreCommits
Both checks failed on the prior empty retry commit (e19b80c).
Reproduced locally at e19b80c: spotlessCheck and Spark
checkStyleMain/Test all pass. PR doesn't touch any GCP IO code,
and both checks were green on the immediately preceding branch
commits (5abbb21, 604037f) and on master (6106b30, e01f711).
Treating as infra flakes; squash before merge.
* [runners-spark] Cover Scala-array fallback and EvaluationContext error paths
Address codecov/patch on PR apache#38255 by exercising the new branches added for
Scala 2.13 / null-safe error logging:
- Refactor SparkRunnerKryoRegistrator's nested Scala-array Class.forName
fallback into a small @VisibleForTesting findFirstAvailableClass helper
and add unit tests for first-hit, fallback, no-match, and empty-input
paths.
- Add EvaluationContextTest covering the catch (RuntimeException) /
catch (Exception) blocks in evaluate() and collect(), including the
null-message path that motivated the String.valueOf wrap.
* [runners-spark] spotless: inline two findFirstAvailableClass calls in test
* flaky SqsIOWriteBatchesTest retry
* flaky ExampleEchoPipelineTest retry
* rebase cleanup: drop duplicate isSparkAtLeast helper now in master via apache#38324
* Address @Abacn 2026-05-07 review
- SparkRunnerKryoRegistrator: throw IllegalStateException instead of
LOG.warn when neither ArraySeq$ofRef (Scala 2.13) nor WrappedArray$ofRef
(Scala 2.12) is on the classpath, so the missing class isn't silently
ignored. Drops the now-unused Logger field and slf4j imports.
- spark_runner.gradle: declare org.apache.spark:spark-connect-shims_2.13
as a provided dep gated on isSparkAtLeast("4.0.0"). Spark 4 splits the
Connect shim classes out of spark-sql; with enableStrictDependencies
this surfaced as analyzeClassesDependencies usedUndeclaredArtifacts.
The artifact does not exist for Spark 3, so the gate prevents Spark 3
resolution failures.
- runners/spark/4/build.gradle: drop the empty sparkVersions test
scaffolding (no additional Spark 4.x patch versions to test against
yet) and delete the now-unused
.github/workflows/beam_PreCommit_Java_Spark4_Versions.yml workflow
+ its README.md row.
- EncoderFactory (shared base): revert the line 94 switch to
STATIC_INVOKE_CONSTRUCTOR.getParameterCount(), keeping Spark 3 behavior
byte-for-byte unchanged. Spark 4's complete EncoderFactory override
under runners/spark/4/src/.../EncoderFactory.java is unaffected.
- CHANGES.md: drop the Highlights line for Spark 4. Will re-add when
ValidatesRunner tests are set up and confirmed working, matching the
Phase 1 apache#38324 pattern.
- runners/spark/4/job-server/container: delete the entire module
(build.gradle + Dockerfile) and remove its include() from
settings.gradle.kts. Per @Abacn's offer to defer the container module
to portable runner support later. The fat-jar :runners:spark:4:job-server
module is kept.
---------
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|
Once validates runner tests (setup in #38478) is green we can announce Spark4 support for Java |
… remaining review comments from #38255 (#38489) * [runners-spark] Spark 4 follow-up: address remaining review comments from #38255 Addresses two trailing review comments left on PR #38255 at the approved/merged head SHA that were never folded in before merge: - Restore the `feature Y added to Java SDK` placeholder line in CHANGES.md Highlights (lost to a rebase). Comment: #38255 (comment) - Delete two orphaned `.github/trigger_files/*.json`: - `beam_PreCommit_Java_Spark4_Versions.json`: its workflow YAML was deleted in #38255. - `beam_PostCommit_Java_ValidatesRunner_Spark4StructuredStreaming.json`: flagged as "not effective yet"; since then, #38478 renamed the associated workflow YAML to `beam_PostCommit_Java_ValidatesRunner_Spark4.yml`, so this trigger JSON is doubly orphaned. Comment: #38255 (comment) The renamed workflow YAML stays in place; per the trigger-files convention these JSONs are only added when an author wants to manually trigger a specific PostCommit. * Update CHANGES.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Fix CHANGES.md URL: /pull/N -> /issues/N for validateChanges * Update CHANGES.md Co-authored-by: Yi Hu <huuyyi@gmail.com> --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Yi Hu <huuyyi@gmail.com>
Addresses #36841. Replaces #38212.
Builds on the shared base + per-version overrides plumbing introduced in #38233 (merged). With that in place, Spark 4 support is reduced to:
runners/spark/4/src/(no duplication of files that match the Spark 3 baseline)runners/spark/4/build.gradle, job-server module, container Dockerfilerunners/spark/src/gradle.properties:spark_versions=3,4The diff is smaller compared to #38212, which predated the refactor and duplicated the entire Spark 3 source tree.
cc @Abacn — this is the slim follow-up we discussed.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.