Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24626] [SQL] Improve location size calculation in Analyze Table command #21608

Closed
wants to merge 34 commits into from

Conversation

Projects
None yet
6 participants
@Achuth17
Copy link
Contributor

commented Jun 21, 2018

What changes were proposed in this pull request?

Currently, Analyze table calculates table size sequentially for each partition. We can parallelize size calculations over partitions.

Results : Tested on a table with 100 partitions and data stored in S3.
With changes :

  • 10.429s
  • 10.557s
  • 10.439s
  • 9.893s


Without changes :

  • 110.034s
  • 99.510s
  • 100.743s
  • 99.106s

How was this patch tested?

Simple unit test.

@maropu

This comment has been minimized.

Copy link
Member

commented Jun 22, 2018

This pr improves actual performance values? (My question is that the calculation is a bottleneck?)

@Achuth17

This comment has been minimized.

Copy link
Contributor Author

commented Jun 22, 2018

Yes, In the case where the data is stored in S3 I noticed a significant difference.

Some rough numbers - When done serially for a table in S3 with 100 partitions, the calculateTotalSize method took about 90 seconds vs 30-40 seconds when done in parallel.

@maropu

This comment has been minimized.

Copy link
Member

commented Jun 22, 2018

ok, can you put the result in the description? Also, can you make the title more precise? e.g., Parallelize size computation in ANALYZE command

@maropu

This comment has been minimized.

Copy link
Member

commented Jun 22, 2018

@Achuth17 Achuth17 changed the title [SPARK-24626] [SQL] Improve Analyze Table command [SPARK-24626] [SQL] Improve location size calculation in Analyze Table command Jun 22, 2018

@Achuth17

This comment has been minimized.

Copy link
Contributor Author

commented Jul 4, 2018

@maropu I have made the changes. What are the next steps?

@gatorsmile

This comment has been minimized.

Copy link
Member

commented Jul 4, 2018

ok to test

}.sum
val numParallelism = Math.min(partitions.size,
Math.min(spark.sparkContext.defaultParallelism, 10000))
spark.sparkContext.parallelize(partitions, numParallelism).mapPartitions {

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Jul 4, 2018

Member

The direction is right, but the implementation needs to be improved by using InMemoryFileIndex.bulkListLeafFiles

@SparkQA

This comment has been minimized.

Copy link

commented Jul 5, 2018

Test build #92628 has finished for PR 21608 at commit 82d5ef3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Jul 6, 2018

Test build #92655 has finished for PR 21608 at commit 1e1db66.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
}
}
val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths,
sessionState.newHadoopConf(), pathFilter, spark).flatMap(x => x._2)

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Jul 6, 2018

Author Contributor

The tests are passing but this line is incorrect.

@gatorsmile @maropu, PathFilter is not serializable, How do we pass PathFilter in a serializable manner? I checked the code and one other way is to use FileInputFormat.getInputPathFilter/FileInputFormat.setInputPathFilter but I couldn't get it to work.

Also, is it okay if we filter the Seq[(Path, Seq[FileStatus])] returned by bulkListLeafFiles and remove stagingDir files?

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Jul 6, 2018

Author Contributor

The above approach might not work too. In the earlier implementation there was a check from recursively listing files from certain directories (stagingDir) and having a pathFilter might not be the right approach.

So I wanted to introduce a list of strings called filterDir as a new parameter to bulkListLeafFiles which can be used to check if a particular directory can be recursed further. Let me know if this approach looks okay.

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Jul 6, 2018

Member
class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serializable {
  override def accept(path: Path): Boolean = {
    val fileName = path.getName
    (!fileName.startsWith(stagingDir) &&
      // Ignore metadata files starting with "_"
      !fileName.startsWith("_"))
  }
}

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Jul 7, 2018

Author Contributor

Thank you, I have made the changes. Can you review this?
Also, the bulkListLeafFiles approach seems to improve the performance compared to my earlier implementation and I have updated the results.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 8, 2018

Test build #92713 has finished for PR 21608 at commit f9b382d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Jul 8, 2018

Test build #92715 has finished for PR 21608 at commit 06a275b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Jul 9, 2018

Test build #92768 has finished for PR 21608 at commit 49b878b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
}.sum
val paths = partitions.map(x => new Path(x.storage.locationUri.get))
val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")

