Skip to content

Commit

Permalink
Revert codes
Browse files Browse the repository at this point in the history
  • Loading branch information
maropu committed Mar 1, 2016
1 parent da879a6 commit c3f0140
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 82 deletions.
22 changes: 9 additions & 13 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}

import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.{ShortCompressionCodecNameMapper, Utils}
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -53,14 +53,10 @@ private[spark] object CompressionCodec {
|| codec.isInstanceOf[LZ4CompressionCodec])
}

/** Maps the short versions of compression codec names to fully-qualified class names. */
private val shortCompressionCodecNameMapper = new ShortCompressionCodecNameMapper {
override def lz4: Option[String] = Some(classOf[LZ4CompressionCodec].getName)
override def lzf: Option[String] = Some(classOf[LZFCompressionCodec].getName)
override def snappy: Option[String] = Some(classOf[SnappyCompressionCodec].getName)
}

private val shortCompressionCodecMap = shortCompressionCodecNameMapper.getAsMap
private val shortCompressionCodecNames = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)

def getCodecName(conf: SparkConf): String = {
conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
Expand All @@ -71,7 +67,7 @@ private[spark] object CompressionCodec {
}

def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
val codecClass = shortCompressionCodecNameMapper.get(codecName).getOrElse(codecName)
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
val codec = try {
val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
Expand All @@ -88,18 +84,18 @@ private[spark] object CompressionCodec {
* If it is already a short name, just return it.
*/
def getShortName(codecName: String): String = {
if (shortCompressionCodecMap.contains(codecName)) {
if (shortCompressionCodecNames.contains(codecName)) {
codecName
} else {
shortCompressionCodecMap
shortCompressionCodecNames
.collectFirst { case (k, v) if v == codecName => k }
.getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") }
}
}

val FALLBACK_COMPRESSION_CODEC = "snappy"
val DEFAULT_COMPRESSION_CODEC = "lz4"
val ALL_COMPRESSION_CODECS = shortCompressionCodecMap.values.toSeq
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
}

/**
Expand Down
45 changes: 0 additions & 45 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,51 +60,6 @@ private[spark] object CallSite {
val empty = CallSite("", "")
}

/** An utility class to map short compression codec names to qualified ones. */
private[spark] class ShortCompressionCodecNameMapper {

def get(codecName: String): Option[String] = codecName.toLowerCase match {
case "none" => none
case "uncompressed" => uncompressed
case "bzip2" => bzip2
case "deflate" => deflate
case "gzip" => gzip
case "lzo" => lzo
case "lz4" => lz4
case "lzf" => lzf
case "snappy" => snappy
case _ => None
}

def getAsMap: Map[String, String] = {
Seq(
("none", none),
("uncompressed", uncompressed),
("bzip2", bzip2),
("deflate", deflate),
("gzip", gzip),
("lzo", lzo),
("lz4", lz4),
("lzf", lzf),
("snappy", snappy)
).flatMap { case (shortCodecName, codecName) =>
if (codecName.isDefined) Some(shortCodecName, codecName.get) else None
}.toMap
}

// To support short codec names, derived classes need to override the methods below that return
// corresponding qualified codec names.
def none: Option[String] = None
def uncompressed: Option[String] = None
def bzip2: Option[String] = None
def deflate: Option[String] = None
def gzip: Option[String] = None
def lzo: Option[String] = None
def lz4: Option[String] = None
def lzf: Option[String] = None
def snappy: Option[String] = None
}

/**
* Various utility methods used by Spark.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,30 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}

import org.apache.spark.util.{ShortCompressionCodecNameMapper, Utils}
import org.apache.spark.util.Utils

private[datasources] object CompressionCodecs {

/** Maps the short versions of compression codec names to fully-qualified class names. */
private val hadoopShortCodecNameMapper = new ShortCompressionCodecNameMapper {
override def bzip2: Option[String] = Some(classOf[BZip2Codec].getCanonicalName)
override def deflate: Option[String] = Some(classOf[DeflateCodec].getCanonicalName)
override def gzip: Option[String] = Some(classOf[GzipCodec].getCanonicalName)
override def lz4: Option[String] = Some(classOf[Lz4Codec].getCanonicalName)
override def snappy: Option[String] = Some(classOf[SnappyCodec].getCanonicalName)
}
private val shortCompressionCodecNames = Map(
"bzip2" -> classOf[BZip2Codec].getName,
"deflate" -> classOf[DeflateCodec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)

/**
* Return the full version of the given codec class.
* If it is already a class name, just return it.
*/
def getCodecClassName(name: String): String = {
val codecName = hadoopShortCodecNameMapper.get(name).getOrElse(name)
val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
try {
// Validate the codec name
Utils.classForName(codecName)
codecName
} catch {
case e: ClassNotFoundException =>
throw new IllegalArgumentException(s"Codec [$codecName] " +
s"is not available. Known codecs are " +
s"${hadoopShortCodecNameMapper.getAsMap.keys.mkString(", ")}.")
s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources.{PartitionSpec, _}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, ShortCompressionCodecNameMapper, Utils}
import org.apache.spark.util.{SerializableConfiguration, Utils}

private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister {

Expand Down Expand Up @@ -284,8 +284,10 @@ private[sql] class ParquetRelation(
conf.set(
ParquetOutputFormat.COMPRESSION,
ParquetRelation
.parquetShortCodecNameMapper.get(sqlContext.conf.parquetCompressionCodec)
.getOrElse(CompressionCodecName.UNCOMPRESSED.name()))
.shortParquetCompressionCodecNames
.getOrElse(
sqlContext.conf.parquetCompressionCodec.toUpperCase,
CompressionCodecName.UNCOMPRESSED).name())

new BucketedOutputWriterFactory {
override def newInstance(
Expand Down Expand Up @@ -901,12 +903,11 @@ private[sql] object ParquetRelation extends Logging {
}
}

/** Maps the short versions of compression codec names to qualified compression names. */
val parquetShortCodecNameMapper = new ShortCompressionCodecNameMapper {
override def none: Option[String] = Some(CompressionCodecName.UNCOMPRESSED.name())
override def uncompressed: Option[String] = Some(CompressionCodecName.UNCOMPRESSED.name())
override def gzip: Option[String] = Some(CompressionCodecName.GZIP.name())
override def lzo: Option[String] = Some(CompressionCodecName.LZO.name())
override def snappy: Option[String] = Some(CompressionCodecName.SNAPPY.name())
}
// The parquet compression short names
val shortParquetCompressionCodecNames = Map(
"none" -> CompressionCodecName.UNCOMPRESSED,
"uncompressed" -> CompressionCodecName.UNCOMPRESSED,
"snappy" -> CompressionCodecName.SNAPPY,
"gzip" -> CompressionCodecName.GZIP,
"lzo" -> CompressionCodecName.LZO)
}

0 comments on commit c3f0140

Please sign in to comment.