Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18990][SQL] make DatasetBenchmark fairer for Dataset #16391

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 42 additions & 33 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetBenchmark.scala
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.expressions.scalalang.typed
import org.apache.spark.sql.functions._
Expand All @@ -34,11 +33,13 @@ object DatasetBenchmark {
def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = {
import spark.implicits._

val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
val rdd = spark.sparkContext.range(0, numRows)
val ds = spark.range(0, numRows)
val df = ds.toDF("l")
val func = (l: Long) => l + 1

val benchmark = new Benchmark("back-to-back map", numRows)
val func = (d: Data) => Data(d.l + 1, d.s)

val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
benchmark.addCase("RDD") { iter =>
var res = rdd
var i = 0
Expand All @@ -53,14 +54,14 @@ object DatasetBenchmark {
var res = df
var i = 0
while (i < numChains) {
res = res.select($"l" + 1 as "l", $"s")
res = res.select($"l" + 1 as "l")
i += 1
}
res.queryExecution.toRdd.foreach(_ => Unit)
}

benchmark.addCase("Dataset") { iter =>
var res = df.as[Data]
var res = ds.as[Long]
var i = 0
while (i < numChains) {
res = res.map(func)
Expand All @@ -75,14 +76,14 @@ object DatasetBenchmark {
def backToBackFilter(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = {
import spark.implicits._

val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
val rdd = spark.sparkContext.range(0, numRows)
val ds = spark.range(0, numRows)
val df = ds.toDF("l")
val func = (l: Long, i: Int) => l % (100L + i) == 0L
val funcs = 0.until(numChains).map { i => (l: Long) => func(l, i) }

val benchmark = new Benchmark("back-to-back filter", numRows)
val func = (d: Data, i: Int) => d.l % (100L + i) == 0L
val funcs = 0.until(numChains).map { i =>
(d: Data) => func(d, i)
}

val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
benchmark.addCase("RDD") { iter =>
var res = rdd
var i = 0
Expand All @@ -104,7 +105,7 @@ object DatasetBenchmark {
}

benchmark.addCase("Dataset") { iter =>
var res = df.as[Data]
var res = ds.as[Long]
var i = 0
while (i < numChains) {
res = res.filter(funcs(i))
Expand Down Expand Up @@ -133,24 +134,29 @@ object DatasetBenchmark {
def aggregate(spark: SparkSession, numRows: Long): Benchmark = {
import spark.implicits._

val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))
val rdd = spark.sparkContext.range(0, numRows)
val ds = spark.range(0, numRows)
val df = ds.toDF("l")

val benchmark = new Benchmark("aggregate", numRows)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to update aggregate.


val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString))
benchmark.addCase("RDD sum") { iter =>
rdd.aggregate(0L)(_ + _.l, _ + _)
rdd.map(l => (l % 10, l)).reduceByKey(_ + _).foreach(_ => Unit)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also test the grouping performance, not only aggregating.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to add grouping operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregate without grouping is not a common use case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think we should also have a test case for aggregation without group by.

}

benchmark.addCase("DataFrame sum") { iter =>
df.select(sum($"l")).queryExecution.toRdd.foreach(_ => Unit)
df.groupBy($"l" % 10).agg(sum($"l")).queryExecution.toRdd.foreach(_ => Unit)
}

benchmark.addCase("Dataset sum using Aggregator") { iter =>
df.as[Data].select(typed.sumLong((d: Data) => d.l)).queryExecution.toRdd.foreach(_ => Unit)
val result = ds.as[Long].groupByKey(_ % 10).agg(typed.sumLong[Long](identity))
result.queryExecution.toRdd.foreach(_ => Unit)
}

val complexDs = df.select($"l", $"l".cast(StringType).as("s")).as[Data]
benchmark.addCase("Dataset complex Aggregator") { iter =>
df.as[Data].select(ComplexAggregator.toColumn).queryExecution.toRdd.foreach(_ => Unit)
val result = complexDs.groupByKey(_.l % 10).agg(ComplexAggregator.toColumn)
result.queryExecution.toRdd.foreach(_ => Unit)
}

benchmark
Expand All @@ -170,36 +176,39 @@ object DatasetBenchmark {
val benchmark3 = aggregate(spark, numRows)

/*
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz

back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 3448 / 3646 29.0 34.5 1.0X
DataFrame 2647 / 3116 37.8 26.5 1.3X
Dataset 4781 / 5155 20.9 47.8 0.7X
RDD 3963 / 3976 25.2 39.6 1.0X
DataFrame 826 / 834 121.1 8.3 4.8X
Dataset 5178 / 5198 19.3 51.8 0.8X
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for "back-to-back map", the logic is so simple that the code generated by Dataset is less efficient than RDD. RDD just adds 1 to the input Long, the only overhead is boxing, while Dataset generates code like this:

boolean mapelements_isNull = true;
long mapelements_value = -1L;
if (!false) {
  mapelements_argValue = range_value;
  mapelements_isNull = false;
  if (!mapelements_isNull) {
    Object mapelements_funcResult = null;
    mapelements_funcResult = mapelements_obj.apply(mapelements_argValue);
    if (mapelements_funcResult == null) {
      mapelements_isNull = true;
    } else {
      mapelements_value = (Long) mapelements_funcResult;
    }
  }
}

Dataset still has the boxing overhead, but its code is more verbose. And Dataset has to write the long to un unsafe row at last, which is another overhead. These are the reasons why Dataset is slower than RDD for this simple case.

Copy link
Member

@kiszk kiszk Dec 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, an signature of apply() is Object apply(Object). It also introduces additional boxing overhead from long to Long.
To reduce these boxing and unboxing overhead, we need to use more concrete signature (e.g. long apply(long) to call a lambda function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method signature in Dataset is: def map[U : Encoder](f: T => U), unless we create primitive version methods, e.g. def map(f: T => Long), I can't think of an easy way to get the concrete signature.

BTW, I think the best solution is to analyze the byte code(class file) of the lambda function, and turn it into expressions.

Copy link
Member

@kiszk kiszk Dec 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that Scala compiler automatically generates primitive version. Current Spark eventually calls primitive version thru generic version Object apply(Object).

Here is a simple example. When we compile the following Dataset program, we can find that the following class is generated by scalac. Scalac automatically generates a primitive version int apply$mcII$sp(int) that can be called by int apply(int).
We could infer this signature in Catalyst for simple cases.

Of course, I totally agree that the best solution is to analyze byte code and turn it into expression. This was already prototyped. Do you think it is good time to make this prototype more robust now?

test("ds") {
  val ds = sparkContext.parallelize((1 to 10), 1).toDS
  ds.map(i => i * 7).show
}

$ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class
Compiled from "Test.scala"
public final class org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
  public static final long serialVersionUID;

  public final int apply(int);
    Code:
       0: aload_0
       1: iload_1
       2: invokevirtual #18                 // Method apply$mcII$sp:(I)I
       5: ireturn

  public int apply$mcII$sp(int);
    Code:
       0: iload_1
       1: bipush        7
       3: imul
       4: ireturn

  public final java.lang.Object apply(java.lang.Object);
    Code:
       0: aload_0
       1: aload_1
       2: invokestatic  #29                 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
       5: invokevirtual #31                 // Method apply:(I)I
       8: invokestatic  #35                 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer;
      11: areturn

  public org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5);
    Code:
       0: aload_0
       1: invokespecial #42                 // Method scala/runtime/AbstractFunction1$mcII$sp."<init>":()V
       4: return
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, scala compiler is smart! I think we can create a ticket to optimize this, i.e. call the primitive apply version, and update the benchmark result.

For byte code analysis, let's discuss about it in the ticket later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will create a JIRA ticket for this optimization.

For byte code analysis, let's restart discuss about it the JIRA entry.

*/
benchmark.run()

/*
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz

back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 1346 / 1618 74.3 13.5 1.0X
DataFrame 59 / 72 1695.4 0.6 22.8X
Dataset 2777 / 2805 36.0 27.8 0.5X
RDD 533 / 587 187.6 5.3 1.0X
DataFrame 79 / 91 1269.0 0.8 6.8X
Dataset 550 / 559 181.7 5.5 1.0X
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For "back-to-back filter", Dataset will deserialize the input row to an object and apply the condition function. When the deserialization becomes no-op, Dataset runs almost the same RDD code like the RDD case. So in this case, RDD and Dataset has similar performance.

*/
benchmark2.run()

/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz

aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD sum 1913 / 1942 52.3 19.1 1.0X
DataFrame sum 46 / 61 2157.7 0.5 41.3X
Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X
Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X
RDD sum 2297 / 2440 43.5 23.0 1.0X
DataFrame sum 630 / 637 158.7 6.3 3.6X
Dataset sum using Aggregator 3129 / 3247 32.0 31.3 0.7X
Dataset complex Aggregator 12109 / 12142 8.3 121.1 0.2X
*/
benchmark3.run()
}
Expand Down