Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13926] Automatically use Kryo serializer when shuffling RDDs with simple types #11755

Closed

Conversation

JoshRosen
Copy link
Contributor

Because ClassTags are available when constructing ShuffledRDD we can use them to automatically use Kryo for shuffle serialization when the RDD's types are known to be compatible with Kryo.

This patch introduces SerializerManager, a component which picks the "best" serializer for a shuffle given the elements' ClassTags. It will automatically pick a Kryo serializer for ShuffledRDDs whose key, value, and/or combiner types are primitives, arrays of primitives, or strings. In the future we can use this class as a narrow extension point to integrate specialized serializers for other types, such as ByteBuffers.

In a planned followup patch, I will extend the BlockManager APIs so that we're able to use similar automatic serializer selection when caching RDDs (this is a little trickier because the ClassTags need to be threaded through many more places).

@SparkQA
Copy link

SparkQA commented Mar 16, 2016

Test build #53283 has finished for PR 11755 at commit 876f038.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 16, 2016

Test build #53287 has finished for PR 11755 at commit ca923b5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 16, 2016

Test build #53302 has finished for PR 11755 at commit 51205ee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -83,7 +83,7 @@ private[spark] class DirectTaskResult[T](
} else {
// This should not run when holding a lock because it may cost dozens of seconds for a large
// value.
val resultSer = SparkEnv.get.serializer.newInstance()
val resultSer = SparkEnv.get. serializer.newInstance()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems an accidental change.

@nongli
Copy link
Contributor

nongli commented Mar 16, 2016

LGTM

@SparkQA
Copy link

SparkQA commented Mar 16, 2016

Test build #53353 has finished for PR 11755 at commit 45b0c0b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DecisionTreeClassificationModelWriter(instance: DecisionTreeClassificationModel)
    • class DecisionTreeRegressionModelWriter(instance: DecisionTreeRegressionModel)
    • case class SplitData(
    • case class NodeData(
    • class Estimator(Params):
    • class Transformer(Params):
    • class Model(Transformer):
    • class LogisticRegressionModel(JavaModel, MLWritable, MLReadable):
    • class NaiveBayesModel(JavaModel, MLWritable, MLReadable):
    • class PipelineMLWriter(JavaMLWriter, JavaWrapper):
    • class PipelineMLReader(JavaMLReader):
    • class PipelineModelMLWriter(JavaMLWriter, JavaWrapper):
    • class PipelineModelMLReader(JavaMLReader):
    • case class SQLTable(

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Mar 16, 2016

Test build #53362 has finished for PR 11755 at commit 45b0c0b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DecisionTreeClassificationModelWriter(instance: DecisionTreeClassificationModel)
    • class DecisionTreeRegressionModelWriter(instance: DecisionTreeRegressionModel)
    • case class SplitData(
    • case class NodeData(
    • class Estimator(Params):
    • class Transformer(Params):
    • class Model(Transformer):
    • class LogisticRegressionModel(JavaModel, MLWritable, MLReadable):
    • class NaiveBayesModel(JavaModel, MLWritable, MLReadable):
    • class PipelineMLWriter(JavaMLWriter, JavaWrapper):
    • class PipelineMLReader(JavaMLReader):
    • class PipelineModelMLWriter(JavaMLWriter, JavaWrapper):
    • class PipelineModelMLReader(JavaMLReader):
    • case class SQLTable(

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Mar 17, 2016

Test build #53365 has finished for PR 11755 at commit 45b0c0b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DecisionTreeClassificationModelWriter(instance: DecisionTreeClassificationModel)
    • class DecisionTreeRegressionModelWriter(instance: DecisionTreeRegressionModel)
    • case class SplitData(
    • case class NodeData(
    • class Estimator(Params):
    • class Transformer(Params):
    • class Model(Transformer):
    • class LogisticRegressionModel(JavaModel, MLWritable, MLReadable):
    • class NaiveBayesModel(JavaModel, MLWritable, MLReadable):
    • class PipelineMLWriter(JavaMLWriter, JavaWrapper):
    • class PipelineMLReader(JavaMLReader):
    • class PipelineModelMLWriter(JavaMLWriter, JavaWrapper):
    • class PipelineModelMLReader(JavaMLReader):
    • case class SQLTable(

@rxin
Copy link
Contributor

rxin commented Mar 17, 2016

Merging in master!

@asfgit asfgit closed this in de1a84e Mar 17, 2016
@JoshRosen JoshRosen deleted the automatically-pick-best-serializer branch March 17, 2016 06:07
asfgit pushed a commit that referenced this pull request Mar 22, 2016
Building on the `SerializerManager` introduced in SPARK-13926/ #11755, this patch Spark modifies Spark's BlockManager to use RDD's ClassTags in order to select the best serializer to use when caching RDD blocks.

When storing a local block, the BlockManager `put()` methods use implicits to record ClassTags and stores those tags in the blocks' BlockInfo records. When reading a local block, the stored ClassTag is used to pick the appropriate serializer. When a block is stored with replication, the class tag is written into the block transfer metadata and will also be stored in the remote BlockManager.

There are two or three places where we don't properly pass ClassTags, including TorrentBroadcast and BlockRDD. I think this happens to work because the missing ClassTag always happens to be `ClassTag.Any`, but it might be worth looking more carefully at those places to see whether we should be more explicit.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #11801 from JoshRosen/pick-best-serializer-for-caching.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
…ith simple types

Because ClassTags are available when constructing ShuffledRDD we can use them to automatically use Kryo for shuffle serialization when the RDD's types are known to be compatible with Kryo.

This patch introduces `SerializerManager`, a component which picks the "best" serializer for a shuffle given the elements' ClassTags. It will automatically pick a Kryo serializer for ShuffledRDDs whose key, value, and/or combiner types are primitives, arrays of primitives, or strings. In the future we can use this class as a narrow extension point to integrate specialized serializers for other types, such as ByteBuffers.

In a planned followup patch, I will extend the BlockManager APIs so that we're able to use similar automatic serializer selection when caching RDDs (this is a little trickier because the ClassTags need to be threaded through many more places).

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#11755 from JoshRosen/automatically-pick-best-serializer.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
Building on the `SerializerManager` introduced in SPARK-13926/ apache#11755, this patch Spark modifies Spark's BlockManager to use RDD's ClassTags in order to select the best serializer to use when caching RDD blocks.

When storing a local block, the BlockManager `put()` methods use implicits to record ClassTags and stores those tags in the blocks' BlockInfo records. When reading a local block, the stored ClassTag is used to pick the appropriate serializer. When a block is stored with replication, the class tag is written into the block transfer metadata and will also be stored in the remote BlockManager.

There are two or three places where we don't properly pass ClassTags, including TorrentBroadcast and BlockRDD. I think this happens to work because the missing ClassTag always happens to be `ClassTag.Any`, but it might be worth looking more carefully at those places to see whether we should be more explicit.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#11801 from JoshRosen/pick-best-serializer-for-caching.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants