diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 97fdc232be8ff..ae014becef755 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -25,7 +25,7 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.{ShortCompressionCodecNameMapper, Utils} +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -53,14 +53,10 @@ private[spark] object CompressionCodec { || codec.isInstanceOf[LZ4CompressionCodec]) } - /** Maps the short versions of compression codec names to fully-qualified class names. */ - private val shortCompressionCodecNameMapper = new ShortCompressionCodecNameMapper { - override def lz4: Option[String] = Some(classOf[LZ4CompressionCodec].getName) - override def lzf: Option[String] = Some(classOf[LZFCompressionCodec].getName) - override def snappy: Option[String] = Some(classOf[SnappyCompressionCodec].getName) - } - - private val shortCompressionCodecMap = shortCompressionCodecNameMapper.getAsMap + private val shortCompressionCodecNames = Map( + "lz4" -> classOf[LZ4CompressionCodec].getName, + "lzf" -> classOf[LZFCompressionCodec].getName, + "snappy" -> classOf[SnappyCompressionCodec].getName) def getCodecName(conf: SparkConf): String = { conf.get(configKey, DEFAULT_COMPRESSION_CODEC) @@ -71,7 +67,7 @@ private[spark] object CompressionCodec { } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { - val codecClass = shortCompressionCodecNameMapper.get(codecName).getOrElse(codecName) + val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName) val codec = try { val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf]) Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec]) @@ -88,10 +84,10 @@ private[spark] object CompressionCodec { * If it is already a short name, just return it. */ def getShortName(codecName: String): String = { - if (shortCompressionCodecMap.contains(codecName)) { + if (shortCompressionCodecNames.contains(codecName)) { codecName } else { - shortCompressionCodecMap + shortCompressionCodecNames .collectFirst { case (k, v) if v == codecName => k } .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") } } @@ -99,7 +95,7 @@ private[spark] object CompressionCodec { val FALLBACK_COMPRESSION_CODEC = "snappy" val DEFAULT_COMPRESSION_CODEC = "lz4" - val ALL_COMPRESSION_CODECS = shortCompressionCodecMap.values.toSeq + val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq } /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5bc827f88b427..e0c9bf02a1a20 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -60,51 +60,6 @@ private[spark] object CallSite { val empty = CallSite("", "") } -/** An utility class to map short compression codec names to qualified ones. */ -private[spark] class ShortCompressionCodecNameMapper { - - def get(codecName: String): Option[String] = codecName.toLowerCase match { - case "none" => none - case "uncompressed" => uncompressed - case "bzip2" => bzip2 - case "deflate" => deflate - case "gzip" => gzip - case "lzo" => lzo - case "lz4" => lz4 - case "lzf" => lzf - case "snappy" => snappy - case _ => None - } - - def getAsMap: Map[String, String] = { - Seq( - ("none", none), - ("uncompressed", uncompressed), - ("bzip2", bzip2), - ("deflate", deflate), - ("gzip", gzip), - ("lzo", lzo), - ("lz4", lz4), - ("lzf", lzf), - ("snappy", snappy) - ).flatMap { case (shortCodecName, codecName) => - if (codecName.isDefined) Some(shortCodecName, codecName.get) else None - }.toMap - } - - // To support short codec names, derived classes need to override the methods below that return - // corresponding qualified codec names. - def none: Option[String] = None - def uncompressed: Option[String] = None - def bzip2: Option[String] = None - def deflate: Option[String] = None - def gzip: Option[String] = None - def lzo: Option[String] = None - def lz4: Option[String] = None - def lzf: Option[String] = None - def snappy: Option[String] = None -} - /** * Various utility methods used by Spark. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala index 6fdebb5e3b2f6..9e913de48f72c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala @@ -21,25 +21,22 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec} -import org.apache.spark.util.{ShortCompressionCodecNameMapper, Utils} +import org.apache.spark.util.Utils private[datasources] object CompressionCodecs { - - /** Maps the short versions of compression codec names to fully-qualified class names. */ - private val hadoopShortCodecNameMapper = new ShortCompressionCodecNameMapper { - override def bzip2: Option[String] = Some(classOf[BZip2Codec].getCanonicalName) - override def deflate: Option[String] = Some(classOf[DeflateCodec].getCanonicalName) - override def gzip: Option[String] = Some(classOf[GzipCodec].getCanonicalName) - override def lz4: Option[String] = Some(classOf[Lz4Codec].getCanonicalName) - override def snappy: Option[String] = Some(classOf[SnappyCodec].getCanonicalName) - } + private val shortCompressionCodecNames = Map( + "bzip2" -> classOf[BZip2Codec].getName, + "deflate" -> classOf[DeflateCodec].getName, + "gzip" -> classOf[GzipCodec].getName, + "lz4" -> classOf[Lz4Codec].getName, + "snappy" -> classOf[SnappyCodec].getName) /** * Return the full version of the given codec class. * If it is already a class name, just return it. */ def getCodecClassName(name: String): String = { - val codecName = hadoopShortCodecNameMapper.get(name).getOrElse(name) + val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name) try { // Validate the codec name Utils.classForName(codecName) @@ -47,8 +44,7 @@ private[datasources] object CompressionCodecs { } catch { case e: ClassNotFoundException => throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Known codecs are " + - s"${hadoopShortCodecNameMapper.getAsMap.keys.mkString(", ")}.") + s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index d0a52781bfb27..c9f726ad1cf4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources.{PartitionSpec, _} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.util.{SerializableConfiguration, ShortCompressionCodecNameMapper, Utils} +import org.apache.spark.util.{SerializableConfiguration, Utils} private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister { @@ -284,8 +284,10 @@ private[sql] class ParquetRelation( conf.set( ParquetOutputFormat.COMPRESSION, ParquetRelation - .parquetShortCodecNameMapper.get(sqlContext.conf.parquetCompressionCodec) - .getOrElse(CompressionCodecName.UNCOMPRESSED.name())) + .shortParquetCompressionCodecNames + .getOrElse( + sqlContext.conf.parquetCompressionCodec.toUpperCase, + CompressionCodecName.UNCOMPRESSED).name()) new BucketedOutputWriterFactory { override def newInstance( @@ -901,12 +903,11 @@ private[sql] object ParquetRelation extends Logging { } } - /** Maps the short versions of compression codec names to qualified compression names. */ - val parquetShortCodecNameMapper = new ShortCompressionCodecNameMapper { - override def none: Option[String] = Some(CompressionCodecName.UNCOMPRESSED.name()) - override def uncompressed: Option[String] = Some(CompressionCodecName.UNCOMPRESSED.name()) - override def gzip: Option[String] = Some(CompressionCodecName.GZIP.name()) - override def lzo: Option[String] = Some(CompressionCodecName.LZO.name()) - override def snappy: Option[String] = Some(CompressionCodecName.SNAPPY.name()) - } + // The parquet compression short names + val shortParquetCompressionCodecNames = Map( + "none" -> CompressionCodecName.UNCOMPRESSED, + "uncompressed" -> CompressionCodecName.UNCOMPRESSED, + "snappy" -> CompressionCodecName.SNAPPY, + "gzip" -> CompressionCodecName.GZIP, + "lzo" -> CompressionCodecName.LZO) }