Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2155,6 +2155,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val ADD_PARTITION_BATCH_SIZE =
buildConf("spark.sql.addPartitionInBatch.size")
.internal()
.doc("The number of partitions to be handled in one turn when use " +
"`AlterTableAddPartitionCommand` to add partitions into table. The smaller " +
"batch size is, the less memory is required for the real handler, e.g. Hive Metastore.")
.intConf
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jan 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this become a variable from now, we need to add .checkValue( to prevent accidental user mistakes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, > 0? Do you think we have a reasonable upper bound for this, too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lower bound checking might be enough, done in ac8b771.

.createWithDefault(100)

/**
* Holds information about keys that have been deprecated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ case class AnalyzePartitionCommand(
// Update the metastore if newly computed statistics are different from those
// recorded in the metastore.
val newPartitions = partitions.flatMap { p =>
val newTotalSize = CommandUtils.calculateLocationSize(
val newTotalSize = CommandUtils.calculateSingleLocationSize(
sessionState, tableMeta.identifier, p.storage.locationUri)
val newRowCount = rowCounts.get(p.spec)
val newStats = CommandUtils.compareAndGetNewStats(p.stats, newTotalSize, newRowCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFile
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.types._

/**
* For the purpose of calculating total directory sizes, use this filter to
* ignore some irrelevant files.
* @param stagingDir hive staging dir
*/
class PathFilterIgnoreNonData(stagingDir: String) extends PathFilter with Serializable {
override def accept(path: Path): Boolean = {
val fileName = path.getName
!fileName.startsWith(stagingDir) && DataSourceUtils.isDataFile(fileName)
}
}

object CommandUtils extends Logging {

Expand All @@ -60,32 +71,21 @@ object CommandUtils extends Logging {
val sessionState = spark.sessionState
val startTime = System.nanoTime()
val totalSize = if (catalogTable.partitionColumnNames.isEmpty) {
calculateLocationSize(sessionState, catalogTable.identifier, catalogTable.storage.locationUri)
calculateSingleLocationSize(sessionState, catalogTable.identifier,
catalogTable.storage.locationUri)
} else {
// Calculate table size as a sum of the visible partitions. See SPARK-21079
val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
logInfo(s"Starting to calculate sizes for ${partitions.length} partitions.")
if (spark.sessionState.conf.parallelFileListingInStatsComputation) {
val paths = partitions.map(x => new Path(x.storage.locationUri.get))
val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
val pathFilter = new PathFilter with Serializable {
override def accept(path: Path): Boolean = isDataPath(path, stagingDir)
}
val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
paths, sessionState.newHadoopConf(), pathFilter, spark, areRootPaths = true)
fileStatusSeq.flatMap(_._2.map(_.getLen)).sum
} else {
partitions.map { p =>
calculateLocationSize(sessionState, catalogTable.identifier, p.storage.locationUri)
}.sum
}
val paths = partitions.map(_.storage.locationUri)
calculateTotalLocationSize(spark, catalogTable.identifier, paths)
}
logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to calculate" +
s" the total size for table ${catalogTable.identifier}.")
totalSize
}

def calculateLocationSize(
def calculateSingleLocationSize(
sessionState: SessionState,
identifier: TableIdentifier,
locationUri: Option[URI]): Long = {
Expand Down Expand Up @@ -137,6 +137,38 @@ object CommandUtils extends Logging {
size
}

def calculateTotalLocationSize(
sparkSession: SparkSession,
tid: TableIdentifier,
paths: Seq[Option[URI]]): Long = {
if (sparkSession.sessionState.conf.parallelFileListingInStatsComputation) {
calculateLocationsSizesParallel(sparkSession, paths.map(_.map(new Path(_))))
} else {
paths.map(p => calculateSingleLocationSize(sparkSession.sessionState, tid, p)).sum
}
}

/**
* Launch a Job to list all leaf files in `paths` and compute the total size
* for each path.
* @param sparkSession the [[SparkSession]]
* @param paths the Seq of [[Option[Path]]]s
* @return total size of all partitions
*/
def calculateLocationsSizesParallel(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Locations and Sizes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to calculateLocationSizeParallel, done in ac8b771, please check.

sparkSession: SparkSession,
paths: Seq[Option[Path]]): Long = {
val stagingDir = sparkSession.sessionState.conf
.getConfString("hive.exec.stagingdir", ".hive-staging")
val filter = new PathFilterIgnoreNonData(stagingDir)
val sizes = InMemoryFileIndex.bulkListLeafFiles(paths.flatten,
sparkSession.sessionState.newHadoopConf(), filter, sparkSession, areRootPaths = true).map {
case (_, files) => files.map(_.getLen).sum
}
// the size is 0 where paths(i) is not defined and sizes(i) where it is defined
paths.zipWithIndex.filter(_._1.isDefined).map(i => sizes(i._2)).sum
}

def compareAndGetNewStats(
oldStats: Option[CatalogStatistics],
newTotalSize: BigInt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}

Expand Down Expand Up @@ -481,22 +481,15 @@ case class AlterTableAddPartitionCommand(
// Hive metastore may not have enough memory to handle millions of partitions in single RPC.
// Also the request to metastore times out when adding lot of partitions in one shot.
// we should split them into smaller batches
val batchSize = 100
val batchSize = conf.getConf(SQLConf.ADD_PARTITION_BATCH_SIZE)
parts.toIterator.grouped(batchSize).foreach { batch =>
catalog.createPartitions(table.identifier, batch, ignoreIfExists = ifNotExists)
}

if (table.stats.nonEmpty) {
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
def calculatePartSize(part: CatalogTablePartition) = CommandUtils.calculateLocationSize(
sparkSession.sessionState, table.identifier, part.storage.locationUri)
val threshold = sparkSession.sparkContext.conf.get(RDD_PARALLEL_LISTING_THRESHOLD)
val partSizes = if (parts.length > threshold) {
ThreadUtils.parmap(parts, "gatheringNewPartitionStats", 8)(calculatePartSize)
} else {
parts.map(calculatePartSize)
}
val addedSize = partSizes.sum
val addedSize = CommandUtils.calculateTotalLocationSize(sparkSession, table.identifier,
parts.map(_.storage.locationUri))
if (addedSize > 0) {
val newStats = CatalogStatistics(sizeInBytes = table.stats.get.sizeInBytes + addedSize)
catalog.alterTableStats(table.identifier, Some(newStats))
Expand Down