Skip to content

Commit

Permalink
[SPARK-29419][SQL] Fix Encoder thread-safety bug in createDataset(Seq)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR fixes a thread-safety bug in `SparkSession.createDataset(Seq)`: if the caller-supplied `Encoder` is used in multiple threads then createDataset's usage of the encoder may lead to incorrect / corrupt results because the Encoder's internal mutable state will be updated from multiple threads.

Here is an example demonstrating the problem:

```scala
import org.apache.spark.sql._

val enc = implicitly[Encoder[(Int, Int)]]

val datasets = (1 to 100).par.map { _ =>
  val pairs = (1 to 100).map(x => (x, x))
  spark.createDataset(pairs)(enc)
}

datasets.reduce(_ union _).collect().foreach {
  pair => require(pair._1 == pair._2, s"Pair elements are mismatched: $pair")
}
```

Before this PR's change, the above example fails because Spark produces corrupted records where different input records' fields have been co-mingled.

This bug is similar to SPARK-22355 / #19577, a similar problem in `Dataset.collect()`.

The fix implemented here is based on #24735's updated version of the `Datataset.collect()` bugfix: use `.copy()`. For consistency, I used same [code comment](https://github.com/apache/spark/blob/d841b33ba3a9b0504597dbccd4b0d11fa810abf3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3414) / explanation as that PR.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Tested manually using the example listed above.

Thanks to smcnamara-stripe for identifying this bug.

Closes #26076 from JoshRosen/SPARK-29419.

Authored-by: Josh Rosen <rosenville@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
(cherry picked from commit f4499f6)
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
JoshRosen authored and HyukjinKwon committed Mar 2, 2020
1 parent ff5ba49 commit 0d1664c
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,8 @@ class SparkSession private(
@Experimental
@InterfaceStability.Evolving
def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {
val enc = encoderFor[T]
// `ExpressionEncoder` is not thread-safe, here we create a new encoder.
val enc = encoderFor[T].copy()
val attributes = enc.schema.toAttributes
val encoded = data.map(d => enc.toRow(d).copy())
val plan = new LocalRelation(attributes, encoded)
Expand Down

0 comments on commit 0d1664c

Please sign in to comment.