From 420ad2be74f20653ba470edebd54f71213bdb030 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 11 Aug 2015 00:57:10 +0800 Subject: [PATCH 1/3] Fixes converting unannotated Parquet lists --- .../parquet/CatalystRowConverter.scala | 151 ++++++++++++++---- .../parquet/CatalystSchemaConverter.scala | 7 +- .../resources/nested-array-struct.parquet | Bin 0 -> 775 bytes .../test/resources/old-repeated-int.parquet | Bin 0 -> 389 bytes .../resources/old-repeated-message.parquet | Bin 0 -> 600 bytes .../src/test/resources/old-repeated.parquet | Bin 0 -> 432 bytes .../parquet-thrift-compat.snappy.parquet | Bin .../resources/proto-repeated-string.parquet | Bin 0 -> 411 bytes .../resources/proto-repeated-struct.parquet | Bin 0 -> 608 bytes .../proto-struct-with-array-many.parquet | Bin 0 -> 802 bytes .../resources/proto-struct-with-array.parquet | Bin 0 -> 1576 bytes .../parquet/ParquetSchemaSuite.scala | 30 ++++ .../ParquetProtobufCompatibilitySuite.scala | 91 +++++++++++ 13 files changed, 246 insertions(+), 33 deletions(-) create mode 100644 sql/core/src/test/resources/nested-array-struct.parquet create mode 100644 sql/core/src/test/resources/old-repeated-int.parquet create mode 100644 sql/core/src/test/resources/old-repeated-message.parquet create mode 100644 sql/core/src/test/resources/old-repeated.parquet mode change 100755 => 100644 sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet create mode 100644 sql/core/src/test/resources/proto-repeated-string.parquet create mode 100644 sql/core/src/test/resources/proto-repeated-struct.parquet create mode 100644 sql/core/src/test/resources/proto-struct-with-array-many.parquet create mode 100644 sql/core/src/test/resources/proto-struct-with-array.parquet create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetProtobufCompatibilitySuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index 3542dfbae1292..ab5a6ddd41cfc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -21,11 +21,11 @@ import java.math.{BigDecimal, BigInteger} import java.nio.ByteOrder import scala.collection.JavaConversions._ -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} +import org.apache.parquet.schema.OriginalType.LIST import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema.{GroupType, PrimitiveType, Type} @@ -42,6 +42,12 @@ import org.apache.spark.unsafe.types.UTF8String * values to an [[ArrayBuffer]]. */ private[parquet] trait ParentContainerUpdater { + /** Called before a record field is being converted */ + def start(): Unit = () + + /** Called after a record field is being converted */ + def end(): Unit = () + def set(value: Any): Unit = () def setBoolean(value: Boolean): Unit = set(value) def setByte(value: Byte): Unit = set(value) @@ -55,6 +61,32 @@ private[parquet] trait ParentContainerUpdater { /** A no-op updater used for root converter (who doesn't have a parent). */ private[parquet] object NoopUpdater extends ParentContainerUpdater +private[parquet] trait HasParentContainerUpdater { + def updater: ParentContainerUpdater +} + +/** + * A convenient converter class for Parquet group types with an [[HasParentContainerUpdater]]. + */ +private[parquet] abstract class CatalystGroupConverter(val updater: ParentContainerUpdater) + extends GroupConverter with HasParentContainerUpdater + +/** + * Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types + * are handled by this converter. Parquet primitive types are only a subset of those of Spark + * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet. + */ +private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUpdater) + extends PrimitiveConverter with HasParentContainerUpdater { + + override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) + override def addInt(value: Int): Unit = updater.setInt(value) + override def addLong(value: Long): Unit = updater.setLong(value) + override def addFloat(value: Float): Unit = updater.setFloat(value) + override def addDouble(value: Double): Unit = updater.setDouble(value) + override def addBinary(value: Binary): Unit = updater.set(value.getBytes) +} + /** * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s. * Since any Parquet record is also a struct, this converter can also be used as root converter. @@ -70,7 +102,7 @@ private[parquet] class CatalystRowConverter( parquetType: GroupType, catalystType: StructType, updater: ParentContainerUpdater) - extends GroupConverter { + extends CatalystGroupConverter(updater) { /** * Updater used together with field converters within a [[CatalystRowConverter]]. It propagates @@ -89,13 +121,11 @@ private[parquet] class CatalystRowConverter( /** * Represents the converted row object once an entire Parquet record is converted. - * - * @todo Uses [[UnsafeRow]] for better performance. */ val currentRow = new SpecificMutableRow(catalystType.map(_.dataType)) // Converters for each field. - private val fieldConverters: Array[Converter] = { + private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { parquetType.getFields.zip(catalystType).zipWithIndex.map { case ((parquetFieldType, catalystField), ordinal) => // Converted field value should be set to the `ordinal`-th cell of `currentRow` @@ -105,11 +135,19 @@ private[parquet] class CatalystRowConverter( override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) - override def end(): Unit = updater.set(currentRow) + override def end(): Unit = { + var i = 0 + while (i < currentRow.numFields) { + fieldConverters(i).updater.end() + i += 1 + } + updater.set(currentRow) + } override def start(): Unit = { var i = 0 while (i < currentRow.numFields) { + fieldConverters(i).updater.start() currentRow.setNullAt(i) i += 1 } @@ -122,20 +160,20 @@ private[parquet] class CatalystRowConverter( private def newConverter( parquetType: Type, catalystType: DataType, - updater: ParentContainerUpdater): Converter = { + updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = { catalystType match { case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => new CatalystPrimitiveConverter(updater) case ByteType => - new PrimitiveConverter { + new CatalystPrimitiveConverter(updater) { override def addInt(value: Int): Unit = updater.setByte(value.asInstanceOf[ByteType#InternalType]) } case ShortType => - new PrimitiveConverter { + new CatalystPrimitiveConverter(updater) { override def addInt(value: Int): Unit = updater.setShort(value.asInstanceOf[ShortType#InternalType]) } @@ -148,7 +186,7 @@ private[parquet] class CatalystRowConverter( case TimestampType => // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. - new PrimitiveConverter { + new CatalystPrimitiveConverter(updater) { // Converts nanosecond timestamps stored as INT96 override def addBinary(value: Binary): Unit = { assert( @@ -164,13 +202,23 @@ private[parquet] class CatalystRowConverter( } case DateType => - new PrimitiveConverter { + new CatalystPrimitiveConverter(updater) { override def addInt(value: Int): Unit = { // DateType is not specialized in `SpecificMutableRow`, have to box it here. updater.set(value.asInstanceOf[DateType#InternalType]) } } + // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor + // annotated by `LIST` or `MAP` should be interpreted as a required list of required + // elements where the element type is the type of the field. + case t: ArrayType if parquetType.getOriginalType != LIST => + if (parquetType.isPrimitive) { + new RepeatedPrimitiveConverter(parquetType, t.elementType, updater) + } else { + new RepeatedGroupConverter(parquetType, t.elementType, updater) + } + case t: ArrayType => new CatalystArrayConverter(parquetType.asGroupType(), t, updater) @@ -195,27 +243,11 @@ private[parquet] class CatalystRowConverter( } } - /** - * Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types - * are handled by this converter. Parquet primitive types are only a subset of those of Spark - * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet. - */ - private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater) - extends PrimitiveConverter { - - override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) - override def addInt(value: Int): Unit = updater.setInt(value) - override def addLong(value: Long): Unit = updater.setLong(value) - override def addFloat(value: Float): Unit = updater.setFloat(value) - override def addDouble(value: Double): Unit = updater.setDouble(value) - override def addBinary(value: Binary): Unit = updater.set(value.getBytes) - } - /** * Parquet converter for strings. A dictionary is used to minimize string decoding cost. */ private final class CatalystStringConverter(updater: ParentContainerUpdater) - extends PrimitiveConverter { + extends CatalystPrimitiveConverter(updater) { private var expandedDictionary: Array[UTF8String] = null @@ -242,7 +274,7 @@ private[parquet] class CatalystRowConverter( private final class CatalystDecimalConverter( decimalType: DecimalType, updater: ParentContainerUpdater) - extends PrimitiveConverter { + extends CatalystPrimitiveConverter(updater) { // Converts decimals stored as INT32 override def addInt(value: Int): Unit = { @@ -306,7 +338,7 @@ private[parquet] class CatalystRowConverter( parquetSchema: GroupType, catalystSchema: ArrayType, updater: ParentContainerUpdater) - extends GroupConverter { + extends CatalystGroupConverter(updater) { private var currentArray: ArrayBuffer[Any] = _ @@ -383,7 +415,7 @@ private[parquet] class CatalystRowConverter( parquetType: GroupType, catalystType: MapType, updater: ParentContainerUpdater) - extends GroupConverter { + extends CatalystGroupConverter(updater) { private var currentKeys: ArrayBuffer[Any] = _ private var currentValues: ArrayBuffer[Any] = _ @@ -446,4 +478,61 @@ private[parquet] class CatalystRowConverter( } } } + + private trait RepeatedConverter { + private var currentArray: ArrayBuffer[Any] = _ + + protected def newArrayUpdater(updater: ParentContainerUpdater) = new ParentContainerUpdater { + override def start(): Unit = currentArray = ArrayBuffer.empty[Any] + override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray)) + override def set(value: Any): Unit = currentArray += value + } + } + + /** + * A primitive converter for converting unannotated repeated primitive values to required arrays + * of required primitives values. + */ + private final class RepeatedPrimitiveConverter( + parquetType: Type, + catalystType: DataType, + parentUpdater: ParentContainerUpdater) + extends PrimitiveConverter with RepeatedConverter with HasParentContainerUpdater { + + val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater) + + private val elementConverter: PrimitiveConverter = + newConverter(parquetType, catalystType, updater).asPrimitiveConverter() + + override def addBoolean(value: Boolean): Unit = elementConverter.addBoolean(value) + override def addInt(value: Int): Unit = elementConverter.addInt(value) + override def addLong(value: Long): Unit = elementConverter.addLong(value) + override def addFloat(value: Float): Unit = elementConverter.addFloat(value) + override def addDouble(value: Double): Unit = elementConverter.addDouble(value) + override def addBinary(value: Binary): Unit = elementConverter.addBinary(value) + + override def setDictionary(dict: Dictionary): Unit = elementConverter.setDictionary(dict) + override def hasDictionarySupport: Boolean = elementConverter.hasDictionarySupport + override def addValueFromDictionary(id: Int): Unit = elementConverter.addValueFromDictionary(id) + } + + /** + * A group converter for converting unannotated repeated group values to required arrays of + * required struct values. + */ + private final class RepeatedGroupConverter( + parquetType: Type, + catalystType: DataType, + parentUpdater: ParentContainerUpdater) + extends GroupConverter with HasParentContainerUpdater with RepeatedConverter { + + val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater) + + private val elementConverter: GroupConverter = + newConverter(parquetType, catalystType, updater).asGroupConverter() + + override def getConverter(field: Int): Converter = elementConverter.getConverter(field) + override def end(): Unit = elementConverter.end() + override def start(): Unit = elementConverter.start() + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index a3fc74cf7929b..275646e8181ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -100,8 +100,11 @@ private[parquet] class CatalystSchemaConverter( StructField(field.getName, convertField(field), nullable = false) case REPEATED => - throw new AnalysisException( - s"REPEATED not supported outside LIST or MAP. Type: $field") + // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor + // annotated by `LIST` or `MAP` should be interpreted as a required list of required + // elements where the element type is the type of the field. + val arrayType = ArrayType(convertField(field), containsNull = false) + StructField(field.getName, arrayType, nullable = false) } } diff --git a/sql/core/src/test/resources/nested-array-struct.parquet b/sql/core/src/test/resources/nested-array-struct.parquet new file mode 100644 index 0000000000000000000000000000000000000000..41a43fa35d39685e56ba4849a16cba4bb1aa86ae GIT binary patch literal 775 zcmaKr-%G+!6vvO#=8u;i;*JSE3{j~lAyWtuV%0Q3*U(Y)B(q&>u(@?NBZ;2+Px=dc z?6I>s%N6u+cQ4=bIp1@3cBjdsBLbvCDhGte15a`Q8~~)V;d2WY3V@LYX{-@GMj#!6 z`;fvdgDblto20oxMhs#hdKzt*4*3w}iuUE6PW?b*Zs1NAv%1WfvAnT@2NhLn_L#fy zHFWX={q7*H z93@^0*O&w#e53@lJ`hFEV2=wL)V**Pb(8vc%<=-4iJz&t;n22J{%<(t!px$!DZLaV zDaOCYR1UR;Go`F89pTwFrqpgr1NlrDOs+J&f2GO;)PtpmW%OH3neOEccXHoWyO`6Bl5({+ea14&qL7DtETw`(h_426$BxCYApN S1!5siKXe$p;U(Ab7x)6M)yB5~ literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/old-repeated-int.parquet b/sql/core/src/test/resources/old-repeated-int.parquet new file mode 100644 index 0000000000000000000000000000000000000000..520922f73ebb75950c4e65dec6689af817cb7a33 GIT binary patch literal 389 zcmZWmO-sW-5FMkeB_3tN1_Ca@_7p=~Z@EPbSg5juTs)Ocvz0>9#NEw7ivQhdDcI1< z%;U}1booNUUY~PehCwzvumZho_zD!@TVtKoQHivd-PzgS{O4oWwfk)ZsDnB!ByvMUB7gt@Rk50{GMw}6DWn-w!#F0CGZwP` zh81XVct9peJU!6=O6yx`ZywS;EGVOwOOIsCr3p)d)y(vgv`4bce)5D7UiKlH0l6Ga0+m-EijTt zM!X)Rd#pSj~EkCao0il-K=62)db~64ZC7r8d1AwIhzFkoG;e&AbpZW#pJ&{5H literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/old-repeated.parquet b/sql/core/src/test/resources/old-repeated.parquet new file mode 100644 index 0000000000000000000000000000000000000000..213f1a90291b30a8a3161b51c38f008f3ae9f6e5 GIT binary patch literal 432 zcmZWm!D@p*5ZxNF!5+)X3PMGioUA16Ei?y9g$B|h;-#ms>Ld--Xm{5`DgF13A<&42 znSH!BJNtMWhsm50I-@h68VC$(I7}ZALYRJm-NGUo*2p;a%Z@xEJgH{;FE=Sj6^mNc zS-TAqXn-pyRtNP8Qt};84d*60yAuBru{7JUo$1)Y6%&KlJ(Uv6u!JS1zOP)cw zaM$5ewB9699EEB0jJ*18aDDn7N1N4K`fzXlnuJ~V?c^nwk}Yeo3wXox4+#3Y!pMU2 V+-`?%2{TWZ?kYh(G4~k%>JK8=aDe~- literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet b/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet old mode 100755 new mode 100644 diff --git a/sql/core/src/test/resources/proto-repeated-string.parquet b/sql/core/src/test/resources/proto-repeated-string.parquet new file mode 100644 index 0000000000000000000000000000000000000000..8a7eea601d0164b1177ab90efb7116c6e4c900da GIT binary patch literal 411 zcmY*W!D_-l5S^$E62wc{umKMtR4+{f-b!vM4Q)Y6&|G?w#H^aKansF;gi?C#*Yq1Z zv6e;{_C4Mk<_)t^FrN}2UmBK6hDddy19SkO`+9soFOY8;=b|A8A$itAvJoQdBBnKK zKy>)#|`4$W^3YtqL)WN5pTmWh1ZGv$>{f|s#sCG%1VN%LJ&FyD4siH@<(8PDu@ z!?sWEU$)ao`yyr1x2MQ?k}~ewv*0eAE$3kr261?gx~fYY8oxy0auLs;o*#@41L)=X h7Au}q6}>(e6`sLs-{PvZ8BpWYeN#xd)c_*=mLHLcZu|fM literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/proto-repeated-struct.parquet b/sql/core/src/test/resources/proto-repeated-struct.parquet new file mode 100644 index 0000000000000000000000000000000000000000..c29eee35c350e84efa2818febb2528309a8ac3ea GIT binary patch literal 608 zcma))-D<)x6vvP811a8(bSZdI%9Js*tjca=2puciK%r=F1_P}cr%>B2jf^q&4ts<> z%r1PaC9^8^%A4e$li&GFTzg<)z+K#J;DQh(TmnDm&bFDCfsEak0$H6=|yp$CW-$_F@l={DK5j1GEpn8)DX!>A+15G`Fpg} zMZREE-l#~cYPa=r6<4%c3AD?tzx2bP7Sypiu9pGoi(^0p+XD*$Y;s4$HpQOVL*dnD3MsZWzL<}ml5ZY`6p``6p3uzOR6cOQizQ3hI@;TGm z8hy+Rm>Hl!&aiC8oEI4i7`u<#dC`r5%q<5r!9eDg1IFy*c2RU=AalzBO)!wT<$y7e zS0C?=T^uJ)6ePiDIW^oM?BO`}o-pLWHOS&h@*H8x zD56?ZuNu`Fl-0Tj)YHv=x(@J>C=j)+#mj%Z_4kgdp}D_<3b zc(o7;z363$6CCr9}+-E j<-Z;KUL2!lIhl~NDb+dIHUN;6ire!D{E~S&5gg5S?rsql%Icnq5|qgD{N<#x?oCY2!n{ZAEKv64iDOJsH{F!~)4uB-v0( z@BJM;_owufA5^-l4@Z{ekVAVh>zOxi-n`LDMyq>_0q^0x8b74l?^SjJrp%q&06h8-y4iMdSJ@MDH4c~HjX3j($=&sN1 zW|q&!OYxG3d&~^8@dlzhDa$1b0`rz(6tkBD*J153G=T1;gzF$B0g1WSKnPOy6M$~KQ|W5 z5A#DO!$$Zca>TK`;67WBib&?m76{4rqTmNgwH)UCc)*uPhjcg;fc!=Tfl{N?GyS_6 z3+tZPeSOS=k#BjS>(f75Q`2EhwX*hMsK_@Kv&ZT;SydBky3mDR6_J}cL*_TtV}7>H zA+wumr}b9v46coS`}(TY;qmaR$9wg^82X@n)jvIvzps*~J`|Fl Date: Tue, 11 Aug 2015 01:24:14 +0800 Subject: [PATCH 2/3] Updates .rat-excludes --- .rat-excludes | 1 + 1 file changed, 1 insertion(+) diff --git a/.rat-excludes b/.rat-excludes index 72771465846b8..9165872b9fb27 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -94,3 +94,4 @@ INDEX gen-java.* .*avpr org.apache.spark.sql.sources.DataSourceRegister +.*parquet From ace6df766d5799e6fc8409bf50c6c17321a774d0 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 11 Aug 2015 08:02:45 +0800 Subject: [PATCH 3/3] Moves ParquetProtobufCompatibilitySuite --- .../parquet/ParquetProtobufCompatibilitySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename sql/core/src/test/scala/org/apache/spark/sql/{ => execution/datasources}/parquet/ParquetProtobufCompatibilitySuite.scala (96%) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetProtobufCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala similarity index 96% rename from sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetProtobufCompatibilitySuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala index 34db9e57058fc..981334cf771cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetProtobufCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.spark.sql.parquet +package org.apache.spark.sql.execution.datasources.parquet import org.apache.spark.sql.test.TestSQLContext -import org.apache.spark.sql.{Row, DataFrame, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} class ParquetProtobufCompatibilitySuite extends ParquetCompatibilityTest { override def sqlContext: SQLContext = TestSQLContext