Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22355][SQL] Dataset.collect is not threadsafe #19577

Closed
wants to merge 2 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

It's possible that users create a Dataset, and call collect of this Dataset in many threads at the same time. Currently Dataset#collect just call encoder.fromRow to convert spark rows to objects of type T, and this encoder is per-dataset. This means Dataset#collect is not thread-safe, because the encoder uses a projection to output the object to a re-usable row.

This PR fixes this problem, by creating a new projection when calling Dataset#collect, so that we have the re-usable row for each method call, instead of each Dataset.

How was this patch tested?

N/A

@cloud-fan
Copy link
Contributor Author

cc @zsxwing @viirya @kiszk

@SparkQA
Copy link

SparkQA commented Oct 26, 2017

Test build #83063 has finished for PR 19577 at commit cecea8c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -3102,7 +3103,12 @@ class Dataset[T] private[sql](
* Collect all elements from a spark plan.
*/
private def collectFromPlan(plan: SparkPlan): Array[T] = {
plan.executeCollect().map(boundEnc.fromRow)
val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
Copy link
Member

@viirya viirya Oct 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromRow has caught Exception during decoding. Shall we also catch it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it just rethrow the exception, not a big deal

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Looks good.

@@ -2661,7 +2657,12 @@ class Dataset[T] private[sql](
*/
def toLocalIterator(): java.util.Iterator[T] = {
withAction("toLocalIterator", queryExecution) { plan =>
plan.executeToIterator().map(boundEnc.fromRow).asJava
val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
Copy link
Member

@viirya viirya Oct 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be better to explain we keep the projection inside for thread-safe with a comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@viirya
Copy link
Member

viirya commented Oct 26, 2017

Nice catch! LGTM with two minor comments.

@kiszk
Copy link
Member

kiszk commented Oct 26, 2017

Good catch, LGTM.
Here is my observation. This problem occurs since this SpecicInternalRow is used per Dataset. This can be shared by multiple threads. To avoid this sharing, this PR assigns a Projection into a local variable as val objProj = GenerateSafeProjection.generate(deserializer :: Nil).
It is not easy to create a test case for this.

@SparkQA
Copy link

SparkQA commented Oct 26, 2017

Test build #83091 has finished for PR 19577 at commit dfcdb23.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Oct 27, 2017
It's possible that users create a `Dataset`, and call `collect` of this `Dataset` in many threads at the same time. Currently `Dataset#collect` just call `encoder.fromRow` to convert spark rows to objects of type T, and this encoder is per-dataset. This means `Dataset#collect` is not thread-safe, because the encoder uses a projection to output the object to a re-usable row.

This PR fixes this problem, by creating a new projection when calling `Dataset#collect`, so that we have the re-usable row for each method call, instead of each Dataset.

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19577 from cloud-fan/encoder.

(cherry picked from commit 5c3a1f3)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
@gatorsmile
Copy link
Member

gatorsmile commented Oct 27, 2017

Thanks! Merged to master/2.2. cc @zsxwing

@asfgit asfgit closed this in 5c3a1f3 Oct 27, 2017
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
It's possible that users create a `Dataset`, and call `collect` of this `Dataset` in many threads at the same time. Currently `Dataset#collect` just call `encoder.fromRow` to convert spark rows to objects of type T, and this encoder is per-dataset. This means `Dataset#collect` is not thread-safe, because the encoder uses a projection to output the object to a re-usable row.

This PR fixes this problem, by creating a new projection when calling `Dataset#collect`, so that we have the re-usable row for each method call, instead of each Dataset.

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#19577 from cloud-fan/encoder.

(cherry picked from commit 5c3a1f3)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
HyukjinKwon pushed a commit that referenced this pull request Mar 2, 2020
### What changes were proposed in this pull request?

This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

```scala
import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.

The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tested manually using the example listed above.

Thanks to smcnamara-stripe for identifying this bug.

Closes #26076 from JoshRosen/SPARK-29419.

Authored-by: Josh Rosen <rosenville@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit f4499f6)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Mar 2, 2020
### What changes were proposed in this pull request?

This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

```scala
import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.

The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tested manually using the example listed above.

Thanks to smcnamara-stripe for identifying this bug.

Closes #26076 from JoshRosen/SPARK-29419.

Authored-by: Josh Rosen <rosenville@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Mar 2, 2020
### What changes were proposed in this pull request?

This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

```scala
import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.

The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tested manually using the example listed above.

Thanks to smcnamara-stripe for identifying this bug.

Closes #26076 from JoshRosen/SPARK-29419.

Authored-by: Josh Rosen <rosenville@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit f4499f6)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
### What changes were proposed in this pull request?

This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

```scala
import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / apache#19577, a similar problem in `Dataset.collect()`.

The fix implemented here is based on apache#24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tested manually using the example listed above.

Thanks to smcnamara-stripe for identifying this bug.

Closes apache#26076 from JoshRosen/SPARK-29419.

Authored-by: Josh Rosen <rosenville@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants