Skip to content
Permalink
Browse files
[HUDI-3780] improve drop partitions (#5178)
  • Loading branch information
XuQianJin-Stars committed Apr 5, 2022
1 parent b28f0d6 commit 3449e86989f86121a9b9a93de602bc8497021a27
Showing 14 changed files with 216 additions and 160 deletions.
@@ -176,7 +176,7 @@ public void updatePartitionsToTable(String tableName, List<String> changedPartit
}

@Override
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
public void dropPartitions(String tableName, List<String> partitionsToDrop) {
throw new UnsupportedOperationException("Not support dropPartitionsToTable yet.");
}

@@ -421,25 +421,6 @@ public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAd
return latestSchema;
}


/**
* Get Last commit's Metadata.
*/
public Option<HoodieCommitMetadata> getLatestCommitMetadata() {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
if (timeline.lastInstant().isPresent()) {
HoodieInstant instant = timeline.lastInstant().get();
byte[] data = timeline.getInstantDetails(instant).get();
return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
} else {
return Option.empty();
}
} catch (Exception e) {
throw new HoodieException("Failed to get commit metadata", e);
}
}

/**
* Read the parquet schema from a parquet File.
*/
@@ -1336,4 +1336,22 @@ public static Set<String> getInflightAndCompletedMetadataPartitions(HoodieTableC
inflightAndCompletedPartitions.addAll(getCompletedMetadataPartitions(tableConfig));
return inflightAndCompletedPartitions;
}

/**
* Get Last commit's Metadata.
*/
public static Option<HoodieCommitMetadata> getLatestCommitMetadata(HoodieTableMetaClient metaClient) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
if (timeline.lastInstant().isPresent()) {
HoodieInstant instant = timeline.lastInstant().get();
byte[] data = timeline.getInstantDetails(instant).get();
return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
} else {
return Option.empty();
}
} catch (Exception e) {
throw new HoodieException("Failed to get commit metadata", e);
}
}
}
@@ -177,12 +177,6 @@ public void addPartitionsToTable(final String tableName, final List<String> part
throw new UnsupportedOperationException("No support for addPartitionsToTable yet.");
}

@Override
public void dropPartitionsToTable(final String tableName, final List<String> partitionsToDrop) {
// bigQuery discovers the new partitions automatically, so do nothing.
throw new UnsupportedOperationException("No support for dropPartitionsToTable yet.");
}

public boolean datasetExists() {
Dataset dataset = bigquery.getDataset(DatasetId.of(syncConfig.projectId, syncConfig.datasetName));
return dataset != null;
@@ -236,6 +230,12 @@ public void updatePartitionsToTable(final String tableName, final List<String> c
throw new UnsupportedOperationException("No support for updatePartitionsToTable yet.");
}

@Override
public void dropPartitions(String tableName, List<String> partitionsToDrop) {
// bigQuery discovers the new partitions automatically, so do nothing.
throw new UnsupportedOperationException("No support for dropPartitions yet.");
}

@Override
public void close() {
// bigQuery has no connection close method, so do nothing.
@@ -110,6 +110,11 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
*/
lazy val partitionFields: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)

/**
* BaseFileFormat
*/
lazy val baseFileFormat: String = metaClient.getTableConfig.getBaseFileFormat.name()

/**
* The schema of table.
* Make StructField nullable and fill the comments in.
@@ -17,30 +17,38 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor}
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.{DataSourceWriteOptions, HoodieWriterUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isEnableHive, withSparkConf}
import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

import scala.collection.JavaConverters.propertiesAsScalaMapConverter
import java.util
import java.util.Locale

import scala.collection.JavaConverters._

trait ProvidesHoodieConfig extends Logging {

def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = {
val sparkSession: SparkSession = hoodieCatalogTable.spark
val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig
val tableId = hoodieCatalogTable.table.identifier

// NOTE: Here we fallback to "" to make sure that null value is not overridden with
// default value ("ts")
@@ -51,6 +59,10 @@ trait ProvidesHoodieConfig extends Logging {
s"There are no primary key in table ${hoodieCatalogTable.table.identifier}, cannot execute update operator")
val enableHive = isEnableHive(sparkSession)

val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)

val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)

withSparkConf(sparkSession, catalogProperties) {
Map.apply(
"path" -> hoodieCatalogTable.tableLocation,
@@ -63,15 +75,14 @@ trait ProvidesHoodieConfig extends Logging {
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> tableId.database.getOrElse("default"),
HIVE_TABLE.key -> tableId.table,
HIVE_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
)
.filter { case(_, v) => v != null }
@@ -98,10 +109,12 @@ trait ProvidesHoodieConfig extends Logging {
val path = hoodieCatalogTable.tableLocation
val tableType = hoodieCatalogTable.tableTypeName
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val catalogProperties = hoodieCatalogTable.catalogProperties

val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf, extraOptions)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)

val options = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ extraOptions
val parameters = withSparkConf(sparkSession, options)()
val parameters = withSparkConf(sparkSession, catalogProperties)()

val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",")

@@ -161,7 +174,7 @@ trait ProvidesHoodieConfig extends Logging {

val enableHive = isEnableHive(sparkSession)

withSparkConf(sparkSession, options) {
withSparkConf(sparkSession, catalogProperties) {
Map(
"path" -> path,
TABLE_TYPE.key -> tableType,
@@ -177,20 +190,124 @@ trait ProvidesHoodieConfig extends Logging {
PAYLOAD_CLASS_NAME.key -> payloadClassName,
ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn),
HIVE_PARTITION_FIELDS.key -> partitionFieldsStr,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"),
HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200",
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"),
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
)
.filter { case (_, v) => v != null }
}
}

def buildHoodieDropPartitionsConfig(
sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable,
partitionsToDrop: String): Map[String, String] = {
val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
val enableHive = isEnableHive(sparkSession)
val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig

val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)

withSparkConf(sparkSession, catalogProperties) {
Map(
"path" -> hoodieCatalogTable.tableLocation,
TBL_NAME.key -> hoodieCatalogTable.tableName,
TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
PARTITIONS_TO_DELETE.key -> partitionsToDrop,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
PARTITIONPATH_FIELD.key -> partitionFields,
HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass
)
.filter { case (_, v) => v != null }
}
}

def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession): Map[String, String] = {
val path = hoodieCatalogTable.tableLocation
val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase(Locale.ROOT))
val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))

assert(hoodieCatalogTable.primaryKeys.nonEmpty,
s"There are no primary key defined in table ${hoodieCatalogTable.table.identifier}, cannot execute delete operation")

val hoodieProps = getHoodieProps(catalogProperties, tableConfig, sparkSession.sqlContext.conf)
val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)

withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
TBL_NAME.key -> tableConfig.getTableName,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
)
}
}

def getHoodieProps(catalogProperties: Map[String, String], tableConfig: HoodieTableConfig, conf: SQLConf, extraOptions: Map[String, String] = Map.empty): TypedProperties = {
val options: Map[String, String] = catalogProperties ++ tableConfig.getProps.asScala.toMap ++ conf.getAllConfs ++ extraOptions
val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(options)
hoodieConfig.getProps
}

def buildHiveSyncConfig(props: TypedProperties, hoodieCatalogTable: HoodieCatalogTable): HiveSyncConfig = {
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig
hiveSyncConfig.basePath = hoodieCatalogTable.tableLocation
hiveSyncConfig.baseFileFormat = hoodieCatalogTable.baseFileFormat
hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key, HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.defaultValue.toBoolean)
hiveSyncConfig.databaseName = hoodieCatalogTable.table.identifier.database.getOrElse("default")
if (props.containsKey(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)) {
hiveSyncConfig.tableName = props.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)
} else {
hiveSyncConfig.tableName = hoodieCatalogTable.table.identifier.table
}
hiveSyncConfig.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name())
hiveSyncConfig.hiveUser = props.getString(HiveSyncConfig.HIVE_USER.key, HiveSyncConfig.HIVE_USER.defaultValue)
hiveSyncConfig.hivePass = props.getString(HiveSyncConfig.HIVE_PASS.key, HiveSyncConfig.HIVE_PASS.defaultValue)
hiveSyncConfig.jdbcUrl = props.getString(HiveSyncConfig.HIVE_URL.key, HiveSyncConfig.HIVE_URL.defaultValue)
hiveSyncConfig.metastoreUris = props.getString(HiveSyncConfig.METASTORE_URIS.key, HiveSyncConfig.METASTORE_URIS.defaultValue)
hiveSyncConfig.partitionFields = props.getStringList(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key, ",", new util.ArrayList[String])
hiveSyncConfig.partitionValueExtractorClass = props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName)
if (props.containsKey(HiveSyncConfig.HIVE_SYNC_MODE.key)) hiveSyncConfig.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key)
hiveSyncConfig.autoCreateDatabase = props.getString(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key, HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.defaultValue).toBoolean
hiveSyncConfig.ignoreExceptions = props.getString(HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.key, HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.defaultValue).toBoolean
hiveSyncConfig.skipROSuffix = props.getString(HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key, HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.defaultValue).toBoolean
hiveSyncConfig.supportTimestamp = props.getString(HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key, "true").toBoolean
hiveSyncConfig.isConditionalSync = props.getString(HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.key, HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.defaultValue).toBoolean
hiveSyncConfig.bucketSpec = if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key, HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue)) HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key), props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key))
else null
if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION)) hiveSyncConfig.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION)
hiveSyncConfig.syncComment = props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT.key, DataSourceWriteOptions.HIVE_SYNC_COMMENT.defaultValue).toBoolean
hiveSyncConfig
}
}

0 comments on commit 3449e86

Please sign in to comment.