This comment has been minimized.

Copy link
@maropu

maropu Jul 11, 2018

Member

Since SerializablePathFilter is only used here, how about defining it as an anonymous class?

      val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
      val pathFilter = new PathFilter with Serializable {
        override def accept(path: Path): Boolean = ...
      }
      val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths,
        sessionState.newHadoopConf(), pathFilter,
        spark).flatMap(_._2)

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Jul 11, 2018

Author Contributor

Done.

@@ -162,7 +162,7 @@ object InMemoryFileIndex extends Logging {
*
* @return for each input path, the set of discovered files for the path
*/
private def bulkListLeafFiles(
def bulkListLeafFiles(

This comment has been minimized.

Copy link
@maropu

maropu Jul 11, 2018

Member

Remove the unnecessary indent. Also, private[sql]?

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Jul 11, 2018

Author Contributor

Done.

@SparkQA

This comment has been minimized.

Copy link

commented Jul 28, 2018

Test build #93710 has finished for PR 21608 at commit 7a2aff4.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Jul 28, 2018

Test build #93731 has finished for PR 21608 at commit e727b69.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@Achuth17

This comment has been minimized.

Copy link
Contributor Author

commented Aug 1, 2018

@gatorsmile, I have made the changes.

@@ -1449,6 +1449,13 @@ object SQLConf {
.intConf
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
.createWithDefault(Deflater.DEFAULT_COMPRESSION)

val COMPUTE_STATS_LIST_FILES_IN_PARALLEL =
buildConf("spark.sql.execution.computeStatsListFilesInParallel")

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Aug 1, 2018

Member

How about spark.sql.parallelFileListingInCommands.enabled?

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Aug 1, 2018

Author Contributor

Done.

val COMPUTE_STATS_LIST_FILES_IN_PARALLEL =
buildConf("spark.sql.execution.computeStatsListFilesInParallel")
.internal()
.doc("If True, File listing for compute statistics is done in parallel.")

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Aug 1, 2018

Member

When true, SQL commands use parallel file listing, as opposed to single thread listing. This usually speeds up commands that need to list many directories.

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Aug 1, 2018

Author Contributor

Thanks! I have made this change,

@@ -78,7 +93,8 @@ object CommandUtils extends Logging {
val size = if (fileStatus.isDirectory) {
fs.listStatus(path)
.map { status =>
if (!status.getPath.getName.startsWith(stagingDir)) {
if (!status.getPath.getName.startsWith(stagingDir) &&
DataSourceUtils.isDataPath(path)) {

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Aug 1, 2018

Member

This is also a behavior change. Could you document it too?

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Aug 1, 2018

Author Contributor

Added a line to migration doc.

@Achuth17

This comment has been minimized.

Copy link
Contributor Author

commented Aug 1, 2018

@gatorsmile, I have addressed the comments. Any other fix required?

@@ -1872,6 +1872,8 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files.
- Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
- In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.
- Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.execution.computeStatsListFilesInParallel` to `False`.

This comment has been minimized.

Copy link
@maropu

maropu Aug 2, 2018

Member

nit: File listing for compute statistics in the ANALYZE command...?

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Aug 2, 2018

Author Contributor

Same as below.

@@ -1872,6 +1872,8 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- In version 2.3 and earlier, Spark converts Parquet Hive tables by default but ignores table properties like `TBLPROPERTIES (parquet.compression 'NONE')`. This happens for ORC Hive table properties like `TBLPROPERTIES (orc.compress 'NONE')` in case of `spark.sql.hive.convertMetastoreOrc=true`, too. Since Spark 2.4, Spark respects Parquet/ORC specific table properties while converting Parquet/ORC Hive tables. As an example, `CREATE TABLE t(id int) STORED AS PARQUET TBLPROPERTIES (parquet.compression 'NONE')` would generate Snappy parquet files during insertion in Spark 2.3, and in Spark 2.4, the result would be uncompressed parquet files.
- Since Spark 2.0, Spark converts Parquet Hive tables by default for better performance. Since Spark 2.4, Spark converts ORC Hive tables by default, too. It means Spark uses its own ORC support by default instead of Hive SerDe. As an example, `CREATE TABLE t(id int) STORED AS ORC` would be handled with Hive SerDe in Spark 2.3, and in Spark 2.4, it would be converted into Spark's ORC data source table and ORC vectorization would be applied. To set `false` to `spark.sql.hive.convertMetastoreOrc` restores the previous behavior.
- In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.
- Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.execution.computeStatsListFilesInParallel` to `False`.
- Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation.

This comment has been minimized.

Copy link
@maropu

maropu Aug 2, 2018

Member

nit: ... during Statistics computation in the ANALYZE command.?

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Aug 2, 2018

Author Contributor

@maropu The calculateTotalSize flow is invoked in other places too. For eg, it is used in updateTableStats method which is called from a few places.

This comment has been minimized.

Copy link
@maropu

maropu Aug 2, 2018

Member

ah, ok.

@SparkQA

This comment has been minimized.

Copy link

commented Aug 2, 2018

Test build #93905 has finished for PR 21608 at commit ae64b9d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@@ -1449,6 +1449,15 @@ object SQLConf {
.intConf
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
.createWithDefault(Deflater.DEFAULT_COMPRESSION)

val PARALLEL_FILE_LISTING_IN_COMMMANDS =
buildConf("spark.sql.parallelFileListingInCommands.enabled")

This comment has been minimized.

Copy link
@maropu

maropu Aug 2, 2018

Member

Currently, in the master, this listing happens only in the SQL commands, so this name fit the case now. But, in a future, if we probably add a new interface for the listing (e.g., SparkSession.analyzeTable, SparkSession.analyzeColumn, ...), this name a little confuses users? So, how about spark.sql.parallelFileListingInStatsComputation.enabledl? @gatorsmile

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Aug 2, 2018

Author Contributor

I can make this change..

This comment has been minimized.

Copy link
@maropu

maropu Aug 2, 2018

Member

It's ok to follow a xiao's decision.

@SparkQA

This comment has been minimized.

Copy link

commented Aug 2, 2018

Test build #93944 has finished for PR 21608 at commit 2e582f6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Aug 2, 2018

Test build #94043 has finished for PR 21608 at commit 1803f26.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA

This comment has been minimized.

Copy link

commented Aug 3, 2018

Test build #94167 has finished for PR 21608 at commit 253af70.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@Achuth17

This comment has been minimized.

Copy link
Contributor Author

commented Aug 3, 2018

@maropu @gatorsmile The tests are failing in places unrelated to my changes. How can I resolve this?

@Achuth17

This comment has been minimized.

Copy link
Contributor Author

commented Aug 3, 2018

retest this please.

@maropu

This comment has been minimized.

Copy link
Member

commented Aug 6, 2018

retest this please

@SparkQA

This comment has been minimized.

Copy link

commented Aug 6, 2018

Test build #94259 has finished for PR 21608 at commit 253af70.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@Achuth17

This comment has been minimized.

Copy link
Contributor Author

commented Aug 6, 2018

@maropu @gatorsmile, Any other change required?

// counted as data files, so that they shouldn't participate partition discovery.
private[sql] def isDataPath(path: Path): Boolean = {
val name = path.getName
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Aug 7, 2018

Member

Let us do not use the same one with PartitioningAwareFileIndex.scala. In this case, the data file is not related to the following condition name.contains("=")

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Aug 7, 2018

Author Contributor

Should I use the earlier implementation with a simple if condition? And revert the changes made to ParitioningAwareFileIndex.scala and DataSourceUtils.scala.

This comment has been minimized.

Copy link
@gatorsmile

gatorsmile Aug 7, 2018

Member

Not sure what is your earlier impl. I would prefer to keeping unchanged the original code in PartitioningAwareFileIndex.scala. Just add a utility function isDataPath in CommandUtils.scala. Does this sound good to you?

This comment has been minimized.

Copy link
@Achuth17

Achuth17 Aug 8, 2018

Author Contributor

Sounds good. I have made the changes.

@SparkQA

This comment has been minimized.

Copy link

commented Aug 8, 2018

Test build #94397 has finished for PR 21608 at commit 70eddc8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@gatorsmile

This comment has been minimized.

Copy link
Member

commented Aug 9, 2018

Thanks! Merged to master.

@asfgit asfgit closed this in d365397 Aug 9, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.