[SPARK-21839][SQL] Support SQL config for ORC compression#19055
[SPARK-21839][SQL] Support SQL config for ORC compression#19055dongjoon-hyun wants to merge 10 commits intoapache:masterfrom dongjoon-hyun:SPARK-21839
Conversation
|
Test build #81139 has finished for PR 19055 at commit
|
|
Hi, @cloud-fan and @gatorsmile . |
|
Test build #81140 has finished for PR 19055 at commit
|
|
FYI, this PR can be reviewed before the other ORC PRs. |
|
Hi, @gatorsmile . |
|
|
||
| /** | ||
| * Compression codec to use. By default snappy compression. | ||
| * Compression codec to use. By default use the value specified in SQLConf. |
There was a problem hiding this comment.
This is confusing. You can remove By default use the value specified in SQLConf.
There was a problem hiding this comment.
Sure, I will. Historically, I brought this from ParquetOptions.
| val compressionCodecClassName: String = { | ||
| // `orc.compress` is a ORC configuration. So, here we respect this as an option but | ||
| // `compression` has higher precedence than `orc.compress`. It means if both are set, | ||
| // we will use `compression`. |
There was a problem hiding this comment.
Instead, update this paragraph to explain the priority.
| "uncompressed, snappy, zlib.") | ||
| .stringConf | ||
| .transform(_.toLowerCase(Locale.ROOT)) | ||
| .checkValues(Set("uncompressed", "snappy", "zlib")) |
There was a problem hiding this comment.
Why this is inconsistent with shortOrcCompressionCodecNames?
// The ORC compression short names
private val shortOrcCompressionCodecNames = Map(
"none" -> "NONE",
"uncompressed" -> "NONE",
"snappy" -> "SNAPPY",
"zlib" -> "ZLIB",
"lzo" -> "LZO")
}
There was a problem hiding this comment.
lzo will be added later. For none, I thought the usage in SQLConf is discouraged intentioanlly like in Parquet. I wanted to show the same behavior with Parquet.
There was a problem hiding this comment.
I added "none" here. Thanks!
|
I am not familiar with ORC. Above is just a quick look about the changes made in this PR. |
| val compressionCodecClassName: String = { | ||
| // `orc.compress` is a ORC configuration. So, here we respect this as an option but | ||
| // `compression` has higher precedence than `orc.compress`. It means if both are set, | ||
| // we will use `compression`. |
There was a problem hiding this comment.
I guess we should update here too:
and I think we should prioritise
compressionspark.sql.orc.compression.codecorc.compress
to be consistent.
There was a problem hiding this comment.
Ur, there is a technical issue.
spark.sql.orc.compression.codec has a default value snappy. So, orc.comress cannot be used in that order.
| "uncompressed, snappy, zlib.") | ||
| .stringConf | ||
| .transform(_.toLowerCase(Locale.ROOT)) | ||
| .checkValues(Set("uncompressed", "snappy", "zlib")) |
There was a problem hiding this comment.
Hm, but lzo still seeks org.apache.hadoop.hive.ql.io.orc.LzoCodec though. Wouldn't it be better to leave as what it is intended for now if I understood correctly? I think it should be almost seldom but disallowing lzo actually looks disallowing the chance of a custom use allowed via .option() API.
I think none was introduced as a compression key for .option() first for Parquet and it was matched for consistency. I think we have kept none for .option() API specific somehow.
There was a problem hiding this comment.
Yep. "none" is added back. "lzo" is not supported. It failed in the current master branch. We have a ignored test case for that LZO failure.
| * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. | ||
| */ | ||
| val compressionCodec: String = { | ||
| val compressionCodecClassName: String = { |
There was a problem hiding this comment.
I guess you matched this to ParquetOptions but actually I think it's ParquetOptions to be changed. I found this minor nit but after it got merged - #15580 (comment)
There was a problem hiding this comment.
Oh, may I change Parquet at this time?
There was a problem hiding this comment.
Anyway, I'll revert this for the other data soruces.
|
Thank you for review, @gatorsmile and @HyukjinKwon . I'll update the PR. |
|
Test build #81178 has finished for PR 19055 at commit
|
|
Thank you again, @gatorsmile and @HyukjinKwon . I updated the PR. |
| "uncompressed, snappy, zlib.") | ||
| .stringConf | ||
| .transform(_.toLowerCase(Locale.ROOT)) | ||
| .checkValues(Set("none", "uncompressed", "snappy", "zlib")) |
There was a problem hiding this comment.
@dongjoon-hyun, I think my only main concern is inconsistency between compression option and this config. If lzo is an unknown key and it directly throws an exception (or even this was not there in the first place), I would have been okay but it looks attempting to find org.apache.hadoop.hive.ql.io.orc.LzoCodec:
java.lang.IllegalArgumentException: LZO is not available.
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.createCodec(WriterImpl.java:331)
at org.apache.hadoop.hive.ql.io.orc.WriterImpl.<init>(WriterImpl.java:201)
at org.apache.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:464)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:74)
at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:55)
...
...
case LZO:
try {
Class<? extends CompressionCodec> lzo =
(Class<? extends CompressionCodec>)
JavaUtils.loadClass("org.apache.hadoop.hive.ql.io.orc.LzoCodec");
return lzo.newInstance();
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("LZO is not available.", e);
} catch (InstantiationException e) {
throw new IllegalArgumentException("Problem initializing LZO", e);
} catch (IllegalAccessException e) {
throw new IllegalArgumentException("Insufficient access to LZO", e);
}
...This appears that if we provide org.apache.hadoop.hive.ql.io.orc.LzoCodec anyhow in the classpath (as an extream case, the one implemented by an user), it should work, if I read this correctly. This should be almost seldom though.
Does your set of ORC related PRs eventually support lzo correctly?
There was a problem hiding this comment.
I see. You mean user-provided hive or libraries. It makes sense. I will add lzo too here. Thank you for pointing out that.
|
Test build #81199 has finished for PR 19055 at commit
|
|
Retest this please. |
|
|
||
| test("SPARK-21839: Add SQL config for ORC compression") { | ||
| val conf = sqlContext.sessionState.conf | ||
| assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY") |
There was a problem hiding this comment.
Add a comment here to explain the test scenario.
// to test the default of spark.sql.orc.compression.codec is snappy
| new OrcOptions(Map("orc.compress" -> "zlib"), conf).compressionCodec == "ZLIB") | ||
| } | ||
|
|
||
| Seq("NONE", "SNAPPY", "ZLIB", "LZO").foreach { c => |
There was a problem hiding this comment.
// to test all the valid options of spark.sql.orc.compression.codec
"none", "uncompressed", "snappy", "zlib", "lzo".
You missed one of it.
There was a problem hiding this comment.
Ur, It's intentional. It's tested in the above. "UNCOMPRESSED" is replaced into "NONE"
So, I omitted here.
There was a problem hiding this comment.
To add that, we need if condition. I'll add that.
| withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") { | ||
| assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE") | ||
| assert( | ||
| new OrcOptions(Map("orc.compress" -> "zlib"), conf).compressionCodec == "ZLIB") |
There was a problem hiding this comment.
Also please add another scenario when users specify compression
|
Thank you for review again, @gatorsmile . I fixed them. |
| // Test all the valid options of spark.sql.orc.compression.codec | ||
| Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c => | ||
| withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) { | ||
| if (c == "UNCOMPRESSED") { |
There was a problem hiding this comment.
val expected = if ... else c
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
There was a problem hiding this comment.
Yep! It's much better.
|
Test build #81218 has finished for PR 19055 at commit
|
|
Test build #81219 has finished for PR 19055 at commit
|
|
Test build #81220 has finished for PR 19055 at commit
|
HyukjinKwon
left a comment
There was a problem hiding this comment.
LGTM except for minor comments. @gatorsmile, do you maybe have more comments?
| package org.apache.spark.sql.hive.orc | ||
|
|
||
| import java.io.File | ||
| import java.util.Locale |
| .get("compression") | ||
| .orElse(orcCompressionConf) | ||
| .getOrElse("snappy").toLowerCase(Locale.ROOT) | ||
| .getOrElse(sqlConf.orcCompressionCodec) |
There was a problem hiding this comment.
Could we update the default values for consistency with Parquet one? :
spark/python/pyspark/sql/readwriter.py
Line 855 in 51620e2
There was a problem hiding this comment.
The default value is snappy, isn't it?
.createWithDefault("snappy")
There was a problem hiding this comment.
I was thinking like:
spark/python/pyspark/sql/readwriter.py
Lines 751 to 753 in 51620e2
Wouldn't we use the value set in spark.sql.parquet.compression.codec by default if compression is unset via option API?
There was a problem hiding this comment.
Actually, I thought the purpose of this configuration is rather for setting the default compression codec for ORC datasource ..
There was a problem hiding this comment.
Yes. This is the priority. If compression and orc.compression is unset via option, we use SQLConf.
compression -> orc.compression -> spark.sql.orc.compression.codec
There was a problem hiding this comment.
The main purpose of this PR is to support users to control ORC compression by using SQLConf, too.
There was a problem hiding this comment.
The default codec is unchanged and the priority is the same. Also, all previous user-given options are respected.
There was a problem hiding this comment.
Ah, it looks I had to be clear. I meant fixing the comment for default value from snappy to spark.sql.orc.compression.codec.
There was a problem hiding this comment.
Thank you! I'll fix that.
|
Thank you very much for review, @HyukjinKwon ! 👍 |
|
According to your review comments, I updated the comment , too. |
|
Test build #81244 has finished for PR 19055 at commit
|
|
Test build #81246 has finished for PR 19055 at commit
|
|
Test build #81247 has finished for PR 19055 at commit
|
| // `orc.compress` is a ORC configuration. So, here we respect this as an option but | ||
| // `compression` has higher precedence than `orc.compress`. It means if both are set, | ||
| // we will use `compression`. | ||
| // `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` is used in order. |
There was a problem hiding this comment.
is used in order -> are in order of precedence from highest to lowest
|
LGTM except a minor comment. |
|
Thank you, @gatorsmile . I fixed it, too. |
|
Test build #81265 has finished for PR 19055 at commit
|
|
Merged to master. |
|
Thank you for merging, @HyukjinKwon ! |
|
Also, Thank you again, @gatorsmile ! |
What changes were proposed in this pull request?
This PR aims to support
spark.sql.orc.compression.codeclike Parquet'sspark.sql.parquet.compression.codec. Users can use SQLConf to control ORC compression, too.How was this patch tested?
Pass the Jenkins with new and updated test cases.