From 9fa8c8f1f3d2e519541ae988e009116615375b34 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 29 Jul 2016 13:46:33 -0700 Subject: [PATCH 1/7] start refactoring to builder --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index fc9ce6bb3041b..ca5212a48644e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -326,7 +326,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { """.stripMargin) val writeSupport = new TestGroupWriteSupport(schema) - val writer = new ParquetWriter[Group](path, writeSupport) + object ParquetWriterBuilder extends ParquetWriter.Builder[Group, Builder[Group]] { + @Override final val file = path + @Override final val writeSupport = writeSupport + } + val writer = ParquetWriterBuilder.build() (0 until 10).foreach { i => val record = new SimpleGroup(schema) From 1ba76430eaf64804fe983ed50d967dbc58c5f8da Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 29 Jul 2016 23:04:52 -0700 Subject: [PATCH 2/7] Use the builder --- .../datasources/parquet/ParquetIOSuite.scala | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index ca5212a48644e..4e14de99e061e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -325,12 +325,20 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { |} """.stripMargin) - val writeSupport = new TestGroupWriteSupport(schema) - object ParquetWriterBuilder extends ParquetWriter.Builder[Group, Builder[Group]] { - @Override final val file = path - @Override final val writeSupport = writeSupport + val testWriteSupport = new TestGroupWriteSupport(schema) + case class ParquetWriterBuilder() extends + ParquetWriter.Builder[Group, ParquetWriterBuilder](path) { + final val writeSupport = testWriteSupport + @Override def getWriteSupport(conf: org.apache.hadoop.conf.Configuration) = { + writeSupport + } + + @Override def self() = { + this + } } - val writer = ParquetWriterBuilder.build() + + val writer = new ParquetWriterBuilder().build() (0 until 10).foreach { i => val record = new SimpleGroup(schema) From e10c147db6fa9250bbaf16faee944dd22d741ada Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 30 Jul 2016 00:01:31 -0700 Subject: [PATCH 3/7] Cleanup some other deprecated parquet writer constructors --- .../parquet/ParquetAvroCompatibilitySuite.scala | 5 +++-- .../parquet/ParquetCompatibilityTest.scala | 15 +++++++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala index 6509e04e85167..1b99fbedca047 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala @@ -27,6 +27,7 @@ import org.apache.avro.Schema import org.apache.avro.generic.IndexedRecord import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter +import org.apache.parquet.hadoop.ParquetWriter import org.apache.spark.sql.Row import org.apache.spark.sql.execution.datasources.parquet.test.avro._ @@ -35,14 +36,14 @@ import org.apache.spark.sql.test.SharedSQLContext class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with SharedSQLContext { private def withWriter[T <: IndexedRecord] (path: String, schema: Schema) - (f: AvroParquetWriter[T] => Unit): Unit = { + (f: ParquetWriter[T] => Unit): Unit = { logInfo( s"""Writing Avro records with the following Avro schema into Parquet file: | |${schema.toString(true)} """.stripMargin) - val writer = new AvroParquetWriter[T](new Path(path), schema) + val writer = AvroParquetWriter.builder[T](new Path(path)).withSchema(schema).build() try f(writer) finally writer.close() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala index 57cd70e1911c3..e8b12973fa619 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala @@ -119,8 +119,19 @@ private[sql] object ParquetCompatibilityTest { metadata: Map[String, String], recordWriters: (RecordConsumer => Unit)*): Unit = { val messageType = MessageTypeParser.parseMessageType(schema) - val writeSupport = new DirectWriteSupport(messageType, metadata) - val parquetWriter = new ParquetWriter[RecordConsumer => Unit](new Path(path), writeSupport) + val testWriteSupport = new DirectWriteSupport(messageType, metadata) + case class ParquetWriterBuilder() extends + ParquetWriter.Builder[RecordConsumer => Unit, ParquetWriterBuilder](new Path(path)) { + final val writeSupport = testWriteSupport + @Override def getWriteSupport(conf: org.apache.hadoop.conf.Configuration) = { + writeSupport + } + + @Override def self() = { + this + } + } + val parquetWriter = new ParquetWriterBuilder().build() try recordWriters.foreach(parquetWriter.write) finally parquetWriter.close() } } From 5e92e6f59f959772fdc276b40939d4830a70512f Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sat, 30 Jul 2016 19:18:20 -0700 Subject: [PATCH 4/7] Simplify a little bit (remove final val writeSupport from inner classes since we just expose getWriteSupport) --- .../datasources/parquet/ParquetCompatibilityTest.scala | 3 +-- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala index e8b12973fa619..39e146b297586 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala @@ -122,9 +122,8 @@ private[sql] object ParquetCompatibilityTest { val testWriteSupport = new DirectWriteSupport(messageType, metadata) case class ParquetWriterBuilder() extends ParquetWriter.Builder[RecordConsumer => Unit, ParquetWriterBuilder](new Path(path)) { - final val writeSupport = testWriteSupport @Override def getWriteSupport(conf: org.apache.hadoop.conf.Configuration) = { - writeSupport + testWriteSupport } @Override def self() = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 4e14de99e061e..aac47ac6b82cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -328,9 +328,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { val testWriteSupport = new TestGroupWriteSupport(schema) case class ParquetWriterBuilder() extends ParquetWriter.Builder[Group, ParquetWriterBuilder](path) { - final val writeSupport = testWriteSupport @Override def getWriteSupport(conf: org.apache.hadoop.conf.Configuration) = { - writeSupport + testWriteSupport } @Override def self() = { From 6f35d6b4bac25f8d45f01c13b96f7bdcc5098f38 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Sun, 31 Jul 2016 18:26:23 -0700 Subject: [PATCH 5/7] These don't need to be case classes - switch to regular --- .../datasources/parquet/ParquetCompatibilityTest.scala | 2 +- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala index e8b12973fa619..d83e0d588f9fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala @@ -120,7 +120,7 @@ private[sql] object ParquetCompatibilityTest { recordWriters: (RecordConsumer => Unit)*): Unit = { val messageType = MessageTypeParser.parseMessageType(schema) val testWriteSupport = new DirectWriteSupport(messageType, metadata) - case class ParquetWriterBuilder() extends + class ParquetWriterBuilder() extends ParquetWriter.Builder[RecordConsumer => Unit, ParquetWriterBuilder](new Path(path)) { final val writeSupport = testWriteSupport @Override def getWriteSupport(conf: org.apache.hadoop.conf.Configuration) = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 4e14de99e061e..a2ad6f84f9437 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -326,7 +326,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { """.stripMargin) val testWriteSupport = new TestGroupWriteSupport(schema) - case class ParquetWriterBuilder() extends + class ParquetWriterBuilder() extends ParquetWriter.Builder[Group, ParquetWriterBuilder](path) { final val writeSupport = testWriteSupport @Override def getWriteSupport(conf: org.apache.hadoop.conf.Configuration) = { From 38749d99125ad22df06563db3f3559b9193b38e2 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 3 Aug 2016 12:13:46 -0700 Subject: [PATCH 6/7] Switch to correct override and add a comment explaining why we are using the builder pattern --- .../parquet/ParquetCompatibilityTest.scala | 14 +++++++------- .../datasources/parquet/ParquetIOSuite.scala | 13 +++++++------ 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala index 2486504ecbd60..a43a856d16ac7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala @@ -120,15 +120,15 @@ private[sql] object ParquetCompatibilityTest { recordWriters: (RecordConsumer => Unit)*): Unit = { val messageType = MessageTypeParser.parseMessageType(schema) val testWriteSupport = new DirectWriteSupport(messageType, metadata) + /** + * Provide a builder for constructing a parquet writer - after PARQUET-248 directly constructing + * the writer is deprecated and should be done through a builder. The default builders include + * Avro - but for raw Parquet writing we must create our own builder. + */ class ParquetWriterBuilder() extends ParquetWriter.Builder[RecordConsumer => Unit, ParquetWriterBuilder](new Path(path)) { - @Override def getWriteSupport(conf: org.apache.hadoop.conf.Configuration) = { - testWriteSupport - } - - @Override def self() = { - this - } + override def getWriteSupport(conf: Configuration) = testWriteSupport + override def self() = this } val parquetWriter = new ParquetWriterBuilder().build() try recordWriters.foreach(parquetWriter.write) finally parquetWriter.close() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index cb8e2343c1bff..d71e29a241fa7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -326,15 +326,16 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { """.stripMargin) val testWriteSupport = new TestGroupWriteSupport(schema) + /** + * Provide a builder for constructing a parquet writer - after PARQUET-248 directly + * constructing the writer is deprecated and should be done through a builder. The default + * builders include Avro - but for raw Parquet writing we must create our own builder. + */ class ParquetWriterBuilder() extends ParquetWriter.Builder[Group, ParquetWriterBuilder](path) { - @Override def getWriteSupport(conf: org.apache.hadoop.conf.Configuration) = { - testWriteSupport - } + override def getWriteSupport(conf: org.apache.hadoop.conf.Configuration) = testWriteSupport - @Override def self() = { - this - } + override def self() = this } val writer = new ParquetWriterBuilder().build() From 1a37b8ffc518dff94117e6129660c1c585412518 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 3 Aug 2016 12:30:49 -0700 Subject: [PATCH 7/7] already import configuration in this file - use short name --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index d71e29a241fa7..0f74094699abd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -333,7 +333,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { */ class ParquetWriterBuilder() extends ParquetWriter.Builder[Group, ParquetWriterBuilder](path) { - override def getWriteSupport(conf: org.apache.hadoop.conf.Configuration) = testWriteSupport + override def getWriteSupport(conf: Configuration) = testWriteSupport override def self() = this }