Skip to content

[SPARK-52339][SQL] Fix comparison of InMemoryFileIndex instances #51043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

bersprockets
Copy link
Contributor

What changes were proposed in this pull request?

This PR changes InMemoryFileIndex#equals to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, InMemoryFileIndex#equals considers the following two collections of root paths to be equal, even though they represent a different number of rows:

["/tmp/test", "/tmp/test"]
["/tmp/test", "/tmp/test", "/tmp/test"]

Why are the changes needed?

The bug can cause correctness issues, e.g.

// create test data
val data = Seq((1, 2), (2, 3)).toDF("a", "b")
data.write.mode("overwrite").csv("/tmp/test")

val fileList1 = List.fill(2)("/tmp/test")
val fileList2 = List.fill(3)("/tmp/test")

val df1 = spark.read.schema("a int, b int").csv(fileList1: _*)
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)

df1.count() // correctly returns 4
df2.count() // correctly returns 6

// the following is the same as above, except df1 is persisted
val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)

df1.count() // correctly returns 4
df2.count() // incorrectly returns 4!!

In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., Set("/tmp/test") == Set("/tmp/test")), the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan.

The same bug also causes inappropriate exchange reuse.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New unit test.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label May 28, 2025
Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks making sense to me

@HyukjinKwon
Copy link
Member

cc @cloud-fan

@cloud-fan
Copy link
Contributor

thanks, merging to master/4.0!

@cloud-fan cloud-fan closed this in ccded6c Jun 23, 2025
cloud-fan pushed a commit that referenced this pull request Jun 23, 2025
### What changes were proposed in this pull request?

This PR changes `InMemoryFileIndex#equals` to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, `InMemoryFileIndex#equals` considers the following two collections of root paths to be equal, even though they represent a different number of rows:
```
["/tmp/test", "/tmp/test"]
["/tmp/test", "/tmp/test", "/tmp/test"]
```

### Why are the changes needed?

The bug can cause correctness issues, e.g.
```
// create test data
val data = Seq((1, 2), (2, 3)).toDF("a", "b")
data.write.mode("overwrite").csv("/tmp/test")

val fileList1 = List.fill(2)("/tmp/test")
val fileList2 = List.fill(3)("/tmp/test")

val df1 = spark.read.schema("a int, b int").csv(fileList1: _*)
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)

df1.count() // correctly returns 4
df2.count() // correctly returns 6

// the following is the same as above, except df1 is persisted
val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)

df1.count() // correctly returns 4
df2.count() // incorrectly returns 4!!
```
In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan.

The same bug also causes inappropriate exchange reuse.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #51043 from bersprockets/multi_path_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit ccded6c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@pan3793
Copy link
Member

pan3793 commented Jun 23, 2025

@cloud-fan should the patch also go branch-3.5?

@cloud-fan
Copy link
Contributor

We need a separate PR. @bersprockets can you open a 3.5 backport PR?

YanivKunda pushed a commit to YanivKunda/spark that referenced this pull request Jun 23, 2025
### What changes were proposed in this pull request?

This PR changes `InMemoryFileIndex#equals` to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, `InMemoryFileIndex#equals` considers the following two collections of root paths to be equal, even though they represent a different number of rows:
```
["/tmp/test", "/tmp/test"]
["/tmp/test", "/tmp/test", "/tmp/test"]
```

### Why are the changes needed?

The bug can cause correctness issues, e.g.
```
// create test data
val data = Seq((1, 2), (2, 3)).toDF("a", "b")
data.write.mode("overwrite").csv("/tmp/test")

val fileList1 = List.fill(2)("/tmp/test")
val fileList2 = List.fill(3)("/tmp/test")

val df1 = spark.read.schema("a int, b int").csv(fileList1: _*)
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)

df1.count() // correctly returns 4
df2.count() // correctly returns 6

// the following is the same as above, except df1 is persisted
val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)

df1.count() // correctly returns 4
df2.count() // incorrectly returns 4!!
```
In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan.

The same bug also causes inappropriate exchange reuse.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51043 from bersprockets/multi_path_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
bersprockets added a commit to bersprockets/spark that referenced this pull request Jun 23, 2025
### What changes were proposed in this pull request?

This PR changes `InMemoryFileIndex#equals` to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, `InMemoryFileIndex#equals` considers the following two collections of root paths to be equal, even though they represent a different number of rows:
```
["/tmp/test", "/tmp/test"]
["/tmp/test", "/tmp/test", "/tmp/test"]
```

