Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17110] Fix StreamCorruptionException in BlockManager.getRemoteValues() #14952

Closed
wants to merge 4 commits into from

Conversation

JoshRosen
Copy link
Contributor

What changes were proposed in this pull request?

This patch fixes a java.io.StreamCorruptedException error affecting remote reads of cached values when certain data types are used. The problem stems from #11801 / SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs. If PySpark cached a PythonRDD, then this would be cached as an RDD[Array[Byte]] and the automatic serializer selection would pick KryoSerializer for replication and block transfer. However, the getRemoteValues() / getRemoteBytes() code path did not pass proper class tags in order to enable the same serializer to be used during deserialization, causing Java to be inappropriately used instead of Kryo, leading to the StreamCorruptedException.

We already fixed a similar bug in #14311, which dealt with similar issues in block replication. Prior to that patch, it seems that we had no tests to ensure that block replication actually succeeded. Similarly, prior to this bug fix patch it looks like we had no tests to perform remote reads of cached data, which is why this bug was able to remain latent for so long.

This patch addresses the bug by modifying BlockManager's get() and getRemoteValues() methods to accept ClassTags, allowing the proper class tag to be threaded in the getOrElseUpdate code path (which is used by rdd.iterator)

How was this patch tested?

Extended the caching tests in DistributedSuite to exercise the getRemoteValues path, plus manual testing to verify that the PySpark bug reproduction in SPARK-17110 is fixed.

@JoshRosen
Copy link
Contributor Author

/cc @ericl

@SparkQA
Copy link

SparkQA commented Sep 3, 2016

Test build #64908 has finished for PR 14952 at commit 9eb75f5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

getRemoteBytes(blockId).map { data =>
val values =
serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))
serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for dataDeserializeStream to require a classtag to be explicitly passed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not saying this should definitely be done one way or the other, but I'm curious why you have a preference for the extra code and more verbose API that come with making the classTag an explicit parameter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it is easy to accidentally forget to pass a correct classtag, since this has happened twice already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you forget to pass a correct ClassTag when the compiler is enforcing its presence via the context bound?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the problem is that the type parameter was inferred as Any.

@SparkQA
Copy link

SparkQA commented Sep 4, 2016

Test build #64925 has finished for PR 14952 at commit 68db68d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ericl
Copy link
Contributor

ericl commented Sep 6, 2016

looks good

@JoshRosen
Copy link
Contributor Author

I'm going to merge this into master and branch-2.0 as an immediate fix for the PySpark caching issue.

asfgit pushed a commit that referenced this pull request Sep 6, 2016
…Values()

## What changes were proposed in this pull request?

This patch fixes a `java.io.StreamCorruptedException` error affecting remote reads of cached values when certain data types are used. The problem stems from #11801 / SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs. If PySpark cached a PythonRDD, then this would be cached as an `RDD[Array[Byte]]` and the automatic serializer selection would pick KryoSerializer for replication and block transfer. However, the `getRemoteValues()` / `getRemoteBytes()` code path did not pass proper class tags in order to enable the same serializer to be used during deserialization, causing Java to be inappropriately used instead of Kryo, leading to the StreamCorruptedException.

We already fixed a similar bug in #14311, which dealt with similar issues in block replication. Prior to that patch, it seems that we had no tests to ensure that block replication actually succeeded. Similarly, prior to this bug fix patch it looks like we had no tests to perform remote reads of cached data, which is why this bug was able to remain latent for so long.

This patch addresses the bug by modifying `BlockManager`'s `get()` and  `getRemoteValues()` methods to accept ClassTags, allowing the proper class tag to be threaded in the `getOrElseUpdate` code path (which is used by `rdd.iterator`)

## How was this patch tested?

Extended the caching tests in `DistributedSuite` to exercise the `getRemoteValues` path, plus manual testing to verify that the PySpark bug reproduction in SPARK-17110 is fixed.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #14952 from JoshRosen/SPARK-17110.

(cherry picked from commit 29cfab3)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
@asfgit asfgit closed this in 29cfab3 Sep 6, 2016
@JoshRosen JoshRosen deleted the SPARK-17110 branch September 6, 2016 22:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants