-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Spark Runner] Prepare Spark 3 structured-streaming to shared base, adopt Flink-style version overrides #38233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tkaymak
wants to merge
1
commit into
apache:master
Choose a base branch
from
tkaymak:spark-refactor-base
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -89,38 +89,102 @@ def hadoopVersions = [ | |||
|
|
||||
| hadoopVersions.each { kv -> configurations.create("hadoopVersion$kv.key") } | ||||
|
|
||||
| def sourceBase = "${project.projectDir}/../src" | ||||
| def sourceBaseCopy = "${project.buildDir}/sourcebase/src" | ||||
|
|
||||
| def useCopiedSourceSet = { scope, type, trigger -> | ||||
| def taskName = "copy${scope.capitalize()}${type.capitalize()}" | ||||
| trigger.dependsOn tasks.register(taskName, Copy) { | ||||
| from "$sourceBase/$scope/$type" | ||||
| into "$sourceBaseCopy/$scope/$type" | ||||
| duplicatesStrategy DuplicatesStrategy.INCLUDE | ||||
| /* | ||||
| * Per-version source overrides (mirrors runners/flink/flink_runner.gradle). | ||||
| * | ||||
| * Layout: | ||||
| * runners/spark/src/ -- shared base (lowest supported version uses these directly) | ||||
| * runners/spark/<major>/src/ -- version-specific overrides (later overrides win) | ||||
| * | ||||
| * The lowest supported `spark_major` builds straight from the shared base. | ||||
| * Higher versions copy <shared> + <previous majors> + <current> into a single | ||||
| * source-overrides directory using DuplicatesStrategy.INCLUDE so the current | ||||
| * version's files override earlier ones. | ||||
| */ | ||||
| def base_path = ".." | ||||
|
|
||||
| def overrides = { versions, type, group = 'java' -> | ||||
| // order matters: later entries override earlier ones during the Copy | ||||
| ["${base_path}/src/${type}/${group}"] + | ||||
| versions.collect { "${base_path}/${it}/src/${type}/${group}" } + | ||||
| ["./src/${type}/${group}"] | ||||
| } | ||||
|
|
||||
| def all_versions = spark_versions.split(",").collect { it.trim() } | ||||
| // Determine version order by list position rather than string comparison so two-digit | ||||
| // majors (e.g. "10") still sort after single-digit ones. | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a good catch, as a follow up we may fix the logic in flink runner gradle: beam/runners/flink/flink_runner.gradle Line 37 in 4e24a9c
|
||||
| def spark_major_index = all_versions.indexOf(spark_major) | ||||
| if (spark_major_index < 0) { | ||||
| throw new GradleException( | ||||
| "spark_major='${spark_major}' is not listed in spark_versions='${spark_versions}' " + | ||||
| "(see root gradle.properties).") | ||||
| } | ||||
| def previous_versions = spark_major_index > 0 ? all_versions.subList(0, spark_major_index) : [] | ||||
|
|
||||
| def main_source_overrides = overrides(previous_versions, "main") | ||||
| def test_source_overrides = overrides(previous_versions, "test") | ||||
| def main_resources_overrides = overrides(previous_versions, "main", "resources") | ||||
| def test_resources_overrides = overrides(previous_versions, "test", "resources") | ||||
|
|
||||
| def sourceOverridesBase = project.layout.buildDirectory.dir('source-overrides/src').get() | ||||
|
|
||||
| def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { copyTask -> | ||||
| copyTask.from main_source_overrides | ||||
| copyTask.into "${sourceOverridesBase}/main/java" | ||||
| copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE | ||||
| if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('main')) { | ||||
| project.ext.excluded_files.main.each { f -> copyTask.exclude "**/${f}" } | ||||
| } | ||||
| // append copied sources to srcDirs | ||||
| sourceSets."$scope"."$type".srcDirs "$sourceBaseCopy/$scope/$type" | ||||
| } | ||||
|
|
||||
| if (copySourceBase) { | ||||
| // Copy source base into build directory. | ||||
| // While this is not necessary, having multiple source sets referencing the same shared base will typically confuse an IDE and harm developer experience. | ||||
| // The copySourceBase flag can be swapped without any implications and allows to pick a main version that is actively worked on. | ||||
| useCopiedSourceSet("main", "java", compileJava) | ||||
| useCopiedSourceSet("main", "resources", processResources) | ||||
| useCopiedSourceSet("test", "java", compileTestJava) | ||||
| useCopiedSourceSet("test", "resources", processTestResources) | ||||
| def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) { | ||||
| it.from main_resources_overrides | ||||
| it.into "${sourceOverridesBase}/main/resources" | ||||
| it.duplicatesStrategy DuplicatesStrategy.INCLUDE | ||||
| } | ||||
|
|
||||
| def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) { copyTask -> | ||||
| copyTask.from test_source_overrides | ||||
| copyTask.into "${sourceOverridesBase}/test/java" | ||||
| copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE | ||||
| if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('test')) { | ||||
| project.ext.excluded_files.test.each { f -> copyTask.exclude "**/${f}" } | ||||
| } | ||||
| } | ||||
|
|
||||
| def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Copy) { | ||||
| it.from test_resources_overrides | ||||
| it.into "${sourceOverridesBase}/test/resources" | ||||
| it.duplicatesStrategy DuplicatesStrategy.INCLUDE | ||||
| } | ||||
|
|
||||
| def use_override = (spark_major_index > 0) | ||||
| def sourceBase = "${project.projectDir}/../src" | ||||
|
|
||||
| if (use_override) { | ||||
| // Pin srcDirs to the Copy task providers so each higher version sees only its merged | ||||
| // overrides tree. Passing the TaskProviders here lets Gradle auto-wire task dependencies | ||||
| // for every consumer (compile, javadoc, sources jar, etc.) without manual dependsOn. | ||||
| sourceSets { | ||||
| main { | ||||
| java { srcDirs = [copySourceOverrides] } | ||||
| resources { srcDirs = [copyResourcesOverrides] } | ||||
| } | ||||
| test { | ||||
| java { srcDirs = [copyTestSourceOverrides] } | ||||
| resources { srcDirs = [copyTestResourcesOverrides] } | ||||
| } | ||||
| } | ||||
| } else { | ||||
| // append shared base sources to srcDirs | ||||
| // Lowest supported Spark version: build straight from the shared base, no copy step. | ||||
| sourceSets { | ||||
| main { | ||||
| java.srcDirs "${sourceBase}/main/java" | ||||
| resources.srcDirs "${sourceBase}/main/resources" | ||||
| java { srcDirs = ["${sourceBase}/main/java"] } | ||||
| resources { srcDirs = ["${sourceBase}/main/resources"] } | ||||
| } | ||||
| test { | ||||
| java.srcDirs "${sourceBase}/test/java" | ||||
| resources.srcDirs "${sourceBase}/test/resources" | ||||
| java { srcDirs = ["${sourceBase}/test/java"] } | ||||
| resources { srcDirs = ["${sourceBase}/test/resources"] } | ||||
|
tkaymak marked this conversation as resolved.
|
||||
| } | ||||
| } | ||||
| } | ||||
|
|
||||
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rephrase the comments a bit to make it plain descriptive (win -> takes precedence)