Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24626] [SQL] Improve location size calculation in Analyze Table command #21608

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7007901
Init changes
arajagopal17 Jun 21, 2018
82d5ef3
Removing logs.
arajagopal17 Jun 21, 2018
9f2fed0
testing new approach
arajagopal17 Jul 5, 2018
3358bbe
Refactoring.
arajagopal17 Jul 5, 2018
1e1db66
Using bulkListLeafFiles.
arajagopal17 Jul 5, 2018
00014c0
Iter changes
arajagopal17 Jul 6, 2018
429a793
working iteration
Achuth17 Jul 7, 2018
9d3054a
remove unnecessary filter.
Achuth17 Jul 7, 2018
9acab38
nit fixes
Achuth17 Jul 7, 2018
efa5fbd
nit fixes
Achuth17 Jul 7, 2018
f9b382d
nit fixes
Achuth17 Jul 7, 2018
06a275b
nit fix
Achuth17 Jul 8, 2018
49b878b
Rename and using case class.
Achuth17 Jul 9, 2018
c3421ed
Fixing incorrect table size issue.
Achuth17 Jul 11, 2018
9a306a5
making bulkListLeafFiles private[sql]
Achuth17 Jul 11, 2018
bd7a8fe
nit
Achuth17 Jul 11, 2018
deb29de
Moving pathfilter to anonymous class
Achuth17 Jul 11, 2018
98ee81b
Added a simple test to verify the accuracy of table size calculation
Achuth17 Jul 12, 2018
107f4c6
Fixing test to verify parallelism.
Achuth17 Jul 19, 2018
27b68d3
Using isDataPath for metadatafile filter.
Achuth17 Jul 21, 2018
5b46593
Merge branch 'master' into improveAnalyze
Achuth17 Jul 24, 2018
4c405c5
Moving isDataPath to DataSourceUtils.scala
Achuth17 Jul 24, 2018
da5eaba
Nit fixes.
Achuth17 Jul 24, 2018
99a4a0f
Fix test name.
Achuth17 Jul 24, 2018
e03e2ee
Gating changes behind a flag, Making changes to migration doc.
Achuth17 Jul 27, 2018
31afcfb
Merging master.
Achuth17 Jul 27, 2018
15ff68d
merge fixes.
Achuth17 Jul 27, 2018
7a2aff4
Setting feature enabled by default.
Achuth17 Jul 28, 2018
e727b69
Small doc fix.
Achuth17 Jul 28, 2018
ae64b9d
Doc changes and fixing flag name.
Achuth17 Aug 1, 2018
2e582f6
Addressing review comments.
Achuth17 Aug 2, 2018
1803f26
Merge branch 'master' into improveAnalyze
Achuth17 Aug 2, 2018
253af70
Fixing merge conflicts.
Achuth17 Aug 3, 2018
70eddc8
Revert PartitionAwareFileIndex changes and add a generic method to Da…
Achuth17 Aug 7, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -47,7 +47,7 @@ case class AnalyzeColumnCommand(
if (tableMeta.tableType == CatalogTableType.VIEW) {
throw new AnalysisException("ANALYZE TABLE is not supported on views.")
}
val sizeInBytes = CommandUtils.calculateTotalSize(sessionState, tableMeta)
val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta)

// Compute stats for each column
val (rowCount, newColStats) = computeColumnStats(sparkSession, tableIdentWithDB, columnNames)
Expand Down
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.util.SerializableConfiguration

/**
* Analyzes a given set of partitions to generate per-partition statistics, which will be used in
Expand Down
Expand Up @@ -39,7 +39,7 @@ case class AnalyzeTableCommand(
}

// Compute stats for the whole table
val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta)
val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
val newRowCount =
if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))

Expand Down
Expand Up @@ -21,12 +21,13 @@ import java.net.URI

import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.execution.datasources.InMemoryFileIndex
import org.apache.spark.sql.internal.SessionState


Expand All @@ -38,7 +39,7 @@ object CommandUtils extends Logging {
val catalog = sparkSession.sessionState.catalog
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
val newTable = catalog.getTableMetadata(table.identifier)
val newSize = CommandUtils.calculateTotalSize(sparkSession.sessionState, newTable)
val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
val newStats = CatalogStatistics(sizeInBytes = newSize)
catalog.alterTableStats(table.identifier, Some(newStats))
} else {
Expand All @@ -47,15 +48,26 @@ object CommandUtils extends Logging {
}
}

def calculateTotalSize(sessionState: SessionState, catalogTable: CatalogTable): BigInt = {
def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = {

val sessionState = spark.sessionState
val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")

if (catalogTable.partitionColumnNames.isEmpty) {
calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri)
calculateLocationSize(sessionState, catalogTable.identifier,
catalogTable.storage.locationUri)
} else {
// Calculate table size as a sum of the visible partitions. See SPARK-21079
val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
partitions.map { p =>
calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
}.sum
val paths = partitions.map(x => new Path(x.storage.locationUri.get.getPath))
val pathFilter = new PathFilter {
override def accept(path: Path): Boolean = {
!path.getName.startsWith(stagingDir)
}
}
val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(paths,
sessionState.newHadoopConf(), pathFilter, spark).flatMap(x => x._2)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests are passing but this line is incorrect.

@gatorsmile @maropu, PathFilter is not serializable, How do we pass PathFilter in a serializable manner? I checked the code and one other way is to use FileInputFormat.getInputPathFilter/FileInputFormat.setInputPathFilter but I couldn't get it to work.

Also, is it okay if we filter the Seq[(Path, Seq[FileStatus])] returned by bulkListLeafFiles and remove stagingDir files?

Copy link
Contributor Author

@Achuth17 Achuth17 Jul 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above approach might not work too. In the earlier implementation there was a check from recursively listing files from certain directories (stagingDir) and having a pathFilter might not be the right approach.

So I wanted to introduce a list of strings called filterDir as a new parameter to bulkListLeafFiles which can be used to check if a particular directory can be recursed further. Let me know if this approach looks okay.

Copy link
Member

@gatorsmile gatorsmile Jul 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serializable {
  override def accept(path: Path): Boolean = {
    val fileName = path.getName
    (!fileName.startsWith(stagingDir) &&
      // Ignore metadata files starting with "_"
      !fileName.startsWith("_"))
  }
}

Copy link
Contributor Author

@Achuth17 Achuth17 Jul 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I have made the changes. Can you review this?
Also, the bulkListLeafFiles approach seems to improve the performance compared to my earlier implementation and I have updated the results.

fileStatusSeq.map(fileStatus => fileStatus.getLen).sum
}
}

Expand Down
Expand Up @@ -162,7 +162,7 @@ object InMemoryFileIndex extends Logging {
*
* @return for each input path, the set of discovered files for the path
*/
private def bulkListLeafFiles(
def bulkListLeafFiles(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the unnecessary indent. Also, private[sql]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
Expand Down