Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2443][SQL] Fix slow read from partitioned tables #1390

Closed
wants to merge 2 commits into from

Conversation

concretevitamin
Copy link
Contributor

This simply incorporates Shark's #329 into Spark SQL. Implementation credit to @chiragaggarwal.

@marmbrus @rxin @chenghao-intel

Benchmarks

Generated a local text file with 10M rows of simple key-value pairs. The data is loaded as a table through Hive. Results are obtained on my local machine using hive/console.

Without the fix:

Non-partitioned Partitioned (1 part)
First run: 9.52s end-to-end (1.64s Spark job) First run: 36.6s (28.3s)
Stablized runs: 1.21s (1.18s) Stablized runs: 27.6s (27.5s)

With this fix:

Non-partitioned Partitioned (1 part)
First run: 9.57s (1.46s) First run: 9.30s (1.45s)
Stablized runs: 1.13s (1.10s) Stablized runs: 1.18s (1.15s)

@concretevitamin
Copy link
Contributor Author

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Jul 13, 2014

QA tests have started for PR 1390. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16598/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 13, 2014

QA results for PR 1390:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16598/consoleFull

@concretevitamin
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jul 13, 2014

QA tests have started for PR 1390. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16599/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 13, 2014

QA results for PR 1390:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16599/consoleFull

val broadcastedHiveConf = _broadcastedHiveConf
val localDeserializer = partDeserializer

val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
hivePartitionRDD.mapPartitions { iter =>
hivePartitionRDD.mapPartitions { case iter =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the pattern matching here necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought in a function context, { case x => ... } will be optimized to { x => ... }. I did a scalac -print on a simple program to confirm that this is not the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh really? how does the generated bytecode differ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://gist.github.com/concretevitamin/272fe413dcc06b8cbe9c

It seems the with-case version does have more instructions to do.

@rxin
Copy link
Contributor

rxin commented Jul 13, 2014

@yhuai can you take a look?

@yhuai
Copy link
Contributor

yhuai commented Jul 13, 2014

I am reviewing it. Will comment it later today.

@chenghao-intel
Copy link
Contributor

The code looks good to me. However, I think we can avoid the work around solution (de-serializing (with partition serde) and then serialize (with table serde) again) for adapting the higher level table scan (TableScanOperator in Shark), which have to providing a unique ObjectInspector for the downstream Operators.

Not like TableScanOperator, HiveTableScan in Spark-Hive doesn't reply on ObjectInspector,
and its output type is GenericMutableRow, I think we could make the object conversion (from raw type to Row object) directly.

@chenghao-intel
Copy link
Contributor

And as the Hive SerDe actually provides the feature of lazy parsing, hence during the converting of raw object to Row, we need to support the column pruning

Sorry, some high level comments.

@yhuai
Copy link
Contributor

yhuai commented Jul 14, 2014

@chenghao-intel I am not sure I understand your comment on column pruning. I think for a Hive table, we should use ColumnProjectionUtils to set needed columns. So, RCFile and ORC can just read needed columns from HDFS.

case _: LazyStruct => convertedRow
case _: HiveColumnarStruct => convertedRow
case _ => tableSerDe.deserialize(
tableSerDe.asInstanceOf[Serializer].serialize(convertedRow, tblConvertedOI))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned by @chenghao-intel, can we avoid it?

@yhuai
Copy link
Contributor

yhuai commented Jul 14, 2014

In general, I suggest adding more comments to explain what we are doing at here because this part of code is pretty Hive-specific.

@marmbrus
Copy link
Contributor

Thanks for reviewing this everyone. I'm all for commenting and cleaning things up here, but if possible I'd like to merge this in today. There are a couple of people blocking on this as its a pretty severe performance bug. How about we just add some TODOs that can be taken care of in a follow up PR?

@concretevitamin
Copy link
Contributor Author

@yhuai suggested a much simpler fix -- I benchmarked this and it gave the same performance boost. I am closing this and opening a new PR.

@concretevitamin
Copy link
Contributor Author

New PR here: #1408

asfgit pushed a commit that referenced this pull request Jul 14, 2014
This fix obtains a comparable performance boost as [PR #1390](#1390) by moving an array update and deserializer initialization out of a potentially very long loop. Suggested by yhuai. The below results are updated for this fix.

## Benchmarks
Generated a local text file with 10M rows of simple key-value pairs. The data is loaded as a table through Hive. Results are obtained on my local machine using hive/console.

Without the fix:

Type | Non-partitioned | Partitioned (1 part)
------------ | ------------ | -------------
First run | 9.52s end-to-end (1.64s Spark job) | 36.6s (28.3s)
Stablized runs | 1.21s (1.18s) | 27.6s (27.5s)

With this fix:

Type | Non-partitioned | Partitioned (1 part)
------------ | ------------ | -------------
First run | 9.57s (1.46s) | 11.0s (1.69s)
Stablized runs | 1.13s (1.10s) | 1.23s (1.19s)

Author: Zongheng Yang <zongheng.y@gmail.com>

Closes #1408 from concretevitamin/slow-read-2 and squashes the following commits:

d86e437 [Zongheng Yang] Move update & initialization out of potentially long loop.
asfgit pushed a commit that referenced this pull request Jul 14, 2014
This fix obtains a comparable performance boost as [PR #1390](#1390) by moving an array update and deserializer initialization out of a potentially very long loop. Suggested by yhuai. The below results are updated for this fix.

## Benchmarks
Generated a local text file with 10M rows of simple key-value pairs. The data is loaded as a table through Hive. Results are obtained on my local machine using hive/console.

Without the fix:

Type | Non-partitioned | Partitioned (1 part)
------------ | ------------ | -------------
First run | 9.52s end-to-end (1.64s Spark job) | 36.6s (28.3s)
Stablized runs | 1.21s (1.18s) | 27.6s (27.5s)

With this fix:

Type | Non-partitioned | Partitioned (1 part)
------------ | ------------ | -------------
First run | 9.57s (1.46s) | 11.0s (1.69s)
Stablized runs | 1.13s (1.10s) | 1.23s (1.19s)

Author: Zongheng Yang <zongheng.y@gmail.com>

Closes #1408 from concretevitamin/slow-read-2 and squashes the following commits:

d86e437 [Zongheng Yang] Move update & initialization out of potentially long loop.

(cherry picked from commit d60b09b)
Signed-off-by: Michael Armbrust <michael@databricks.com>
harishreedharan pushed a commit to harishreedharan/spark that referenced this pull request Jul 29, 2014
In HiveTableScan.scala, ObjectInspector was created for all of the partition based records, which probably causes ClassCastException if the object inspector is not identical among table & partitions.

This is the follow up with:
apache#1408
apache#1390

I've run a micro benchmark in my local with 15000000 records totally, and got the result as below:

With This Patch  |  Partition-Based Table  |  Non-Partition-Based Table
------------ | ------------- | -------------
No  |  1927 ms  |  1885 ms
Yes  | 1541 ms  |  1524 ms

It showed this patch will also improve the performance.

PS:  the benchmark code is also attached. (thanks liancheng )
```
package org.apache.spark.sql.hive

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._

object HiveTableScanPrepare extends App {
  case class Record(key: String, value: String)

  val sparkContext = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))

  val hiveContext = new LocalHiveContext(sparkContext)

  val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"$i", s"val_$i")))

  import hiveContext._

  hql("SHOW TABLES")
  hql("DROP TABLE if exists part_scan_test")
  hql("DROP TABLE if exists scan_test")
  hql("DROP TABLE if exists records")
  rdd.registerAsTable("records")

  hql("""CREATE TABLE part_scan_test (key STRING, value STRING) PARTITIONED BY (part1 string, part2 STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)
  hql("""CREATE TABLE scan_test (key STRING, value STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)

  for (part1 <- 2000 until 2001) {
    for (part2 <- 1 to 5) {
      hql(s"""from records
                 | insert into table part_scan_test PARTITION (part1='$part1', part2='2010-01-$part2')
                 | select key, value
               """.stripMargin)
      hql(s"""from records
                 | insert into table scan_test select key, value
               """.stripMargin)
    }
  }
}

object HiveTableScanTest extends App {
  val sparkContext = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))

  val hiveContext = new LocalHiveContext(sparkContext)

  import hiveContext._

  hql("SHOW TABLES")
  val part_scan_test = hql("select key, value from part_scan_test")
  val scan_test = hql("select key, value from scan_test")

  val r_part_scan_test = (0 to 5).map(i => benchmark(part_scan_test))
  val r_scan_test = (0 to 5).map(i => benchmark(scan_test))
  println("Scanning Partition-Based Table")
  r_part_scan_test.foreach(printResult)
  println("Scanning Non-Partition-Based Table")
  r_scan_test.foreach(printResult)

  def printResult(result: (Long, Long)) {
    println(s"Duration: ${result._1} ms Result: ${result._2}")
  }

  def benchmark(srdd: SchemaRDD) = {
    val begin = System.currentTimeMillis()
    val result = srdd.count()
    val end = System.currentTimeMillis()
    ((end - begin), result)
  }
}
```

Author: Cheng Hao <hao.cheng@intel.com>

Closes apache#1439 from chenghao-intel/hadoop_table_scan and squashes the following commits:

888968f [Cheng Hao] Fix issues in code style
27540ba [Cheng Hao] Fix the TableScan Bug while partition serde differs
40a24a7 [Cheng Hao] Add Unit Test
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This fix obtains a comparable performance boost as [PR apache#1390](apache#1390) by moving an array update and deserializer initialization out of a potentially very long loop. Suggested by yhuai. The below results are updated for this fix.

## Benchmarks
Generated a local text file with 10M rows of simple key-value pairs. The data is loaded as a table through Hive. Results are obtained on my local machine using hive/console.

Without the fix:

Type | Non-partitioned | Partitioned (1 part)
------------ | ------------ | -------------
First run | 9.52s end-to-end (1.64s Spark job) | 36.6s (28.3s)
Stablized runs | 1.21s (1.18s) | 27.6s (27.5s)

With this fix:

Type | Non-partitioned | Partitioned (1 part)
------------ | ------------ | -------------
First run | 9.57s (1.46s) | 11.0s (1.69s)
Stablized runs | 1.13s (1.10s) | 1.23s (1.19s)

Author: Zongheng Yang <zongheng.y@gmail.com>

Closes apache#1408 from concretevitamin/slow-read-2 and squashes the following commits:

d86e437 [Zongheng Yang] Move update & initialization out of potentially long loop.
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
In HiveTableScan.scala, ObjectInspector was created for all of the partition based records, which probably causes ClassCastException if the object inspector is not identical among table & partitions.

This is the follow up with:
apache#1408
apache#1390

I've run a micro benchmark in my local with 15000000 records totally, and got the result as below:

With This Patch  |  Partition-Based Table  |  Non-Partition-Based Table
------------ | ------------- | -------------
No  |  1927 ms  |  1885 ms
Yes  | 1541 ms  |  1524 ms

It showed this patch will also improve the performance.

PS:  the benchmark code is also attached. (thanks liancheng )
```
package org.apache.spark.sql.hive

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._

object HiveTableScanPrepare extends App {
  case class Record(key: String, value: String)

  val sparkContext = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))

  val hiveContext = new LocalHiveContext(sparkContext)

  val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"$i", s"val_$i")))

  import hiveContext._

  hql("SHOW TABLES")
  hql("DROP TABLE if exists part_scan_test")
  hql("DROP TABLE if exists scan_test")
  hql("DROP TABLE if exists records")
  rdd.registerAsTable("records")

  hql("""CREATE TABLE part_scan_test (key STRING, value STRING) PARTITIONED BY (part1 string, part2 STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)
  hql("""CREATE TABLE scan_test (key STRING, value STRING)
                 | ROW FORMAT SERDE
                 | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)

  for (part1 <- 2000 until 2001) {
    for (part2 <- 1 to 5) {
      hql(s"""from records
                 | insert into table part_scan_test PARTITION (part1='$part1', part2='2010-01-$part2')
                 | select key, value
               """.stripMargin)
      hql(s"""from records
                 | insert into table scan_test select key, value
               """.stripMargin)
    }
  }
}

object HiveTableScanTest extends App {
  val sparkContext = new SparkContext(
    new SparkConf()
      .setMaster("local")
      .setAppName(getClass.getSimpleName.stripSuffix("$")))

  val hiveContext = new LocalHiveContext(sparkContext)

  import hiveContext._

  hql("SHOW TABLES")
  val part_scan_test = hql("select key, value from part_scan_test")
  val scan_test = hql("select key, value from scan_test")

  val r_part_scan_test = (0 to 5).map(i => benchmark(part_scan_test))
  val r_scan_test = (0 to 5).map(i => benchmark(scan_test))
  println("Scanning Partition-Based Table")
  r_part_scan_test.foreach(printResult)
  println("Scanning Non-Partition-Based Table")
  r_scan_test.foreach(printResult)

  def printResult(result: (Long, Long)) {
    println(s"Duration: ${result._1} ms Result: ${result._2}")
  }

  def benchmark(srdd: SchemaRDD) = {
    val begin = System.currentTimeMillis()
    val result = srdd.count()
    val end = System.currentTimeMillis()
    ((end - begin), result)
  }
}
```

Author: Cheng Hao <hao.cheng@intel.com>

Closes apache#1439 from chenghao-intel/hadoop_table_scan and squashes the following commits:

888968f [Cheng Hao] Fix issues in code style
27540ba [Cheng Hao] Fix the TableScan Bug while partition serde differs
40a24a7 [Cheng Hao] Add Unit Test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants