From 2bb0af0d64e89b5b132ed593ec4a29d5f4c87910 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 29 Mar 2016 18:04:38 +0800 Subject: [PATCH 1/3] WIP --- .../scala/org/apache/spark/sql/sources/interfaces.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 1e02354edf4c1..f5e850e73fbc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -484,6 +484,15 @@ trait FileFormat { } } +private[sql] object FileFormat { + def appendPartitionValues( + rows: Iterator[InternalRow], + output: Seq[Attribute], + partitionValues: InternalRow): Iterator[InternalRow] = { + + } +} + /** * A collection of data files from a partitioned relation, along with the partition values in the * form of an [[InternalRow]]. From 01fbf1740ff0da27a99bf2decaa304250bde5ece Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 29 Mar 2016 18:18:41 +0800 Subject: [PATCH 2/3] Utility method for appending partition values --- .../execution/datasources/json/JSONRelation.scala | 6 +----- .../datasources/parquet/ParquetRelation.scala | 15 +++++++-------- .../org/apache/spark/sql/sources/interfaces.scala | 7 +++++-- .../apache/spark/sql/hive/orc/OrcRelation.scala | 12 ++++-------- 4 files changed, 17 insertions(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 21fc1224eff29..fa72bc88937ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -137,7 +137,6 @@ class DefaultSource extends FileFormat with DataSourceRegister { .getOrElse(sqlContext.conf.columnNameOfCorruptRecord) val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes - val joinedRow = new JoinedRow() file => { val lines = new HadoopFileLinesReader(file, broadcastedConf.value.value).map(_.toString) @@ -148,10 +147,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { columnNameOfCorruptRecord, parsedOptions) - val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - rows.map { row => - appendPartitionColumns(joinedRow(row, file.partitionValues)) - } + FileFormat.appendPartitionValues(rows, fullSchema, file.partitionValues) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index d6b84be267411..3a31241e86098 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -395,14 +395,13 @@ private[sql] class DefaultSource iter.asInstanceOf[Iterator[InternalRow]] } else { val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes - val joinedRow = new JoinedRow() - val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - - // This is a horrible erasure hack... if we type the iterator above, then it actually check - // the type in next() and we get a class cast exception. If we make that function return - // Object, then we can defer the cast until later! - iter.asInstanceOf[Iterator[InternalRow]] - .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) + FileFormat.appendPartitionValues( + // This is a horrible erasure hack... if we type the iterator above, then it actually + // check the type in next() and we get a class cast exception. If we make that function + // return Object, then we can defer the cast until later! + iter.asInstanceOf[Iterator[InternalRow]], + fullSchema, + file.partitionValues) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index f5e850e73fbc0..391688b0dd146 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -31,8 +31,9 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.streaming.{Sink, Source} @@ -489,7 +490,9 @@ private[sql] object FileFormat { rows: Iterator[InternalRow], output: Seq[Attribute], partitionValues: InternalRow): Iterator[InternalRow] = { - + val joinedRow = new JoinedRow() + val appendPartitionColumns = GenerateUnsafeProjection.generate(output, output) + rows.map { row => appendPartitionColumns(joinedRow(row, partitionValues)) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 7c4a0a0c0f09f..004d4311f4d5c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -174,14 +174,10 @@ private[sql] class DefaultSource file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader) ) - // Appends partition values - val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes - val joinedRow = new JoinedRow() - val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput) - - unsafeRowIterator.map { dataRow => - appendPartitionColumns(joinedRow(dataRow, file.partitionValues)) - } + FileFormat.appendPartitionValues( + unsafeRowIterator, + dataSchema.toAttributes ++ partitionSchema.toAttributes, + file.partitionValues) } } } From 60a0ef0098d26e8bac5bccb54d9786eeb79ccc0b Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 29 Mar 2016 20:37:21 +0800 Subject: [PATCH 3/3] Fixes import order --- .../main/scala/org/apache/spark/sql/sources/interfaces.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 391688b0dd146..b6c511a6fa805 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -31,7 +31,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, expressions} +import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.FileRelation