Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18990][SQL] make DatasetBenchmark fairer for Dataset #16391

Closed
wants to merge 4 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

Currently DatasetBenchmark use case class Data(l: Long, s: String) as the record type of RDD and Dataset, which introduce serialization overhead only to Dataset and is unfair.

This PR use Long as the record type, to be fairer for Dataset

How was this patch tested?

existing tests

@@ -220,7 +220,7 @@ object DecimalLiteral {
/**
* In order to do type checking, use Literal.create() instead of constructor
*/
case class Literal (value: Any, dataType: DataType) extends LeafExpression with CodegenFallback {
case class Literal (value: Any, dataType: DataType) extends LeafExpression {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are unrelated changes. When I looked at the generated codes, I found Literal generated verbose codes, so I simplified it a little bit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to split into two PRs? To put two changes into one PR may not be easy to understand diffs of performance results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I reverted. The result is almost the same so I didn't update

benchmark.addCase("RDD sum") { iter =>
rdd.aggregate(0L)(_ + _.l, _ + _)
rdd.map(l => (l % 10, l)).reduceByKey(_ + _).foreach(_ => Unit)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also test the grouping performance, not only aggregating.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to add grouping operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregate without grouping is not a common use case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think we should also have a test case for aggregation without group by.

Dataset 4781 / 5155 20.9 47.8 0.7X
RDD 3963 / 3976 25.2 39.6 1.0X
DataFrame 826 / 834 121.1 8.3 4.8X
Dataset 5178 / 5198 19.3 51.8 0.8X
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for "back-to-back map", the logic is so simple that the code generated by Dataset is less efficient than RDD. RDD just adds 1 to the input Long, the only overhead is boxing, while Dataset generates code like this:

boolean mapelements_isNull = true;
long mapelements_value = -1L;
if (!false) {
  mapelements_argValue = range_value;
  mapelements_isNull = false;
  if (!mapelements_isNull) {
    Object mapelements_funcResult = null;
    mapelements_funcResult = mapelements_obj.apply(mapelements_argValue);
    if (mapelements_funcResult == null) {
      mapelements_isNull = true;
    } else {
      mapelements_value = (Long) mapelements_funcResult;
    }
  }
}

Dataset still has the boxing overhead, but its code is more verbose. And Dataset has to write the long to un unsafe row at last, which is another overhead. These are the reasons why Dataset is slower than RDD for this simple case.

Copy link
Member

@kiszk kiszk Dec 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, an signature of apply() is Object apply(Object). It also introduces additional boxing overhead from long to Long.
To reduce these boxing and unboxing overhead, we need to use more concrete signature (e.g. long apply(long) to call a lambda function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method signature in Dataset is: def map[U : Encoder](f: T => U), unless we create primitive version methods, e.g. def map(f: T => Long), I can't think of an easy way to get the concrete signature.

BTW, I think the best solution is to analyze the byte code(class file) of the lambda function, and turn it into expressions.

Copy link
Member

@kiszk kiszk Dec 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that Scala compiler automatically generates primitive version. Current Spark eventually calls primitive version thru generic version Object apply(Object).

Here is a simple example. When we compile the following Dataset program, we can find that the following class is generated by scalac. Scalac automatically generates a primitive version int apply$mcII$sp(int) that can be called by int apply(int).
We could infer this signature in Catalyst for simple cases.

Of course, I totally agree that the best solution is to analyze byte code and turn it into expression. This was already prototyped. Do you think it is good time to make this prototype more robust now?

test("ds") {
  val ds = sparkContext.parallelize((1 to 10), 1).toDS
  ds.map(i => i * 7).show
}

$ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class
Compiled from "Test.scala"
public final class org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
  public static final long serialVersionUID;

  public final int apply(int);
    Code:
       0: aload_0
       1: iload_1
       2: invokevirtual #18                 // Method apply$mcII$sp:(I)I
       5: ireturn

  public int apply$mcII$sp(int);
    Code:
       0: iload_1
       1: bipush        7
       3: imul
       4: ireturn

  public final java.lang.Object apply(java.lang.Object);
    Code:
       0: aload_0
       1: aload_1
       2: invokestatic  #29                 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
       5: invokevirtual #31                 // Method apply:(I)I
       8: invokestatic  #35                 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer;
      11: areturn

  public org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5);
    Code:
       0: aload_0
       1: invokespecial #42                 // Method scala/runtime/AbstractFunction1$mcII$sp."<init>":()V
       4: return
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, scala compiler is smart! I think we can create a ticket to optimize this, i.e. call the primitive apply version, and update the benchmark result.

For byte code analysis, let's discuss about it in the ticket later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will create a JIRA ticket for this optimization.

For byte code analysis, let's restart discuss about it the JIRA entry.

Dataset 2777 / 2805 36.0 27.8 0.5X
RDD 533 / 587 187.6 5.3 1.0X
DataFrame 79 / 91 1269.0 0.8 6.8X
Dataset 550 / 559 181.7 5.5 1.0X
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For "back-to-back filter", Dataset will deserialize the input row to an object and apply the condition function. When the deserialization becomes no-op, Dataset runs almost the same RDD code like the RDD case. So in this case, RDD and Dataset has similar performance.

RDD sum 1950 / 1995 51.3 19.5 1.0X
DataFrame sum 587 / 611 170.2 5.9 3.3X
Dataset sum using Aggregator 3014 / 3222 33.2 30.1 0.6X
Dataset complex Aggregator 32650 / 34505 3.1 326.5 0.1X
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For "aggregate", Dataset use AppendColumnsExec to generate the grouping key, which will do an extra copy(the unsafe row joiner). This makes Dataset slower than RDD.

val rdd = spark.sparkContext.range(0, numRows)
val ds = spark.range(0, numRows)
val df = ds.toDF("l")

val benchmark = new Benchmark("aggregate", numRows)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to update aggregate.

@cloud-fan
Copy link
Contributor Author

cc @yhuai @hvanhovell

@SparkQA
Copy link

SparkQA commented Dec 23, 2016

Test build #70552 has finished for PR 16391 at commit 43da0af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Literal (value: Any, dataType: DataType) extends LeafExpression

@SparkQA
Copy link

SparkQA commented Dec 23, 2016

Test build #70553 has finished for PR 16391 at commit ff89719.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Literal (value: Any, dataType: DataType) extends LeafExpression with CodegenFallback

@SparkQA
Copy link

SparkQA commented Dec 27, 2016

Test build #70619 has finished for PR 16391 at commit 2a4d22d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

thanks for the review, merging to master!

@asfgit asfgit closed this in a05cc42 Dec 27, 2016
@yhuai
Copy link
Contributor

yhuai commented Dec 27, 2016

Seems there is no explicit LGTM. I am reverting this change from master.

@yhuai
Copy link
Contributor

yhuai commented Dec 27, 2016

Just reverted from master.

btw, I think we can keep existing cases and then add new cases.

@inouehrs
Copy link
Contributor

I agree that it is quite nice to have multiple record types in the benchmark to reveal the source of overheads!
I also observed the relative performances of RDD, DataFrame, Dataset become quite different when only a primitive is used as the record type as discussed in an old PR (f1e49f3).

cmonkey pushed a commit to cmonkey/spark that referenced this pull request Dec 29, 2016
## What changes were proposed in this pull request?

Currently `DatasetBenchmark` use `case class Data(l: Long, s: String)` as the record type of `RDD` and `Dataset`, which introduce serialization overhead only to `Dataset` and is unfair.

This PR use `Long` as the record type, to be fairer for `Dataset`

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#16391 from cloud-fan/benchmark.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

Currently `DatasetBenchmark` use `case class Data(l: Long, s: String)` as the record type of `RDD` and `Dataset`, which introduce serialization overhead only to `Dataset` and is unfair.

This PR use `Long` as the record type, to be fairer for `Dataset`

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#16391 from cloud-fan/benchmark.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants