Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18122][SQL][WIP]Fallback to Kryo for unsupported encoder for class's subfield #15918

Closed
wants to merge 8 commits into from

Conversation

windpiger
Copy link
Contributor

What changes were proposed in this pull request?

It will throw a UnsupportedOperationException when a class's subfield has not supported type Encoder.

This PR will fallback to KryoSer/KryoDeser for this kind of subfields.

before fix

case class MyClass(a: String, b:Option[Set[Int]])
val ds = Seq(MyClass("a",Some(Set(1))),MyClass("b",Some(Set(2)))).toDS

java.lang.UnsupportedOperationException: No Encoder found for Set[scala.Int]
- option value class: "scala.collection.immutable.Set"
- field (class: "scala.Option", name: "b")
- root class: "lineacce9ce6ecc049489e2e74fa620679d927.$read.$iw.$iw.$iw.$iw.MyClass"
	at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:598)

after

case class MyClass(a: String, b:Option[Set[Int]])
val ds = Seq(MyClass("a",Some(Set(1))),MyClass("b",Some(Set(2)))).toDS
ds: org.apache.spark.sql.Dataset[MyClass] = [a: string, b: binary]
ds.foreach{
| r => val x = r.b
| x.get.foreach(println)
| }
1
2

How was this patch tested?

unittest added

@windpiger windpiger changed the title [SPARK-18122][SQL]Fallback to Kryo for unsupported encoder for class's subfield [SPARK-18122][SQL][WIP]Fallback to Kryo for unsupported encoder for class's subfield Nov 17, 2016
@SparkQA
Copy link

SparkQA commented Nov 17, 2016

Test build #68764 has finished for PR 15918 at commit bb11c93.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Nov 17, 2016

I think this introduces a big behavior change, right? now objects are serialized with a hybrid of two serializers? I am not sure this is a good idea.

@windpiger
Copy link
Contributor Author

Yes, and there is a aspect to be concerned:
If the Set[Int] serialized to Binary, and then val ds1 = ds.select("b") ,the column type of ds1 is BinaryType, ds1 can not be deserialized to Set[Int]
this is WIP, I will continue to work on this

@srowen
Copy link
Member

srowen commented Nov 25, 2016

There's not obviously a way to implement this, IMHO. I am not sure mixing two serializations would make sense. I would close these for now.

@rxin
Copy link
Contributor

rxin commented Nov 27, 2016

FWIW I think there are some value here, but I agree that changing the default behavior can be surprising and bad.

@koertkuipers
Copy link
Contributor

@srowen and @rxin what is the default behavior that is changed here? i see a current situation where an implicit encoder is provided that simply cannot handle the task at hand and this leads to failure.

either the implicits for ExpressionEncoder need to be more narrow so that they do not claim types they cannot handle (and then other implicit encoders can be used), or they need to be able to handle these types, for example by falling back to kryo as is suggested in this JIRA.

currrently implicitly[Encoder[Option[Set[Int]]]] gives you an ExpressionEncoder that cannot handle it. that is undesired and makes it difficult to provide an alternative implicit by the user.

i proposed making the ExpressionEncoders more narrow (that seemed the easier fix to me at first) but @marmbrus preferred the approach of falling back to kryo and broadening it. see:
http://apache-spark-developers-list.1001551.n3.nabble.com/getting-encoder-implicits-to-be-more-accurate-td19561.html

@marmbrus
Copy link
Contributor

marmbrus commented Nov 30, 2016

I agree with @koertkuipers that the only change in behavior is that cases that used to throw an error will now not throw an error. If done right (I haven't looked deeply at the PR itself yet), no case that is currently working should change.

It is maybe slightly odd to mix serialization types, but thats kind of already happening today if you use the kryo serializer. You are taking kryo encoded data and putting it as a binary value into a tungsten row. The change here makes it possible to do the same in cases where the incompatible object is nested within a compatible object. Currently you are forced into all or nothing (i.e. even if only a single field is incompatible you must treat the whole object as an opaque binary blob).

The one possible concern compatibility concern I can see is, if in the future we add support for an previously unsupported type, the schema will change from BinaryType to something else. However, given there are very few operations you can do on Binary, and this format is not persisted or guaranteed to be compatible across Spark versions, this actually seems okay.

Thoughts?

@marmbrus
Copy link
Contributor

marmbrus commented Dec 1, 2016

We should probably add a flag (maybe even off by default). The error message can tell you to turn on the flag if you are okay with the fallback.

@koertkuipers
Copy link
Contributor

if we do a flag i would also prefer it if the current implicits are more narrow if the flag is not set, if possible.

@marmbrus
Copy link
Contributor

marmbrus commented Dec 2, 2016

I don't think you can limit the implicit. What type would pick up case classes, but not case classes that contain invalid things? I think you would need a macros for this kind of introspection. (I'd be happy to be proven wrong with a PR.)

I'd recommend you only import the implicits you need rather than using the wildcard.

@koertkuipers
Copy link
Contributor

koertkuipers commented Dec 3, 2016 via email

@marmbrus
Copy link
Contributor

@windpiger, were you still working on this? I think it would be a useful feature if we can get the tests to pass.

@windpiger
Copy link
Contributor Author

oh sorry,Recently I was working on other works, I will continue to work on the this soon,and finish it.

@windpiger
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 14, 2017

Test build #72873 has finished for PR 15918 at commit bb11c93.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72922 has started for PR 15918 at commit 18d4ba5.

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72921 has finished for PR 15918 at commit 4aac7dd.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72924 has started for PR 15918 at commit cbe91bf.

@windpiger
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72927 has finished for PR 15918 at commit cbe91bf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72936 has finished for PR 15918 at commit adf31b2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@windpiger windpiger changed the title [SPARK-18122][SQL][WIP]Fallback to Kryo for unsupported encoder for class's subfield [SPARK-18122][SQL]Fallback to Kryo for unsupported encoder for class's subfield Feb 16, 2017
@windpiger windpiger changed the title [SPARK-18122][SQL]Fallback to Kryo for unsupported encoder for class's subfield [SPARK-18122][SQL][WIP]Fallback to Kryo for unsupported encoder for class's subfield Feb 16, 2017
@JasonMWhite
Copy link
Contributor

@windpiger I see you flipped it back to WIP. What else needs to be done?

@windpiger
Copy link
Contributor Author

this change of schemaFor will affect the default behavior.
For example:
udf in functions.scala
if the input type is not supported and fall back to kryo with BinaryType, here inputTypes before will be Nil which is ok for the following expression's typecheck, but after fall back to kryo it will failed when typecheck(BinaryType not equal to ReturnType), so I left logic to throw exception in the schemaFor for un-nested unsupported type , while for complex nested including Map etc, it will fall back to kryo.

even if I use schemaForDefaultBinaryType not schemaFor in deserializerFor , and I think it is ok for this serde situation. But I am not sure the change in schmaFor will affect other logics like udf described above.

@HyukjinKwon
Copy link
Member

Hi @windpiger, is this still WIP?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants