From e13ce9f85af8a0f87af09e49de04cfda30bf9180 Mon Sep 17 00:00:00 2001 From: Tim Poterba Date: Tue, 16 May 2017 15:29:46 -0400 Subject: [PATCH 1/3] Fix quadratic List indexing in ParquetWriteSupport Fix quadratic List indexing in ParquetWriteSupport. Minimal solution is to convert rootFieldWriters to a WrappedArray, which has O(1) indexing, and restores complexity to linear. --- .../sql/execution/datasources/parquet/ParquetWriteSupport.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 38b0e33937f3c..34667c01bb793 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -90,7 +90,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit } - this.rootFieldWriters = schema.map(_.dataType).map(makeWriter) + this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray val messageType = new ParquetSchemaConverter(configuration).convert(schema) val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava From 4e2d5acf1bb7c6dca5f66264c74dc4514c0000e0 Mon Sep 17 00:00:00 2001 From: tpoterba Date: Wed, 17 May 2017 00:04:31 -0400 Subject: [PATCH 2/3] fixed compiler error --- .../datasources/parquet/ParquetWriteSupport.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 34667c01bb793..ded195c4fae11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -58,7 +58,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit private var schema: StructType = _ // `ValueWriter`s for all fields of the schema - private var rootFieldWriters: Seq[ValueWriter] = _ + private var rootFieldWriters: IndexedSeq[ValueWriter] = _ // The Parquet `RecordConsumer` to which all `InternalRow`s are written private var recordConsumer: RecordConsumer = _ @@ -90,7 +90,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit } - this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray + this.rootFieldWriters = schema.map(_.dataType).map(makeWriter).toArray[ValueWriter] val messageType = new ParquetSchemaConverter(configuration).convert(schema) val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava @@ -116,7 +116,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit } private def writeFields( - row: InternalRow, schema: StructType, fieldWriters: Seq[ValueWriter]): Unit = { + row: InternalRow, schema: StructType, fieldWriters: IndexedSeq[ValueWriter]): Unit = { var i = 0 while (i < row.numFields) { if (!row.isNullAt(i)) { @@ -192,7 +192,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit makeDecimalWriter(precision, scale) case t: StructType => - val fieldWriters = t.map(_.dataType).map(makeWriter) + val fieldWriters = t.map(_.dataType).map(makeWriter).toArray[ValueWriter] (row: SpecializedGetters, ordinal: Int) => consumeGroup { writeFields(row.getStruct(ordinal, t.length), t, fieldWriters) From 72c5487d39424fe82c1c6030246ca891355b176f Mon Sep 17 00:00:00 2001 From: tpoterba Date: Wed, 17 May 2017 11:37:55 -0400 Subject: [PATCH 3/3] addressed comments --- .../execution/datasources/parquet/ParquetWriteSupport.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index ded195c4fae11..63a8666f0d774 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -58,7 +58,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit private var schema: StructType = _ // `ValueWriter`s for all fields of the schema - private var rootFieldWriters: IndexedSeq[ValueWriter] = _ + private var rootFieldWriters: Array[ValueWriter] = _ // The Parquet `RecordConsumer` to which all `InternalRow`s are written private var recordConsumer: RecordConsumer = _ @@ -116,7 +116,7 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit } private def writeFields( - row: InternalRow, schema: StructType, fieldWriters: IndexedSeq[ValueWriter]): Unit = { + row: InternalRow, schema: StructType, fieldWriters: Array[ValueWriter]): Unit = { var i = 0 while (i < row.numFields) { if (!row.isNullAt(i)) {