Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18560][CORE][STREAMING][WIP] Receiver data can not be deserialized properly. #15992

Closed
wants to merge 3 commits into from

Conversation

uncleGen
Copy link
Contributor

@uncleGen uncleGen commented Nov 23, 2016

What changes were proposed in this pull request?

My spark streaming job can run correctly on Spark 1.6.1, but it can not run properly on Spark master branch, with following exception:

16/11/22 19:20:15 ERROR executor.Executor: Exception in task 4.3 in stage 6.0 (TID 87)
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:243)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Dig deep into relevant implementation, I find the type of data received by Receiver is erased. And in Spark2.x, framework can choose a appropriate Serializer from JavaSerializer and KryoSerializer base on the type of data.

At the Receiver side, the type of data is erased to be Object, so framework will choose JavaSerializer, with following code:

def canUseKryo(ct: ClassTag[_]): Boolean = {
    primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag
  }

  def getSerializer(ct: ClassTag[_]): Serializer = {
    if (canUseKryo(ct)) {
      kryoSerializer
    } else {
      defaultSerializer
    }
  }

At task side, we can get correct data type, and framework will choose KryoSerializer if possible, with following supported type:


private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]]
private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
    val primitiveClassTags = Set[ClassTag[_]](
      ClassTag.Boolean,
      ClassTag.Byte,
      ClassTag.Char,
      ClassTag.Double,
      ClassTag.Float,
      ClassTag.Int,
      ClassTag.Long,
      ClassTag.Null,
      ClassTag.Short
    )
    val arrayClassTags = primitiveClassTags.map(_.wrap)
    primitiveClassTags ++ arrayClassTags
  }

In my case, the type of data is Byte Array.

This problem stems from SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs.

How was this patch tested?

update existing unit test

@uncleGen
Copy link
Contributor Author

cc @JoshRosen

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69063 has finished for PR 15992 at commit cbc4fc6.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class Receiver[T](val storageLevel: StorageLevel)(implicit t: ClassTag[T])

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69064 has finished for PR 15992 at commit 489766e.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69066 has finished for PR 15992 at commit 9e89e16.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69067 has finished for PR 15992 at commit ec36a0e.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class Receiver[T](val storageLevel: StorageLevel)(implicit t: ClassTag[T])

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69070 has finished for PR 15992 at commit 6f64afd.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 23, 2016

Test build #69072 has finished for PR 15992 at commit 037f10d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@uncleGen uncleGen changed the title [SPARK-18560][CORE][STREAMING] Receiver data can not be dataSerialized properly. [SPARK-18560][CORE][STREAMING] Receiver data can not be deSerialized properly. Nov 24, 2016
@uncleGen uncleGen changed the title [SPARK-18560][CORE][STREAMING] Receiver data can not be deSerialized properly. [SPARK-18560][CORE][STREAMING] Receiver data can not be deserialized properly. Nov 24, 2016
@uncleGen
Copy link
Contributor Author

cc @zsxwing and @JoshRosen

@@ -83,7 +84,12 @@ import org.apache.spark.storage.StorageLevel
* }}}
*/
@DeveloperApi
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
abstract class Receiver[T](val storageLevel: StorageLevel)(implicit t: ClassTag[T])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering will this change the signature of Receiver and break the existing code by user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, this change will indeed broke custom receiver implementation in Java, just like the update in JavaCustomReceiver.java. Besides, in my humble opinion, I did not encounter any problem in my local test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so in that case we should carefully think about this change, at least keep compatible with existing code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is really trickier to add ClassTag in many place to support automatically-pick-best-serializer feature.

@jerryshao
Copy link
Contributor

I'm thinking if we could treat Spark Streaming as a special case to turn off type based auto serializer choosing mechanism. Current fix introduces lots of changes in the streaming code and may potentially break user's existing code. Just my two cents.

@uncleGen
Copy link
Contributor Author

uncleGen commented Nov 24, 2016

@jerryshao yep, the simplest way is to turn it off. Generally, this is a followup patch for #11755. I tends to support automatically-pick-best-serializer feature here to get better performance, if there is no big damage to existing code, sooner rather than later.

@uncleGen
Copy link
Contributor Author

@JoshRosen and @zsxwing waiting for your feedback.

@uncleGen
Copy link
Contributor Author

NO one will review this PR? @srowen Could you please call someone to review this? Thanks!

@srowen
Copy link
Member

srowen commented Nov 28, 2016

I am not familiar enough with this code to review it. I do think @JoshRosen is the right person given https://issues.apache.org/jira/browse/SPARK-13990 and believe he's said he will start reviewing again this week after the holiday

@zsxwing
Copy link
Member

zsxwing commented Nov 28, 2016

Sorry for the delay.

My high level comment is this is a critical bug and we also need to fix 2.0.x. However, we usually don't want to break APIs between maintenance releases. So I'm thinking the first step we should do is adding a configuration at https://github.com/apache/spark/pull/11755/files#diff-82b3c6a9530bd021cfb3d3f15056be41R49 and disable it for Streaming.

@uncleGen
Copy link
Contributor Author

uncleGen commented Nov 29, 2016

@zsxwing Maybe, it is indeed a well-advised solution. I will provide another PR first to add a configuration for streaming. IMHO, I will keep this PR WIP for farther discussion and optimization. What is your opinion?

@uncleGen uncleGen changed the title [SPARK-18560][CORE][STREAMING] Receiver data can not be deserialized properly. [SPARK-18560][CORE][STREAMING][WIP] Receiver data can not be deserialized properly. Nov 29, 2016
asfgit pushed a commit that referenced this pull request Nov 30, 2016
…rk Streaming

## What changes were proposed in this pull request?

#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue #15992 to optimize the solution.

## How was this patch tested?

existing ut

Author: uncleGen <hustyugm@gmail.com>

Closes #16052 from uncleGen/SPARK-18617.

(cherry picked from commit 56c82ed)
Signed-off-by: Reynold Xin <rxin@databricks.com>
asfgit pushed a commit that referenced this pull request Nov 30, 2016
…rk Streaming

## What changes were proposed in this pull request?

#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue #15992 to optimize the solution.

## How was this patch tested?

existing ut

Author: uncleGen <hustyugm@gmail.com>

Closes #16052 from uncleGen/SPARK-18617.
@zsxwing
Copy link
Member

zsxwing commented Nov 30, 2016

I think this PR has one issue. There are two places requiring ClassTag. The first one is creating Receiver, the other one is calling receiverStream. If they are inconsistent (e.g., writing some Java codes to create Receiver, and call receiverStream using Scala codes), then it will crash.

robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 2, 2016
…rk Streaming

## What changes were proposed in this pull request?

apache#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue apache#15992 to optimize the solution.

## How was this patch tested?

existing ut

Author: uncleGen <hustyugm@gmail.com>

Closes apache#16052 from uncleGen/SPARK-18617.
@zsxwing
Copy link
Member

zsxwing commented Dec 5, 2016

@uncleGen could you close this one, please? The current workaround in #15992 looks enough. You can create a new one if you have a better solution for Java APIs.

@uncleGen uncleGen closed this Dec 6, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
…rk Streaming

## What changes were proposed in this pull request?

apache#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue apache#15992 to optimize the solution.

## How was this patch tested?

existing ut

Author: uncleGen <hustyugm@gmail.com>

Closes apache#16052 from uncleGen/SPARK-18617.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…rk Streaming

## What changes were proposed in this pull request?

apache#15992 provided a solution to fix the bug, i.e. **receiver data can not be deserialized properly**. As zsxwing said, it is a critical bug, but we should not break APIs between maintenance releases. It may be a rational choice to close auto pick kryo serializer for Spark Streaming in the first step. I will continue apache#15992 to optimize the solution.

## How was this patch tested?

existing ut

Author: uncleGen <hustyugm@gmail.com>

Closes apache#16052 from uncleGen/SPARK-18617.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants