Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27635][SQL] Prevent from splitting too many partitions smaller than row group size in Parquet file format #24527

Closed
wants to merge 1 commit into from

Conversation

LantaoJin
Copy link
Contributor

What changes were proposed in this pull request?

The scenario is submitting multiple jobs concurrently with spark dynamic allocation enabled. The issue happens in determining RDD partition numbers. When there are more available CPU cores, spark will try to split RDD to more pieces. But since the file is stored as parquet format, parquet's row group is actually the basic unit block to read data. Splitting RDD to too many small pieces doesn't make sense.
Jobs will launch too many partitions and never complete.
Screen Shot 2019-05-05 at 5 45 15 PM

Force set the default parallelism to a fixed number (for example 200) could workaround.

How was this patch tested?

Exist UTs

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes // parquet.block.size
case _ =>
FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @LantaoJin . It would be very helpful if you provide a test case for your following claim.

Splitting RDD to too many small pieces doesn't make sense. Jobs will launch too many partitions and never complete.

Copy link
Contributor Author

@LantaoJin LantaoJin May 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be hard to provide a UT. This case only happens in one of our jobs which we enable multi-thread to read from one HDFS folder and write to different target HDFS folders with different filters. With DRA enabled and the job launched near 2000 executors with near 8000 active tasks. When the job runs for a while, the task number of filter/scan stages increases from 200 to over 5000. And we got many below logs:

19/04/29 06:13:48 INFO FileSourceScanExec: Planning scan with bin packing, max size: 129026539 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:13:48 INFO FileSourceScanExec: Planning scan with bin packing, max size: 129026539 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:13:49 INFO FileSourceScanExec: Planning scan with bin packing, max size: 129026539 bytes, open cost is considered as scanning 4194304 bytes.

Changed to

19/04/29 06:15:49 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4474908 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:16:15 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:16:21 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:16:23 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
19/04/29 06:16:23 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.

This issue would gone in four cases:

  1. set "spark.default.parallelism" to a fixed value.
  2. Disable DRA and set num-executors to a low value.
  3. The app can not get too many resources to launch executors
  4. Run jobs one by one instead multi-thread to run.

All of above will prevent app to require too many partitions since less cores:

  override def defaultParallelism(): Int = { //  if not set, more resources, more cores
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
  }

  def maxSplitBytes(
      sparkSession: SparkSession,
      selectedPartitions: Seq[PartitionDirectory]): Long = {
    val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
    val defaultParallelism = sparkSession.sparkContext.defaultParallelism
    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
    val bytesPerCore = totalBytes / defaultParallelism // more cores, less bytesPerCore
    // less bytesPerCore, less maxSplitBytes
    Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
  }

  def splitFiles(
      sparkSession: SparkSession,
      file: FileStatus,
      filePath: Path,
      isSplitable: Boolean,
      maxSplitBytes: Long,
      partitionValues: InternalRow): Seq[PartitionedFile] = {
    if (isSplitable) {
      (0L until file.getLen by maxSplitBytes).map { offset => // less maxSplitBytes, more partitions
        val remaining = file.getLen - offset
        val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
        val hosts = getBlockHosts(getBlockLocations(file), offset, size)
        PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts)
      }
    } else {
      Seq(getPartitionedFile(file, filePath, partitionValues))
    }
  }

FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions)
val maxSplitBytes = relation.fileFormat match {
case _ : ParquetSource =>
fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes // parquet.block.size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be something provided by the ParquetSource?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, ParquetSource here is alias of ParquetFileFormat

import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, please ignore. Currently, FileFormat doesn't provide this. How to split is determined in DataSourceScanExec

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @LantaoJin .

First, I'm not sure about making ParquetSource only logic.

Second, Spark computes like the following. So, if you set openCostInBytes=128MB, you will get what you did in this PR.

min(spark.sql.files.maxPartitionBytes, max(spark.sql.files.openCostInBytes, bytesPerCore))
= min(128MB, max(128MB, bytesPerCore))
= 128MB

Could you try that way? If you agree, I'd like to close this PR and issue as INVALID.

cc @gatorsmile

@LantaoJin
Copy link
Contributor Author

@dongjoon-hyun Yes, I see. Close.

@LantaoJin LantaoJin closed this Jun 10, 2019
@dongjoon-hyun
Copy link
Member

Thank you, @LantaoJin .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants