From 6e07cc7d6c6809b0ca90c8330ad02183da6aff32 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Thu, 12 Nov 2015 16:18:46 +0800 Subject: [PATCH 01/12] [SPARK-11691][SQL] Allow to specify compression codec in HadoopFsRelation when saving --- .../apache/spark/sql/DataFrameWriter.scala | 14 +++++++++++ .../datasources/DataSourceStrategy.scala | 2 +- .../InsertIntoHadoopFsRelation.scala | 10 ++++---- .../datasources/ResolvedDataSource.scala | 10 +++++++- .../datasources/WriterContainer.scala | 23 +++++++++++++++---- .../datasources/text/TextSuite.scala | 13 ++++++++++- 6 files changed, 60 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d6bdd3d825565..5c391199c8538 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -21,6 +21,8 @@ import java.util.Properties import scala.collection.JavaConverters._ +import org.apache.hadoop.io.compress.CompressionCodec + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -30,6 +32,8 @@ import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.sources.HadoopFsRelation + + /** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, @@ -151,6 +155,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** +<<<<<<< b72611f20a03c790b6fd341b6ffdb3b5437609ee * Buckets the output by the given columns. If specified, the output is laid out on the file * system similar to Hive's bucketing scheme. * @@ -177,6 +182,15 @@ final class DataFrameWriter private[sql](df: DataFrame) { this.sortColumnNames = Option(colName +: colNames) this } + /* + * Specify the compression codec when saving it on hdfs + * + * @since 1.7.0 + */ + def compress(codec: Class[_ <: CompressionCodec]): DataFrameWriter = { + this.extraOptions += ("compression.codec" -> codec.getCanonicalName) + this + } /** * Saves the content of the [[DataFrame]] at the specified path. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index c24967abeb33e..dee1db5043c45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -151,7 +151,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case i @ logical.InsertIntoTable( l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) => val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append - execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil + execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode, None)) :: Nil case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index 2d3e1714d2b7b..a7aadbd24bf99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources import java.io.IOException import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat @@ -32,7 +33,6 @@ import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution} import org.apache.spark.sql.sources._ import org.apache.spark.util.Utils - /** * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a @@ -58,7 +58,8 @@ import org.apache.spark.util.Utils private[sql] case class InsertIntoHadoopFsRelation( @transient relation: HadoopFsRelation, @transient query: LogicalPlan, - mode: SaveMode) + mode: SaveMode, + codec: Option[Class[_ <: CompressionCodec]]) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { @@ -126,7 +127,7 @@ private[sql] case class InsertIntoHadoopFsRelation( """.stripMargin) val writerContainer = if (partitionColumns.isEmpty && relation.maybeBucketSpec.isEmpty) { - new DefaultWriterContainer(relation, job, isAppend) + new DefaultWriterContainer(relation, job, isAppend, codec) } else { val output = df.queryExecution.executedPlan.output val (partitionOutput, dataOutput) = @@ -140,7 +141,8 @@ private[sql] case class InsertIntoHadoopFsRelation( output, PartitioningUtils.DEFAULT_PARTITION_NAME, sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES), - isAppend) + isAppend, + codec) } // This call shouldn't be put into the `try` block below because it only initializes and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index eec9070beed65..6801a3d712b14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -24,6 +24,7 @@ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory} import org.apache.hadoop.util.StringUtils import org.apache.spark.Logging @@ -289,11 +290,18 @@ object ResolvedDataSource extends Logging { // For partitioned relation r, r.schema's column ordering can be different from the column // ordering of data.logicalPlan (partition columns are all moved after data column). This // will be adjusted within InsertIntoHadoopFsRelation. + + val codec = options.get("compression.codec").flatMap(e => + Some(new CompressionCodecFactory(sqlContext.sparkContext.hadoopConfiguration) + .getCodecClassByName(e).asInstanceOf[Class[CompressionCodec]]) + ) + sqlContext.executePlan( InsertIntoHadoopFsRelation( r, data.logicalPlan, - mode)).toRdd + mode, + codec)).toRdd r case _ => sys.error(s"${clazz.getCanonicalName} does not allow create table as select.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 7e5c8f2f48d6b..1247a9eee96c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -19,7 +19,11 @@ package org.apache.spark.sql.execution.datasources import java.util.{Date, UUID} +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.SequenceFile.CompressionType +import org.apache.hadoop.io.compress.{CompressionCodec, SnappyCodec} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -39,7 +43,8 @@ import org.apache.spark.util.SerializableConfiguration private[sql] abstract class BaseWriterContainer( @transient val relation: HadoopFsRelation, @transient private val job: Job, - isAppend: Boolean) + isAppend: Boolean, + codec: Option[Class[_ <: CompressionCodec]]) extends Logging with Serializable { protected val dataSchema = relation.dataSchema @@ -207,6 +212,11 @@ private[sql] abstract class BaseWriterContainer( serializableConf.value.set("mapred.task.id", taskAttemptId.toString) serializableConf.value.setBoolean("mapred.task.is.map", true) serializableConf.value.setInt("mapred.task.partition", 0) + for (c <- codec) { + serializableConf.value.set("mapred.output.compress", "true") + serializableConf.value.set("mapred.output.compression.codec", c.getCanonicalName) + serializableConf.value.set("mapred.output.compression.type", CompressionType.BLOCK.toString) + } } def commitTask(): Unit = { @@ -239,8 +249,10 @@ private[sql] abstract class BaseWriterContainer( private[sql] class DefaultWriterContainer( relation: HadoopFsRelation, job: Job, - isAppend: Boolean) - extends BaseWriterContainer(relation, job, isAppend) { + isAppend: Boolean, + codec: Option[Class[_ <: CompressionCodec]]) + extends BaseWriterContainer(relation, job, isAppend, + codec: Option[Class[_ <: CompressionCodec]]) { def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { executorSideSetup(taskContext) @@ -308,8 +320,9 @@ private[sql] class DynamicPartitionWriterContainer( inputSchema: Seq[Attribute], defaultPartitionName: String, maxOpenFiles: Int, - isAppend: Boolean) - extends BaseWriterContainer(relation, job, isAppend) { + isAppend: Boolean, + codec: Option[Class[_ <: CompressionCodec]]) + extends BaseWriterContainer(relation, job, isAppend, codec) { private val bucketSpec = relation.maybeBucketSpec diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index f95272530d585..d42ae2e127702 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql.execution.datasources.text -import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} +import com.google.common.io.Files +import org.apache.hadoop.io.compress.GzipCodec + +import org.apache.spark.sql._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.Utils @@ -57,6 +60,14 @@ class TextSuite extends QueryTest with SharedSQLContext { } } + test("compression") { + val tempDirPath = Files.createTempDir().getAbsolutePath; + val df = sqlContext.read.text(testFile) + df.show() + df.write.compress(classOf[GzipCodec]).mode(SaveMode.Overwrite).text(tempDirPath) + verifyFrame(sqlContext.read.text(tempDirPath)) + } + private def testFile: String = { Thread.currentThread().getContextClassLoader.getResource("text-suite.txt").toString } From e67c3b91e62d61b1f8cf87299e4bf21a16304349 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Thu, 12 Nov 2015 17:50:28 +0800 Subject: [PATCH 02/12] fix compilation issue --- .../test/scala/org/apache/spark/sql/hive/parquetSuites.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index a6ca7d0386b22..e6204c37193dd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -306,7 +306,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt") df.queryExecution.sparkPlan match { - case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[ParquetRelation].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " + @@ -336,7 +336,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array") df.queryExecution.sparkPlan match { - case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _)) => // OK + case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _, _)) => // OK case o => fail("test_insert_parquet should be converted to a " + s"${classOf[ParquetRelation].getCanonicalName} and " + s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." + From 905e7fa5976a49b628b0d0a114cd25d783a216b0 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 2 Dec 2015 11:24:32 +0800 Subject: [PATCH 03/12] address comments --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 3 +-- .../spark/sql/execution/datasources/ResolvedDataSource.scala | 1 + .../spark/sql/execution/datasources/text/TextSuite.scala | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 5c391199c8538..4ca12bd5e2178 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.sources.HadoopFsRelation - /** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, @@ -185,7 +184,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { /* * Specify the compression codec when saving it on hdfs * - * @since 1.7.0 + * @since 1.6.0 */ def compress(codec: Class[_ <: CompressionCodec]): DataFrameWriter = { this.extraOptions += ("compression.codec" -> codec.getCanonicalName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 6801a3d712b14..fc9a3d60880a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -26,6 +26,7 @@ import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory} import org.apache.hadoop.util.StringUtils +import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory} import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index d42ae2e127702..77b787e0789df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -63,7 +63,6 @@ class TextSuite extends QueryTest with SharedSQLContext { test("compression") { val tempDirPath = Files.createTempDir().getAbsolutePath; val df = sqlContext.read.text(testFile) - df.show() df.write.compress(classOf[GzipCodec]).mode(SaveMode.Overwrite).text(tempDirPath) verifyFrame(sqlContext.read.text(tempDirPath)) } From 9d7c38b88c062f8bfb5d07aa47d698041dc2239b Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 2 Dec 2015 13:03:08 +0800 Subject: [PATCH 04/12] minor change --- .../spark/sql/execution/datasources/WriterContainer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 1247a9eee96c0..ddb9e9abeecaa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -212,7 +212,7 @@ private[sql] abstract class BaseWriterContainer( serializableConf.value.set("mapred.task.id", taskAttemptId.toString) serializableConf.value.setBoolean("mapred.task.is.map", true) serializableConf.value.setInt("mapred.task.partition", 0) - for (c <- codec) { + codec.foreach { c => serializableConf.value.set("mapred.output.compress", "true") serializableConf.value.set("mapred.output.compression.codec", c.getCanonicalName) serializableConf.value.set("mapred.output.compression.type", CompressionType.BLOCK.toString) From 691a8f4acbe191e989fbb51fd7a885638949b58a Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Thu, 3 Dec 2015 13:20:07 +0800 Subject: [PATCH 05/12] change back to 1.7.0 --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 4ca12bd5e2178..b524feb5c9a47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -184,7 +184,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { /* * Specify the compression codec when saving it on hdfs * - * @since 1.6.0 + * @since 1.7.0 */ def compress(codec: Class[_ <: CompressionCodec]): DataFrameWriter = { this.extraOptions += ("compression.codec" -> codec.getCanonicalName) From 04bcd938bac4e0e0a5a666853dc35b79b27b7656 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 29 Jan 2016 12:57:24 +0800 Subject: [PATCH 06/12] fix review comments --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 1 - .../spark/sql/execution/datasources/DataSourceStrategy.scala | 2 +- .../sql/execution/datasources/InsertIntoHadoopFsRelation.scala | 2 +- .../spark/sql/execution/datasources/ResolvedDataSource.scala | 1 - 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index b524feb5c9a47..517848b87cf35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -154,7 +154,6 @@ final class DataFrameWriter private[sql](df: DataFrame) { } /** -<<<<<<< b72611f20a03c790b6fd341b6ffdb3b5437609ee * Buckets the output by the given columns. If specified, the output is laid out on the file * system similar to Hive's bucketing scheme. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index dee1db5043c45..c24967abeb33e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -151,7 +151,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case i @ logical.InsertIntoTable( l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) => val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append - execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode, None)) :: Nil + execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil case _ => Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index a7aadbd24bf99..53d5e441d7488 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -59,7 +59,7 @@ private[sql] case class InsertIntoHadoopFsRelation( @transient relation: HadoopFsRelation, @transient query: LogicalPlan, mode: SaveMode, - codec: Option[Class[_ <: CompressionCodec]]) + codec: Option[Class[_ <: CompressionCodec]] = None) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index fc9a3d60880a9..6801a3d712b14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -26,7 +26,6 @@ import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory} import org.apache.hadoop.util.StringUtils -import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory} import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil From bf84998dc0f19358d87fb4d195391988b5165d32 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 29 Jan 2016 13:36:14 +0800 Subject: [PATCH 07/12] fix code style --- .../src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 517848b87cf35..8193f755ca931 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.sources.HadoopFsRelation - /** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, From 80eb810dca051b3b94a7d34ef808f6ad5eebf15e Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 29 Jan 2016 16:35:01 +0800 Subject: [PATCH 08/12] minor fix --- .../main/scala/org/apache/spark/sql/DataFrameWriter.scala | 4 ++-- .../execution/datasources/InsertIntoHadoopFsRelation.scala | 1 + .../spark/sql/execution/datasources/WriterContainer.scala | 5 ++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 8193f755ca931..189e1d420068a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -180,9 +180,9 @@ final class DataFrameWriter private[sql](df: DataFrame) { this } /* - * Specify the compression codec when saving it on hdfs + * Specify the compression codec when saving it on hdfs. * - * @since 1.7.0 + * @since 2.0.0 */ def compress(codec: Class[_ <: CompressionCodec]): DataFrameWriter = { this.extraOptions += ("compression.codec" -> codec.getCanonicalName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index 53d5e441d7488..95aaa3541dd84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution} import org.apache.spark.sql.sources._ import org.apache.spark.util.Utils + /** * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index ddb9e9abeecaa..cc8d7b51f822b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -44,7 +44,7 @@ private[sql] abstract class BaseWriterContainer( @transient val relation: HadoopFsRelation, @transient private val job: Job, isAppend: Boolean, - codec: Option[Class[_ <: CompressionCodec]]) + codec: Option[Class[_ <: CompressionCodec]] = None) extends Logging with Serializable { protected val dataSchema = relation.dataSchema @@ -251,8 +251,7 @@ private[sql] class DefaultWriterContainer( job: Job, isAppend: Boolean, codec: Option[Class[_ <: CompressionCodec]]) - extends BaseWriterContainer(relation, job, isAppend, - codec: Option[Class[_ <: CompressionCodec]]) { + extends BaseWriterContainer(relation, job, isAppend, codec) { def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { executorSideSetup(taskContext) From 6eb4191562fff2b5450f1b406164eaf800e2ac60 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 24 Feb 2016 01:19:54 +0900 Subject: [PATCH 09/12] Apply comments --- .../apache/spark/sql/DataFrameWriter.scala | 41 +++++++------------ .../datasources/ResolvedDataSource.scala | 16 ++++---- .../datasources/WriterContainer.scala | 11 ++--- .../datasources/text/TextSuite.scala | 9 ++-- 4 files changed, 31 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 189e1d420068a..c5839fc3b6488 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.sql @@ -21,8 +21,6 @@ import java.util.Properties import scala.collection.JavaConverters._ -import org.apache.hadoop.io.compress.CompressionCodec - import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -179,15 +177,6 @@ final class DataFrameWriter private[sql](df: DataFrame) { this.sortColumnNames = Option(colName +: colNames) this } - /* - * Specify the compression codec when saving it on hdfs. - * - * @since 2.0.0 - */ - def compress(codec: Class[_ <: CompressionCodec]): DataFrameWriter = { - this.extraOptions += ("compression.codec" -> codec.getCanonicalName) - this - } /** * Saves the content of the [[DataFrame]] at the specified path. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 6801a3d712b14..7e69bdbacb6b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -24,7 +24,7 @@ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.compress.{CompressionCodec, CompressionCodecFactory} +import org.apache.hadoop.io.compress.CompressionCodecFactory import org.apache.hadoop.util.StringUtils import org.apache.spark.Logging @@ -287,15 +287,17 @@ object ResolvedDataSource extends Logging { bucketSpec, caseInsensitiveOptions) + val codec = options + .get("compressionCodec") + .map { codecName => + val hadoopConf = sqlContext.sparkContext.hadoopConfiguration + Option(new CompressionCodecFactory(hadoopConf).getCodecClassByName(codecName)) + } + .getOrElse(None) + // For partitioned relation r, r.schema's column ordering can be different from the column // ordering of data.logicalPlan (partition columns are all moved after data column). This // will be adjusted within InsertIntoHadoopFsRelation. - - val codec = options.get("compression.codec").flatMap(e => - Some(new CompressionCodecFactory(sqlContext.sparkContext.hadoopConfiguration) - .getCodecClassByName(e).asInstanceOf[Class[CompressionCodec]]) - ) - sqlContext.executePlan( InsertIntoHadoopFsRelation( r, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index cc8d7b51f822b..8ea287bcaa305 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -19,11 +19,8 @@ package org.apache.spark.sql.execution.datasources import java.util.{Date, UUID} -import scala.collection.JavaConverters._ - import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.SequenceFile.CompressionType -import org.apache.hadoop.io.compress.{CompressionCodec, SnappyCodec} +import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -212,10 +209,10 @@ private[sql] abstract class BaseWriterContainer( serializableConf.value.set("mapred.task.id", taskAttemptId.toString) serializableConf.value.setBoolean("mapred.task.is.map", true) serializableConf.value.setInt("mapred.task.partition", 0) - codec.foreach { c => + codec.map { codecClass => serializableConf.value.set("mapred.output.compress", "true") - serializableConf.value.set("mapred.output.compression.codec", c.getCanonicalName) - serializableConf.value.set("mapred.output.compression.type", CompressionType.BLOCK.toString) + serializableConf.value.set("mapred.output.compression.codec", codecClass.getCanonicalName) + serializableConf.value.set("mapred.output.compression.type", "BLOCK") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 77b787e0789df..5cebb319c79ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -17,10 +17,7 @@ package org.apache.spark.sql.execution.datasources.text -import com.google.common.io.Files -import org.apache.hadoop.io.compress.GzipCodec - -import org.apache.spark.sql._ +import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.Utils @@ -61,9 +58,9 @@ class TextSuite extends QueryTest with SharedSQLContext { } test("compression") { - val tempDirPath = Files.createTempDir().getAbsolutePath; + val tempDirPath = Utils.createTempDir().getAbsolutePath val df = sqlContext.read.text(testFile) - df.write.compress(classOf[GzipCodec]).mode(SaveMode.Overwrite).text(tempDirPath) + df.write.option("compressionCodec", "GzipCodec").mode(SaveMode.Overwrite).text(tempDirPath) verifyFrame(sqlContext.read.text(tempDirPath)) } From 76142406b7f967e441fd97548f11410cb1d651ea Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 24 Feb 2016 17:54:03 +0900 Subject: [PATCH 10/12] Change variable names --- .../datasources/InsertIntoHadoopFsRelation.scala | 6 +++--- .../sql/execution/datasources/WriterContainer.scala | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala index 95aaa3541dd84..f54e2c2114541 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala @@ -60,7 +60,7 @@ private[sql] case class InsertIntoHadoopFsRelation( @transient relation: HadoopFsRelation, @transient query: LogicalPlan, mode: SaveMode, - codec: Option[Class[_ <: CompressionCodec]] = None) + compressionCodec: Option[Class[_ <: CompressionCodec]] = None) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { @@ -128,7 +128,7 @@ private[sql] case class InsertIntoHadoopFsRelation( """.stripMargin) val writerContainer = if (partitionColumns.isEmpty && relation.maybeBucketSpec.isEmpty) { - new DefaultWriterContainer(relation, job, isAppend, codec) + new DefaultWriterContainer(relation, job, isAppend, compressionCodec) } else { val output = df.queryExecution.executedPlan.output val (partitionOutput, dataOutput) = @@ -143,7 +143,7 @@ private[sql] case class InsertIntoHadoopFsRelation( PartitioningUtils.DEFAULT_PARTITION_NAME, sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES), isAppend, - codec) + compressionCodec) } // This call shouldn't be put into the `try` block below because it only initializes and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 8ea287bcaa305..a8aca95e0bc93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -41,7 +41,7 @@ private[sql] abstract class BaseWriterContainer( @transient val relation: HadoopFsRelation, @transient private val job: Job, isAppend: Boolean, - codec: Option[Class[_ <: CompressionCodec]] = None) + compressionCodec: Option[Class[_ <: CompressionCodec]] = None) extends Logging with Serializable { protected val dataSchema = relation.dataSchema @@ -209,7 +209,7 @@ private[sql] abstract class BaseWriterContainer( serializableConf.value.set("mapred.task.id", taskAttemptId.toString) serializableConf.value.setBoolean("mapred.task.is.map", true) serializableConf.value.setInt("mapred.task.partition", 0) - codec.map { codecClass => + compressionCodec.map { codecClass => serializableConf.value.set("mapred.output.compress", "true") serializableConf.value.set("mapred.output.compression.codec", codecClass.getCanonicalName) serializableConf.value.set("mapred.output.compression.type", "BLOCK") @@ -247,8 +247,8 @@ private[sql] class DefaultWriterContainer( relation: HadoopFsRelation, job: Job, isAppend: Boolean, - codec: Option[Class[_ <: CompressionCodec]]) - extends BaseWriterContainer(relation, job, isAppend, codec) { + compressionCodec: Option[Class[_ <: CompressionCodec]]) + extends BaseWriterContainer(relation, job, isAppend, compressionCodec) { def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { executorSideSetup(taskContext) @@ -317,8 +317,8 @@ private[sql] class DynamicPartitionWriterContainer( defaultPartitionName: String, maxOpenFiles: Int, isAppend: Boolean, - codec: Option[Class[_ <: CompressionCodec]]) - extends BaseWriterContainer(relation, job, isAppend, codec) { + compressionCodec: Option[Class[_ <: CompressionCodec]]) + extends BaseWriterContainer(relation, job, isAppend, compressionCodec) { private val bucketSpec = relation.maybeBucketSpec From 9364b53a84ac287d05a0bc732b26f56f0b783bf6 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 24 Feb 2016 19:36:43 +0900 Subject: [PATCH 11/12] Add shortcut names for compression codecs --- .../datasources/ResolvedDataSource.scala | 20 ++++++++++++++----- .../datasources/text/TextSuite.scala | 10 ++++++---- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 7e69bdbacb6b9..82fe4eb417971 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -24,7 +24,7 @@ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.compress.CompressionCodecFactory +import org.apache.hadoop.io.compress._ import org.apache.hadoop.util.StringUtils import org.apache.spark.Logging @@ -50,6 +50,13 @@ object ResolvedDataSource extends Logging { "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName ) + /** Maps the short versions of compression codec names to fully-qualified class names. */ + private val shortCompressionCodecNames = Map( + "bzip2" -> classOf[BZip2Codec].getCanonicalName, + "deflate" -> classOf[DeflateCodec].getCanonicalName, + "gzip" -> classOf[GzipCodec].getCanonicalName, + "snappy" -> classOf[SnappyCodec].getCanonicalName) + /** Given a provider name, look up the data source class definition. */ def lookupDataSource(provider0: String): Class[_] = { val provider = backwardCompatibilityMap.getOrElse(provider0, provider0) @@ -287,11 +294,14 @@ object ResolvedDataSource extends Logging { bucketSpec, caseInsensitiveOptions) - val codec = options + val compressionCodec = options .get("compressionCodec") .map { codecName => - val hadoopConf = sqlContext.sparkContext.hadoopConfiguration - Option(new CompressionCodecFactory(hadoopConf).getCodecClassByName(codecName)) + val codecFactory = new CompressionCodecFactory( + sqlContext.sparkContext.hadoopConfiguration) + val resolvedCodecName = shortCompressionCodecNames.getOrElse( + codecName.toLowerCase, codecName) + Option(codecFactory.getCodecClassByName(resolvedCodecName)) } .getOrElse(None) @@ -303,7 +313,7 @@ object ResolvedDataSource extends Logging { r, data.logicalPlan, mode, - codec)).toRdd + compressionCodec)).toRdd r case _ => sys.error(s"${clazz.getCanonicalName} does not allow create table as select.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 5cebb319c79ac..0e5ce75f16587 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -58,10 +58,12 @@ class TextSuite extends QueryTest with SharedSQLContext { } test("compression") { - val tempDirPath = Utils.createTempDir().getAbsolutePath - val df = sqlContext.read.text(testFile) - df.write.option("compressionCodec", "GzipCodec").mode(SaveMode.Overwrite).text(tempDirPath) - verifyFrame(sqlContext.read.text(tempDirPath)) + Seq("bzip2", "deflate", "gzip", "snappy").map { codecName => + val tempDirPath = Utils.createTempDir().getAbsolutePath + val df = sqlContext.read.text(testFile) + df.write.option("compressionCodec", codecName).mode(SaveMode.Overwrite).text(tempDirPath) + verifyFrame(sqlContext.read.text(tempDirPath)) + } } private def testFile: String = { From 8cfbaea0e249d917009d511e537f13843fec1071 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Thu, 25 Feb 2016 14:49:15 +0900 Subject: [PATCH 12/12] Add an utility class to map short codec names to qualified ones --- .../apache/spark/io/CompressionCodec.scala | 22 +++++---- .../scala/org/apache/spark/util/Utils.scala | 45 +++++++++++++++++++ .../datasources/ResolvedDataSource.scala | 16 +++---- .../datasources/parquet/ParquetRelation.scala | 23 +++++----- .../datasources/text/TextSuite.scala | 2 +- 5 files changed, 78 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index ae014becef755..97fdc232be8ff 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -25,7 +25,7 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.Utils +import org.apache.spark.util.{ShortCompressionCodecNameMapper, Utils} /** * :: DeveloperApi :: @@ -53,10 +53,14 @@ private[spark] object CompressionCodec { || codec.isInstanceOf[LZ4CompressionCodec]) } - private val shortCompressionCodecNames = Map( - "lz4" -> classOf[LZ4CompressionCodec].getName, - "lzf" -> classOf[LZFCompressionCodec].getName, - "snappy" -> classOf[SnappyCompressionCodec].getName) + /** Maps the short versions of compression codec names to fully-qualified class names. */ + private val shortCompressionCodecNameMapper = new ShortCompressionCodecNameMapper { + override def lz4: Option[String] = Some(classOf[LZ4CompressionCodec].getName) + override def lzf: Option[String] = Some(classOf[LZFCompressionCodec].getName) + override def snappy: Option[String] = Some(classOf[SnappyCompressionCodec].getName) + } + + private val shortCompressionCodecMap = shortCompressionCodecNameMapper.getAsMap def getCodecName(conf: SparkConf): String = { conf.get(configKey, DEFAULT_COMPRESSION_CODEC) @@ -67,7 +71,7 @@ private[spark] object CompressionCodec { } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { - val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName) + val codecClass = shortCompressionCodecNameMapper.get(codecName).getOrElse(codecName) val codec = try { val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf]) Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec]) @@ -84,10 +88,10 @@ private[spark] object CompressionCodec { * If it is already a short name, just return it. */ def getShortName(codecName: String): String = { - if (shortCompressionCodecNames.contains(codecName)) { + if (shortCompressionCodecMap.contains(codecName)) { codecName } else { - shortCompressionCodecNames + shortCompressionCodecMap .collectFirst { case (k, v) if v == codecName => k } .getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") } } @@ -95,7 +99,7 @@ private[spark] object CompressionCodec { val FALLBACK_COMPRESSION_CODEC = "snappy" val DEFAULT_COMPRESSION_CODEC = "lz4" - val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq + val ALL_COMPRESSION_CODECS = shortCompressionCodecMap.values.toSeq } /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index e0c9bf02a1a20..967593e737af7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -60,6 +60,51 @@ private[spark] object CallSite { val empty = CallSite("", "") } +/** An utility class to map short compression codec names to qualified ones. */ +private[spark] class ShortCompressionCodecNameMapper { + + def get(codecName: String): Option[String] = codecName.toLowerCase match { + case "none" => none + case "uncompressed" => uncompressed + case "bzip2" => bzip2 + case "deflate" => deflate + case "gzip" => gzip + case "lzo" => lzo + case "lz4" => lz4 + case "lzf" => lzf + case "snappy" => snappy + case _ => None + } + + def getAsMap: Map[String, String] = { + Seq( + ("none", none), + ("uncompressed", uncompressed), + ("bzip2", bzip2), + ("deflate", deflate), + ("gzip", gzip), + ("lzo", lzo), + ("lz4", lz4), + ("lzf", lzf), + ("snappy", snappy) + ).flatMap { case (shortCodecName, codecName) => + if (codecName.isDefined) Some(shortCodecName, codecName.get) else None + }.toMap + } + + // To support short codec names, derived classes need to override the methods below that return + // corresponding qualified codec names. + def none: Option[String] = None + def uncompressed: Option[String] = None + def bzip2: Option[String] = None + def deflate: Option[String] = None + def gzip: Option[String] = None + def lzo: Option[String] = None + def lz4: Option[String] = None + def lzf: Option[String] = None + def snappy: Option[String] = None +} + /** * Various utility methods used by Spark. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 82fe4eb417971..d882d19ea758d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{CalendarIntervalType, StructType} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ShortCompressionCodecNameMapper, Utils} case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) @@ -51,11 +51,12 @@ object ResolvedDataSource extends Logging { ) /** Maps the short versions of compression codec names to fully-qualified class names. */ - private val shortCompressionCodecNames = Map( - "bzip2" -> classOf[BZip2Codec].getCanonicalName, - "deflate" -> classOf[DeflateCodec].getCanonicalName, - "gzip" -> classOf[GzipCodec].getCanonicalName, - "snappy" -> classOf[SnappyCodec].getCanonicalName) + private val hadoopShortCodecNameMapper = new ShortCompressionCodecNameMapper { + override def bzip2: Option[String] = Some(classOf[BZip2Codec].getCanonicalName) + override def deflate: Option[String] = Some(classOf[DeflateCodec].getCanonicalName) + override def gzip: Option[String] = Some(classOf[GzipCodec].getCanonicalName) + override def snappy: Option[String] = Some(classOf[SnappyCodec].getCanonicalName) + } /** Given a provider name, look up the data source class definition. */ def lookupDataSource(provider0: String): Class[_] = { @@ -299,8 +300,7 @@ object ResolvedDataSource extends Logging { .map { codecName => val codecFactory = new CompressionCodecFactory( sqlContext.sparkContext.hadoopConfiguration) - val resolvedCodecName = shortCompressionCodecNames.getOrElse( - codecName.toLowerCase, codecName) + val resolvedCodecName = hadoopShortCodecNameMapper.get(codecName).getOrElse(codecName) Option(codecFactory.getCodecClassByName(resolvedCodecName)) } .getOrElse(None) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 1e686d41f41db..a4ff92fcf1a15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.catalyst.util.LegacyTypeStringParser import org.apache.spark.sql.execution.datasources.{PartitionSpec, _} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.util.{SerializableConfiguration, Utils} +import org.apache.spark.util.{SerializableConfiguration, ShortCompressionCodecNameMapper, Utils} private[sql] class DefaultSource extends BucketedHadoopFsRelationProvider with DataSourceRegister { @@ -283,10 +283,8 @@ private[sql] class ParquetRelation( conf.set( ParquetOutputFormat.COMPRESSION, ParquetRelation - .shortParquetCompressionCodecNames - .getOrElse( - sqlContext.conf.parquetCompressionCodec.toUpperCase, - CompressionCodecName.UNCOMPRESSED).name()) + .parquetShortCodecNameMapper.get(sqlContext.conf.parquetCompressionCodec) + .getOrElse(CompressionCodecName.UNCOMPRESSED.name())) new BucketedOutputWriterFactory { override def newInstance( @@ -902,11 +900,12 @@ private[sql] object ParquetRelation extends Logging { } } - // The parquet compression short names - val shortParquetCompressionCodecNames = Map( - "NONE" -> CompressionCodecName.UNCOMPRESSED, - "UNCOMPRESSED" -> CompressionCodecName.UNCOMPRESSED, - "SNAPPY" -> CompressionCodecName.SNAPPY, - "GZIP" -> CompressionCodecName.GZIP, - "LZO" -> CompressionCodecName.LZO) + /** Maps the short versions of compression codec names to qualified compression names. */ + val parquetShortCodecNameMapper = new ShortCompressionCodecNameMapper { + override def none: Option[String] = Some(CompressionCodecName.UNCOMPRESSED.name()) + override def uncompressed: Option[String] = Some(CompressionCodecName.UNCOMPRESSED.name()) + override def gzip: Option[String] = Some(CompressionCodecName.GZIP.name()) + override def lzo: Option[String] = Some(CompressionCodecName.LZO.name()) + override def snappy: Option[String] = Some(CompressionCodecName.SNAPPY.name()) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala index 0e5ce75f16587..67122ca68130a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala @@ -58,7 +58,7 @@ class TextSuite extends QueryTest with SharedSQLContext { } test("compression") { - Seq("bzip2", "deflate", "gzip", "snappy").map { codecName => + Seq("bzip2", "deflate", "gzip").map { codecName => val tempDirPath = Utils.createTempDir().getAbsolutePath val df = sqlContext.read.text(testFile) df.write.option("compressionCodec", codecName).mode(SaveMode.Overwrite).text(tempDirPath)