Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7160][SQL] Support converting DataFrames to typed RDDs. #5713

Closed
wants to merge 1 commit into from

Conversation

rayortigas
Copy link

@punya
Copy link
Contributor

punya commented Apr 27, 2015

Is there a way to make the code paths for Scala types (Product subtypes) and Java types (beans) more similar, or to have a common core? It seems like otherwise we have to keep doing everything twice.

@rayortigas
Copy link
Author

Hi @punya.

Re: Scala and Java types, are you talking about:

a) the code similarities between createToScalaConverter(DataType) and (createToProductConverter[T] + createToScalaConverter(universe.Type, DataType))? I think after introducing this typed path I would like to DRY up the code. Just wanted to propose a minimally invasive commit for now.

b) the general approach for Scala/Java-Catalyst conversions? I can see that ScalaReflection and JavaTypeInference do similar things, but don't know enough about the design to know whether this is ripe for refactoring. If it hasn't been done or scheduled, maybe someone should do a survey of the current state of things.

@rayortigas
Copy link
Author

Now I see a thread on spark-dev (which I just joined) about availability/discoverability of design docs in general. :D If there isn't one for this conversion stuff, it is probably a candidate for a design doc.

@marmbrus
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Apr 28, 2015

Test build #31174 has finished for PR 5713 at commit add51b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • s"Incompatible class $
    • * case class Food(name: String, count: Int)
  • This patch does not change any dependencies.

@rxin
Copy link
Contributor

rxin commented May 4, 2015

Thanks for doing this. We are fairly busy with 1.4 release deadline. I'd like to revisit this pull request in Spark 1.5 in the context of making user-defined types and closures more usable in DataFrames.

@rayortigas
Copy link
Author

Thanks for the update, Reynold! I'm just glad it's in the queue. :D Looking forward to the 1.4 release and will keep an eye out for any feedback on this PR...

@JoshRosen
Copy link
Contributor

Just a head's up: I have a WIP patch which performs a significant refactoring of CatalystTypeConveters (#6222). I think that my patch should make the changes here easier to implement / understand, so I'd love to get your feedback on my proposed refactorings.

@rayortigas
Copy link
Author

@JoshRosen: As you saw from my comment on #6222 I think it looks good. As for this PR, yeah, it should be re-implemented on top of your patch.

I think the conversion would still use the type hints given by toTypedRDD[T], so I guess getConverterForType and CatalystTypeConverter.toScala could be overloaded, e.g.:

def toScala(universeType: universe.Type, @Nullable catalystValue: CatalystType): ScalaType

or given a default param, e.g.:

def toScala(
  @Nullable catalystValue: CatalystType,
  universeType: Option[universe.Type] = None
): ScalaType

And BigDecimalConverter would use the type hint to figure out whether to create a Java BigDecimal or a Scala one.

In any case, looks doable and should be cleaner. If you like, I can update this PR after you merge your patch.

@rayortigas
Copy link
Author

Updated the PR after #6222 was merged...

  1. 6c36abf was rebasing on cafd505, so still non-invasive.
  2. 3711a3e92de5972492e40621861908bfc54afc85 was an attempt to more tightly integrate with cafd505.

@JoshRosen
Copy link
Contributor

Jenkins, this is ok to test.

@SparkQA
Copy link

SparkQA commented Jun 8, 2015

Test build #34456 has finished for PR 5713 at commit 3711a3e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • s"Incompatible class $
    • * case class Food(name: String, count: Int)

@SparkQA
Copy link

SparkQA commented Aug 23, 2015

Test build #41415 has finished for PR 5713 at commit 0ee742d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • s"Incompatible class $
    • * case class Food(name: String, count: Int)

@rayortigas
Copy link
Author

Hi @marmbrus, just repeating what I wrote at https://issues.apache.org/jira/browse/SPARK-7160, I updated this PR to bring it in sync with master.

I rebased on yesterday and integrated with InternalRow. I then force-pushed the changes here, so now you'll only see the one commit.

Thanks,
Ray

val converter =
CatalystTypeConverters.createToProductConverter[G](dataType.asInstanceOf[StructType])
converter(row.asInstanceOf[InternalRow])
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also a kind of a nit, but could you limit the scope of the intercept. I think this is right, but with the extra casts I'm worried the wrong thing might be throwing ClassCastException

@marmbrus
Copy link
Contributor

marmbrus commented Sep 3, 2015

Sorry for the unreasonable delay reviewing this. Over all it looks pretty good to me. I have a few small comments and there is a conflict (which is fortunately easy to resolve this time)

@marmbrus
Copy link
Contributor

marmbrus commented Sep 3, 2015

Actually, I have one other concern. It doesn't seem that this works in the spark shell.

scala> val df = Seq((1,new Integer(1))).toDF("a", "b")
df: org.apache.spark.sql.DataFrame = [a: int, b: int]

scala> case class MyClass(a: Int, b: Int)
defined class MyClass

scala> df.toTypedRDD[MyClass].collect()
15/09/03 16:35:04 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
scala.ScalaReflectionException: class MyClass is an inner class, use reflectClass on an InstanceMirror to obtain its ClassMirror
    at scala.reflect.runtime.JavaMirrors$JavaMirror.ErrorInnerClass(JavaMirrors.scala:126)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.reflectClass(JavaMirrors.scala:193)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.reflectClass(JavaMirrors.scala:65)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter$.createToProductConverter(CatalystTypeConverters.scala:340)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$.createToProductConverter(CatalystTypeConverters.scala:592)
    at org.apache.spark.sql.DataFrame$$anonfun$41.apply(DataFrame.scala:1566)
    at org.apache.spark.sql.DataFrame$$anonfun$41.apply(DataFrame.scala:1565)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
15/09/03 16:35:04 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): scala.ScalaReflectionException: class MyClass is an inner class, use reflectClass on an InstanceMirror to obtain its ClassMirror
    at scala.reflect.runtime.JavaMirrors$JavaMirror.ErrorInnerClass(JavaMirrors.scala:126)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.reflectClass(JavaMirrors.scala:193)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.reflectClass(JavaMirrors.scala:65)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter$.createToProductConverter(CatalystTypeConverters.scala:340)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$.createToProductConverter(CatalystTypeConverters.scala:592)
    at org.apache.spark.sql.DataFrame$$anonfun$41.apply(DataFrame.scala:1566)
    at org.apache.spark.sql.DataFrame$$anonfun$41.apply(DataFrame.scala:1565)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

Unfortunately, I'm not sure if there is an easy way around this.

@@ -887,4 +887,98 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
.select(struct($"b"))
.collect()
}

test("SPARK-7160: toTypedRDD[T] works with simple case class") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably move this into its own suite.

@marmbrus
Copy link
Contributor

marmbrus commented Sep 4, 2015

After spending some more time trying to fix the above issue, I wonder if it wouldn't be better to make this a whole separate code path, instead of trying to handle both cases inside of catalyst converters. That class is already pretty hard to follow and adding a whole set of branches everywhere makes it even worse. Another concern is that the extra branches might affect the performance of the other case.

Thoughts?

@rayortigas
Copy link
Author

First off, thanks for the code review, Michael.

Re: spark-shell, I thought I'd bumped into a known limitation of the REPL (e.g. https://groups.google.com/forum/#!msg/spark-users/bwAmbUgxWrA/HwP4Nv4adfEJ) but I should've raised it, so sorry. The workaround (in my head) was that people should just not use case classes defined within the REPL. Not sure what to do here, I'll have to think about it.

Re: a separate code path, I actually had it that way when I submitted the first version of this PR. But then Josh refactored the converters and it seemed reasonable to follow his refactored code paths, since it was easier to track (and probably maintain) the conversions for each type, so that was the second (previous) version of this PR. I can see arguments either way. If we don't expect to add more converters/conversions, then maybe a separate code path would be fine.

@marmbrus
Copy link
Contributor

marmbrus commented Sep 4, 2015

Regarding the REPL issues, I'm wondering if there isn't some way to handle the common case, where the outer pointer that is getting added to the class isn't actually needed for anything. I was hoping there would be some way to just pass null in, but so far I have not succeeded.

For the separate code path issue. My biggest motivation here is I see a lot of things that aren't great from a performance perspective (the old code is the same). I think if this becomes popular we are going to have to code-generate the conversion functions. Having it separate would make this transition easier.

@rayortigas
Copy link
Author

Got it. I'll refactor back to a separate code path then (and address the other issues you identified).

After that I'll try to figure out the REPL stuff. If you figure it out, then great. :D

Hope that works.

@rayortigas
Copy link
Author

Hi @marmbrus, I updated this PR to use a separate code path as you requested.

I don't think I can solve the REPL thing, and I'm at the limits of my understanding of Scala reflection. fwiw, json4s seems to recommend just compiling case classes out-of-band, and then importing them into the REPL, for functionality similar to toTypedRDD (near the end of the readme).

@rayortigas
Copy link
Author

btw I force-pushed when updating this PR, since the diff would've otherwise looked weird. I rebased on a commit from 9/25.

val message =
s"""|Error constructing ${classTag.runtimeClass.getName}: ${e.getMessage};
|paramTypes: ${paramTypes}, dataTypes: ${dataTypes},
|convertedArgs: ${convertedArgs}""".stripMargin.replace("\n", " ")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing feedback, this error message provides more details when things go wrong calling the constructor.

@SparkQA
Copy link

SparkQA commented Sep 28, 2015

Test build #43055 has finished for PR 5713 at commit 6d4bec2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • * case class Food(name: String, count: Int)

@FurcyPin
Copy link

FurcyPin commented Nov 3, 2015

Hi @rayortigas,

I have been working on a similar feature.

So far I have something that work similarly but that is a little less easy to use,
here is an sample demo:

case class DemoCC(int: Int, boolean: Boolean, string: Option[String]) extends Serializable

object Demo {

  def main(args: Array[String]): Unit = {

    val sc = new SparkContext()
    val sqlContext = new SQLContext(sc)

    val inputData: Seq[Row] = Seq(
      Row(true,1,"A",2.0),
      Row(true,2,null,4.0),
      Row(false,3,"C",9.0)
    )
    val schema = StructType(Seq(
      StructField("boolean",BooleanType),
      StructField("int",IntegerType),
      StructField("string",StringType),
      StructField("double",DoubleType)
    ))

    val rdd: RDD[Row] = sc.parallelize(inputData,3)

    val df: DataFrame = sqlContext.createDataFrame(rdd,schema)

    /* The permutationPlan can be serialized, so we generate it once and for all on the driver. This will perform a preliminary check as well */
    val permutationPlan = PermutationPlan[DemoCC](df)

    /* The transformer cannot be serialized because TypeTag is not (really) serializable in scala 2.10 */
    @transient lazy val transformer = new RowToCaseClassTransformer[DemoCC](permutationPlan)

    /* Using "df.map(transformer)" instead would not work... */
    val res = df.map{r => transformer(r)}

    res.collect.foreach{println}
  }

}

I tried implementing dataframe.toCaseClassRDD[CaseClass] with an implicit conversion
but I bumped into Serialization Exception because TypeTags are not (really) serializable in scala 2.10 (https://issues.scala-lang.org/browse/SI-5919), which is why the transformer in the example
has to be transient.

On the other hand, I tried to be as generic as possible, and my implementation supports
nested case classes, and maps the DataFrame's column names with the Case Classes fields names.
Thus they do not require to be defined in the same order, and the Case Class may have less fields than the DataFrame (as shown the example).
It also throws an error at initialization if there is a type or name incompatibility.

I've just found this pull request, and would love to contribute, but I am not sure how to proceed.
I wanted to open source my code earlier, but I hoped to find a way to reduce the boilerplate first.

I would love to discuss this further with you. I am not sure this is the most suitable place to do so.

Regards,

Furcy

PS: thanks for the ScalaReflectionLock.synchronized,
I bumped into the same issue and was looking for a solution ;-)

@marmbrus
Copy link
Contributor

marmbrus commented Nov 3, 2015

Hey, I'm really sorry for letting this sit so long. I got wrapped up trying to get ready for Spark 1.6. It would be great if you can look at SPARK-9999, which adds a new method .as[Type] to a DataFrame, which converts it to typed Dataset. The design doc attached to the JIRA explains more, but I think it accomplishes a lot of what you are both trying to do. As an added bonus, it also supports typed tuples and uses codegen under the covers for performance. Would love to get some more people looking at the API and testing it out.

case class A(x: String, y: Int)
case class B(a: A, z: Double)
case class C(x: String, a: Seq[A])
case class D(x: String, a: Map[Int, A])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are any test cases here that aren't covered by ExpressionEncoderSuite it would be awesome to add them.

@marmbrus
Copy link
Contributor

Now that Spark 1.6 is almost release I think we can close this issue. Thanks again for working on it.

@asfgit asfgit closed this in ce5fd40 Dec 17, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants