From 1d4d40c0e6a7318cf6a2f2f783c3c110b63cd724 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 15 Jul 2016 12:37:45 +0900 Subject: [PATCH 1/3] Support for conversion from compatible schema for Parquet data source when data types are not matched --- .../parquet/ParquetCompatibleConverter.scala | 87 +++++++++++++++++++ .../parquet/ParquetRowConverter.scala | 7 +- .../datasources/parquet/ParquetIOSuite.scala | 26 ++++++ 3 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConverter.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConverter.scala new file mode 100644 index 0000000000000..8d4c8e25c2bc0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConverter.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.parquet.io.api.Converter + +import org.apache.spark.sql.catalyst.analysis.TypeCoercion +import org.apache.spark.sql.types._ + +private[parquet] object ParquetCompatibleConverter { + + // The logic for setting and adding a value in `ParquetPrimitiveConverter` are separated + // into `NumericValueUpdater` and `NumericCompatibleConverter` so that value can be converted + // to a desired type. + // `NumericValueUpdater` updates the input `Number` via `ParentContainerUpdater`. This + // is for updating a value converted for the appropriate value type for `ParentContainerUpdater` + private type NumericValueUpdater = Number => Unit + + // This is a wrapper for `NumericValueUpdater`. this returns a converter that adds the value + // from `NumericValueUpdater`. + private type NumericCompatibleConverter = NumericValueUpdater => ParquetPrimitiveConverter + + private def makeNumericCompatibleConverter( + guessedType: DataType, + updater: ParentContainerUpdater): NumericCompatibleConverter = guessedType match { + case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType => + (valueUpdater: NumericValueUpdater) => + new ParquetPrimitiveConverter(updater) { + override def addInt(value: Int): Unit = valueUpdater(value) + override def addLong(value: Long): Unit = valueUpdater(value) + override def addFloat(value: Float): Unit = valueUpdater(value) + override def addDouble(value: Double): Unit = valueUpdater(value) + } + } + + private def makeNumericCompatibleUpdater( + catalystType: DataType, + updater: ParentContainerUpdater): NumericValueUpdater = catalystType match { + case ByteType => (v: Number) => updater.setByte(v.byteValue()) + case ShortType => (v: Number) => updater.setShort(v.shortValue()) + case IntegerType => (v: Number) => updater.setInt(v.intValue()) + case LongType => (v: Number) => updater.setLong(v.longValue()) + case FloatType => (v: Number) => updater.setFloat(v.floatValue()) + case DoubleType => (v: Number) => updater.setDouble(v.doubleValue()) + } + + private def isNumericCompatible(catalystType: DataType, guessedType: DataType): Boolean = { + // Both should be numeric types and catalyst type should be wider. + val isNumeric = Seq(catalystType, guessedType).forall(TypeCoercion.numericPrecedence.contains) + + // We use compatible converter only if `guessedType` is + // smaller than `catalystType`. If they are equal, it falls back to normal converter. + val isCompatible = TypeCoercion.numericPrecedence.lastIndexWhere(_ == catalystType) > + TypeCoercion.numericPrecedence.lastIndexWhere(_ == guessedType) + + isNumeric && isCompatible + } + + def makeCompatibleConverter( + guessedType: DataType, + catalystType: DataType, + updater: ParentContainerUpdater): Option[Converter with HasParentContainerUpdater] = { + // These should be numeric types and compatible. + if (isNumericCompatible(catalystType, guessedType)) { + val compatibleConverter = makeNumericCompatibleConverter(guessedType, updater) + val compatibleUpdater = makeNumericCompatibleUpdater(catalystType, updater) + Some(compatibleConverter(compatibleUpdater)) + } else { + None + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 32e6c60cd9766..a15e9964d8d4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -213,7 +213,7 @@ private[parquet] class ParquetRowConverter( catalystType: DataType, updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = { - catalystType match { + def makeConverter(): Converter with HasParentContainerUpdater = catalystType match { case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => new ParquetPrimitiveConverter(updater) @@ -311,6 +311,11 @@ private[parquet] class ParquetRowConverter( s"Unable to create Parquet converter for data type ${t.json} " + s"whose Parquet type is $parquetType") } + + val guessedType = schemaConverter.convertField(parquetType) + ParquetCompatibleConverter + .makeCompatibleConverter(guessedType, catalystType, updater) + .getOrElse(makeConverter()) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 57a0af1dda971..e29c4aaaa7271 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -767,6 +767,32 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { assert(option.compressionCodecClassName == "UNCOMPRESSED") } } + + test("SPARK-16544 Support Parquet schema compatibility with numeric types") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString) { + withTempPath { file => + val data = (1 to 4).map(i => (i.toByte, i.toShort, i, i.toLong, i.toFloat)) + + spark.createDataFrame(data).toDF("a", "b", "c", "d", "e") + .write.parquet(file.getCanonicalPath) + + val schema = StructType( + StructField("a", ShortType, true) :: + StructField("b", IntegerType, true) :: + StructField("c", LongType, true) :: + StructField("d", FloatType, true) :: + StructField("e", DoubleType, true) :: Nil) + + val df = spark.read.schema(schema).parquet(file.getAbsolutePath) + + val expectedDf = data.map { case (a, b, c, d, e) => + (a.toShort, b.toInt, c.toLong, d.toFloat, e.toDouble) + }.toDF("a", "b", "c", "d", "e") + + checkAnswer(df, expectedDf) + } + } + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) From c6865bf765757db005b1015978fea82b0024756f Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 10 Apr 2017 17:28:11 +0900 Subject: [PATCH 2/3] Rename to ParquetCompatibleConversion --- ...patibleConverter.scala => ParquetCompatibleConversion.scala} | 2 +- .../sql/execution/datasources/parquet/ParquetRowConverter.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/{ParquetCompatibleConverter.scala => ParquetCompatibleConversion.scala} (98%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConversion.scala similarity index 98% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConverter.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConversion.scala index 8d4c8e25c2bc0..a99f320d1ff0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConversion.scala @@ -22,7 +22,7 @@ import org.apache.parquet.io.api.Converter import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.types._ -private[parquet] object ParquetCompatibleConverter { +private[parquet] object ParquetCompatibleConversion { // The logic for setting and adding a value in `ParquetPrimitiveConverter` are separated // into `NumericValueUpdater` and `NumericCompatibleConverter` so that value can be converted diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index a15e9964d8d4f..3385bd853ae27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -313,7 +313,7 @@ private[parquet] class ParquetRowConverter( } val guessedType = schemaConverter.convertField(parquetType) - ParquetCompatibleConverter + ParquetCompatibleConversion .makeCompatibleConverter(guessedType, catalystType, updater) .getOrElse(makeConverter()) } From cbf8a224e9cb5744fd340a4f835bdf07cfdf5543 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 11 Apr 2017 19:16:33 +0900 Subject: [PATCH 3/3] Minor cleanup --- .../parquet/ParquetRowConverter.scala | 5 +- ...on.scala => ParquetUpcastConversion.scala} | 64 +++++++++---------- .../datasources/parquet/ParquetIOSuite.scala | 7 +- 3 files changed, 36 insertions(+), 40 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/{ParquetCompatibleConversion.scala => ParquetUpcastConversion.scala} (51%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 3385bd853ae27..a65f900f3879e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -312,9 +312,8 @@ private[parquet] class ParquetRowConverter( s"whose Parquet type is $parquetType") } - val guessedType = schemaConverter.convertField(parquetType) - ParquetCompatibleConversion - .makeCompatibleConverter(guessedType, catalystType, updater) + ParquetUpCastConversion + .findUpCastConverter(schemaConverter.convertField(parquetType), catalystType, updater) .getOrElse(makeConverter()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConversion.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUpcastConversion.scala similarity index 51% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConversion.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUpcastConversion.scala index a99f320d1ff0c..7febd250aab11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibleConversion.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUpcastConversion.scala @@ -22,33 +22,29 @@ import org.apache.parquet.io.api.Converter import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.types._ -private[parquet] object ParquetCompatibleConversion { +private[parquet] object ParquetUpCastConversion { - // The logic for setting and adding a value in `ParquetPrimitiveConverter` are separated - // into `NumericValueUpdater` and `NumericCompatibleConverter` so that value can be converted - // to a desired type. - // `NumericValueUpdater` updates the input `Number` via `ParentContainerUpdater`. This - // is for updating a value converted for the appropriate value type for `ParentContainerUpdater` + // The logic for setting and adding a value in `ParquetPrimitiveConverter` are separated so that + // the value can be converted to a desired type. `NumericValueUpdater` updates the input numbers + // via `ParentContainerUpdater`. private type NumericValueUpdater = Number => Unit - // This is a wrapper for `NumericValueUpdater`. this returns a converter that adds the value - // from `NumericValueUpdater`. - private type NumericCompatibleConverter = NumericValueUpdater => ParquetPrimitiveConverter - - private def makeNumericCompatibleConverter( + private def makeNumericConverter( guessedType: DataType, - updater: ParentContainerUpdater): NumericCompatibleConverter = guessedType match { - case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType => - (valueUpdater: NumericValueUpdater) => - new ParquetPrimitiveConverter(updater) { - override def addInt(value: Int): Unit = valueUpdater(value) - override def addLong(value: Long): Unit = valueUpdater(value) - override def addFloat(value: Float): Unit = valueUpdater(value) - override def addDouble(value: Double): Unit = valueUpdater(value) - } + updater: ParentContainerUpdater): NumericValueUpdater => ParquetPrimitiveConverter = { + guessedType match { + case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType => + (valueUpdater: NumericValueUpdater) => + new ParquetPrimitiveConverter(updater) { + override def addInt(value: Int): Unit = valueUpdater(value) + override def addLong(value: Long): Unit = valueUpdater(value) + override def addFloat(value: Float): Unit = valueUpdater(value) + override def addDouble(value: Double): Unit = valueUpdater(value) + } + } } - private def makeNumericCompatibleUpdater( + private def makeNumericUpdater( catalystType: DataType, updater: ParentContainerUpdater): NumericValueUpdater = catalystType match { case ByteType => (v: Number) => updater.setByte(v.byteValue()) @@ -59,27 +55,29 @@ private[parquet] object ParquetCompatibleConversion { case DoubleType => (v: Number) => updater.setDouble(v.doubleValue()) } - private def isNumericCompatible(catalystType: DataType, guessedType: DataType): Boolean = { - // Both should be numeric types and catalyst type should be wider. + private def isUpCastableNumeric(catalystType: DataType, guessedType: DataType): Boolean = { + + // Both should be numeric types and `catalystType` should be wider. val isNumeric = Seq(catalystType, guessedType).forall(TypeCoercion.numericPrecedence.contains) - // We use compatible converter only if `guessedType` is - // smaller than `catalystType`. If they are equal, it falls back to normal converter. - val isCompatible = TypeCoercion.numericPrecedence.lastIndexWhere(_ == catalystType) > + // We use up-cast converter only if `guessedType` is narrower than `catalystType`. + // If they are equal, it should falls back to a normal converter. + val isUpCastable = + TypeCoercion.numericPrecedence.lastIndexWhere(_ == catalystType) > TypeCoercion.numericPrecedence.lastIndexWhere(_ == guessedType) - isNumeric && isCompatible + isNumeric && isUpCastable } - def makeCompatibleConverter( + def findUpCastConverter( guessedType: DataType, catalystType: DataType, updater: ParentContainerUpdater): Option[Converter with HasParentContainerUpdater] = { - // These should be numeric types and compatible. - if (isNumericCompatible(catalystType, guessedType)) { - val compatibleConverter = makeNumericCompatibleConverter(guessedType, updater) - val compatibleUpdater = makeNumericCompatibleUpdater(catalystType, updater) - Some(compatibleConverter(compatibleUpdater)) + // These should be numeric types and up-castable. + if (isUpCastableNumeric(catalystType, guessedType)) { + val converter = makeNumericConverter(guessedType, updater) + val valueUpdater = makeNumericUpdater(catalystType, updater) + Some(converter(valueUpdater)) } else { None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index e29c4aaaa7271..a02380f132de4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -773,8 +773,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { withTempPath { file => val data = (1 to 4).map(i => (i.toByte, i.toShort, i, i.toLong, i.toFloat)) - spark.createDataFrame(data).toDF("a", "b", "c", "d", "e") - .write.parquet(file.getCanonicalPath) + data.toDF("a", "b", "c", "d", "e").write.parquet(file.getCanonicalPath) val schema = StructType( StructField("a", ShortType, true) :: @@ -785,11 +784,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { val df = spark.read.schema(schema).parquet(file.getAbsolutePath) - val expectedDf = data.map { case (a, b, c, d, e) => + val expected = data.map { case (a, b, c, d, e) => (a.toShort, b.toInt, c.toLong, d.toFloat, e.toDouble) }.toDF("a", "b", "c", "d", "e") - checkAnswer(df, expectedDf) + checkAnswer(df, expected) } } }