### Why are the changes needed?

The bug can cause correctness issues, e.g.
```
// create test data
val data = Seq((1, 2), (2, 3)).toDF("a", "b")
data.write.mode("overwrite").csv("/tmp/test")

val fileList1 = List.fill(2)("/tmp/test")
val fileList2 = List.fill(3)("/tmp/test")

val df1 = spark.read.schema("a int, b int").csv(fileList1: _*)
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)

df1.count() // correctly returns 4
df2.count() // correctly returns 6

// the following is the same as above, except df1 is persisted
val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)

df1.count() // correctly returns 4
df2.count() // incorrectly returns 4!!
```
In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan.

The same bug also causes inappropriate exchange reuse.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51043 from bersprockets/multi_path_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
yaooqinn pushed a commit that referenced this pull request Jun 24, 2025
### What changes were proposed in this pull request?

This is a back-port of #51043.

This PR changes `InMemoryFileIndex#equals` to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, `InMemoryFileIndex#equals` considers the following two collections of root paths to be equal, even though they represent a different number of rows:
```
["/tmp/test", "/tmp/test"]
["/tmp/test", "/tmp/test", "/tmp/test"]
```

### Why are the changes needed?

The bug can cause correctness issues, e.g.
```
// create test data
val data = Seq((1, 2), (2, 3)).toDF("a", "b")
data.write.mode("overwrite").csv("/tmp/test")

val fileList1 = List.fill(2)("/tmp/test")
val fileList2 = List.fill(3)("/tmp/test")

val df1 = spark.read.schema("a int, b int").csv(fileList1: _*)
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)

df1.count() // correctly returns 4
df2.count() // correctly returns 6

// the following is the same as above, except df1 is persisted
val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)

df1.count() // correctly returns 4
df2.count() // incorrectly returns 4!!
```
In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan.

The same bug also causes inappropriate exchange reuse.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #51256 from bersprockets/multi_path_issue_br35.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Kent Yao <yao@apache.org>
yaooqinn added a commit that referenced this pull request Jun 25, 2025
…ly when size matches

### What changes were proposed in this pull request?

A follow-up for #51043 that sorts paths in InMemoryFileIndex#equal only when size matches

### Why are the changes needed?

Avoid potential perf regression.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Existing test from #51043

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #51263 from yaooqinn/SPARK-52339.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
yaooqinn added a commit that referenced this pull request Jun 25, 2025
…ly when size matches

### What changes were proposed in this pull request?

A follow-up for #51043 that sorts paths in InMemoryFileIndex#equal only when size matches

### Why are the changes needed?

Avoid potential perf regression.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Existing test from #51043

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #51263 from yaooqinn/SPARK-52339.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit 1cfe07c)
Signed-off-by: Kent Yao <yao@apache.org>
yaooqinn added a commit that referenced this pull request Jun 25, 2025
…ly when size matches

### What changes were proposed in this pull request?

A follow-up for #51043 that sorts paths in InMemoryFileIndex#equal only when size matches

### Why are the changes needed?

Avoid potential perf regression.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

Existing test from #51043

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #51263 from yaooqinn/SPARK-52339.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit 1cfe07c)
Signed-off-by: Kent Yao <yao@apache.org>
haoyangeng-db pushed a commit to haoyangeng-db/apache-spark that referenced this pull request Jun 25, 2025
### What changes were proposed in this pull request?

This PR changes `InMemoryFileIndex#equals` to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, `InMemoryFileIndex#equals` considers the following two collections of root paths to be equal, even though they represent a different number of rows:
```
["/tmp/test", "/tmp/test"]
["/tmp/test", "/tmp/test", "/tmp/test"]
```

### Why are the changes needed?

The bug can cause correctness issues, e.g.
```
// create test data
val data = Seq((1, 2), (2, 3)).toDF("a", "b")
data.write.mode("overwrite").csv("/tmp/test")

val fileList1 = List.fill(2)("/tmp/test")
val fileList2 = List.fill(3)("/tmp/test")

val df1 = spark.read.schema("a int, b int").csv(fileList1: _*)
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)

df1.count() // correctly returns 4
df2.count() // correctly returns 6

// the following is the same as above, except df1 is persisted
val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)

df1.count() // correctly returns 4
df2.count() // incorrectly returns 4!!
```
In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan.

The same bug also causes inappropriate exchange reuse.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#51043 from bersprockets/multi_path_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants