From 43f809fd2ff619c901e05bc062ab70aa65371a46 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 15 Feb 2018 08:55:43 -0800 Subject: [PATCH 1/3] [SPARK-23390][SQL] Register task completion listerners first in ParquetFileFormat --- .../datasources/parquet/ParquetFileFormat.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index ba69f9a26c968..84e56c2830709 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -395,16 +395,19 @@ class ParquetFileFormat ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } val taskContext = Option(TaskContext.get()) - val parquetReader = if (enableVectorizedReader) { + val iter = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) + val recordReaderIterator = new RecordReaderIterator(vectorizedReader) + // Register a task completion lister before `initalization`. + taskContext.foreach(_.addTaskCompletionListener(_ => recordReaderIterator.close())) vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { vectorizedReader.enableReturningBatches() } - vectorizedReader + recordReaderIterator } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns UnsafeRow @@ -414,16 +417,16 @@ class ParquetFileFormat } else { new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz)) } + val recordReaderIterator = new RecordReaderIterator(reader) + // Register a task completion lister before `initalization`. + taskContext.foreach(_.addTaskCompletionListener(_ => recordReaderIterator.close())) reader.initialize(split, hadoopAttemptContext) - reader + recordReaderIterator } - val iter = new RecordReaderIterator(parquetReader) - taskContext.foreach(_.addTaskCompletionListener(_ => iter.close())) // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] && - enableVectorizedReader) { + if (enableVectorizedReader) { iter.asInstanceOf[Iterator[InternalRow]] } else { val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes From e08d06c0e6c0cf23178d12baaa5eb00d55f9b456 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 16 Feb 2018 19:17:48 -0800 Subject: [PATCH 2/3] Address comments --- .../parquet/ParquetFileFormat.scala | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 84e56c2830709..d37a942ebc82b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -395,19 +395,21 @@ class ParquetFileFormat ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } val taskContext = Option(TaskContext.get()) - val iter = if (enableVectorizedReader) { + if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) - val recordReaderIterator = new RecordReaderIterator(vectorizedReader) - // Register a task completion lister before `initalization`. - taskContext.foreach(_.addTaskCompletionListener(_ => recordReaderIterator.close())) + val iter = new RecordReaderIterator(vectorizedReader) + // Register a task completion lister before `initialization`. + taskContext.foreach(_.addTaskCompletionListener(_ => iter.close())) vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { vectorizedReader.enableReturningBatches() } - recordReaderIterator + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns UnsafeRow @@ -417,18 +419,11 @@ class ParquetFileFormat } else { new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz)) } - val recordReaderIterator = new RecordReaderIterator(reader) - // Register a task completion lister before `initalization`. - taskContext.foreach(_.addTaskCompletionListener(_ => recordReaderIterator.close())) + val iter = new RecordReaderIterator(reader) + // Register a task completion lister before `initialization`. + taskContext.foreach(_.addTaskCompletionListener(_ => iter.close())) reader.initialize(split, hadoopAttemptContext) - recordReaderIterator - } - - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - if (enableVectorizedReader) { - iter.asInstanceOf[Iterator[InternalRow]] - } else { val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes val joinedRow = new JoinedRow() val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) From 8bd02d8692708ab58e31e19a3682af3a94550369 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 17 Feb 2018 10:38:35 -0800 Subject: [PATCH 3/3] Address comment. --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index d37a942ebc82b..476bd02374364 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -399,7 +399,7 @@ class ParquetFileFormat val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) val iter = new RecordReaderIterator(vectorizedReader) - // Register a task completion lister before `initialization`. + // SPARK-23457 Register a task completion lister before `initialization`. taskContext.foreach(_.addTaskCompletionListener(_ => iter.close())) vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") @@ -420,7 +420,7 @@ class ParquetFileFormat new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz)) } val iter = new RecordReaderIterator(reader) - // Register a task completion lister before `initialization`. + // SPARK-23457 Register a task completion lister before `initialization`. taskContext.foreach(_.addTaskCompletionListener(_ => iter.close())) reader.initialize(split, hadoopAttemptContext)