Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the configuration for bloom filters #2464

Closed
asfimport opened this issue Feb 26, 2020 · 7 comments
Closed

Refactor the configuration for bloom filters #2464

asfimport opened this issue Feb 26, 2020 · 7 comments

Comments

@asfimport
Copy link
Collaborator

asfimport commented Feb 26, 2020

Refactor the hadoop configuration for bloom filters according to PARQUET-1784.

Reporter: Gabor Szadovszky / @gszadovszky
Assignee: Gabor Szadovszky / @gszadovszky

Related issues:

PRs and other links:

Note: This issue was originally created as PARQUET-1805. Please see the migration documentation for further details.

@asfimport
Copy link
Collaborator Author

Yuming Wang / @wangyum:
It seems that the previous configuration is better, enabling bloom filter seriously affects the writing performance:

val numRows = 1024 * 1024 * 15
val df = spark.range(numRows).selectExpr(
  "id",
  "cast(id as string) as s",
  "cast(id as timestamp) as ts",
  "cast(cast(id as timestamp) as date) as td",
  "cast(id as decimal) as dec")
val benchmark = new org.apache.spark.benchmark.Benchmark(
  "Benchmark bloom filter write",
  numRows,
  minNumIters = 5)
Seq(false, true).foreach { pushDownEnabled =>
  val name = s"Write parquet ${if (pushDownEnabled) s"(bloom filter)" else ""}"
  benchmark.addCase(name) { _ =>
    withSQLConf(org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED -> s"$pushDownEnabled") {
      df.write.mode("overwrite").parquet("/tmp/spark/parquet")
    }
  }
}
benchmark.run()

Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
Benchmark bloom filter write:             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Write parquet                                      5531           6001         503          2.8         351.6       1.0X
Write parquet (bloom filter)                      10529          11633        1113          1.5         669.4       0.5X

@asfimport
Copy link
Collaborator Author

Gabor Szadovszky / @gszadovszky:
@wangyum, I think this performance issue is not related to this jira but the whole bloom filter feature (PARQUET-41). If you turn on the writing of the bloom filters for all the columns it will impact writing performance. (You may check the related configuration parameters at https://github.com/apache/parquet-mr/tree/master/parquet-hadoop for details.)

I am not an expert of this feature and maybe we can improve the writing performance but generating bloom filters will have performance impact. It is up to the user to decide if this impact worth for the potential benefit at read time. That's why it is highly suggested to specify which exact columns are the bloom filters required for and also to specify the other parameters for bloom filter.

@chenjunjiedada, any comments on this?

@asfimport
Copy link
Collaborator Author

Junjie Chen / @chenjunjiedada:
I think what @wangyum concern is we enable all columns' bloom filter when parquet.bloom.filter.enabled is set to true. That behaviour is a bit odd consider if we have a table with a heap of columns. We could change to use parquet.bloom.filter.enabled#column.path to enable the bloom filter for the specific column after setting parquet.bloom.filter.enabled.

@asfimport
Copy link
Collaborator Author

Gabor Szadovszky / @gszadovszky:
Oh, I got it, thanks @chenjunjiedada. I've felt it was more logical this way. The "major" configuration is for all columns and the "column specific" one is to configure otherwise. Since the "major" one is false by default you only need to enable the bloom filters for the columns one-by-one. You don't even need to set parquet.bloom.filter.enabled but the columns specific ones only. We've tried to describe this in the README.

@asfimport
Copy link
Collaborator Author

Yuming Wang / @wangyum:
Thank you @gszadovszky @chenjunjiedada  This is what I want:

set parquet.bloom.filter.enabled=false;
set parquet.bloom.filter.enabled#ts=true;
set parquet.bloom.filter.enabled#dec=true;

Benchmark and benchmark result:

val numRows = 1024 * 1024 * 15
val df = spark.range(numRows).selectExpr(
  "id",
  "cast(id as string) as s",
  "cast(id as timestamp) as ts",
  "cast(cast(id as timestamp) as date) as td",
  "cast(id as decimal) as dec")
val benchmark = new org.apache.spark.benchmark.Benchmark(
  "Benchmark bloom filter write",
  numRows,
  minNumIters = 5)

benchmark.addCase("default") { _ =>
  withSQLConf() {
    df.write.mode("overwrite").parquet("/tmp/spark/parquet")
  }
}

benchmark.addCase("Build bloom filter for ts column") { _ =>
  withSQLConf(
    org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED -> "false",
    org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#ts" -> "true") {
    df.write.mode("overwrite").parquet("/tmp/spark/parquet")
  }
}

benchmark.addCase("Build bloom filter for ts and dec column") { _ =>
  withSQLConf(
    org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED -> "false",
    org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#ts" -> "true",
    org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#dec" -> "true") {
    df.write.mode("overwrite").parquet("/tmp/spark/parquet")
  }
}

benchmark.addCase("Build bloom filter for all column") { _ =>
  withSQLConf(
    org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED -> "true") {
    df.write.mode("overwrite").parquet("/tmp/spark/parquet")
  }
}
benchmark.run()

Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
Benchmark bloom filter write:             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
default                                            5207           5314          72          3.0         331.1       1.0X
Build bloom filter for ts column                   5808           6065         245          2.7         369.2       0.9X
Build bloom filter for ts and dec column           6685           6776          79          2.4         425.0       0.8X
Build bloom filter for all column                  9077           9889         629          1.7         577.1       0.6X

cc @dongjoon-hyun

@asfimport
Copy link
Collaborator Author

Gabor Szadovszky / @gszadovszky:
@wangyum, sorry but I don't get what the problem is here. Could you please describe your problem in more details? (If you think it is really a bug then please create a separate jira?)

@asfimport
Copy link
Collaborator Author

Yuming Wang / @wangyum:
Thank you @gszadovszky   No issue for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant