From de591a34da1e241b56d50bee314dfa4d1ffa5170 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Thu, 12 May 2016 22:25:04 +0900 Subject: [PATCH 1/5] Set the number of partitions for reading parquet schema --- .../execution/datasources/parquet/ParquetFileFormat.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index ff7962df22452..ca4d52e1defb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -795,11 +795,16 @@ private[sql] object ParquetFileFormat extends Logging { // side, and resemble fake `FileStatus`es there. val partialFileStatusInfo = filesToTouch.map(f => (f.getPath.toString, f.getLen)) + // Set the number of partitions to prevent following schema reads from generating many tasks + // in case of a small number of parquet files. + val numParallelism = Math.min(partialFileStatusInfo.size, + sparkSession.sparkContext.defaultParallelism) + // Issues a Spark job to read Parquet schema in parallel. val partiallyMergedSchemas = sparkSession .sparkContext - .parallelize(partialFileStatusInfo) + .parallelize(partialFileStatusInfo, numParallelism) .mapPartitions { iterator => // Resembles fake `FileStatus`es with serialized path and length information. val fakeFileStatuses = iterator.map { case (path, length) => From 8ee44067efdf64fda0189bc1f207acc3dd167e85 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 17 May 2016 08:41:34 +0900 Subject: [PATCH 2/5] Fix a bug --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index ca4d52e1defb3..9d9cd06c70ed2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -797,7 +797,7 @@ private[sql] object ParquetFileFormat extends Logging { // Set the number of partitions to prevent following schema reads from generating many tasks // in case of a small number of parquet files. - val numParallelism = Math.min(partialFileStatusInfo.size, + val numParallelism = Math.min(partialFileStatusInfo.size + 1, sparkSession.sparkContext.defaultParallelism) // Issues a Spark job to read Parquet schema in parallel. From 4f3ee3cccba78911530767feef99a07794428b73 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Sat, 11 Jun 2016 13:05:13 +0900 Subject: [PATCH 3/5] Update how to decide numParallelism --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 9d9cd06c70ed2..b34a1979ddf00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -797,8 +797,7 @@ private[sql] object ParquetFileFormat extends Logging { // Set the number of partitions to prevent following schema reads from generating many tasks // in case of a small number of parquet files. - val numParallelism = Math.min(partialFileStatusInfo.size + 1, - sparkSession.sparkContext.defaultParallelism) + val numParallelism = Math.min(partialFileStatusInfo.size + 1, 10000) // Issues a Spark job to read Parquet schema in parallel. val partiallyMergedSchemas = From 4ebd7d154bbe793dde98cecceceaf77938b2b617 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Tue, 14 Jun 2016 22:41:06 +0900 Subject: [PATCH 4/5] Update --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index b34a1979ddf00..9d9cd06c70ed2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -797,7 +797,8 @@ private[sql] object ParquetFileFormat extends Logging { // Set the number of partitions to prevent following schema reads from generating many tasks // in case of a small number of parquet files. - val numParallelism = Math.min(partialFileStatusInfo.size + 1, 10000) + val numParallelism = Math.min(partialFileStatusInfo.size + 1, + sparkSession.sparkContext.defaultParallelism) // Issues a Spark job to read Parquet schema in parallel. val partiallyMergedSchemas = From 7165a2b35cf904a28bf1cc6f3feca906e644e09d Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 15 Jun 2016 01:13:09 +0900 Subject: [PATCH 5/5] Fix a bug of potential overflows --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 9d9cd06c70ed2..5f5bc24cebf56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -797,7 +797,7 @@ private[sql] object ParquetFileFormat extends Logging { // Set the number of partitions to prevent following schema reads from generating many tasks // in case of a small number of parquet files. - val numParallelism = Math.min(partialFileStatusInfo.size + 1, + val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1), sparkSession.sparkContext.defaultParallelism) // Issues a Spark job to read Parquet schema in parallel.