Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9043] Serialize key, value and combiner classes in ShuffleDependency #7403

Closed
wants to merge 4 commits into from

Conversation

massie
Copy link
Contributor

@massie massie commented Jul 14, 2015

ShuffleManager implementations are currently not given type information for
the key, value and combiner classes. Serialization of shuffle objects relies
on objects being JavaSerializable, with methods defined for reading/writing
the object or, alternatively, serialization via Kryo which uses reflection.

Serialization systems like Avro, Thrift and Protobuf generate classes with
zero argument constructors and explicit schema information
(e.g. IndexedRecords in Avro have get, put and getSchema methods).

By serializing the key, value and combiner class names in ShuffleDependency,
shuffle implementations will have access to schema information when
registerShuffle() is called.

@massie
Copy link
Contributor Author

massie commented Jul 14, 2015

Jenkins, test this please.

@massie
Copy link
Contributor Author

massie commented Jul 14, 2015

There was a timeout fetching from the git repo. Having Jenkins try again.

@@ -65,7 +67,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, this is a binary-incompatible change to a DeveloperAPI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the source for DeveloperApi, a Developer API is unstable and can change between minor releases.

@JoshRosen
Copy link
Contributor

I don't think that we can make this change as-is since we can't break binary compatibility for stable public APIs like PairRDDFunctions.

@massie
Copy link
Contributor Author

massie commented Jul 14, 2015

@JoshRosen What suggestions do you have for making the key, value and combiner class names available in the ShuffleDependency in a way that is binary compatible?

I had assumed that this would be part of the next major release where binary changes might be allowed.

@massie
Copy link
Contributor Author

massie commented Jul 14, 2015

Also, the comment in the DeveloperApi annotation reads...

/**
 * A lower-level, unstable API intended for developers.
 *
 * Developer API's might change or be removed in minor versions of Spark.
 *
 * NOTE: If there exists a Scaladoc comment that immediately precedes this annotation, the first
 * line of the comment must be ":: DeveloperApi ::" with no trailing blank line. This is because
 * of the known issue that Scaladoc displays only either the annotation or the comment, whichever
 * comes first.
 */

@massie
Copy link
Contributor Author

massie commented Jul 14, 2015

The PairRDDFunctions class isn't labeled DeveloperApi so what is the assumed level of stability?

Luckily, the original PairRDDFunctions has ClassTags for the key and value; however, the combine* methods didn't have info on the combiner classes. Not sure how to work around that. Ideas?

@SparkQA
Copy link

SparkQA commented Jul 14, 2015

Test build #37268 has finished for PR 7403 at commit 8c980c4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jul 15, 2015

@massie Although I'm in favor of collecting more type information (e.g. even type tags), but no labeling means it should be stable across the entire Spark 1.x release...

@massie
Copy link
Contributor Author

massie commented Jul 15, 2015

@JoshRosen @rxin I just pushed an update that reverts the PairRDDFunctions ClassTag implicits to their original form.

The only remaining sticking point is the three combineByKey methods in PairRDDFunctions which require a ClassTag in order to get type information about the combiner class. Does adding an implicit to a method break binary compatibility? (I'm certain that removing an implicit would)

@SparkQA
Copy link

SparkQA commented Jul 15, 2015

Test build #37380 has finished for PR 7403 at commit 1be4402.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](

@massie
Copy link
Contributor Author

massie commented Jul 15, 2015

To answer my own question, adding a ClassTag, of course, breaks binary compatibility since it changes the method signature. Sorry about the unnecessary comment chatter.

@massie
Copy link
Contributor Author

massie commented Jul 15, 2015

I just pushed an update (5c58b4df9b) which ensures that we keep binary compatibility in PairRDDFunctions. The old combineByKey methods now call into the new combineByKey2 methods passing in a null for the combiner ClassTag.

If this is an approach that is agreeable, I'll expand the tests and finalize the work.

Any suggestions on a better name of the new combiner methods? I just called it combineByKey2 but I'll sure there's a better name.

Thanks for the review help @JoshRosen and @rxin.

@SparkQA
Copy link

SparkQA commented Jul 16, 2015

Test build #37423 has finished for PR 7403 at commit 5c58b4d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](

@massie
Copy link
Contributor Author

massie commented Jul 17, 2015

There is another approach that would work here too.

Currently, RDD has following implicit method for the PairRDDFunctions...

object RDD {

  def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
                                 (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
    new PairRDDFunctions(rdd)
  }

Code compiled with this looks like the following,

val rdd: com.github.massie.RDD = new com.github.massie.RDD((ClassTag.apply(classOf[scala.Tuple2]): scala.reflect.ClassTag));
      massie.this.RDD.rddToPairRDDFunctions(rdd, (ClassTag.apply(classOf[java.lang.String]): scala.reflect.ClassTag), (ClassTag.apply(classOf[java.lang.String]): scala.reflect.ClassTag), scala.math.Ordering$String).combineByKey()

To keep binary compatibility, we just need to keep the rrdToPairRDDFunctions, remove the implicit and create a new implicit, e.g.

object RDD {

  def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
                                 (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
    new PairRDDFunctions(rdd)
  }

  implicit def rddToNewPairRDDFunctions[K, V](rdd: RDD[(K, V)])
                                 (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): NewPairRDDFunctions[K, V] = {
    new NewPairRDDFunctions(rdd)
  }
  ...
}

Old code will still run since the rddToPairRDDFunction exists and PairRDDFunctions has not been modified. New code will build against the new NewPairRDDFunctions class which includes the ClassTag information in the combineByKey methods.

As a small side note, the PairRDDFunctions would wrap the NewPairRDDFunctions so we don't have copy-paste. The combineByKey functions in PairRDDFunctions would pass in a null ClassTag. That means that old applications will work as before, but not provide combiner class information to the shuffle.

The advantage of this approach is that the method names remain the same (no need for a combineByKey2 or whatever). A downside is that old code will run against the new binary but old code may not be able to compile, if the code has no ClassTag available when combineByKey is called.

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #37988 has finished for PR 7403 at commit 5aba066.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@massie
Copy link
Contributor Author

massie commented Jul 23, 2015

This is ready for review when someone has the time. All unit tests pass.

  • Only classes marked @DeveloperApi or are private[spark] have been altered to include ClassTags (breaking compatibility)
  • The PairRRDFunctions have three new sortByKeyWithClassTag methods that mirror the original sortByKey signatures
  • All calls to sortByKey have been moved to sortByKeyWithClassTag. Only users that call sortByKey directly will not have combiner class type information provided to the shuffle.
  • I've added a warning message when sortByKey is used
  • As an aside, I wrote the code to use the implicit trick I talked about and it turned out to complicate the code quite a lot.

* The key, value and combiner classes are serialized so that shuffle manager
* implementation can use the information to build
*/
val keyClassName: String = reflect.classTag[K].runtimeClass.getName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this used? it might require the key class to have a 0-arg ctor right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's an example of how I use this in the Parquet shuffle manager to create a schema for the (key, value) or (key, combiner) pairs for the shuffle files.

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #38310 has finished for PR 7403 at commit d07b771.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](

@massie
Copy link
Contributor Author

massie commented Jul 24, 2015

Jenkins, test this please.

1 similar comment
@massie
Copy link
Contributor Author

massie commented Jul 24, 2015

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #38367 has finished for PR 7403 at commit d07b771.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](

@massie
Copy link
Contributor Author

massie commented Jul 24, 2015

Jenkins tests pass. The other failures were Jenkins hiccups.

@@ -48,7 +48,7 @@ import org.apache.spark.serializer.Serializer
* you can use `rdd1`'s partitioner/partition size and not worry about running
* out of memory because of the size of `rdd2`.
*/
private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These ClassTags should not be removed. I just pushed an update that reverts this line change.

@SparkQA
Copy link

SparkQA commented Jul 25, 2015

Test build #38403 has finished for PR 7403 at commit bf0344e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](

@SparkQA
Copy link

SparkQA commented Jul 25, 2015

Test build #38406 has finished for PR 7403 at commit 59abe01.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](

@massie
Copy link
Contributor Author

massie commented Jul 25, 2015

The error isn't related to this PR...

[info] *** 1 SUITE ABORTED ***
[error] Error: Total 21, Failed 0, Errors 1, Passed 20
[error] Error during tests:
[error]     org.apache.spark.streaming.kinesis.KinesisBackedBlockRDDSuite

... looks like something related to Kinesis backed streaming.

Maybe related to

commit 93eb2acfb287807355ba5d77989d239fdd6e2c30
Author: Tathagata Das <tathagata.das1565@gmail.com>
Date:   Sun Jul 19 20:34:30 2015 -0700

Maybe @tdas knows what's causing the failure?

@@ -75,7 +76,8 @@ private[spark] class CoGroupPartition(
* @param part partitioner used to partition the shuffle output
*/
@DeveloperApi
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
part: Partitioner)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed this in my style changes. Fixing now.

@massie
Copy link
Contributor Author

massie commented Sep 1, 2015

@andrewor14 I noticed that I missed a few of the style fixes that you recommended. I just pushed 41d2a3c which fixes them.

Thanks for the reviewing this PR. I appreciate it.

@SparkQA
Copy link

SparkQA commented Sep 1, 2015

Test build #41897 has finished for PR 7403 at commit 41d2a3c.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](

@SparkQA
Copy link

SparkQA commented Sep 2, 2015

Test build #41893 has finished for PR 7403 at commit 56792cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](

@massie
Copy link
Contributor Author

massie commented Sep 8, 2015

@rxin and @andrewor14 - Is there anything more that needs to be done before this PR is ready to be merged? I've made all recommended changes.

There is an open question about having the class names be (a) public variables on a Dependency or (b) parameters passed to registerShuffle. Option A would be better if we want in the future to see the Dependency tree with all class types and option B is better if we don't want to have the class names be public. Of course, option B would require a change to the ShuffleManager API and this PR.

This PR is ready to go for Option A. Let me know your thoughts.

…ndency

ShuffleManager implementations are currently not given type information for
the key, value and combiner classes. Serialization of shuffle objects relies
on objects being JavaSerializable, with methods defined for reading/writing
the object or, alternatively, serialization via Kryo which uses reflection.

Serialization systems like Avro, Thrift and Protobuf generate classes with
zero argument constructors and explicit schema information
(e.g. IndexedRecords in Avro have get, put and getSchema methods).

By serializing the key, value and combiner class names in ShuffleDependency,
shuffle implementations will have access to schema information when
registerShuffle() is called.
@massie
Copy link
Contributor Author

massie commented Sep 9, 2015

I just rebased this on top of upstream/master in order to fix some conflicts that arose as master changed underneath this PR.

@rxin @andrewor14 This PR has been open for almost two months. Can you please let me know if you see any remaining work that needs to be done before merging?

* Simplified version of combineByKey that hash-partitions the output RDD.
* This method is here for backward compatibility. It
* does not provide combiner classtag information to
* the shuffle.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java doc should start with a description of what the method does. We should use the old one and add that it exists backward compatibility after the first sentence.

@andrewor14
Copy link
Contributor

@massie The changes here look fine. The only thing I'm still not sure about is the fact that everything is public. You pointed out that ShuffleDependency is DeveloperApi, but things like combineByKeyWithClassTag are not and are fully public. My view is that if in the initial version of this patch we keep as many things private, we can always expose more of these things gradually in the future, but we can't go the other way. E.g. your other patch #7265 doesn't require any of these things to be public.

Other than that, I don't have strong opinions one way or the other about this patch. I think it's a good addition but I'm wary of the many public API changes in this patch. If @rxin thinks it's in a good shape then we should merge it.

@SparkQA
Copy link

SparkQA commented Sep 9, 2015

Test build #42210 has finished for PR 7403 at commit ed1afac.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class SparkHadoopWriter(jobConf: JobConf)
    • class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
    • class CoGroupedRDD[K: ClassTag](
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    • class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev)
    • class DefaultSource extends RelationProvider with DataSourceRegister
    • class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
    • abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
    • abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)

@massie
Copy link
Contributor Author

massie commented Sep 9, 2015

Thanks for the response, @andrewor14. I'll update this PR to make the class names private. As you say, we can make them public in a future PR, if needed. You're right that #7265 doesn't need them to be public.

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42220 has finished for PR 7403 at commit 2906e74.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class SparkHadoopWriter(jobConf: JobConf)
    • class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
    • class CoGroupedRDD[K: ClassTag](
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    • class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev)
    • case class Instance(w: Double, a: Vector, b: Double)
    • class DefaultSource extends RelationProvider with DataSourceRegister
    • class WeibullGenerator(
    • class IndexToString(JavaTransformer, HasInputCol, HasOutputCol):
    • class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
    • abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
    • abstract class ReceiverInputDStream[T: ClassTag](ssc_ : StreamingContext)

@massie
Copy link
Contributor Author

massie commented Sep 10, 2015

The class names are private now and all tests pass.

To provide a little more explanation of why the combineByKeyWithClasstag methods are needed (instead of just naming them combineByKey.

class PlaySuite extends SparkFunSuite {

  def combine[C](c: C): Unit = {
    println("Running combine without ClassTag")
  }

  def combine[C](c: C)(implicit ct: ClassTag[C]): Unit = {
    println("Running combin with ClassTag")
  }

  test("Example") {
    combine(42)
  }

}

Causes the compiler to throw the error:

Error:(34, 5) ambiguous reference to overloaded definition,
both method combine in class PlaySuite of type [C](c: C)(implicit ct: scala.reflect.ClassTag[C])Unit
and  method combine in class PlaySuite of type [C](c: C)Unit
match argument types (Int)
    combine(42)
    ^

There's unfortunately no way to add ClassTags and not break compatibility without have these new methods.

I hope that you find this PR is ready to merge now. Please let me know if you see anything else that needs to be done.

@andrewor14
Copy link
Contributor

@massie This looks OK to me. The only thing is that I find the name combineByKeyWithClassTag kind of awkward, and because it's a fully public API we won't be able to change the signature or the name in the future.

I understand that it won't compile if you just call it combineByKey, but I wonder if we should at least mark these @DeveloperApi or something. Actually we could even mark them as private[spark] and still benefit from the feature here, since reduceByKey and all the other things internally call this method. Please correct me if necessary.

I'll defer the judgment to @rxin.

@massie
Copy link
Contributor Author

massie commented Sep 10, 2015

@andrewor14 I agree the name combineByKeyWithClassTag is awkward. I'm open to changing it to a less awkward name, if you or anyone else can suggest one.

We can't make the combineByKeyWithClassTag methods private, since developers may want to use them directly (as they can with the combineByKey methods), although most likely use the higher-level methods.

I hope that in Spark 2.0, when we're able to fix this API, we can simply add the ClassTag to combineByKey and have no need for the *withClassTag methods.

@rxin
Copy link
Contributor

rxin commented Sep 10, 2015

Let's add an experimental annotation to it, and then merge this.

@SparkQA
Copy link

SparkQA commented Sep 10, 2015

Test build #42297 has finished for PR 7403 at commit adcdfaf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • class CoGroupedRDD[K: ClassTag](
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](

@rxin
Copy link
Contributor

rxin commented Sep 11, 2015

LGTM.

@rxin
Copy link
Contributor

rxin commented Sep 11, 2015

I've merged this. Thanks @massie

@asfgit asfgit closed this in 0eabea8 Sep 11, 2015
@massie massie deleted the shuffle-classtags branch September 11, 2015 00:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants