Skip to content

[SPARK-17930][CORE]The SerializerInstance instance used when deserializing a TaskResult is not reused #15512

Closed
witgo wants to merge 2 commits intoapache:masterfrom
witgo:SPARK-17930
Closed

[SPARK-17930][CORE]The SerializerInstance instance used when deserializing a TaskResult is not reused #15512
witgo wants to merge 2 commits intoapache:masterfrom
witgo:SPARK-17930

Conversation

@witgo
Copy link
Contributor

@witgo witgo commented Oct 17, 2016

What changes were proposed in this pull request?

The following code is called when the DirectTaskResult instance is deserialized

  def value(): T = {
    if (valueObjectDeserialized) {
      valueObject
    } else {
      // Each deserialization creates a new instance of SerializerInstance, which is very time-consuming
      val resultSer = SparkEnv.get.serializer.newInstance()
      valueObject = resultSer.deserialize(valueBytes)
      valueObjectDeserialized = true
      valueObject
    }
  }

In the case of stage has a lot of tasks, reuse SerializerInstance instance can improve the scheduling performance of three times

The test data is TPC-DS 2T (Parquet) and SQL statement as follows (query 2):

select  i_item_id, 
        avg(ss_quantity) agg1,
        avg(ss_list_price) agg2,
        avg(ss_coupon_amt) agg3,
        avg(ss_sales_price) agg4 
 from store_sales, customer_demographics, date_dim, item, promotion
 where ss_sold_date_sk = d_date_sk and
       ss_item_sk = i_item_sk and
       ss_cdemo_sk = cd_demo_sk and
       ss_promo_sk = p_promo_sk and
       cd_gender = 'M' and 
       cd_marital_status = 'M' and
       cd_education_status = '4 yr Degree' and
       (p_channel_email = 'N' or p_channel_event = 'N') and
       d_year = 2001 
 group by i_item_id
 order by i_item_id
 limit 100;

spark-defaults.conf file:

spark.master                           yarn-client
spark.executor.instances               20
spark.driver.memory                    16g
spark.executor.memory                  30g
spark.executor.cores                   5
spark.default.parallelism              100 
spark.sql.shuffle.partitions           100000 
spark.serializer                       org.apache.spark.serializer.KryoSerializer
spark.driver.maxResultSize              0
spark.rpc.netty.dispatcher.numThreads   8
spark.executor.extraJavaOptions          -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M 
spark.cleaner.referenceTracking.blocking true
spark.cleaner.referenceTracking.blocking.shuffle true

Performance test results are as follows

SPARK-17930 ed14633
54.5 s 231.7 s

How was this patch tested?

Existing tests.

@witgo witgo changed the title The SerializerInstance instance used when deserializing a TaskResult is not reused [SPARK-17930][CORE]The SerializerInstance instance used when deserializing a TaskResult is not reused Oct 17, 2016
@srowen
Copy link
Member

srowen commented Oct 17, 2016

Hm, if the benchmark you give generalizes much that is certainly compelling. I guess I'm surprised that instantiating the object can be so expensive relative to deserialization since it just happens once per task.

But it is a fairly simple change. ThreadLocal avoids thread safety issues though I do wonder if the serializers can hold on to state that would make this a source of memory leak?

Maybe ... @squito or @zsxwing has a thought?

@SparkQA
Copy link

SparkQA commented Oct 17, 2016

Test build #67063 has finished for PR 15512 at commit 037871d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Oct 17, 2016

serializing will create buffers, but since these are only used for deserializing, I don't think there should even be any buffers created. I guess the time saved is all the registration which can be skipped? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala#L85. I suppose in this case, this is the result of spark.sql.shuffle.partitions=100000, so you end up calling this method a lot.

The only wrinkle I can see here is if reference-tracking is turned on (which it is, by default). But I think this is taken care of anyway by the way borrowKryo works.

@squito
Copy link
Contributor

squito commented Oct 17, 2016

Jenkins, retest this please

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good just some nits.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please keep the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I prefer the following codes because Option(...).getOrElse doesn't improve the readability but creates unnecessary objects.

val resultSer = if (resultSer == null) SparkEnv.get.serializer.newInstance() else resultSer 
valueObject = resultSer.deserialize(valueBytes)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@SparkQA
Copy link

SparkQA commented Oct 17, 2016

Test build #67065 has finished for PR 15512 at commit 037871d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would be nice to add a comment here saying "force deserialization of referenced value" or some such

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@witgo
Copy link
Contributor Author

witgo commented Oct 18, 2016

@squito I also think that the time saved is all the registration which can be skipped, but did not verify.

@SparkQA
Copy link

SparkQA commented Oct 18, 2016

Test build #67102 has finished for PR 15512 at commit 7d73691.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Oct 18, 2016

LGTM. Merging to master. Thanks!

@asfgit asfgit closed this in 4518642 Oct 18, 2016
@witgo witgo deleted the SPARK-17930 branch October 19, 2016 01:21
zzcclp added a commit to zzcclp/spark that referenced this pull request Oct 20, 2016
robert3005 pushed a commit to palantir/spark that referenced this pull request Nov 1, 2016
…lizing a TaskResult is not reused

## What changes were proposed in this pull request?
The following code is called when the DirectTaskResult instance is deserialized

```scala

  def value(): T = {
    if (valueObjectDeserialized) {
      valueObject
    } else {
      // Each deserialization creates a new instance of SerializerInstance, which is very time-consuming
      val resultSer = SparkEnv.get.serializer.newInstance()
      valueObject = resultSer.deserialize(valueBytes)
      valueObjectDeserialized = true
      valueObject
    }
  }

```

In the case of stage has a lot of tasks, reuse SerializerInstance instance can improve the scheduling performance of three times

The test data is TPC-DS 2T (Parquet) and  SQL statement as follows (query 2):

```sql

select  i_item_id,
        avg(ss_quantity) agg1,
        avg(ss_list_price) agg2,
        avg(ss_coupon_amt) agg3,
        avg(ss_sales_price) agg4
 from store_sales, customer_demographics, date_dim, item, promotion
 where ss_sold_date_sk = d_date_sk and
       ss_item_sk = i_item_sk and
       ss_cdemo_sk = cd_demo_sk and
       ss_promo_sk = p_promo_sk and
       cd_gender = 'M' and
       cd_marital_status = 'M' and
       cd_education_status = '4 yr Degree' and
       (p_channel_email = 'N' or p_channel_event = 'N') and
       d_year = 2001
 group by i_item_id
 order by i_item_id
 limit 100;

```

`spark-defaults.conf` file:

```
spark.master                           yarn-client
spark.executor.instances               20
spark.driver.memory                    16g
spark.executor.memory                  30g
spark.executor.cores                   5
spark.default.parallelism              100
spark.sql.shuffle.partitions           100000
spark.serializer                       org.apache.spark.serializer.KryoSerializer
spark.driver.maxResultSize              0
spark.rpc.netty.dispatcher.numThreads   8
spark.executor.extraJavaOptions          -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M
spark.cleaner.referenceTracking.blocking true
spark.cleaner.referenceTracking.blocking.shuffle true

```

Performance test results are as follows

[SPARK-17930](https://github.com/witgo/spark/tree/SPARK-17930)| [ed14633](witgo@ed14633])
------------ | -------------
54.5 s|231.7 s

## How was this patch tested?

Existing tests.

Author: Guoqiang Li <witgo@qq.com>

Closes apache#15512 from witgo/SPARK-17930.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…lizing a TaskResult is not reused

## What changes were proposed in this pull request?
The following code is called when the DirectTaskResult instance is deserialized

```scala

  def value(): T = {
    if (valueObjectDeserialized) {
      valueObject
    } else {
      // Each deserialization creates a new instance of SerializerInstance, which is very time-consuming
      val resultSer = SparkEnv.get.serializer.newInstance()
      valueObject = resultSer.deserialize(valueBytes)
      valueObjectDeserialized = true
      valueObject
    }
  }

```

In the case of stage has a lot of tasks, reuse SerializerInstance instance can improve the scheduling performance of three times

The test data is TPC-DS 2T (Parquet) and  SQL statement as follows (query 2):

```sql

select  i_item_id,
        avg(ss_quantity) agg1,
        avg(ss_list_price) agg2,
        avg(ss_coupon_amt) agg3,
        avg(ss_sales_price) agg4
 from store_sales, customer_demographics, date_dim, item, promotion
 where ss_sold_date_sk = d_date_sk and
       ss_item_sk = i_item_sk and
       ss_cdemo_sk = cd_demo_sk and
       ss_promo_sk = p_promo_sk and
       cd_gender = 'M' and
       cd_marital_status = 'M' and
       cd_education_status = '4 yr Degree' and
       (p_channel_email = 'N' or p_channel_event = 'N') and
       d_year = 2001
 group by i_item_id
 order by i_item_id
 limit 100;

```

`spark-defaults.conf` file:

```
spark.master                           yarn-client
spark.executor.instances               20
spark.driver.memory                    16g
spark.executor.memory                  30g
spark.executor.cores                   5
spark.default.parallelism              100
spark.sql.shuffle.partitions           100000
spark.serializer                       org.apache.spark.serializer.KryoSerializer
spark.driver.maxResultSize              0
spark.rpc.netty.dispatcher.numThreads   8
spark.executor.extraJavaOptions          -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M
spark.cleaner.referenceTracking.blocking true
spark.cleaner.referenceTracking.blocking.shuffle true

```

Performance test results are as follows

[SPARK-17930](https://github.com/witgo/spark/tree/SPARK-17930)| [ed14633](witgo@ed14633])
------------ | -------------
54.5 s|231.7 s

## How was this patch tested?

Existing tests.

Author: Guoqiang Li <witgo@qq.com>

Closes apache#15512 from witgo/SPARK-17930.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants