Skip to content

Commit

Permalink
[CARMEL-3513] Add compact table command (#22)
Browse files Browse the repository at this point in the history
* [CARMEL-3513] Add compact table command

* tiny fix

* trivial fix

* tiny fix

* fix code style

* fix ut

* fix ut
  • Loading branch information
wangshisan authored and GitHub Enterprise committed Sep 22, 2020
1 parent 8ba4354 commit 67bf542
Show file tree
Hide file tree
Showing 12 changed files with 913 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ statement
| SHOW PRINCIPALS identifier #showRolePrincipals
| SHOW SESSION #showSession
| unsupportedHiveNativeCommands .*? #failNativeCommand
| COMPACT TABLE target=tableIdentifier partitionSpec?
(INTO fileNum=INTEGER_VALUE identifier)? #compactTable
;

optionSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{AliasIdentifier, InternalRow}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction}
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -1099,6 +1100,19 @@ case class CollectMetrics(
override def output: Seq[Attribute] = child.output
}

/**
* @param table the metadata of the table to be created.
* @param partition target partitions
* @param targetFileNum target number of output files.
*/
case class CompactTable(
table: LogicalPlan,
partition: TablePartitionSpec,
targetFileNum: Option[Int]) extends LogicalPlan {
override def children: Seq[LogicalPlan] = Seq(table)
override def output: Seq[Attribute] = Seq.empty
}

/**
* The AT keyword specifies that the request is inclusive of any changes
* made by a statement or transaction with timestamp or version equal to the specified parameter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2671,6 +2671,17 @@ object SQLConf {
.checkValue(_ > 0, "The timeout value must be positive")
.createWithDefault(10L)

val SMALL_FILE_SIZE_THRESHOLD = buildConf("spark.sql.smallFile.size.threshold")
.doc("File of which size under this number will be treated as small file. ")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(80 * 1024 * 1024)

val TARGET_MERGE_FILE_SIZE =
buildConf("spark.sql.mergeFileSize")
.doc("The target post-shuffle input size in bytes of a task.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(256 * 1024 * 1024)

val MAX_RETURNED_ROW_COUNT =
buildConf("spark.sql.max.result.count")
.doc("Max returned result row count per SQL to protect driver from OOM, this will only" +
Expand Down Expand Up @@ -3548,6 +3559,10 @@ class SQLConf extends Serializable with Logging {
def maxNumberForTemporaryTablesPerSession: Long =
getConf(StaticSQLConf.TEMPORARY_TABLE_MAX_NUM_PER_SESSION)

def smallFileSizeThreshold: Long = getConf(SMALL_FILE_SIZE_THRESHOLD)

def mergedFileSize: Long = getConf(TARGET_MERGE_FILE_SIZE)

def maxTaskResultSize: Int = getConf(TASK_MAX_RETURN_SIZE)

def shuffleAdjustPartitionNumThresholdMin: Double =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import org.antlr.v4.runtime.tree.TerminalNode

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogIndex, _}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{DescribeIndexContext, _}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CompactTableContext, _}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.{AlterIndexCommand, CreateIndexCommand, DescribeIndexCommand, DropIndexCommand, ShowIndexesCommand, _}
import org.apache.spark.sql.execution.datasources._
Expand Down Expand Up @@ -876,6 +878,28 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
ShowRolePrincipalsStatement(ctx.identifier().getText)
}

override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {
if (ctx.identifier() != null) {
ctx.identifier().getText.toLowerCase(Locale.ROOT) match {
case "files" =>
case other => operationNotAllowed(s"COMPACT to resource type '$other'", ctx)
}
}
val table = UnresolvedRelation(visitTableIdentifier(ctx.target))
val partitionSpec = if (ctx.partitionSpec() != null) {
visitPartitionSpec(ctx.partitionSpec()).map {
case (col, value) =>
(col, value.getOrElse(throw new ParseException("Invalid partition value: ",
ctx.partitionSpec())))
}
} else Map.empty[String, String]
val filesNum =
if (ctx.INTEGER_VALUE() != null) Option(ctx.INTEGER_VALUE.getText.toInt) else None
if (filesNum.nonEmpty) require(filesNum.get > 0,
throw new IllegalArgumentException(s"Target file number ${filesNum.get} need be positive!"))
CompactTable(table, partitionSpec, filesNum)
}

override def visitDropIndex(ctx: DropIndexContext): LogicalPlan = withOrigin(ctx) {
DropIndexCommand(
visitTableIdentifier(ctx.tableIdentifier), ctx.identifier().getText, ctx.EXISTS()==null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,80 @@ object CommandUtils extends Logging {
}
}

def calculateTotalSizeAndSmallFileNum(spark: SparkSession,
catalogTable: CatalogTable): (BigInt, Int) = {
val sessionState = spark.sessionState
if (catalogTable.partitionColumnNames.isEmpty || DDLUtils.isTemporaryTable(catalogTable)) {
val res = calculateLocationSizeAndSmallFileNum(sessionState,
catalogTable.identifier, catalogTable.storage.locationUri)
(BigInt(res._1), res._2)
} else {
val partitions = sessionState.catalog.listPartitions(catalogTable.identifier)
partitions.map { p =>
calculateLocationSizeAndSmallFileNum(sessionState,
catalogTable.identifier, p.storage.locationUri)
}.foldLeft((BigInt(0), 0))((cur, inc) => (cur._1 + inc._1, cur._2 + inc._2))
}
}

def calculateLocationSizeAndSmallFileNum(
sessionState: SessionState,
identifier: TableIdentifier,
locationUri: Option[URI]): (Long, Int) = {
// This method is mainly based on
// org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
// in Hive 0.13 (except that we do not use fs.getContentSummary).
// TODO: Generalize statistics collection.
// TODO: Why fs.getContentSummary returns wrong size on Jenkins?
// Can we use fs.getContentSummary in future?
// Seems fs.getContentSummary returns wrong table size on Jenkins. So we use
// countFileSize to count the table size.
val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")

def getPathSizeAndSmallFileNum(fs: FileSystem, path: Path): (Long, Int) = {
val fileStatus = fs.getFileStatus(path)
val res = if (fileStatus.isDirectory) {
fs.listStatus(path)
.map { status =>
if (!status.getPath.getName.startsWith(stagingDir)) {
getPathSizeAndSmallFileNum(fs, status.getPath)
} else {
(0L, 0)
}
}.foldLeft((0L, 0))((cur, inc) => (cur._1 + inc._1, cur._2 + inc._2))
} else {
val len = fileStatus.getLen
// Ignore empty files, like _SUCCESS.
val smallFileNum = if (len > 0 && len < sessionState.conf.smallFileSizeThreshold) 1 else 0
(len, smallFileNum)
}

res
}

val startTime = System.nanoTime()
logInfo(s"Starting to calculate the total file size and " +
s"small files number under path $locationUri.")
val sizeAndNum = locationUri.map { p =>
val path = new Path(p)
try {
val fs = path.getFileSystem(sessionState.newHadoopConf())
getPathSizeAndSmallFileNum(fs, path)
} catch {
case NonFatal(e) =>
logWarning(
s"Failed to get the size and small files number of table ${identifier.table} in the " +
s"database ${identifier.database} because of ${e.toString}", e)
(0L, 0)
}
}.getOrElse((0L, 0))
val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
logInfo(s"It took $durationInMs ms to calculate the total file size" +
s" and small files number under path $locationUri.")

sizeAndNum
}

/**
* Launch a Job to list all leaf files in `paths` and compute the total size
* for each path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ struct<>
-- !query output
org.apache.spark.sql.catalyst.parser.ParseException

mismatched input ''embedded'' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DENY', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'UPLOAD', 'USE', 'VALUES', 'WITH'}(line 6, pos 34)
mismatched input ''embedded'' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'COMPACT', 'CREATE', 'DELETE', 'DENY', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'UPLOAD', 'USE', 'VALUES', 'WITH'}(line 6, pos 34)

== SQL ==
/*
Expand Down Expand Up @@ -108,7 +108,7 @@ struct<>
-- !query output
org.apache.spark.sql.catalyst.parser.ParseException

mismatched input '<EOF>' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DENY', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'UPLOAD', 'USE', 'VALUES', 'WITH'}(line 1, pos 37)
mismatched input '<EOF>' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'COMPACT', 'CREATE', 'DELETE', 'DENY', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'UPLOAD', 'USE', 'VALUES', 'WITH'}(line 1, pos 37)

== SQL ==
/* and this is the end of the file */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2346,6 +2346,28 @@ class AuthorizationSuite extends HiveThriftServer2Test with BeforeAndAfterEach {
})
}

test("compact table") {
withMultipleConnectionJdbcStatement("b_carmel", "user1",
"b_carmel", "user1")()()()() ({
statement =>
assert(statement.execute("set role admin"))
assert(statement.execute("create table t1 (a int) using parquet"))
assert(statement.execute("insert into t1 values(1)"))
assert(statement.execute("insert into t1 values(2)"))
}, { statement =>
val ex = intercept[SQLException] {
statement.execute("compact table t1 into 1 files")
}
assert(ex.getMessage.contains("Permission denied: Principal [name=user1, type=USER] " +
"does not have following privileges for operation QUERY [[SELECT] on Object "))
}, { statement =>
assert(statement.execute("grant select on table t1 to user user1"))
assert(statement.execute("grant insert on table t1 to user user1"))
}, { statement =>
assert(statement.execute("compact table t1 into 1 files"))
})
}

test("view database rules: can't create table and must use reference dbs's table") {
withMultipleConnectionJdbcStatement(
"b_carmel")("dbt", "dbv")("dbt.t", "default.t")("dbv.v")()({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
new FallBackFileSourceV2(session) +:
new ResolveSessionCatalog(
catalogManager, conf, catalog.isTempView, catalog.isTempFunction) +:
CompactTableAnalysis(session) +:
customResolutionRules

override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
Expand Down Expand Up @@ -114,7 +115,8 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
override val sparkSession: SparkSession = session

override def extraPlanningStrategies: Seq[Strategy] =
super.extraPlanningStrategies ++ customPlanningStrategies ++ Seq(HiveTableScans, Scripts)
super.extraPlanningStrategies ++ customPlanningStrategies ++
Seq(HiveTableScans, Scripts, CompactTable)
}
}

Expand Down

0 comments on commit 67bf542

Please sign in to comment.