From 25e92501614680bf681f641eeb5f714313f3838a Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Sat, 27 Feb 2016 13:38:35 +0900 Subject: [PATCH 1/6] Add a common utility code to map short names to fully-qualified codec names --- .../apache/spark/io/CompressionCodec.scala | 22 +++++---- .../scala/org/apache/spark/util/Utils.scala | 45 +++++++++++++++++++ .../apache/spark/sql/DataFrameWriter.scala | 30 ++++++------- .../datasources/CompressionCodecs.scala | 22 +++++---- .../datasources/parquet/ParquetRelation.scala | 23 +++++----- .../datasources/text/TextSuite.scala | 21 ++++----- 6 files changed, 106 insertions(+), 57 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index ae014becef755..97fdc232be8ff 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -25,7 +25,7 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.Utils +import org.apache.spark.util.{ShortCompressionCodecNameMapper, Utils} /** * :: DeveloperApi :: @@ -53,10 +53,14 @@ private[spark] object CompressionCodec { || codec.isInstanceOf[LZ4CompressionCodec]) } - private val shortCompressionCodecNames = Map( - "lz4" -> classOf[LZ4CompressionCodec].getName, - "lzf" -> classOf[LZFCompressionCodec].getName, - "snappy" -> classOf[SnappyCompressionCodec].getName) + /** Maps the short versions of compression codec names to fully-qualified class names. */ + private val shortCompressionCodecNameMapper = new ShortCompressionCodecNameMapper { + override def lz4: Option[String] = Some(classOf[LZ4CompressionCodec].getName) + override def lzf: Option[String] = Some(classOf[LZFCompressionCodec].getName) + override def snappy: Option[String] = Some(classOf[SnappyCompressionCodec].getName) + } + + private val shortCompressionCodecMap = shortCompressionCodecNameMapper.getAsMap def getCodecName(conf: SparkConf): String = { conf.get(configKey, DEFAULT_COMPRESSION_CODEC) @@ -67,7 +71,7 @@ private[spark] object CompressionCodec { } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { - val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName) + val codecClass = shortCompressionCodecNameMapper.get(codecName).getOrElse(codecName) val codec = try { val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf]) Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec]) @@ -84,10 +88,10 @@ private[spark] object CompressionCodec { * If it is already a short name, just return it. */ def getShortName(codecName: String): String = { - if (shortCompressionCodecNames.contains(codecName)) { + if (shortCompressionCodecMap.contains(codecName)) { codecName } else { - shortCompressionCodecNames + shortCompressionCodecMap .collectFirst { case (k, v) if v == codecName => k } .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") } } @@ -95,7 +99,7 @@ private[spark] object CompressionCodec { val FALLBACK_COMPRESSION_CODEC = "snappy" val DEFAULT_COMPRESSION_CODEC = "lz4" - val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq + val ALL_COMPRESSION_CODECS = shortCompressionCodecMap.values.toSeq } /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index e0c9bf02a1a20..5bc827f88b427 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -60,6 +60,51 @@ private[spark] object CallSite { val empty = CallSite("", "") } +/** An utility class to map short compression codec names to qualified ones. */ +private[spark] class ShortCompressionCodecNameMapper { + + def get(codecName: String): Option[String] = codecName.toLowerCase match { + case "none" => none + case "uncompressed" => uncompressed + case "bzip2" => bzip2 + case "deflate" => deflate + case "gzip" => gzip + case "lzo" => lzo + case "lz4" => lz4 + case "lzf" => lzf + case "snappy" => snappy + case _ => None + } + + def getAsMap: Map[String, String] = { + Seq( + ("none", none), + ("uncompressed", uncompressed), + ("bzip2", bzip2), + ("deflate", deflate), + ("gzip", gzip), + ("lzo", lzo), + ("lz4", lz4), + ("lzf", lzf), + ("snappy", snappy) + ).flatMap { case (shortCodecName, codecName) => + if (codecName.isDefined) Some(shortCodecName, codecName.get) else None + }.toMap + } + + // To support short codec names, derived classes need to override the methods below that return + // corresponding qualified codec names. + def none: Option[String] = None + def uncompressed: Option[String] = None + def bzip2: Option[String] = None + def deflate: Option[String] = None + def gzip: Option[String] = None + def lzo: Option[String] = None + def lz4: Option[String] = None + def lzf: Option[String] = None + def snappy: Option[String] = None +} + /** * Various utility methods used by Spark. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d6bdd3d825565..c5839fc3b6488 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.sql diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala index bc8ef4ad7e236..abfbd09366be8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala @@ -19,23 +19,28 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.SequenceFile.CompressionType -import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, Lz4Codec, SnappyCodec} +import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec} +import org.apache.spark.util.ShortCompressionCodecNameMapper import org.apache.spark.util.Utils private[datasources] object CompressionCodecs { - private val shortCompressionCodecNames = Map( - "bzip2" -> classOf[BZip2Codec].getName, - "gzip" -> classOf[GzipCodec].getName, - "lz4" -> classOf[Lz4Codec].getName, - "snappy" -> classOf[SnappyCodec].getName) + + /** Maps the short versions of compression codec names to fully-qualified class names. */ + private val hadoopShortCodecNameMapper = new ShortCompressionCodecNameMapper { + override def bzip2: Option[String] = Some(classOf[BZip2Codec].getCanonicalName) + override def deflate: Option[String] = Some(classOf[DeflateCodec].getCanonicalName) + override def gzip: Option[String] = Some(classOf[GzipCodec].getCanonicalName) + override def lz4: Option[String] = Some(classOf[Lz4Codec].getCanonicalName) + override def snappy: Option[String] = Some(classOf[SnappyCodec].getCanonicalName) + } /** * Return the full version of the given codec class. * If it is already a class name, just return it. */ def getCodecClassName(name: String): String = { - val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name) + val codecName = hadoopShortCodecNameMapper.get(name).getOrElse(name) try { // Validate the codec name Utils.classForName(codecName) @@ -43,7 +48,8 @@ private[datasources] object CompressionCodecs { } catch { case e: ClassNotFoundException => throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.") + s"is not available. Known codecs are " + + s"${hadoopShortCodecNameMapper.getAsMap.keys.mkString(", ")}.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 184cbb2f296b0..d0a52781bfb27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources.{PartitionSpec, _} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.{SerializableConfiguration, ShortCompressionCodecNameMapper, Utils} private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister { @@ -284,10 +284,8 @@ private[sql] class ParquetRelation( conf.set( ParquetOutputFormat.COMPRESSION, ParquetRelation - .shortParquetCompressionCodecNames - .getOrElse( - sqlContext.conf.parquetCompressionCodec.toUpperCase, - CompressionCodecName.UNCOMPRESSED).name()) + .parquetShortCodecNameMapper.get(sqlContext.conf.parquetCompressionCodec) + .getOrElse(CompressionCodecName.UNCOMPRESSED.name())) new BucketedOutputWriterFactory { override def newInstance( @@ -903,11 +901,12 @@ private[sql] object ParquetRelation extends Logging { } } - // The parquet compression short names - val shortParquetCompressionCodecNames = Map( - "NONE" -> CompressionCodecName.UNCOMPRESSED, - "UNCOMPRESSED" -> CompressionCodecName.UNCOMPRESSED, - "SNAPPY" -> CompressionCodecName.SNAPPY, - "GZIP" -> CompressionCodecName.GZIP, - "LZO" -> CompressionCodecName.LZO) + /** Maps the short versions of compression codec names to qualified compression names. */ + val parquetShortCodecNameMapper = new ShortCompressionCodecNameMapper { + override def none: Option[String] = Some(CompressionCodecName.UNCOMPRESSED.name()) + override def uncompressed: Option[String] = Some(CompressionCodecName.UNCOMPRESSED.name()) + override def gzip: Option[String] = Some(CompressionCodecName.GZIP.name()) + override def lzo: Option[String] = Some(CompressionCodecName.LZO.name()) + override def snappy: Option[String] = Some(CompressionCodecName.SNAPPY.name()) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 6ae42a30fb00c..0337ead894525 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.text -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.Utils @@ -58,18 +58,13 @@ class TextSuite extends QueryTest with SharedSQLContext { } test("SPARK-13503 Support to specify the option for compression codec for TEXT") { - val df = sqlContext.read.text(testFile).withColumnRenamed("value", "adwrasdf") - - val tempFile = Utils.createTempDir() - tempFile.delete() - df.write - .option("compression", "gZiP") - .text(tempFile.getCanonicalPath) - val compressedFiles = tempFile.listFiles() - assert(compressedFiles.exists(_.getName.endsWith(".gz"))) - verifyFrame(sqlContext.read.text(tempFile.getCanonicalPath)) - - Utils.deleteRecursively(tempFile) + Seq("bzip2", "deflate", "gzip").map { codecName => + val tempDir = Utils.createTempDir() + val tempDirPath = tempDir.getAbsolutePath() + val df = sqlContext.read.text(testFile) + df.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath) + verifyFrame(sqlContext.read.text(tempDirPath)) + } } private def testFile: String = { From 2d7737b4afb82835173e463a7b8063eee64d644f Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Sat, 27 Feb 2016 14:26:09 +0900 Subject: [PATCH 2/6] Fix style errors --- .../spark/sql/execution/datasources/CompressionCodecs.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala index abfbd09366be8..6fdebb5e3b2f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala @@ -20,9 +20,8 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec} -import org.apache.spark.util.ShortCompressionCodecNameMapper -import org.apache.spark.util.Utils +import org.apache.spark.util.{ShortCompressionCodecNameMapper, Utils} private[datasources] object CompressionCodecs { From da879a6ab9b58f10fc5c2e2313bde3faccbe9bfa Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Sat, 27 Feb 2016 23:52:25 +0900 Subject: [PATCH 3/6] Add a test for an illegal codec given in an option --- .../sql/execution/datasources/text/TextSuite.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 0337ead894525..3277dca9d5c54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -58,13 +58,21 @@ class TextSuite extends QueryTest with SharedSQLContext { } test("SPARK-13503 Support to specify the option for compression codec for TEXT") { + val testDf = sqlContext.read.text(testFile) + Seq("bzip2", "deflate", "gzip").map { codecName => val tempDir = Utils.createTempDir() val tempDirPath = tempDir.getAbsolutePath() - val df = sqlContext.read.text(testFile) - df.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath) + testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath) verifyFrame(sqlContext.read.text(tempDirPath)) } + + val errMsg = intercept[IllegalArgumentException] { + val tempDirPath = Utils.createTempDir().getAbsolutePath() + testDf.write.option("compression", "illegal").mode(SaveMode.Overwrite).text(tempDirPath) + } + assert(errMsg.getMessage === "Codec [illegal] is not available. " + + "Known codecs are bzip2, deflate, lz4, gzip, snappy.") } private def testFile: String = { From c3f0140292561f6ccb3a66f366c0a589167bd0b6 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 1 Mar 2016 17:27:18 +0900 Subject: [PATCH 4/6] Revert codes --- .../apache/spark/io/CompressionCodec.scala | 22 ++++----- .../scala/org/apache/spark/util/Utils.scala | 45 ------------------- .../datasources/CompressionCodecs.scala | 22 ++++----- .../datasources/parquet/ParquetRelation.scala | 23 +++++----- 4 files changed, 30 insertions(+), 82 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 97fdc232be8ff..ae014becef755 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -25,7 +25,7 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.{ShortCompressionCodecNameMapper, Utils} +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -53,14 +53,10 @@ private[spark] object CompressionCodec { || codec.isInstanceOf[LZ4CompressionCodec]) } - /** Maps the short versions of compression codec names to fully-qualified class names. */ - private val shortCompressionCodecNameMapper = new ShortCompressionCodecNameMapper { - override def lz4: Option[String] = Some(classOf[LZ4CompressionCodec].getName) - override def lzf: Option[String] = Some(classOf[LZFCompressionCodec].getName) - override def snappy: Option[String] = Some(classOf[SnappyCompressionCodec].getName) - } - - private val shortCompressionCodecMap = shortCompressionCodecNameMapper.getAsMap + private val shortCompressionCodecNames = Map( + "lz4" -> classOf[LZ4CompressionCodec].getName, + "lzf" -> classOf[LZFCompressionCodec].getName, + "snappy" -> classOf[SnappyCompressionCodec].getName) def getCodecName(conf: SparkConf): String = { conf.get(configKey, DEFAULT_COMPRESSION_CODEC) @@ -71,7 +67,7 @@ private[spark] object CompressionCodec { } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { - val codecClass = shortCompressionCodecNameMapper.get(codecName).getOrElse(codecName) + val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName) val codec = try { val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf]) Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec]) @@ -88,10 +84,10 @@ private[spark] object CompressionCodec { * If it is already a short name, just return it. */ def getShortName(codecName: String): String = { - if (shortCompressionCodecMap.contains(codecName)) { + if (shortCompressionCodecNames.contains(codecName)) { codecName } else { - shortCompressionCodecMap + shortCompressionCodecNames .collectFirst { case (k, v) if v == codecName => k } .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") } } @@ -99,7 +95,7 @@ private[spark] object CompressionCodec { val FALLBACK_COMPRESSION_CODEC = "snappy" val DEFAULT_COMPRESSION_CODEC = "lz4" - val ALL_COMPRESSION_CODECS = shortCompressionCodecMap.values.toSeq + val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq } /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5bc827f88b427..e0c9bf02a1a20 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -60,51 +60,6 @@ private[spark] object CallSite { val empty = CallSite("", "") } -/** An utility class to map short compression codec names to qualified ones. */ -private[spark] class ShortCompressionCodecNameMapper { - - def get(codecName: String): Option[String] = codecName.toLowerCase match { - case "none" => none - case "uncompressed" => uncompressed - case "bzip2" => bzip2 - case "deflate" => deflate - case "gzip" => gzip - case "lzo" => lzo - case "lz4" => lz4 - case "lzf" => lzf - case "snappy" => snappy - case _ => None - } - - def getAsMap: Map[String, String] = { - Seq( - ("none", none), - ("uncompressed", uncompressed), - ("bzip2", bzip2), - ("deflate", deflate), - ("gzip", gzip), - ("lzo", lzo), - ("lz4", lz4), - ("lzf", lzf), - ("snappy", snappy) - ).flatMap { case (shortCodecName, codecName) => - if (codecName.isDefined) Some(shortCodecName, codecName.get) else None - }.toMap - } - - // To support short codec names, derived classes need to override the methods below that return - // corresponding qualified codec names. - def none: Option[String] = None - def uncompressed: Option[String] = None - def bzip2: Option[String] = None - def deflate: Option[String] = None - def gzip: Option[String] = None - def lzo: Option[String] = None - def lz4: Option[String] = None - def lzf: Option[String] = None - def snappy: Option[String] = None -} - /** * Various utility methods used by Spark. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala index 6fdebb5e3b2f6..9e913de48f72c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala @@ -21,25 +21,22 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec} -import org.apache.spark.util.{ShortCompressionCodecNameMapper, Utils} +import org.apache.spark.util.Utils private[datasources] object CompressionCodecs { - - /** Maps the short versions of compression codec names to fully-qualified class names. */ - private val hadoopShortCodecNameMapper = new ShortCompressionCodecNameMapper { - override def bzip2: Option[String] = Some(classOf[BZip2Codec].getCanonicalName) - override def deflate: Option[String] = Some(classOf[DeflateCodec].getCanonicalName) - override def gzip: Option[String] = Some(classOf[GzipCodec].getCanonicalName) - override def lz4: Option[String] = Some(classOf[Lz4Codec].getCanonicalName) - override def snappy: Option[String] = Some(classOf[SnappyCodec].getCanonicalName) - } + private val shortCompressionCodecNames = Map( + "bzip2" -> classOf[BZip2Codec].getName, + "deflate" -> classOf[DeflateCodec].getName, + "gzip" -> classOf[GzipCodec].getName, + "lz4" -> classOf[Lz4Codec].getName, + "snappy" -> classOf[SnappyCodec].getName) /** * Return the full version of the given codec class. * If it is already a class name, just return it. */ def getCodecClassName(name: String): String = { - val codecName = hadoopShortCodecNameMapper.get(name).getOrElse(name) + val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name) try { // Validate the codec name Utils.classForName(codecName) @@ -47,8 +44,7 @@ private[datasources] object CompressionCodecs { } catch { case e: ClassNotFoundException => throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Known codecs are " + - s"${hadoopShortCodecNameMapper.getAsMap.keys.mkString(", ")}.") + s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index d0a52781bfb27..c9f726ad1cf4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources.{PartitionSpec, _} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.util.{SerializableConfiguration, ShortCompressionCodecNameMapper, Utils} +import org.apache.spark.util.{SerializableConfiguration, Utils} private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister { @@ -284,8 +284,10 @@ private[sql] class ParquetRelation( conf.set( ParquetOutputFormat.COMPRESSION, ParquetRelation - .parquetShortCodecNameMapper.get(sqlContext.conf.parquetCompressionCodec) - .getOrElse(CompressionCodecName.UNCOMPRESSED.name())) + .shortParquetCompressionCodecNames + .getOrElse( + sqlContext.conf.parquetCompressionCodec.toUpperCase, + CompressionCodecName.UNCOMPRESSED).name()) new BucketedOutputWriterFactory { override def newInstance( @@ -901,12 +903,11 @@ private[sql] object ParquetRelation extends Logging { } } - /** Maps the short versions of compression codec names to qualified compression names. */ - val parquetShortCodecNameMapper = new ShortCompressionCodecNameMapper { - override def none: Option[String] = Some(CompressionCodecName.UNCOMPRESSED.name()) - override def uncompressed: Option[String] = Some(CompressionCodecName.UNCOMPRESSED.name()) - override def gzip: Option[String] = Some(CompressionCodecName.GZIP.name()) - override def lzo: Option[String] = Some(CompressionCodecName.LZO.name()) - override def snappy: Option[String] = Some(CompressionCodecName.SNAPPY.name()) - } + // The parquet compression short names + val shortParquetCompressionCodecNames = Map( + "none" -> CompressionCodecName.UNCOMPRESSED, + "uncompressed" -> CompressionCodecName.UNCOMPRESSED, + "snappy" -> CompressionCodecName.SNAPPY, + "gzip" -> CompressionCodecName.GZIP, + "lzo" -> CompressionCodecName.LZO) } From ac6b82c36f4cc5a40b23ec1cad81249fde32c3cf Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 2 Mar 2016 10:17:17 +0900 Subject: [PATCH 5/6] Use foreach not map --- .../apache/spark/sql/execution/datasources/text/TextSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 3277dca9d5c54..59e0e6a7cfa00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -60,7 +60,7 @@ class TextSuite extends QueryTest with SharedSQLContext { test("SPARK-13503 Support to specify the option for compression codec for TEXT") { val testDf = sqlContext.read.text(testFile) - Seq("bzip2", "deflate", "gzip").map { codecName => + Seq("bzip2", "deflate", "gzip").foreach { codecName => val tempDir = Utils.createTempDir() val tempDirPath = tempDir.getAbsolutePath() testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath) From 1be5cc1a92a35bfa4274fd5195fd78d3bac1400c Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 2 Mar 2016 14:59:31 +0900 Subject: [PATCH 6/6] Fix bugs --- .../sql/execution/datasources/parquet/ParquetRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index c9f726ad1cf4a..a1806221b7148 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -286,7 +286,7 @@ private[sql] class ParquetRelation( ParquetRelation .shortParquetCompressionCodecNames .getOrElse( - sqlContext.conf.parquetCompressionCodec.toUpperCase, + sqlContext.conf.parquetCompressionCodec.toLowerCase(), CompressionCodecName.UNCOMPRESSED).name()) new BucketedOutputWriterFactory {