From 4157771715a235fe5ffad970764b805fd74f45d5 Mon Sep 17 00:00:00 2001 From: Vinitha Gankidi Date: Wed, 1 Nov 2017 13:09:44 -0700 Subject: [PATCH 1/2] [SPARK-22411][SQL] Disable the heuristic to calculate max partition size when dynamic allocation is enabled and use the value specified by the property spark.sql.files.maxPartitionBytes instead --- .../spark/sql/execution/DataSourceScanExec.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index e9f65031143b7..5fcdf8b7b90a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -424,11 +424,18 @@ case class FileSourceScanExec( val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes - val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism - val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum - val bytesPerCore = totalBytes / defaultParallelism - val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + val maxSplitBytes = + if (Utils.isDynamicAllocationEnabled(fsRelation.sparkSession.sparkContext.getConf)) { + defaultMaxSplitBytes + } else { + val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism + val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum + val bytesPerCore = totalBytes / defaultParallelism + val maxPartitionBytes = + Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + maxPartitionBytes + } logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") From d3aff40637da077f4f9cee655d6fb24b0b204a5d Mon Sep 17 00:00:00 2001 From: Vinitha Gankidi Date: Sun, 12 Nov 2017 15:49:43 -0800 Subject: [PATCH 2/2] SPARK-22411 Fixing indentation and adding a comment --- .../spark/sql/execution/DataSourceScanExec.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 5fcdf8b7b90a4..f6e6dc5954e3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -425,16 +425,17 @@ case class FileSourceScanExec( fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes + // Ignore bytesPerCore when dynamic allocation is enabled. See SPARK-22411 val maxSplitBytes = if (Utils.isDynamicAllocationEnabled(fsRelation.sparkSession.sparkContext.getConf)) { - defaultMaxSplitBytes + defaultMaxSplitBytes } else { - val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism - val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum - val bytesPerCore = totalBytes / defaultParallelism - val maxPartitionBytes = - Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) - maxPartitionBytes + val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism + val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum + val bytesPerCore = totalBytes / defaultParallelism + val maxPartitionBytes = + Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + maxPartitionBytes } logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.")