Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23457][SQL] Register task completion listeners first in ParquetFileFormat #20619

Closed
wants to merge 3 commits into from
Closed

Conversation

dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Feb 15, 2018

What changes were proposed in this pull request?

ParquetFileFormat leaks opened files in some cases. This PR prevents that by registering task completion listers first before initialization.

Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
	at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
	at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:538)
	at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400)
	at 

How was this patch tested?

Manual. The following test case generates the same leakage.

  test("SPARK-23457 Register task completion listeners first in ParquetFileFormat") {
    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") {
      withTempDir { dir =>
        val basePath = dir.getCanonicalPath
        Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, "first").toString)
        Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, "second").toString)
        val df = spark.read.parquet(
          new Path(basePath, "first").toString,
          new Path(basePath, "second").toString)
        val e = intercept[SparkException] {
          df.collect()
        }
        assert(e.getCause.isInstanceOf[OutOfMemoryError])
      }
    }
  }

@dongjoon-hyun
Copy link
Member Author

Hi, @cloud-fan and @gatorsmile .
This is the same kind of PR about opened file leakage for ParquetFileFormat. Could you review this?


// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
enableVectorizedReader) {
if (enableVectorizedReader) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to merge this if-statement into the above if-statement?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. It looks possible. I'll update together after getting more reviews. Thanks, @kiszk .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it seems more reasonable to merge this if-else now.

@gatorsmile
Copy link
Member

}

val iter = new RecordReaderIterator(parquetReader)
taskContext.foreach(_.addTaskCompletionListener(_ => iter.close()))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the reported leakage, this is too late.

@SparkQA
Copy link

SparkQA commented Feb 15, 2018

Test build #87482 has finished for PR 20619 at commit 43f809f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Feb 16, 2018

It looks good to me that we move the registrations to the new (earlier) places.

val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
val recordReaderIterator = new RecordReaderIterator(vectorizedReader)
// Register a task completion lister before `initalization`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could new VectorizedParquetRecordReader or new RecordReaderIterator fail?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those constructors didn't look heavy to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@cloud-fan
Copy link
Contributor

can we provide a manual test like the OOM one in your ORC PR?

@dongjoon-hyun
Copy link
Member Author

Yep. I'll try for this, too. @cloud-fan .

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-23390][SQL] Register task completion listerners first in ParquetFileFormat [SPARK-23390][SQL] Register task completion listeners first in ParquetFileFormat Feb 17, 2018
@dongjoon-hyun
Copy link
Member Author

The reproducible test case is added into PR description and the code is updated according to @kiszk and @cloud-fan 's comments.

@cloud-fan
Copy link
Contributor

LGTM

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Feb 17, 2018

Thank you for last-minute review before your vacation. I'm lucky. :)

@gatorsmile
Copy link
Member

He is already on vacation. : )

@SparkQA
Copy link

SparkQA commented Feb 17, 2018

Test build #87516 has finished for PR 20619 at commit e08d06c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Feb 17, 2018

Oh.. It was a review from a vacation.

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@dongjoon-hyun
Copy link
Member Author

The failure is irrelevant to this PR.

org.apache.spark.sql.hive.client.HiveClientSuites.(It is not a test it is a sbt.testing.NestedSuiteSelector)

@SparkQA
Copy link

SparkQA commented Feb 17, 2018

Test build #87518 has finished for PR 20619 at commit e08d06c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

retest this please

@dongjoon-hyun
Copy link
Member Author

Thank you for retriggering, @gatorsmile .

@SparkQA
Copy link

SparkQA commented Feb 17, 2018

Test build #87520 has finished for PR 20619 at commit e08d06c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Feb 17, 2018

Would it be worth to add this JIRA number in a comment as we did for ORC?

@kiszk
Copy link
Member

kiszk commented Feb 17, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Feb 17, 2018

Test build #87523 has finished for PR 20619 at commit e08d06c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Feb 17, 2018

Umm, we still see the following exception in the log ...

Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null
	at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36)
	at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
	at org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.open(RecordReaderUtils.java:173)
	at org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:254)
	at org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:633)
	at org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.initialize(OrcColumnarBatchReader.java:140)
	at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anonfun$buildReaderWithPartitionValues$2.apply(OrcFileFormat.scala:197)
	at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anonfun$buildReaderWithPartitionValues$2.apply(OrcFileFormat.scala:161)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:179)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1834)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1162)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1162)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2063)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2063)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	... 3 more

@dongjoon-hyun
Copy link
Member Author

Yep. @kiszk . @mgaido91 also reports that, so I'm investigating that more.

However, that doesn't mean this approach is not proper. You can see the manual test case example in previous ORC-related PR and this PR. This approach definitely reduces the number of point of failures.

For the remaining issue, I think we may need a different approach in a different code path.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Feb 17, 2018

For the following, I'll create another one.

Would it be worth to add this JIRA number in a comment as we did for ORC?

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-23390][SQL] Register task completion listeners first in ParquetFileFormat [SPARK-23457][SQL] Register task completion listeners first in ParquetFileFormat Feb 17, 2018
@mgaido91
Copy link
Contributor

LGTM

@dongjoon-hyun
Copy link
Member Author

Thank you for review, @mgaido91 .

@kiszk
Copy link
Member

kiszk commented Feb 17, 2018

LGTM with one minor comment

@dongjoon-hyun
Copy link
Member Author

Thank you, @kiszk . I added SPARK-23390 in the PR description.

Would it be worth to add this JIRA number in a comment as we did for ORC?

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@dongjoon-hyun
Copy link
Member Author

Oh, @kiszk . The following meat really comment in the code. Sorry, I misunderstood.

Would it be worth to add this JIRA number in a comment as we did for ORC?

val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion lister before `initialization`.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, SPARK-23457 is added.

@SparkQA
Copy link

SparkQA commented Feb 17, 2018

Test build #87527 has finished for PR 20619 at commit e08d06c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 17, 2018

Test build #87528 has finished for PR 20619 at commit 8bd02d8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

The final failure is irrelevant to this.

org.apache.spark.sql.sources.CreateTableAsSelectSuite.(It is not a test it is a sbt.testing.SuiteSelector)

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Feb 18, 2018

Test build #87533 has finished for PR 20619 at commit 8bd02d8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Feb 18, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Feb 18, 2018

Test build #87534 has finished for PR 20619 at commit 8bd02d8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Feb 18, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Feb 18, 2018

Test build #87535 has finished for PR 20619 at commit 8bd02d8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Feb 18, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Feb 18, 2018

Test build #87537 has finished for PR 20619 at commit 8bd02d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in f5850e7 Feb 20, 2018
@dongjoon-hyun
Copy link
Member Author

Thank you all!

@dongjoon-hyun dongjoon-hyun deleted the SPARK-23390 branch February 20, 2018 06:08
@dongjoon-hyun
Copy link
Member Author

Hi, @cloud-fan .
Since 2.3 is announced, can we have this in branch-2.3 for Apache Spark 2.3.1?

@cloud-fan
Copy link
Contributor

Yea, please go ahead.

@dongjoon-hyun
Copy link
Member Author

Thank you, @cloud-fan !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants