Skip to content

Conversation

@chenghao-intel
Copy link
Contributor

Aggregation function min/max in catalyst will create expression tree for each single row, however, the expression tree creation is quite expensive in a multithreading env currently. Hence we got a very bad performance for the min/max.
Here is the benchmark that I've done in my local.

Master Previous Result (ms) Current Result (ms)
local 3645 3416
local[6] 3602 1002

The Benchmark source code.

case class Record(key: Int, value: Int)

object TestHive2 extends HiveContext(new SparkContext("local[6]", "TestSQLContext", new SparkConf()))

object DataPrepare extends App {
  import TestHive2._

  val rdd = sparkContext.parallelize((1 to 10000000).map(i => Record(i % 3000, i)), 12)

  runSqlHive("SHOW TABLES")
  runSqlHive("DROP TABLE if exists a")
  runSqlHive("DROP TABLE if exists result")
  rdd.registerAsTable("records")

  runSqlHive("""CREATE TABLE a (key INT, value INT)
                 | ROW FORMAT SERDE 
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' 
                 | STORED AS RCFILE
               """.stripMargin)
  runSqlHive("""CREATE TABLE result (key INT, value INT)
                 | ROW FORMAT SERDE 
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' 
                 | STORED AS RCFILE
               """.stripMargin)

  hql(s"""from records 
             | insert into table a
             | select key, value
           """.stripMargin)
}

object PerformanceTest extends App {
  import TestHive2._

  hql("SHOW TABLES")
  hql("set spark.sql.shuffle.partitions=12")

  val cmd = "select min(value), max(value) from a group by key"

  val results = ("Result1", benchmark(cmd)) :: 
                ("Result2", benchmark(cmd)) :: 
                ("Result3", benchmark(cmd)) :: Nil
  results.foreach { case (prompt, result) => {
      println(s"$prompt: took ${result._1} ms (${result._2} records)")
    }
  }

  def benchmark(cmd: String) = {
    val begin = System.currentTimeMillis()
    val count = hql(cmd).count
    val end = System.currentTimeMillis()
    ((end - begin), count)
  }
}

@SparkQA
Copy link

SparkQA commented Aug 25, 2014

QA tests have started for PR 2113 at commit 03c6d4f.

  • This patch merges cleanly.

@chenghao-intel chenghao-intel changed the title [SPARK-3197] [SQL] Reduce the Expressions creation for aggregation function (min/max) [SPARK-3197] [SQL] Reduce the Expression tree object creations for aggregation function (min/max) Aug 25, 2014
@SparkQA
Copy link

SparkQA commented Aug 25, 2014

QA tests have finished for PR 2113 at commit 03c6d4f.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class MutableLiteral(var value: Any, dataType: DataType, nullable: Boolean = true)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't think this needs to be @transient (this function will never be serialized), and it could also be a val.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes, you're right, I will update this.

@chenghao-intel chenghao-intel force-pushed the aggregation_expression_optimization branch from 03c6d4f to db40395 Compare August 27, 2014 00:48
@chenghao-intel
Copy link
Contributor Author

test this please

@chenghao-intel
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Aug 27, 2014

QA tests have started for PR 2113 at commit db40395.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Aug 27, 2014

QA tests have finished for PR 2113 at commit db40395.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class MutableLiteral(var value: Any, dataType: DataType, nullable: Boolean = true)

@marmbrus
Copy link
Contributor

Thanks! I've merged this to master and 1.1.

asfgit pushed a commit that referenced this pull request Aug 27, 2014
…gregation function (min/max)

Aggregation function min/max in catalyst will create expression tree for each single row, however, the expression tree creation is quite expensive in a multithreading env currently. Hence we got a very bad performance for the min/max.
Here is the benchmark that I've done in my local.

Master | Previous Result (ms) | Current Result (ms)
------------ | ------------- | -------------
local | 3645 | 3416
local[6] | 3602 | 1002

The Benchmark source code.
```
case class Record(key: Int, value: Int)

object TestHive2 extends HiveContext(new SparkContext("local[6]", "TestSQLContext", new SparkConf()))

object DataPrepare extends App {
  import TestHive2._

  val rdd = sparkContext.parallelize((1 to 10000000).map(i => Record(i % 3000, i)), 12)

  runSqlHive("SHOW TABLES")
  runSqlHive("DROP TABLE if exists a")
  runSqlHive("DROP TABLE if exists result")
  rdd.registerAsTable("records")

  runSqlHive("""CREATE TABLE a (key INT, value INT)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)
  runSqlHive("""CREATE TABLE result (key INT, value INT)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)

  hql(s"""from records
             | insert into table a
             | select key, value
           """.stripMargin)
}

object PerformanceTest extends App {
  import TestHive2._

  hql("SHOW TABLES")
  hql("set spark.sql.shuffle.partitions=12")

  val cmd = "select min(value), max(value) from a group by key"

  val results = ("Result1", benchmark(cmd)) ::
                ("Result2", benchmark(cmd)) ::
                ("Result3", benchmark(cmd)) :: Nil
  results.foreach { case (prompt, result) => {
      println(s"$prompt: took ${result._1} ms (${result._2} records)")
    }
  }

  def benchmark(cmd: String) = {
    val begin = System.currentTimeMillis()
    val count = hql(cmd).count
    val end = System.currentTimeMillis()
    ((end - begin), count)
  }
}
```

Author: Cheng Hao <hao.cheng@intel.com>

Closes #2113 from chenghao-intel/aggregation_expression_optimization and squashes the following commits:

db40395 [Cheng Hao] remove the transient and add val for the expression property
d56167d [Cheng Hao] Reduce the Expressions creation

(cherry picked from commit 4238c17)
Signed-off-by: Michael Armbrust <michael@databricks.com>
@asfgit asfgit closed this in 4238c17 Aug 27, 2014
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
…gregation function (min/max)

Aggregation function min/max in catalyst will create expression tree for each single row, however, the expression tree creation is quite expensive in a multithreading env currently. Hence we got a very bad performance for the min/max.
Here is the benchmark that I've done in my local.

Master | Previous Result (ms) | Current Result (ms)
------------ | ------------- | -------------
local | 3645 | 3416
local[6] | 3602 | 1002

The Benchmark source code.
```
case class Record(key: Int, value: Int)

object TestHive2 extends HiveContext(new SparkContext("local[6]", "TestSQLContext", new SparkConf()))

object DataPrepare extends App {
  import TestHive2._

  val rdd = sparkContext.parallelize((1 to 10000000).map(i => Record(i % 3000, i)), 12)

  runSqlHive("SHOW TABLES")
  runSqlHive("DROP TABLE if exists a")
  runSqlHive("DROP TABLE if exists result")
  rdd.registerAsTable("records")

  runSqlHive("""CREATE TABLE a (key INT, value INT)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)
  runSqlHive("""CREATE TABLE result (key INT, value INT)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)

  hql(s"""from records
             | insert into table a
             | select key, value
           """.stripMargin)
}

object PerformanceTest extends App {
  import TestHive2._

  hql("SHOW TABLES")
  hql("set spark.sql.shuffle.partitions=12")

  val cmd = "select min(value), max(value) from a group by key"

  val results = ("Result1", benchmark(cmd)) ::
                ("Result2", benchmark(cmd)) ::
                ("Result3", benchmark(cmd)) :: Nil
  results.foreach { case (prompt, result) => {
      println(s"$prompt: took ${result._1} ms (${result._2} records)")
    }
  }

  def benchmark(cmd: String) = {
    val begin = System.currentTimeMillis()
    val count = hql(cmd).count
    val end = System.currentTimeMillis()
    ((end - begin), count)
  }
}
```

Author: Cheng Hao <hao.cheng@intel.com>

Closes apache#2113 from chenghao-intel/aggregation_expression_optimization and squashes the following commits:

db40395 [Cheng Hao] remove the transient and add val for the expression property
d56167d [Cheng Hao] Reduce the Expressions creation
@chenghao-intel chenghao-intel deleted the aggregation_expression_optimization branch October 9, 2014 04:51
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chenghao-intel , will this cause twice evaluation of expr on the same input line? why not eval once and store in a tmp val? Did I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right! I think now it can be replaced by MinOf.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, just start to read aggregate code :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yjshen , the new aggregation has been merged, you have a lot more to read now :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it would be a long journey, but I could ask the author whenever I have a question ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants