-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52339][SQL] Fix comparison of InMemoryFileIndex
instances
#51043
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
HyukjinKwon
approved these changes
May 29, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks making sense to me
This reverts commit 42c2acc.
8f0302d
to
a939125
Compare
cc @cloud-fan |
cloud-fan
approved these changes
Jun 23, 2025
thanks, merging to master/4.0! |
cloud-fan
pushed a commit
that referenced
this pull request
Jun 23, 2025
### What changes were proposed in this pull request? This PR changes `InMemoryFileIndex#equals` to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, `InMemoryFileIndex#equals` considers the following two collections of root paths to be equal, even though they represent a different number of rows: ``` ["/tmp/test", "/tmp/test"] ["/tmp/test", "/tmp/test", "/tmp/test"] ``` ### Why are the changes needed? The bug can cause correctness issues, e.g. ``` // create test data val data = Seq((1, 2), (2, 3)).toDF("a", "b") data.write.mode("overwrite").csv("/tmp/test") val fileList1 = List.fill(2)("/tmp/test") val fileList2 = List.fill(3)("/tmp/test") val df1 = spark.read.schema("a int, b int").csv(fileList1: _*) val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // correctly returns 6 // the following is the same as above, except df1 is persisted val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // incorrectly returns 4!! ``` In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan. The same bug also causes inappropriate exchange reuse. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51043 from bersprockets/multi_path_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit ccded6c) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan should the patch also go |
We need a separate PR. @bersprockets can you open a 3.5 backport PR? |
YanivKunda
pushed a commit
to YanivKunda/spark
that referenced
this pull request
Jun 23, 2025
### What changes were proposed in this pull request? This PR changes `InMemoryFileIndex#equals` to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, `InMemoryFileIndex#equals` considers the following two collections of root paths to be equal, even though they represent a different number of rows: ``` ["/tmp/test", "/tmp/test"] ["/tmp/test", "/tmp/test", "/tmp/test"] ``` ### Why are the changes needed? The bug can cause correctness issues, e.g. ``` // create test data val data = Seq((1, 2), (2, 3)).toDF("a", "b") data.write.mode("overwrite").csv("/tmp/test") val fileList1 = List.fill(2)("/tmp/test") val fileList2 = List.fill(3)("/tmp/test") val df1 = spark.read.schema("a int, b int").csv(fileList1: _*) val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // correctly returns 6 // the following is the same as above, except df1 is persisted val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // incorrectly returns 4!! ``` In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan. The same bug also causes inappropriate exchange reuse. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#51043 from bersprockets/multi_path_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
bersprockets
added a commit
to bersprockets/spark
that referenced
this pull request
Jun 23, 2025
### What changes were proposed in this pull request? This PR changes `InMemoryFileIndex#equals` to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, `InMemoryFileIndex#equals` considers the following two collections of root paths to be equal, even though they represent a different number of rows: ``` ["/tmp/test", "/tmp/test"] ["/tmp/test", "/tmp/test", "/tmp/test"] ``` ### Why are the changes needed? The bug can cause correctness issues, e.g. ``` // create test data val data = Seq((1, 2), (2, 3)).toDF("a", "b") data.write.mode("overwrite").csv("/tmp/test") val fileList1 = List.fill(2)("/tmp/test") val fileList2 = List.fill(3)("/tmp/test") val df1 = spark.read.schema("a int, b int").csv(fileList1: _*) val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // correctly returns 6 // the following is the same as above, except df1 is persisted val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // incorrectly returns 4!! ``` In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan. The same bug also causes inappropriate exchange reuse. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#51043 from bersprockets/multi_path_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
yaooqinn
pushed a commit
that referenced
this pull request
Jun 24, 2025
### What changes were proposed in this pull request? This is a back-port of #51043. This PR changes `InMemoryFileIndex#equals` to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, `InMemoryFileIndex#equals` considers the following two collections of root paths to be equal, even though they represent a different number of rows: ``` ["/tmp/test", "/tmp/test"] ["/tmp/test", "/tmp/test", "/tmp/test"] ``` ### Why are the changes needed? The bug can cause correctness issues, e.g. ``` // create test data val data = Seq((1, 2), (2, 3)).toDF("a", "b") data.write.mode("overwrite").csv("/tmp/test") val fileList1 = List.fill(2)("/tmp/test") val fileList2 = List.fill(3)("/tmp/test") val df1 = spark.read.schema("a int, b int").csv(fileList1: _*) val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // correctly returns 6 // the following is the same as above, except df1 is persisted val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // incorrectly returns 4!! ``` In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan. The same bug also causes inappropriate exchange reuse. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51256 from bersprockets/multi_path_issue_br35. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Kent Yao <yao@apache.org>
yaooqinn
added a commit
that referenced
this pull request
Jun 25, 2025
…ly when size matches ### What changes were proposed in this pull request? A follow-up for #51043 that sorts paths in InMemoryFileIndex#equal only when size matches ### Why are the changes needed? Avoid potential perf regression. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test from #51043 ### Was this patch authored or co-authored using generative AI tooling? No Closes #51263 from yaooqinn/SPARK-52339. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
yaooqinn
added a commit
that referenced
this pull request
Jun 25, 2025
…ly when size matches ### What changes were proposed in this pull request? A follow-up for #51043 that sorts paths in InMemoryFileIndex#equal only when size matches ### Why are the changes needed? Avoid potential perf regression. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test from #51043 ### Was this patch authored or co-authored using generative AI tooling? No Closes #51263 from yaooqinn/SPARK-52339. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org> (cherry picked from commit 1cfe07c) Signed-off-by: Kent Yao <yao@apache.org>
yaooqinn
added a commit
that referenced
this pull request
Jun 25, 2025
…ly when size matches ### What changes were proposed in this pull request? A follow-up for #51043 that sorts paths in InMemoryFileIndex#equal only when size matches ### Why are the changes needed? Avoid potential perf regression. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test from #51043 ### Was this patch authored or co-authored using generative AI tooling? No Closes #51263 from yaooqinn/SPARK-52339. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org> (cherry picked from commit 1cfe07c) Signed-off-by: Kent Yao <yao@apache.org>
haoyangeng-db
pushed a commit
to haoyangeng-db/apache-spark
that referenced
this pull request
Jun 25, 2025
### What changes were proposed in this pull request? This PR changes `InMemoryFileIndex#equals` to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, `InMemoryFileIndex#equals` considers the following two collections of root paths to be equal, even though they represent a different number of rows: ``` ["/tmp/test", "/tmp/test"] ["/tmp/test", "/tmp/test", "/tmp/test"] ``` ### Why are the changes needed? The bug can cause correctness issues, e.g. ``` // create test data val data = Seq((1, 2), (2, 3)).toDF("a", "b") data.write.mode("overwrite").csv("/tmp/test") val fileList1 = List.fill(2)("/tmp/test") val fileList2 = List.fill(3)("/tmp/test") val df1 = spark.read.schema("a int, b int").csv(fileList1: _*) val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // correctly returns 6 // the following is the same as above, except df1 is persisted val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // incorrectly returns 4!! ``` In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan. The same bug also causes inappropriate exchange reuse. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#51043 from bersprockets/multi_path_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR changes
InMemoryFileIndex#equals
to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change,InMemoryFileIndex#equals
considers the following two collections of root paths to be equal, even though they represent a different number of rows:Why are the changes needed?
The bug can cause correctness issues, e.g.
In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g.,
Set("/tmp/test") == Set("/tmp/test"))
, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan.The same bug also causes inappropriate exchange reuse.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New unit test.
Was this patch authored or co-authored using generative AI tooling?
No.