From 2d55a61c7b5e59f2442fa20d8dc5bec8eceda650 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 6 Aug 2016 11:05:25 +0900 Subject: [PATCH 1/7] [SPARK-16610][SQL] Do not ignore `orc.compress` when `compression` option is unset --- .../spark/sql/hive/orc/OrcFileFormat.scala | 3 +- .../spark/sql/hive/orc/OrcOptions.scala | 28 +++++++++++++------ .../spark/sql/hive/orc/OrcQuerySuite.scala | 23 +++++++++++++++ 3 files changed, 45 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 1d3c4663c3399..98ff08f9e1836 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -67,10 +67,11 @@ private[sql] class OrcFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val orcOptions = new OrcOptions(options) val configuration = job.getConfiguration + val orcOptions = new OrcOptions(options, configuration) + configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec) configuration match { case conf: JobConf => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala index 91cf0dc960d58..797e37e910407 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -17,27 +17,39 @@ package org.apache.spark.sql.hive.orc +import org.apache.hadoop.conf.Configuration + /** * Options for the ORC data source. */ private[orc] class OrcOptions( - @transient private val parameters: Map[String, String]) + @transient private val parameters: Map[String, String], + @transient private val conf: Configuration) extends Serializable { import OrcOptions._ /** - * Compression codec to use. By default snappy compression. + * Compression codec to use. By default use the value specified in Hadoop configuration. * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ val compressionCodec: String = { - val codecName = parameters.getOrElse("compression", "snappy").toLowerCase - if (!shortOrcCompressionCodecNames.contains(codecName)) { - val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase) - throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") + val default = conf.get(OrcRelation.ORC_COMPRESSION, "NONE") + + // Because the ORC configuration value in `default` is not guaranteed to be the same + // with keys in `shortOrcCompressionCodecNames` in Spark, this value should not be + // used as the key for `shortOrcCompressionCodecNames` but just a return value. + parameters.get("compression") match { + case None => default + case Some(name) => + val lowerCaseName = name.toLowerCase + if (!shortOrcCompressionCodecNames.contains(lowerCaseName)) { + val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase) + throw new IllegalArgumentException(s"Codec [$lowerCaseName] " + + s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") + } + shortOrcCompressionCodecNames(lowerCaseName) } - shortOrcCompressionCodecNames(codecName) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 49e963ee1294d..91d6b247baea2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -161,6 +161,29 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } + test("SPARK-16610: Respect orc.compress configuration when compression is unset") { + // Respect `orc.compress`. + withTempPath { file => + spark.range(0, 10).write + .option("orc.compress", "ZLIB") + .orc(file.getCanonicalPath) + val expectedCompressionKind = + OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression + assert("ZLIB" === expectedCompressionKind.name()) + } + + // `compression` overrides `orc.compress`. + withTempPath { file => + spark.range(0, 10).write + .option("compression", "ZLIB") + .option("orc.compress", "SNAPPY") + .orc(file.getCanonicalPath) + val expectedCompressionKind = + OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression + assert("ZLIB" === expectedCompressionKind.name()) + } + } + // Hive supports zlib, snappy and none for Hive 1.2.1. test("Compression options for writing to an ORC file (SNAPPY, ZLIB and NONE)") { withTempPath { file => From 4f2731370621e1fd9b25105a8f0184c98a7465f7 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 6 Aug 2016 11:08:28 +0900 Subject: [PATCH 2/7] Use SNAPPY as default --- .../main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala index 797e37e910407..fa9866c8250a4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -34,7 +34,7 @@ private[orc] class OrcOptions( * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ val compressionCodec: String = { - val default = conf.get(OrcRelation.ORC_COMPRESSION, "NONE") + val default = conf.get(OrcRelation.ORC_COMPRESSION, "SNAPPY") // Because the ORC configuration value in `default` is not guaranteed to be the same // with keys in `shortOrcCompressionCodecNames` in Spark, this value should not be From 1ad44eca2d796202c894c262efad666249a7b942 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 6 Aug 2016 11:09:59 +0900 Subject: [PATCH 3/7] Fix indentation --- .../main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 98ff08f9e1836..376b045e537b4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -67,7 +67,6 @@ private[sql] class OrcFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val configuration = job.getConfiguration val orcOptions = new OrcOptions(options, configuration) From af1a3b837a3d384ba2387e2db0b5ae975870b21a Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 6 Aug 2016 11:11:38 +0900 Subject: [PATCH 4/7] Add a comment for default value --- .../main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala index fa9866c8250a4..ddea379f386d5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -31,6 +31,7 @@ private[orc] class OrcOptions( /** * Compression codec to use. By default use the value specified in Hadoop configuration. + * If `orc.compress` is unset, then we use snappy. * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ val compressionCodec: String = { From 3f70f25546ee359d65b45cb7b0a9306303b7f523 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 8 Aug 2016 09:44:42 +0900 Subject: [PATCH 5/7] Revert the change to look up Hadoop configuration but just add an alias --- .../spark/sql/hive/orc/OrcFileFormat.scala | 4 +-- .../spark/sql/hive/orc/OrcOptions.scala | 31 ++++++------------- 2 files changed, 11 insertions(+), 24 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 376b045e537b4..1d3c4663c3399 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -67,9 +67,9 @@ private[sql] class OrcFileFormat job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val configuration = job.getConfiguration + val orcOptions = new OrcOptions(options) - val orcOptions = new OrcOptions(options, configuration) + val configuration = job.getConfiguration configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec) configuration match { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala index ddea379f386d5..2b654fb6349fe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -17,40 +17,27 @@ package org.apache.spark.sql.hive.orc -import org.apache.hadoop.conf.Configuration - /** * Options for the ORC data source. */ -private[orc] class OrcOptions( - @transient private val parameters: Map[String, String], - @transient private val conf: Configuration) +private[orc] class OrcOptions(@transient private val parameters: Map[String, String]) extends Serializable { import OrcOptions._ /** - * Compression codec to use. By default use the value specified in Hadoop configuration. - * If `orc.compress` is unset, then we use snappy. + * Compression codec to use. By default snappy compression. * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ val compressionCodec: String = { - val default = conf.get(OrcRelation.ORC_COMPRESSION, "SNAPPY") - - // Because the ORC configuration value in `default` is not guaranteed to be the same - // with keys in `shortOrcCompressionCodecNames` in Spark, this value should not be - // used as the key for `shortOrcCompressionCodecNames` but just a return value. - parameters.get("compression") match { - case None => default - case Some(name) => - val lowerCaseName = name.toLowerCase - if (!shortOrcCompressionCodecNames.contains(lowerCaseName)) { - val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase) - throw new IllegalArgumentException(s"Codec [$lowerCaseName] " + - s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") - } - shortOrcCompressionCodecNames(lowerCaseName) + val codecName = parameters.getOrElse( + "compression", parameters.getOrElse("orc.compress", "snappy")).toLowerCase + if (!shortOrcCompressionCodecNames.contains(codecName)) { + val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase) + throw new IllegalArgumentException(s"Codec [$codecName] " + + s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") } + shortOrcCompressionCodecNames(codecName) } } From be04706d76a6525ef3b0f1da7d52d84d612fdebd Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 8 Aug 2016 09:49:50 +0900 Subject: [PATCH 6/7] Update the names of test cases --- .../scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 91d6b247baea2..b13878d578603 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -161,7 +161,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } - test("SPARK-16610: Respect orc.compress configuration when compression is unset") { + test("SPARK-16610: Respect orc.compress option when compression is unset") { // Respect `orc.compress`. withTempPath { file => spark.range(0, 10).write From e4d6999ca610ca7f3d0994c312c8821d5433aa83 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 9 Aug 2016 09:40:35 +0900 Subject: [PATCH 7/7] Clean codes and add some more comments --- .../org/apache/spark/sql/hive/orc/OrcOptions.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala index 2b654fb6349fe..c2a126d3bf9c0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -30,8 +30,14 @@ private[orc] class OrcOptions(@transient private val parameters: Map[String, Str * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. */ val compressionCodec: String = { - val codecName = parameters.getOrElse( - "compression", parameters.getOrElse("orc.compress", "snappy")).toLowerCase + // `orc.compress` is a ORC configuration. So, here we respect this as an option but + // `compression` has higher precedence than `orc.compress`. It means if both are set, + // we will use `compression`. + val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION) + val codecName = parameters + .get("compression") + .orElse(orcCompressionConf) + .getOrElse("snappy").toLowerCase if (!shortOrcCompressionCodecNames.contains(codecName)) { val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase) throw new IllegalArgumentException(s"Codec [$codecName] " +