diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala new file mode 100644 index 000000000000..78081220e1ae --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import org.apache.hudi.HoodieWriterUtils._ +import org.apache.hudi.common.config.DFSPropertiesConfiguration +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.ValidationUtils +import org.apache.hudi.keygen.ComplexKeyGenerator +import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} +import org.apache.spark.sql.hudi.HoodieSqlUtils._ +import org.apache.spark.sql.types.{StructField, StructType} + +import java.util.{Locale, Properties} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +/** + * A wrapper of hoodie CatalogTable instance and hoodie Table. + */ +class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) extends Logging { + + assert(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi", "It's not a Hudi table") + + private val hadoopConf = spark.sessionState.newHadoopConf + + /** + * database.table in catalog + */ + val catalogTableName = table.qualifiedName + + /** + * properties defined in catalog. + */ + val catalogProperties: Map[String, String] = table.storage.properties ++ table.properties + + /** + * hoodie table's location. + * if create managed hoodie table, use `catalog.defaultTablePath`. + */ + val tableLocation: String = HoodieSqlUtils.getTableLocation(table, spark) + + /** + * A flag to whether the hoodie table exists. + */ + val hoodieTableExists: Boolean = tableExistsInPath(tableLocation, hadoopConf) + + /** + * Meta Client. + */ + lazy val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder() + .setBasePath(tableLocation) + .setConf(hadoopConf) + .build() + + /** + * Hoodie Table Config + */ + lazy val tableConfig: HoodieTableConfig = metaClient.getTableConfig + + /** + * the name of table + */ + lazy val tableName: String = tableConfig.getTableName + + /** + * The name of type of table + */ + lazy val tableType: HoodieTableType = tableConfig.getTableType + + /** + * The type of table + */ + lazy val tableTypeName: String = tableType.name() + + /** + * Recored Field List(Primary Key List) + */ + lazy val primaryKeys: Array[String] = tableConfig.getRecordKeyFields.orElse(Array.empty) + + /** + * PreCombine Field + */ + lazy val preCombineKey: Option[String] = Option(tableConfig.getPreCombineField) + + /** + * Paritition Fields + */ + lazy val partitionFields: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) + + /** + * The schema of table. + * Make StructField nullable. + */ + lazy val tableSchema: StructType = { + val originSchema = getTableSqlSchema(metaClient, includeMetadataFields = true).get + StructType(originSchema.map(_.copy(nullable = true))) + } + + /** + * The schema without hoodie meta fields + */ + lazy val tableSchemaWithoutMetaFields: StructType = HoodieSqlUtils.removeMetaFields(tableSchema) + + /** + * The schema of data fields + */ + lazy val dataSchema: StructType = { + StructType(tableSchema.filterNot(f => partitionFields.contains(f.name))) + } + + /** + * The schema of data fields not including hoodie meta fields + */ + lazy val dataSchemaWithoutMetaFields: StructType = HoodieSqlUtils.removeMetaFields(dataSchema) + + /** + * The schema of partition fields + */ + lazy val partitionSchema: StructType = StructType(tableSchema.filter(f => partitionFields.contains(f.name))) + + /** + * All the partition paths + */ + def getAllPartitionPaths: Seq[String] = HoodieSqlUtils.getAllPartitionPaths(spark, table) + + /** + * init hoodie table for create table (as select) + */ + def initHoodieTable(): Unit = { + logInfo(s"Init hoodie.properties for ${table.identifier.unquotedString}") + val (finalSchema, tableConfigs) = parseSchemaAndConfigs() + + // Save all the table config to the hoodie.properties. + val properties = new Properties() + properties.putAll(tableConfigs.asJava) + + HoodieTableMetaClient.withPropertyBuilder() + .fromProperties(properties) + .setTableName(table.identifier.table) + .setTableCreateSchema(SchemaConverters.toAvroType(finalSchema).toString()) + .setPartitionFields(table.partitionColumnNames.mkString(",")) + .initTable(hadoopConf, tableLocation) + } + + /** + * @return schema, table parameters in which all parameters aren't sql-styled. + */ + private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = { + val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap + val globalTableConfigs = mappingSparkDatasourceConfigsToTableConfigs(globalProps) + val globalSqlOptions = HoodieOptionConfig.mappingTableConfigToSqlOption(globalTableConfigs) + + val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(globalSqlOptions ++ catalogProperties) + + // get final schema and parameters + val (finalSchema, tableConfigs) = (table.tableType, hoodieTableExists) match { + case (CatalogTableType.EXTERNAL, true) => + val existingTableConfig = tableConfig.getProps.asScala.toMap + val currentTableConfig = globalTableConfigs ++ existingTableConfig + val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(catalogProperties) + validateTableConfig(spark, catalogTableProps, convertMapToHoodieConfig(existingTableConfig)) + + val options = extraTableConfig(spark, hoodieTableExists, currentTableConfig) ++ + HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++ currentTableConfig + + ValidationUtils.checkArgument(tableSchema.nonEmpty || table.schema.nonEmpty, + s"Missing schema for Create Table: $catalogTableName") + val schema = if (tableSchema.nonEmpty) { + tableSchema + } else { + addMetaFields(table.schema) + } + + (schema, options) + + case (_, false) => + ValidationUtils.checkArgument(table.schema.nonEmpty, + s"Missing schema for Create Table: $catalogTableName") + val schema = table.schema + val options = extraTableConfig(spark, isTableExists = false, globalTableConfigs) ++ + HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) + (addMetaFields(schema), options) + + case (CatalogTableType.MANAGED, true) => + throw new AnalysisException(s"Can not create the managed table('$catalogTableName')" + + s". The associated location('$tableLocation') already exists.") + } + HoodieOptionConfig.validateTable(spark, finalSchema, + HoodieOptionConfig.mappingTableConfigToSqlOption(tableConfigs)) + + val resolver = spark.sessionState.conf.resolver + val dataSchema = finalSchema.filterNot { f => + table.partitionColumnNames.exists(resolver(_, f.name)) + } + verifyDataSchema(table.identifier, table.tableType, dataSchema) + + (finalSchema, tableConfigs) + } + + private def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean, + originTableConfig: Map[String, String] = Map.empty): Map[String, String] = { + val extraConfig = mutable.Map.empty[String, String] + if (isTableExists) { + val allPartitionPaths = getAllPartitionPaths + if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) { + extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = + originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) + } else { + extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = + String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table)) + } + if (originTableConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)) { + extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = + originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) + } else { + extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = + String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table)) + } + } else { + extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true" + extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue() + } + + if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) { + extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = + HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( + originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) + } else { + extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName + } + extraConfig.toMap + } + + // This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#verifyDataSchema + private def verifyDataSchema(tableIdentifier: TableIdentifier, tableType: CatalogTableType, + dataSchema: Seq[StructField]): Unit = { + if (tableType != CatalogTableType.VIEW) { + val invalidChars = Seq(",", ":", ";") + def verifyNestedColumnNames(schema: StructType): Unit = schema.foreach { f => + f.dataType match { + case st: StructType => verifyNestedColumnNames(st) + case _ if invalidChars.exists(f.name.contains) => + val invalidCharsString = invalidChars.map(c => s"'$c'").mkString(", ") + val errMsg = "Cannot create a table having a nested column whose name contains " + + s"invalid characters ($invalidCharsString) in Hive metastore. Table: $tableIdentifier; " + + s"Column: ${f.name}" + throw new AnalysisException(errMsg) + case _ => + } + } + + dataSchema.foreach { f => + f.dataType match { + // Checks top-level column names + case _ if f.name.contains(",") => + throw new AnalysisException("Cannot create a table having a column whose name " + + s"contains commas in Hive metastore. Table: $tableIdentifier; Column: ${f.name}") + // Checks nested column names + case st: StructType => + verifyNestedColumnNames(st) + case _ => + } + } + } + } +} + +object HoodieCatalogTable { + + def apply(sparkSession: SparkSession, tableIdentifier: TableIdentifier): HoodieCatalogTable = { + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier) + HoodieCatalogTable(sparkSession, catalogTable) + } + + def apply(sparkSession: SparkSession, catalogTable: CatalogTable): HoodieCatalogTable = { + new HoodieCatalogTable(sparkSession, catalogTable) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index 16f83f482c29..bc9f14978c29 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -18,10 +18,9 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions -import org.apache.hudi.common.model.DefaultHoodieRecordPayload +import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.common.util.ValidationUtils - import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType @@ -43,27 +42,27 @@ object HoodieOptionConfig { val SQL_VALUE_TABLE_TYPE_MOR = "mor" - val SQL_KEY_TABLE_PRIMARY_KEY: HoodieOption[String] = buildConf() + val SQL_KEY_TABLE_PRIMARY_KEY: HoodieSQLOption[String] = buildConf() .withSqlKey("primaryKey") .withHoodieKey(DataSourceWriteOptions.RECORDKEY_FIELD.key) .withTableConfigKey(HoodieTableConfig.RECORDKEY_FIELDS.key) .defaultValue(DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue()) .build() - val SQL_KEY_TABLE_TYPE: HoodieOption[String] = buildConf() + val SQL_KEY_TABLE_TYPE: HoodieSQLOption[String] = buildConf() .withSqlKey("type") .withHoodieKey(DataSourceWriteOptions.TABLE_TYPE.key) .withTableConfigKey(HoodieTableConfig.TYPE.key) .defaultValue(SQL_VALUE_TABLE_TYPE_COW) .build() - val SQL_KEY_PRECOMBINE_FIELD: HoodieOption[String] = buildConf() + val SQL_KEY_PRECOMBINE_FIELD: HoodieSQLOption[String] = buildConf() .withSqlKey("preCombineField") .withHoodieKey(DataSourceWriteOptions.PRECOMBINE_FIELD.key) .withTableConfigKey(HoodieTableConfig.PRECOMBINE_FIELD.key) .build() - val SQL_PAYLOAD_CLASS: HoodieOption[String] = buildConf() + val SQL_PAYLOAD_CLASS: HoodieSQLOption[String] = buildConf() .withSqlKey("payloadClass") .withHoodieKey(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key) .withTableConfigKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key) @@ -75,8 +74,8 @@ object HoodieOptionConfig { */ private lazy val keyMapping: Map[String, String] = { HoodieOptionConfig.getClass.getDeclaredFields - .filter(f => f.getType == classOf[HoodieOption[_]]) - .map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieOption[_]]}) + .filter(f => f.getType == classOf[HoodieSQLOption[_]]) + .map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]}) .map(option => option.sqlKeyName -> option.hoodieKeyName) .toMap } @@ -87,8 +86,8 @@ object HoodieOptionConfig { */ private lazy val keyTableConfigMapping: Map[String, String] = { HoodieOptionConfig.getClass.getDeclaredFields - .filter(f => f.getType == classOf[HoodieOption[_]]) - .map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieOption[_]]}) + .filter(f => f.getType == classOf[HoodieSQLOption[_]]) + .map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]}) .filter(_.tableConfigKey.isDefined) .map(option => option.sqlKeyName -> option.tableConfigKey.get) .toMap @@ -142,19 +141,15 @@ object HoodieOptionConfig { options.map(kv => tableConfigKeyToSqlKey.getOrElse(kv._1, kv._1) -> reverseValueMapping.getOrElse(kv._2, kv._2)) } - private lazy val defaultSqlOptions: Map[String, String] = { + val defaultSqlOptions: Map[String, String] = { HoodieOptionConfig.getClass.getDeclaredFields - .filter(f => f.getType == classOf[HoodieOption[_]]) - .map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieOption[_]]}) + .filter(f => f.getType == classOf[HoodieSQLOption[_]]) + .map(f => {f.setAccessible(true); f.get(HoodieOptionConfig).asInstanceOf[HoodieSQLOption[_]]}) .filter(option => option.tableConfigKey.isDefined && option.defaultValue.isDefined) .map(option => option.sqlKeyName -> option.defaultValue.get.toString) .toMap } - private lazy val defaultTableConfig: Map[String, String] = { - mappingSqlOptionToHoodieParam(defaultSqlOptions) - } - /** * Get the primary key from the table options. * @param options @@ -189,76 +184,80 @@ object HoodieOptionConfig { // extract primaryKey, preCombineField, type options def extractSqlOptions(options: Map[String, String]): Map[String, String] = { + val sqlOptions = mappingTableConfigToSqlOption(options) val targetOptions = keyMapping.keySet -- Set(SQL_PAYLOAD_CLASS.sqlKeyName) - options.filterKeys(targetOptions.contains) + sqlOptions.filterKeys(targetOptions.contains) } // validate primaryKey, preCombineField and type options - def validateTable(spark: SparkSession, schema: StructType, options: Map[String, String]): Unit = { + def validateTable(spark: SparkSession, schema: StructType, sqlOptions: Map[String, String]): Unit = { val resolver = spark.sessionState.conf.resolver // validate primary key - val primaryKeys = options.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName) + val primaryKeys = sqlOptions.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName) .map(_.split(",").filter(_.length > 0)) ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.") primaryKeys.get.foreach { primaryKey => ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, primaryKey)), - s"Can't find primary key `$primaryKey` in ${schema.treeString}.") + s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.") } // validate precombine key - val precombineKey = options.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName) + val precombineKey = sqlOptions.get(SQL_KEY_PRECOMBINE_FIELD.sqlKeyName) if (precombineKey.isDefined && precombineKey.get.nonEmpty) { ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, precombineKey.get)), - s"Can't find precombine key `${precombineKey.get}` in ${schema.treeString}.") + s"Can't find preCombineKey `${precombineKey.get}` in ${schema.treeString}.") } // validate table type - val tableType = options.get(SQL_KEY_TABLE_TYPE.sqlKeyName) + val tableType = sqlOptions.get(SQL_KEY_TABLE_TYPE.sqlKeyName) ValidationUtils.checkArgument(tableType.nonEmpty, "No `type` is specified.") ValidationUtils.checkArgument( - tableType.get.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW) || - tableType.get.equalsIgnoreCase(HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR), - s"'type' must be '${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW}' or " + - s"'${HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_MOR}'") + tableType.get.equalsIgnoreCase(SQL_VALUE_TABLE_TYPE_COW) || + tableType.get.equalsIgnoreCase(SQL_VALUE_TABLE_TYPE_MOR), + s"'type' must be '$SQL_VALUE_TABLE_TYPE_COW' or '$SQL_VALUE_TABLE_TYPE_MOR'") } - def buildConf[T](): HoodieOptions[T] = { - new HoodieOptions[T] + def buildConf[T](): HoodieSQLOptionBuilder[T] = { + new HoodieSQLOptionBuilder[T] } } -case class HoodieOption[T](sqlKeyName: String, hoodieKeyName: String, - defaultValue: Option[T], tableConfigKey: Option[String] = None) +case class HoodieSQLOption[T]( + sqlKeyName: String, + hoodieKeyName: String, + tableConfigKey: Option[String], + defaultValue: Option[T] +) -class HoodieOptions[T] { +class HoodieSQLOptionBuilder[T] { private var sqlKeyName: String = _ private var hoodieKeyName: String =_ private var tableConfigKey: String =_ private var defaultValue: T =_ - def withSqlKey(sqlKeyName: String): HoodieOptions[T] = { + def withSqlKey(sqlKeyName: String): HoodieSQLOptionBuilder[T] = { this.sqlKeyName = sqlKeyName this } - def withHoodieKey(hoodieKeyName: String): HoodieOptions[T] = { + def withHoodieKey(hoodieKeyName: String): HoodieSQLOptionBuilder[T] = { this.hoodieKeyName = hoodieKeyName this } - def withTableConfigKey(tableConfigKey: String): HoodieOptions[T] = { + def withTableConfigKey(tableConfigKey: String): HoodieSQLOptionBuilder[T] = { this.tableConfigKey = tableConfigKey this } - def defaultValue(defaultValue: T): HoodieOptions[T] = { + def defaultValue(defaultValue: T): HoodieSQLOptionBuilder[T] = { this.defaultValue = defaultValue this } - def build(): HoodieOption[T] = { - HoodieOption(sqlKeyName, hoodieKeyName, Option(defaultValue), Option(tableConfigKey)) + def build(): HoodieSQLOption[T] = { + HoodieSQLOption(sqlKeyName, hoodieKeyName, Option(tableConfigKey), Option(defaultValue)) } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index e1c3a5a8efe1..accf8ac3be49 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -80,13 +80,14 @@ object HoodieSqlUtils extends SparkAdapterSupport { } } - def getTableSqlSchema(metaClient: HoodieTableMetaClient): Option[StructType] = { + def getTableSqlSchema(metaClient: HoodieTableMetaClient, + includeMetadataFields: Boolean = false): Option[StructType] = { val schemaResolver = new TableSchemaResolver(metaClient) - val avroSchema = try Some(schemaResolver.getTableAvroSchema(false)) + val avroSchema = try Some(schemaResolver.getTableAvroSchema(includeMetadataFields)) catch { case _: Throwable => None } - avroSchema.map(AvroConversionUtils.convertAvroSchemaToStructType).map(removeMetaFields) + avroSchema.map(AvroConversionUtils.convertAvroSchemaToStructType) } def getAllPartitionPaths(spark: SparkSession, table: CatalogTable): Seq[String] = { @@ -309,4 +310,21 @@ object HoodieSqlUtils extends SparkAdapterSupport { + s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss'") } } + + def formatName(sparkSession: SparkSession, name: String): String = { + if (sparkSession.sessionState.conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) + } + + /** + * Check if this is a empty table path. + */ + def isEmptyPath(tablePath: String, conf: Configuration): Boolean = { + val basePath = new Path(tablePath) + val fs = basePath.getFileSystem(conf) + if (fs.exists(basePath)) { + fs.listStatus(basePath).isEmpty + } else { + true + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala index e7d77e7598ae..a2fa1829b852 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala @@ -18,26 +18,24 @@ package org.apache.spark.sql.hudi.command import java.nio.charset.StandardCharsets + import org.apache.avro.Schema import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType} -import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieInstant.State import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant} import org.apache.hudi.common.util.{CommitUtils, Option} import org.apache.hudi.table.HoodieSparkTable - -import scala.collection.JavaConverters._ import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, HoodieWriterUtils} + import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand} -import org.apache.spark.sql.hudi.HoodieSqlUtils -import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.util.SchemaUtils +import scala.collection.JavaConverters._ import scala.util.control.NonFatal /** @@ -51,31 +49,32 @@ case class AlterHoodieTableAddColumnsCommand( override def run(sparkSession: SparkSession): Seq[Row] = { if (colsToAdd.nonEmpty) { val resolver = sparkSession.sessionState.conf.resolver - val table = sparkSession.sessionState.catalog.getTableMetadata(tableId) + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId) + val tableSchema = hoodieCatalogTable.tableSchema val existsColumns = - colsToAdd.map(_.name).filter(col => table.schema.fieldNames.exists(f => resolver(f, col))) + colsToAdd.map(_.name).filter(col => tableSchema.fieldNames.exists(f => resolver(f, col))) if (existsColumns.nonEmpty) { throw new AnalysisException(s"Columns: [${existsColumns.mkString(",")}] already exists in the table," + - s" table columns is: [${HoodieSqlUtils.removeMetaFields(table.schema).fieldNames.mkString(",")}]") + s" table columns is: [${hoodieCatalogTable.tableSchemaWithoutMetaFields.fieldNames.mkString(",")}]") } // Get the new schema - val newSqlSchema = StructType(table.schema.fields ++ colsToAdd) + val newSqlSchema = StructType(tableSchema.fields ++ colsToAdd) val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table) val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace) // Commit with new schema to change the table schema - AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession) + AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, hoodieCatalogTable, sparkSession) // Refresh the new schema to meta - val newDataSchema = StructType(table.dataSchema.fields ++ colsToAdd) - refreshSchemaInMeta(sparkSession, table, newDataSchema) + val newDataSchema = StructType(hoodieCatalogTable.dataSchema.fields ++ colsToAdd) + refreshSchemaInMeta(sparkSession, hoodieCatalogTable.table, newDataSchema) } Seq.empty[Row] } private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable, - newSqlSchema: StructType): Unit = { + newSqlSchema: StructType): Unit = { try { sparkSession.catalog.uncacheTable(tableId.quotedString) } catch { @@ -98,25 +97,22 @@ object AlterHoodieTableAddColumnsCommand { /** * Generate an empty commit with new schema to change the table's schema. * @param schema The new schema to commit. - * @param table The hoodie table. + * @param hoodieCatalogTable The hoodie catalog table. * @param sparkSession The spark session. */ - def commitWithSchema(schema: Schema, table: CatalogTable, sparkSession: SparkSession): Unit = { - val path = getTableLocation(table, sparkSession) + def commitWithSchema(schema: Schema, hoodieCatalogTable: HoodieCatalogTable, + sparkSession: SparkSession): Unit = { val jsc = new JavaSparkContext(sparkSession.sparkContext) val client = DataSourceUtils.createHoodieClient( jsc, schema.toString, - path, - table.identifier.table, - HoodieWriterUtils.parametersWithWriteDefaults(table.storage.properties ++ table.properties).asJava + hoodieCatalogTable.tableLocation, + hoodieCatalogTable.tableName, + HoodieWriterUtils.parametersWithWriteDefaults(hoodieCatalogTable.catalogProperties).asJava ) - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build() - - val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.INSERT, metaClient.getTableType) + val commitActionType = CommitUtils.getCommitActionType(WriteOperationType.INSERT, hoodieCatalogTable.tableType) val instantTime = HoodieActiveTimeline.createNewInstantTime client.startCommitWithTime(instantTime, commitActionType) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala index d9569ceb53b4..7aa1d8383325 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala @@ -18,14 +18,16 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.Schema + import org.apache.hudi.AvroConversionUtils import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException + import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation import org.apache.spark.sql.types.{StructField, StructType} import scala.util.control.NonFatal @@ -34,22 +36,21 @@ import scala.util.control.NonFatal * Command for alter hudi table's column type. */ case class AlterHoodieTableChangeColumnCommand( - tableName: TableIdentifier, + tableIdentifier: TableIdentifier, columnName: String, newColumn: StructField) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(tableName) - val resolver = sparkSession.sessionState.conf.resolver + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) + val resolver = sparkSession.sessionState.conf.resolver if (!resolver(columnName, newColumn.name)) { throw new AnalysisException(s"Can not support change column name for hudi table currently.") } // Get the new schema - val newSqlSchema = StructType( - table.schema.fields.map { field => + val newTableSchema = StructType( + hoodieCatalogTable.tableSchema.fields.map { field => if (resolver(field.name, columnName)) { newColumn } else { @@ -57,34 +58,30 @@ case class AlterHoodieTableChangeColumnCommand( } }) val newDataSchema = StructType( - table.dataSchema.fields.map { field => + hoodieCatalogTable.dataSchema.fields.map { field => if (resolver(field.name, columnName)) { newColumn } else { field } }) - val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName.table) - val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace) + val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableIdentifier.table) + val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newTableSchema, structName, nameSpace) - val path = getTableLocation(table, sparkSession) - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val metaClient = HoodieTableMetaClient.builder().setBasePath(path) - .setConf(hadoopConf).build() // Validate the compatibility between new schema and origin schema. - validateSchema(newSchema, metaClient) + validateSchema(newSchema, hoodieCatalogTable.metaClient) // Commit new schema to change the table schema - AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, table, sparkSession) + AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, hoodieCatalogTable, sparkSession) try { - sparkSession.catalog.uncacheTable(tableName.quotedString) + sparkSession.catalog.uncacheTable(tableIdentifier.quotedString) } catch { case NonFatal(e) => - log.warn(s"Exception when attempting to uncache table ${tableName.quotedString}", e) + log.warn(s"Exception when attempting to uncache table ${tableIdentifier.quotedString}", e) } - sparkSession.catalog.refreshTable(tableName.unquotedString) + sparkSession.catalog.refreshTable(tableIdentifier.unquotedString) // Change the schema in the meta using new data schema. - catalog.alterTableDataSchema(tableName, newDataSchema) + sparkSession.sessionState.catalog.alterTableDataSchema(tableIdentifier, newDataSchema) Seq.empty[Row] } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala index 84e5961c4ba5..a3dfdd6a0d04 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableDropPartitionCommand.scala @@ -17,15 +17,16 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils} +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.PartitionPathEncodeUtils import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME + import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand} import org.apache.spark.sql.hudi.HoodieSqlUtils._ @@ -35,24 +36,19 @@ case class AlterHoodieTableDropPartitionCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(tableIdentifier) - DDLUtils.verifyAlterTableType(catalog, table, isView = false) + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) + DDLUtils.verifyAlterTableType( + sparkSession.sessionState.catalog, hoodieCatalogTable.table, isView = false) - val path = getTableLocation(table, sparkSession) - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build() - val partitionColumns = metaClient.getTableConfig.getPartitionFields val normalizedSpecs: Seq[Map[String, String]] = specs.map { spec => normalizePartitionSpec( spec, - partitionColumns.get(), - table.identifier.quotedString, + hoodieCatalogTable.partitionFields, + hoodieCatalogTable.tableName, sparkSession.sessionState.conf.resolver) } - val parameters = buildHoodieConfig(sparkSession, path, partitionColumns.get, normalizedSpecs) - + val parameters = buildHoodieConfig(sparkSession, hoodieCatalogTable, normalizedSpecs) HoodieSparkSqlWriter.write( sparkSession.sqlContext, SaveMode.Append, @@ -65,15 +61,14 @@ extends RunnableCommand { private def buildHoodieConfig( sparkSession: SparkSession, - path: String, - partitionColumns: Seq[String], + hoodieCatalogTable: HoodieCatalogTable, normalizedSpecs: Seq[Map[String, String]]): Map[String, String] = { - val table = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier) - val allPartitionPaths = getAllPartitionPaths(sparkSession, table) + val table = hoodieCatalogTable.table + val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table) val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table) val partitionsToDelete = normalizedSpecs.map { spec => - partitionColumns.map{ partitionColumn => + hoodieCatalogTable.partitionFields.map{ partitionColumn => val encodedPartitionValue = if (enableEncodeUrl) { PartitionPathEncodeUtils.escapePathName(spec(partitionColumn)) } else { @@ -87,22 +82,16 @@ extends RunnableCommand { }.mkString("/") }.mkString(",") - val metaClient = HoodieTableMetaClient.builder() - .setBasePath(path) - .setConf(sparkSession.sessionState.newHadoopConf) - .build() - val tableConfig = metaClient.getTableConfig - - withSparkConf(sparkSession, table.storage.properties) { + withSparkConf(sparkSession, Map.empty) { Map( - "path" -> path, - TBL_NAME.key -> tableIdentifier.table, - TABLE_TYPE.key -> tableConfig.getTableType.name, + "path" -> hoodieCatalogTable.tableLocation, + TBL_NAME.key -> hoodieCatalogTable.tableName, + TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName, OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL, PARTITIONS_TO_DELETE.key -> partitionsToDelete, - RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, - PRECOMBINE_FIELD.key -> tableConfig.getPreCombineField, - PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), + PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), + PARTITIONPATH_FIELD.key -> hoodieCatalogTable.partitionFields.mkString(",") ) } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala index ec71a9d3c1aa..c7b5bdc202f6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableRenameCommand.scala @@ -18,10 +18,11 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.common.table.HoodieTableMetaClient + import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.execution.command.AlterTableRenameCommand -import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation /** * Command for alter hudi table's table name. @@ -34,18 +35,15 @@ class AlterHoodieTableRenameCommand( override def run(sparkSession: SparkSession): Seq[Row] = { if (newName != oldName) { - val catalog = sparkSession.sessionState.catalog - val table = catalog.getTableMetadata(oldName) - val path = getTableLocation(table, sparkSession) - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val metaClient = HoodieTableMetaClient.builder().setBasePath(path) - .setConf(hadoopConf).build() + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, oldName) + // Init table with new name. HoodieTableMetaClient.withPropertyBuilder() - .fromProperties(metaClient.getTableConfig.getProps(true)) + .fromProperties(hoodieCatalogTable.tableConfig.getProps) .setTableName(newName.table) - .initTable(hadoopConf, path) + .initTable(hadoopConf, hoodieCatalogTable.tableLocation) + // Call AlterTableRenameCommand#run to rename table in meta. super.run(sparkSession) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala index 2244e7212348..ce6237ec9934 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala @@ -25,11 +25,11 @@ import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.sql.InsertMode import org.apache.spark.sql.{Row, SaveMode, SparkSession} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HoodieCatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand -import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation +import org.apache.spark.sql.hudi.HoodieSqlUtils import scala.collection.JavaConverters._ @@ -64,18 +64,23 @@ case class CreateHoodieTableAsSelectCommand( // scalastyle:on } } - val tablePath = getTableLocation(table, sparkSession) - val hadoopConf = sparkSession.sessionState.newHadoopConf() - assert(CreateHoodieTableCommand.isEmptyPath(tablePath, hadoopConf), - s"Path '$tablePath' should be empty for CTAS") // ReOrder the query which move the partition columns to the last of the project list val reOrderedQuery = reOrderPartitionColumn(query, table.partitionColumnNames) val tableWithSchema = table.copy(schema = reOrderedQuery.schema) + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableWithSchema) + val tablePath = hoodieCatalogTable.tableLocation + val hadoopConf = sparkSession.sessionState.newHadoopConf() + assert(HoodieSqlUtils.isEmptyPath(tablePath, hadoopConf), + s"Path '$tablePath' should be empty for CTAS") + // Execute the insert query try { - val tblProperties = table.storage.properties ++ table.properties + // init hoodie table + hoodieCatalogTable.initHoodieTable() + + val tblProperties = hoodieCatalogTable.catalogProperties val options = Map( DataSourceWriteOptions.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType == CatalogTableType.MANAGED).toString, DataSourceWriteOptions.HIVE_TABLE_SERDE_PROPERTIES.key -> ConfigUtils.configToString(tblProperties.asJava), @@ -89,11 +94,8 @@ case class CreateHoodieTableAsSelectCommand( // If write success, create the table in catalog if it has not synced to the // catalog by the meta sync. if (!sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { - // Create the table - val createTableCommand = CreateHoodieTableCommand(tableWithSchema, mode == SaveMode.Ignore) - val path = getTableLocation(table, sparkSession) - val (finalSchema, _, tableSqlOptions) = createTableCommand.parseSchemaAndConfigs(sparkSession, path, ctas = true) - createTableCommand.createTableInCatalog(sparkSession, finalSchema, tableSqlOptions) + // create catalog table for this hoodie table + CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, mode == SaveMode.Ignore) } } else { // failed to insert data, clear table path clearTablePath(tablePath, hadoopConf) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index e449a91a1279..bbbecca8236e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -17,37 +17,27 @@ package org.apache.spark.sql.hudi.command -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hudi.{DataSourceWriteOptions, HoodieWriterUtils, SparkAdapterSupport} -import org.apache.hudi.HoodieWriterUtils._ -import org.apache.hudi.common.config.DFSPropertiesConfiguration + +import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} import org.apache.hudi.common.model.HoodieFileFormat -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.hadoop.HoodieParquetInputFormat import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils -import org.apache.hudi.keygen.ComplexKeyGenerator -import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory -import org.apache.spark.internal.Logging -import org.apache.spark.sql.avro.SchemaConverters -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, TableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hive.HiveClientUtils import org.apache.spark.sql.hive.HiveExternalCatalog._ -import org.apache.spark.sql.hudi.HoodieOptionConfig +import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} import org.apache.spark.sql.hudi.HoodieSqlUtils._ -import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.checkTableConfigEqual import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD -import org.apache.spark.sql.types.{StructField, StructType} -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.{SPARK_VERSION, SparkConf} -import java.util.{Locale, Properties} -import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.control.NonFatal @@ -58,10 +48,6 @@ import scala.util.control.NonFatal case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean) extends RunnableCommand with SparkAdapterSupport { - val tableName = formatName(table.identifier.table) - - val tblProperties = table.storage.properties ++ table.properties - override def run(sparkSession: SparkSession): Seq[Row] = { val tableIsExists = sparkSession.sessionState.catalog.tableExists(table.identifier) if (tableIsExists) { @@ -74,94 +60,50 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean } } - // get schema with meta fields, table config if hudi table exists, options including - // table configs and properties of the catalog table - val path = getTableLocation(table, sparkSession) - val (finalSchema, existingTableConfig, tableSqlOptions) = parseSchemaAndConfigs(sparkSession, path) - - // Init the hoodie.properties - initTableIfNeed(sparkSession, path, finalSchema, existingTableConfig, tableSqlOptions) + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, table) + // check if there are conflict between table configs defined in hoodie table and properties defined in catalog. + CreateHoodieTableCommand.validateTblProperties(hoodieCatalogTable) + // init hoodie table + hoodieCatalogTable.initHoodieTable() try { - // Create table in the catalog - createTableInCatalog(sparkSession, finalSchema, tableSqlOptions) + // create catalog table for this hoodie table + CreateHoodieTableCommand.createTableInCatalog(sparkSession, hoodieCatalogTable, ignoreIfExists) } catch { case NonFatal(e) => logWarning(s"Failed to create catalog table in metastore: ${e.getMessage}") } - Seq.empty[Row] } +} - def parseSchemaAndConfigs(sparkSession: SparkSession, path: String, ctas: Boolean = false) - : (StructType, Map[String, String], Map[String, String]) = { - val resolver = sparkSession.sessionState.conf.resolver - val conf = sparkSession.sessionState.newHadoopConf - // if CTAS, we treat the table we just created as nonexistent - val isTableExists = if (ctas) false else tableExistsInPath(path, conf) - var existingTableConfig = Map.empty[String, String] - val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap - val globalSqlProps = HoodieOptionConfig.mappingTableConfigToSqlOption( - HoodieWriterUtils.mappingSparkDatasourceConfigsToTableConfigs(globalProps)) - val sqlOptions = HoodieOptionConfig.withDefaultSqlOptions(globalSqlProps ++ tblProperties) - val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(tblProperties) - - // get final schema and parameters - val (finalSchema, tableSqlOptions) = (table.tableType, isTableExists) match { - case (CatalogTableType.EXTERNAL, true) => - // If this is an external table & the table has already exists in the location, - // load the schema from the table meta. - val metaClient = HoodieTableMetaClient.builder() - .setBasePath(path) - .setConf(conf) - .build() - val tableSchema = getTableSqlSchema(metaClient) - existingTableConfig = metaClient.getTableConfig.getProps.asScala.toMap - validateTableConfig(sparkSession, catalogTableProps, convertMapToHoodieConfig(existingTableConfig)) - - val options = extraTableConfig(sparkSession, isTableExists, existingTableConfig) ++ - sqlOptions ++ HoodieOptionConfig.mappingTableConfigToSqlOption(existingTableConfig) - - val userSpecifiedSchema = table.schema - val schema = if (userSpecifiedSchema.isEmpty && tableSchema.isDefined) { - tableSchema.get - } else if (userSpecifiedSchema.nonEmpty) { - userSpecifiedSchema - } else { - throw new IllegalArgumentException(s"Missing schema for Create Table: $tableName") - } - - (addMetaFields(schema), options) - - case (_, false) => - assert(table.schema.nonEmpty, s"Missing schema for Create Table: $tableName") - val schema = table.schema - val options = extraTableConfig(sparkSession, isTableExists = false) ++ sqlOptions - (addMetaFields(schema), options) +object CreateHoodieTableCommand { - case (CatalogTableType.MANAGED, true) => - throw new AnalysisException(s"Can not create the managed table('$tableName')" + - s". The associated location('$path') already exists.") - } - HoodieOptionConfig.validateTable(sparkSession, finalSchema, tableSqlOptions) + def validateTblProperties(hoodieCatalogTable: HoodieCatalogTable): Unit = { + if (hoodieCatalogTable.hoodieTableExists) { + val originTableConfig = hoodieCatalogTable.tableConfig.getProps.asScala.toMap + val tableOptions = hoodieCatalogTable.catalogProperties - val dataSchema = finalSchema.filterNot { f => - table.partitionColumnNames.exists(resolver(_, f.name)) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PRECOMBINE_FIELD.key) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PARTITION_FIELDS.key) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.RECORDKEY_FIELDS.key) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.URL_ENCODE_PARTITIONING.key) + checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) } - verifyDataSchema(table.identifier, table.tableType, dataSchema) - - (finalSchema, existingTableConfig, tableSqlOptions) } - def createTableInCatalog(sparkSession: SparkSession, finalSchema: StructType, - options: Map[String, String]): Unit = { + def createTableInCatalog(sparkSession: SparkSession, + hoodieCatalogTable: HoodieCatalogTable, ignoreIfExists: Boolean): Unit = { + val table = hoodieCatalogTable.table assert(table.tableType != CatalogTableType.VIEW) - assert(table.provider.isDefined) - val sessionState = sparkSession.sessionState - val path = getTableLocation(table, sparkSession) - val conf = sparkSession.sessionState.newHadoopConf() - val tableType = HoodieOptionConfig.getTableType(options) + val catalog = sparkSession.sessionState.catalog + val path = hoodieCatalogTable.tableLocation + val tableConfig = hoodieCatalogTable.tableConfig + val properties = tableConfig.getProps.asScala.toMap + + val tableType = tableConfig.getTableType.name() val inputFormat = tableType match { case DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL => classOf[HoodieParquetInputFormat].getCanonicalName @@ -173,7 +115,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean val serdeFormat = HoodieInputFormatUtils.getSerDeClassName(HoodieFileFormat.PARQUET) // only parameters irrelevant to hudi can be set to storage.properties - val storageProperties = HoodieOptionConfig.deleteHoodieOptions(options) + val storageProperties = HoodieOptionConfig.deleteHoodieOptions(properties) val newStorage = new CatalogStorageFormat( Some(new Path(path).toUri), Some(inputFormat), @@ -182,17 +124,18 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean table.storage.compressed, storageProperties + ("path" -> path)) - val newDatabaseName = formatName(table.identifier.database - .getOrElse(sessionState.catalog.getCurrentDatabase)) + val tablName = HoodieSqlUtils.formatName(sparkSession, table.identifier.table) + val newDatabaseName = HoodieSqlUtils.formatName(sparkSession, table.identifier.database + .getOrElse(catalog.getCurrentDatabase)) val newTableIdentifier = table.identifier - .copy(table = tableName, database = Some(newDatabaseName)) + .copy(table = tablName, database = Some(newDatabaseName)) // append pk, preCombineKey, type to the properties of table - val newTblProperties = table.storage.properties ++ table.properties ++ HoodieOptionConfig.extractSqlOptions(options) + val newTblProperties = hoodieCatalogTable.catalogProperties ++ HoodieOptionConfig.extractSqlOptions(properties) val newTable = table.copy( identifier = newTableIdentifier, - schema = finalSchema, + schema = hoodieCatalogTable.tableSchema, storage = newStorage, createVersion = SPARK_VERSION, properties = newTblProperties @@ -201,9 +144,9 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean // Create table in the catalog val enableHive = isEnableHive(sparkSession) if (enableHive) { - createHiveDataSourceTable(newTable, sparkSession) + createHiveDataSourceTable(sparkSession, newTable, ignoreIfExists) } else { - sessionState.catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) + catalog.createTable(newTable, ignoreIfExists = false, validateLocation = false) } } @@ -215,7 +158,8 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean * @param table * @param sparkSession */ - private def createHiveDataSourceTable(table: CatalogTable, sparkSession: SparkSession): Unit = { + private def createHiveDataSourceTable(sparkSession: SparkSession, table: CatalogTable, + ignoreIfExists: Boolean): Unit = { val dbName = table.identifier.database.get // check database val dbExists = sparkSession.sessionState.catalog.databaseExists(dbName) @@ -237,43 +181,6 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean client.createTable(tableWithDataSourceProps, ignoreIfExists) } - private def formatName(name: String): String = { - if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT) - } - - // This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#verifyDataSchema - private def verifyDataSchema(tableName: TableIdentifier, tableType: CatalogTableType, - dataSchema: Seq[StructField]): Unit = { - if (tableType != CatalogTableType.VIEW) { - val invalidChars = Seq(",", ":", ";") - def verifyNestedColumnNames(schema: StructType): Unit = schema.foreach { f => - f.dataType match { - case st: StructType => verifyNestedColumnNames(st) - case _ if invalidChars.exists(f.name.contains) => - val invalidCharsString = invalidChars.map(c => s"'$c'").mkString(", ") - val errMsg = "Cannot create a table having a nested column whose name contains " + - s"invalid characters ($invalidCharsString) in Hive metastore. Table: $tableName; " + - s"Column: ${f.name}" - throw new AnalysisException(errMsg) - case _ => - } - } - - dataSchema.foreach { f => - f.dataType match { - // Checks top-level column names - case _ if f.name.contains(",") => - throw new AnalysisException("Cannot create a table having a column whose name " + - s"contains commas in Hive metastore. Table: $tableName; Column: ${f.name}") - // Checks nested column names - case st: StructType => - verifyNestedColumnNames(st) - case _ => - } - } - } - } - // This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#tableMetaToTableProps private def tableMetaToTableProps(sparkConf: SparkConf, table: CatalogTable, schema: StructType): Map[String, String] = { @@ -323,93 +230,15 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean properties.toMap } - def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean, - originTableConfig: Map[String, String] = Map.empty): Map[String, String] = { - val extraConfig = mutable.Map.empty[String, String] - if (isTableExists) { - val allPartitionPaths = getAllPartitionPaths(sparkSession, table) - if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) { - extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = - originTableConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) - } else { - extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = - String.valueOf(isHiveStyledPartitioning(allPartitionPaths, table)) - } - if (originTableConfig.contains(HoodieTableConfig.URL_ENCODE_PARTITIONING.key)) { - extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = - originTableConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) - } else { - extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = - String.valueOf(isUrlEncodeEnabled(allPartitionPaths, table)) - } - } else { - extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) = "true" - extraConfig(HoodieTableConfig.URL_ENCODE_PARTITIONING.key) = HoodieTableConfig.URL_ENCODE_PARTITIONING.defaultValue - } - - if (originTableConfig.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) { - extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = - HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator( - originTableConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)) - } else { - extraConfig(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = classOf[ComplexKeyGenerator].getCanonicalName - } - extraConfig.toMap - } - - /** - * Init the hoodie.properties. - */ - def initTableIfNeed(sparkSession: SparkSession, - location: String, - schema: StructType, + private def checkTableConfigEqual( originTableConfig: Map[String, String], - sqlOptions: Map[String, String]): Unit = { - - logInfo(s"Init hoodie.properties for $tableName") - val conf = sparkSession.sessionState.newHadoopConf() - - val tableOptions = HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) - checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PRECOMBINE_FIELD.key) - checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.PARTITION_FIELDS.key) - checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.RECORDKEY_FIELDS.key) - checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) - checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.URL_ENCODE_PARTITIONING.key) - checkTableConfigEqual(originTableConfig, tableOptions, HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) - // Save all the table config to the hoodie.properties. - val parameters = HoodieWriterUtils.mappingSparkDatasourceConfigsToTableConfigs(originTableConfig ++ tableOptions) - val properties = new Properties() - properties.putAll(parameters.asJava) - HoodieTableMetaClient.withPropertyBuilder() - .fromProperties(properties) - .setTableName(tableName) - .setTableCreateSchema(SchemaConverters.toAvroType(schema).toString()) - .setPartitionFields(table.partitionColumnNames.mkString(",")) - .initTable(conf, location) - } -} - -object CreateHoodieTableCommand extends Logging { - - def checkTableConfigEqual(originTableConfig: Map[String, String], - newTableConfig: Map[String, String], configKey: String): Unit = { + newTableConfig: Map[String, String], + configKey: String): Unit = { if (originTableConfig.contains(configKey) && newTableConfig.contains(configKey)) { assert(originTableConfig(configKey) == newTableConfig(configKey), s"Table config: $configKey in the create table is: ${newTableConfig(configKey)}, is not the same with the value in " + s"hoodie.properties, which is: ${originTableConfig(configKey)}. Please keep the same.") } } - - /** - * Check if this is a empty table path. - */ - def isEmptyPath(tablePath: String, conf: Configuration): Boolean = { - val basePath = new Path(tablePath) - val fs = basePath.getFileSystem(conf) - if (fs.exists(basePath)) { - fs.listStatus(basePath).isEmpty - } else { - true - } - } } + diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index 98cc4dd726ea..1decb5dcf14c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.DataSourceWriteOptions.{OPERATION, _} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport} import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hudi.HoodieSqlUtils._ @@ -57,26 +57,20 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab } private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = { - val targetTable = sparkSession.sessionState.catalog.getTableMetadata(tableId) - val tblProperties = targetTable.storage.properties ++ targetTable.properties - val path = getTableLocation(targetTable, sparkSession) - val conf = sparkSession.sessionState.newHadoopConf() - val metaClient = HoodieTableMetaClient.builder() - .setBasePath(path) - .setConf(conf) - .build() - val tableConfig = metaClient.getTableConfig - val tableSchema = getTableSqlSchema(metaClient).get + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId) + val path = hoodieCatalogTable.tableLocation + val tableConfig = hoodieCatalogTable.tableConfig + val tableSchema = hoodieCatalogTable.tableSchema val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase) val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name))) val primaryColumns = tableConfig.getRecordKeyFields.get() assert(primaryColumns.nonEmpty, s"There are no primary key defined in table $tableId, cannot execute delete operator") - withSparkConf(sparkSession, tblProperties) { + withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) { Map( "path" -> path, - TBL_NAME.key -> tableId.table, + TBL_NAME.key -> tableConfig.getTableName, HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning, KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index ac3fce531568..635aa64a099d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -22,7 +22,6 @@ import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord} -import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.{Option => HOption} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME @@ -34,15 +33,13 @@ import org.apache.hudi.sql.InsertMode import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter} import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hudi.HoodieSqlUtils._ -import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSqlUtils} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} import java.util.Properties @@ -53,10 +50,10 @@ import scala.collection.JavaConverters._ * Command for insert into hoodie table. */ case class InsertIntoHoodieTableCommand( - logicalRelation: LogicalRelation, - query: LogicalPlan, - partition: Map[String, Option[String]], - overwrite: Boolean) + logicalRelation: LogicalRelation, + query: LogicalPlan, + partition: Map[String, Option[String]], + overwrite: Boolean) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { @@ -83,14 +80,18 @@ object InsertIntoHoodieTableCommand extends Logging { * @param refreshTable Whether to refresh the table after insert finished. * @param extraOptions Extra options for insert. */ - def run(sparkSession: SparkSession, table: CatalogTable, query: LogicalPlan, - insertPartitions: Map[String, Option[String]], - overwrite: Boolean, refreshTable: Boolean = true, - extraOptions: Map[String, String] = Map.empty): Boolean = { + def run(sparkSession: SparkSession, + table: CatalogTable, + query: LogicalPlan, + insertPartitions: Map[String, Option[String]], + overwrite: Boolean, + refreshTable: Boolean = true, + extraOptions: Map[String, String] = Map.empty): Boolean = { - val config = buildHoodieInsertConfig(table, sparkSession, overwrite, insertPartitions, extraOptions) + val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table) + val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, overwrite, insertPartitions, extraOptions) - val mode = if (overwrite && table.partitionColumnNames.isEmpty) { + val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) { // insert overwrite non-partition table SaveMode.Overwrite } else { @@ -98,7 +99,7 @@ object InsertIntoHoodieTableCommand extends Logging { SaveMode.Append } val conf = sparkSession.sessionState.conf - val alignedQuery = alignOutputFields(query, table, insertPartitions, conf) + val alignedQuery = alignOutputFields(query, hoodieCatalogTable, insertPartitions, conf) // If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery), // The nullable attribute of fields will lost. // In order to pass the nullable attribute to the inputDF, we specify the schema @@ -120,18 +121,18 @@ object InsertIntoHoodieTableCommand extends Logging { /** * Aligned the type and name of query's output fields with the result table's fields. * @param query The insert query which to aligned. - * @param table The result table. + * @param hoodieCatalogTable The result hoodie catalog table. * @param insertPartitions The insert partition map. * @param conf The SQLConf. * @return */ private def alignOutputFields( query: LogicalPlan, - table: CatalogTable, + hoodieCatalogTable: HoodieCatalogTable, insertPartitions: Map[String, Option[String]], conf: SQLConf): LogicalPlan = { - val targetPartitionSchema = table.partitionSchema + val targetPartitionSchema = hoodieCatalogTable.partitionSchema val staticPartitionValues = insertPartitions.filter(p => p._2.isDefined).mapValues(_.get) assert(staticPartitionValues.isEmpty || @@ -139,20 +140,22 @@ object InsertIntoHoodieTableCommand extends Logging { s"Required partition columns is: ${targetPartitionSchema.json}, Current static partitions " + s"is: ${staticPartitionValues.mkString("," + "")}") - assert(staticPartitionValues.size + query.output.size == table.schema.size, - s"Required select columns count: ${removeMetaFields(table.schema).size}, " + + val queryOutputWithoutMetaFields = removeMetaFields(query.output) + assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size + == hoodieCatalogTable.tableSchemaWithoutMetaFields.size, + s"Required select columns count: ${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " + s"Current select columns(including static partition column) count: " + - s"${staticPartitionValues.size + removeMetaFields(query.output).size},columns: " + - s"(${(removeMetaFields(query.output).map(_.name) ++ staticPartitionValues.keys).mkString(",")})") - val queryDataFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition - query.output.dropRight(targetPartitionSchema.fields.length) + s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size},columns: " + + s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})") + + val queryDataFieldsWithoutMetaFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition + queryOutputWithoutMetaFields.dropRight(targetPartitionSchema.fields.length) } else { // insert static partition - query.output + queryOutputWithoutMetaFields } - val targetDataSchema = table.dataSchema // Align for the data fields of the query - val dataProjects = queryDataFields.zip(targetDataSchema.fields).map { - case (dataAttr, targetField) => + val dataProjectsWithputMetaFields = queryDataFieldsWithoutMetaFields.zip( + hoodieCatalogTable.dataSchemaWithoutMetaFields.fields).map { case (dataAttr, targetField) => val castAttr = castIfNeeded(dataAttr.withNullability(targetField.nullable), targetField.dataType, conf) Alias(castAttr, targetField.name)() @@ -161,9 +164,9 @@ object InsertIntoHoodieTableCommand extends Logging { val partitionProjects = if (staticPartitionValues.isEmpty) { // insert dynamic partitions // The partition attributes is followed the data attributes in the query // So we init the partitionAttrPosition with the data schema size. - var partitionAttrPosition = targetDataSchema.size + var partitionAttrPosition = hoodieCatalogTable.dataSchemaWithoutMetaFields.size targetPartitionSchema.fields.map(f => { - val partitionAttr = query.output(partitionAttrPosition) + val partitionAttr = queryOutputWithoutMetaFields(partitionAttrPosition) partitionAttrPosition = partitionAttrPosition + 1 val castAttr = castIfNeeded(partitionAttr.withNullability(f.nullable), f.dataType, conf) Alias(castAttr, f.name)() @@ -176,9 +179,7 @@ object InsertIntoHoodieTableCommand extends Logging { Alias(castAttr, f.name)() }) } - // Remove the hoodie meta fields from the projects as we do not need these to write - val withoutMetaFieldDataProjects = dataProjects.filter(c => !HoodieSqlUtils.isMetaField(c.name)) - val alignedProjects = withoutMetaFieldDataProjects ++ partitionProjects + val alignedProjects = dataProjectsWithputMetaFields ++ partitionProjects Project(alignedProjects, query) } @@ -187,65 +188,44 @@ object InsertIntoHoodieTableCommand extends Logging { * @return */ private def buildHoodieInsertConfig( - table: CatalogTable, + hoodieCatalogTable: HoodieCatalogTable, sparkSession: SparkSession, isOverwrite: Boolean, insertPartitions: Map[String, Option[String]] = Map.empty, extraOptions: Map[String, String]): Map[String, String] = { if (insertPartitions.nonEmpty && - (insertPartitions.keys.toSet != table.partitionColumnNames.toSet)) { + (insertPartitions.keys.toSet != hoodieCatalogTable.partitionFields.toSet)) { throw new IllegalArgumentException(s"Insert partition fields" + s"[${insertPartitions.keys.mkString(" " )}]" + - s" not equal to the defined partition in table[${table.partitionColumnNames.mkString(",")}]") - } - val path = getTableLocation(table, sparkSession) - val conf = sparkSession.sessionState.newHadoopConf() - val isTableExists = tableExistsInPath(path, conf) - val (tableConfig, tableSchema) = if (isTableExists) { - val metaClient = HoodieTableMetaClient.builder() - .setBasePath(path) - .setConf(conf) - .build() - (metaClient.getTableConfig, getTableSqlSchema(metaClient).get) - } else { - (new HoodieTableConfig(), table.schema) - } - val partitionColumns = tableConfig.getPartitionFieldProp - val partitionSchema = if (null == partitionColumns || partitionColumns.isEmpty) { - table.partitionSchema - } else { - StructType(tableSchema.filter(f => partitionColumns.contains(f.name))) + s" not equal to the defined partition in table[${hoodieCatalogTable.partitionFields.mkString(",")}]") } + val path = hoodieCatalogTable.tableLocation + val tableType = hoodieCatalogTable.tableTypeName + val tableConfig = hoodieCatalogTable.tableConfig + val tableSchema = hoodieCatalogTable.tableSchema - val options = table.storage.properties ++ table.properties ++ tableConfig.getProps.asScala.toMap ++ extraOptions + val options = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ extraOptions val parameters = withSparkConf(sparkSession, options)() - val tableName = Option(tableConfig.getTableName).getOrElse(table.identifier.table) - val tableType = Option(tableConfig.getTableType.name).getOrElse(TABLE_TYPE.defaultValue) - val primaryColumns = tableConfig.getRecordKeyFields.orElse(HoodieOptionConfig.getPrimaryColumns(options)) - val preCombineColumn = Option(tableConfig.getPreCombineField) - .getOrElse(HoodieOptionConfig.getPreCombineField(options).getOrElse("")) - val partitionFields = Option(tableConfig.getPartitionFieldProp) - .getOrElse(table.partitionColumnNames.mkString(",")) + val preCombineColumn = hoodieCatalogTable.preCombineKey.getOrElse("") + val partitionFields = hoodieCatalogTable.partitionFields.mkString(",") + val hiveStylePartitioningEnable = Option(tableConfig.getHiveStylePartitioningEnable).getOrElse("true") val urlEncodePartitioning = Option(tableConfig.getUrlEncodePartitoning).getOrElse("false") val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName) .getOrElse(classOf[ComplexKeyGenerator].getCanonicalName) - val dropDuplicate = sparkSession.conf - .getOption(INSERT_DROP_DUPS.key) - .getOrElse(INSERT_DROP_DUPS.defaultValue) - .toBoolean - val enableBulkInsert = parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key, DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean - val hasPrecombineColumn = preCombineColumn.nonEmpty - val isPartitionedTable = table.partitionColumnNames.nonEmpty + val dropDuplicate = sparkSession.conf + .getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean + val insertMode = InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key, DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue())) val isNonStrictMode = insertMode == InsertMode.NON_STRICT - + val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty + val hasPrecombineColumn = preCombineColumn.nonEmpty val operation = (enableBulkInsert, isOverwrite, dropDuplicate, isNonStrictMode, isPartitionedTable) match { case (true, _, _, false, _) => @@ -284,14 +264,14 @@ object InsertIntoHoodieTableCommand extends Logging { Map( "path" -> path, TABLE_TYPE.key -> tableType, - TBL_NAME.key -> tableName, + TBL_NAME.key -> hoodieCatalogTable.tableName, PRECOMBINE_FIELD.key -> preCombineColumn, OPERATION.key -> operation, HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning, KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> keyGeneratorClassName, - RECORDKEY_FIELD.key -> primaryColumns.mkString(","), + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), PARTITIONPATH_FIELD.key -> partitionFields, PAYLOAD_CLASS_NAME.key -> payloadClassName, ENABLE_ROW_WRITER.key -> enableBulkInsert.toString, @@ -299,14 +279,14 @@ object InsertIntoHoodieTableCommand extends Logging { META_SYNC_ENABLED.key -> enableHive.toString, HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(), HIVE_USE_JDBC.key -> "false", - HIVE_DATABASE.key -> table.identifier.database.getOrElse("default"), - HIVE_TABLE.key -> table.identifier.table, + HIVE_DATABASE.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"), + HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table, HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", HIVE_PARTITION_FIELDS.key -> partitionFields, HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> "200", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", - SqlKeyGenerator.PARTITION_SCHEMA -> table.partitionSchema.toDDL + SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL ) } } @@ -324,7 +304,7 @@ class ValidateDuplicateKeyPayload(record: GenericRecord, orderingVal: Comparable } override def combineAndGetUpdateValue(currentValue: IndexedRecord, - schema: Schema, properties: Properties): HOption[IndexedRecord] = { + schema: Schema, properties: Properties): HOption[IndexedRecord] = { val key = currentValue.asInstanceOf[GenericRecord].get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString throw new HoodieDuplicateKeyException(key) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 251ebc32d828..a746f83737a1 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -19,26 +19,29 @@ package org.apache.spark.sql.hudi.command import org.apache.avro.Schema import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.StringUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils, SparkAdapterSupport} + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Cast, EqualTo, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.hudi.command.payload.ExpressionPayload import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._ -import org.apache.spark.sql.hudi.{HoodieOptionConfig, SerDeUtils} +import org.apache.spark.sql.hudi.SerDeUtils import org.apache.spark.sql.types.{BooleanType, StructType} + import java.util.Base64 + /** * The Command for hoodie MergeIntoTable. * The match on condition must contain the row key fields currently, so that we can use Hoodie @@ -78,12 +81,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab private lazy val targetTableSchemaWithoutMetaFields = removeMetaFields(mergeInto.targetTable.schema).fields - private lazy val targetTable = - sparkSession.sessionState.catalog.getTableMetadata(targetTableIdentify) + private lazy val hoodieCatalogTable = HoodieCatalogTable(sparkSession, targetTableIdentify) - private lazy val tblProperties = targetTable.storage.properties ++ targetTable.properties - - private lazy val targetTableType = HoodieOptionConfig.getTableType(tblProperties) + private lazy val targetTableType = hoodieCatalogTable.tableTypeName /** * @@ -126,7 +126,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab assert(updateActions.size <= 1, s"Only support one updateAction currently, current update action count is: ${updateActions.size}") val updateAction = updateActions.headOption - HoodieOptionConfig.getPreCombineField(tblProperties).map(preCombineField => { + hoodieCatalogTable.preCombineKey.map(preCombineField => { val sourcePreCombineField = updateAction.map(u => u.assignments.filter { case Assignment(key: AttributeReference, _) => key.name.equalsIgnoreCase(preCombineField) @@ -149,9 +149,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab this.sparkSession = sparkSession // Create the write parameters - val parameters = buildMergeIntoConfig(mergeInto) - - val sourceDF = buildSourceDF(sparkSession) + val parameters = buildMergeIntoConfig(hoodieCatalogTable) if (mergeInto.matchedActions.nonEmpty) { // Do the upsert executeUpsert(sourceDF, parameters) @@ -180,7 +178,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab * row key and pre-combine field. * */ - private def buildSourceDF(sparkSession: SparkSession): DataFrame = { + private lazy val sourceDF: DataFrame = { var sourceDF = Dataset.ofRows(sparkSession, mergeInto.sourceTable) targetKey2SourceExpression.foreach { case (targetColumn, sourceExpression) @@ -429,33 +427,24 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab /** * Create the config for hoodie writer. - * @param mergeInto - * @return */ - private def buildMergeIntoConfig(mergeInto: MergeIntoTable): Map[String, String] = { + private def buildMergeIntoConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, String] = { val targetTableDb = targetTableIdentify.database.getOrElse("default") val targetTableName = targetTableIdentify.identifier - val path = getTableLocation(targetTable, sparkSession) - val conf = sparkSession.sessionState.newHadoopConf() - val metaClient = HoodieTableMetaClient.builder() - .setBasePath(path) - .setConf(conf) - .build() - val tableConfig = metaClient.getTableConfig - val tableSchema = getTableSqlSchema(metaClient).get + val path = hoodieCatalogTable.tableLocation + val tableConfig = hoodieCatalogTable.tableConfig + val tableSchema = hoodieCatalogTable.tableSchema val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase) val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name))) - val options = tblProperties - val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("") // Enable the hive sync by default if spark have enable the hive metastore. val enableHive = isEnableHive(sparkSession) - withSparkConf(sparkSession, options) { + withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) { Map( "path" -> path, RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, - PRECOMBINE_FIELD.key -> preCombineColumn, + PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), TBL_NAME.key -> targetTableName, PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala index 1c1f4b73d0da..d27ba6acd07a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/ShowHoodieTablePartitionsCommand.scala @@ -17,22 +17,22 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.PartitionPathEncodeUtils + import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.PartitioningUtils -import org.apache.spark.sql.hudi.HoodieSqlUtils._ import org.apache.spark.sql.types.StringType /** * Command for show hudi table's partitions. */ case class ShowHoodieTablePartitionsCommand( - tableName: TableIdentifier, + tableIdentifier: TableIdentifier, specOpt: Option[TablePartitionSpec]) extends RunnableCommand { @@ -41,28 +41,17 @@ extends RunnableCommand { } override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - val resolver = sparkSession.sessionState.conf.resolver - val catalogTable = catalog.getTableMetadata(tableName) - val tablePath = getTableLocation(catalogTable, sparkSession) - - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath) - .setConf(hadoopConf).build() - val schemaOpt = getTableSqlSchema(metaClient) - val partitionColumnNamesOpt = metaClient.getTableConfig.getPartitionFields - if (partitionColumnNamesOpt.isPresent && partitionColumnNamesOpt.get.nonEmpty - && schemaOpt.isDefined && schemaOpt.nonEmpty) { + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) - val partitionColumnNames = partitionColumnNamesOpt.get - val schema = schemaOpt.get - val allPartitionPaths: Seq[String] = getAllPartitionPaths(sparkSession, catalogTable) + val schemaOpt = hoodieCatalogTable.tableSchema + val partitionColumnNamesOpt = hoodieCatalogTable.tableConfig.getPartitionFields + if (partitionColumnNamesOpt.isPresent && partitionColumnNamesOpt.get.nonEmpty && schemaOpt.nonEmpty) { if (specOpt.isEmpty) { - allPartitionPaths.map(Row(_)) + hoodieCatalogTable.getAllPartitionPaths.map(Row(_)) } else { val spec = specOpt.get - allPartitionPaths.filter { partitionPath => + hoodieCatalogTable.getAllPartitionPaths.filter { partitionPath => val part = PartitioningUtils.parsePathFragment(partitionPath) spec.forall { case (col, value) => PartitionPathEncodeUtils.escapePartitionValue(value) == part.getOrElse(col, null) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala index a57a3d1bd457..2090185f3150 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala @@ -18,45 +18,37 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.common.table.HoodieTableMetaClient + import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.execution.command.TruncateTableCommand -import org.apache.spark.sql.hudi.HoodieSqlUtils.getTableLocation /** * Command for truncate hudi table. */ class TruncateHoodieTableCommand( - tableName: TableIdentifier, + tableIdentifier: TableIdentifier, partitionSpec: Option[TablePartitionSpec]) - extends TruncateTableCommand(tableName, partitionSpec) { + extends TruncateTableCommand(tableIdentifier, partitionSpec) { override def run(sparkSession: SparkSession): Seq[Row] = { - val table = sparkSession.sessionState.catalog.getTableMetadata(tableName) - val path = getTableLocation(table, sparkSession) - val hadoopConf = sparkSession.sessionState.newHadoopConf() - // If we have not specified the partition, truncate will delete all the - // data in the table path include the hoodi.properties. In this case we - // should reInit the table. - val needReInitTable = partitionSpec.isEmpty + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier) + val properties = hoodieCatalogTable.tableConfig.getProps + val tablePath = hoodieCatalogTable.tableLocation - val tableProperties = if (needReInitTable) { - // Create MetaClient - val metaClient = HoodieTableMetaClient.builder().setBasePath(path) - .setConf(hadoopConf).build() - Some(metaClient.getTableConfig.getProps(true)) - } else { - None - } // Delete all data in the table directory super.run(sparkSession) - if (tableProperties.isDefined) { + // If we have not specified the partition, truncate will delete all the data in the table path + // include the hoodi.properties. In this case we should reInit the table. + if (partitionSpec.isEmpty) { + val hadoopConf = sparkSession.sessionState.newHadoopConf() // ReInit hoodie.properties HoodieTableMetaClient.withPropertyBuilder() - .fromProperties(tableProperties.get) - .initTable(hadoopConf, path) + .fromProperties(properties) + .initTable(hadoopConf, hoodieCatalogTable.tableLocation) } Seq.empty[Row] } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index 0c7d1ef0b4cb..a6a0cb117805 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -20,17 +20,18 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.SparkAdapterSupport import org.apache.hudi.common.model.HoodieRecord -import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.ddl.HiveSyncMode import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable} import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.hudi.HoodieSqlUtils._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StructField, StructType} import scala.collection.JavaConverters._ @@ -43,9 +44,7 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo override def run(sparkSession: SparkSession): Seq[Row] = { logInfo(s"start execute update command for $tableId") - def cast(exp:Expression, field: StructField): Expression = { - castIfNeeded(exp, field.dataType, sparkSession.sqlContext.conf) - } + val sqlConf = sparkSession.sessionState.conf val name2UpdateValue = updateTable.assignments.map { case Assignment(attr: AttributeReference, value) => attr.name -> value @@ -61,9 +60,9 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo val projects = updateExpressions.zip(removeMetaFields(table.schema).fields).map { case (attr: AttributeReference, field) => - Column(cast(attr, field)) + Column(cast(attr, field, sqlConf)) case (exp, field) => - Column(Alias(cast(exp, field), field.name)()) + Column(Alias(cast(exp, field, sqlConf), field.name)()) } var df = Dataset.ofRows(sparkSession, table) @@ -83,30 +82,21 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo } private def buildHoodieConfig(sparkSession: SparkSession): Map[String, String] = { - val targetTable = sparkSession.sessionState.catalog.getTableMetadata(tableId) - val tblProperties = targetTable.storage.properties ++ targetTable.properties - val path = getTableLocation(targetTable, sparkSession) - val conf = sparkSession.sessionState.newHadoopConf() - val metaClient = HoodieTableMetaClient.builder() - .setBasePath(path) - .setConf(conf) - .build() - val tableConfig = metaClient.getTableConfig - val tableSchema = getTableSqlSchema(metaClient).get - val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase) - val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name))) - val primaryColumns = tableConfig.getRecordKeyFields.get() + val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableId) + val catalogProperties = hoodieCatalogTable.catalogProperties + val tableConfig = hoodieCatalogTable.tableConfig + val preCombineColumn = Option(tableConfig.getPreCombineField).getOrElse("") - assert(primaryColumns.nonEmpty, + assert(hoodieCatalogTable.primaryKeys.nonEmpty, s"There are no primary key in table $tableId, cannot execute update operator") val enableHive = isEnableHive(sparkSession) - withSparkConf(sparkSession, tblProperties) { + withSparkConf(sparkSession, catalogProperties) { Map( - "path" -> path, - RECORDKEY_FIELD.key -> primaryColumns.mkString(","), + "path" -> hoodieCatalogTable.tableLocation, + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), PRECOMBINE_FIELD.key -> preCombineColumn, - TBL_NAME.key -> tableId.table, + TBL_NAME.key -> hoodieCatalogTable.tableName, HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitoning, KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, @@ -122,8 +112,12 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName, HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200", - SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL + SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL ) } } + + def cast(exp:Expression, field: StructField, sqlConf: SQLConf): Expression = { + castIfNeeded(exp, field.dataType, sqlConf) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala index 8cc092c2e5f0..f8326386d340 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestAlterTableDropPartition.scala @@ -47,7 +47,7 @@ class TestAlterTableDropPartition extends TestHoodieSqlBase { spark.sql(s"""insert into $tableName values (1, "z3", "v1", "2021-10-01"), (2, "l4", "v1", "2021-10-02")""") checkExceptionContain(s"alter table $tableName drop partition (dt='2021-10-01')")( - s"dt is not a valid partition column in table `default`.`$tableName`.") + s"dt is not a valid partition column in table") } Seq(false, true).foreach { urlencode => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala index 5cd66fd4838d..4c0c60385104 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieOptionConfig.scala @@ -138,7 +138,7 @@ class TestHoodieOptionConfig extends HoodieClientTestBase { val e2 = intercept[IllegalArgumentException] { HoodieOptionConfig.validateTable(spark, schema, sqlOptions2) } - assertTrue(e2.getMessage.contains("Can't find primary key")) + assertTrue(e2.getMessage.contains("Can't find primaryKey")) // preCombine field not found val sqlOptions3 = baseSqlOptions ++ Map( @@ -149,7 +149,7 @@ class TestHoodieOptionConfig extends HoodieClientTestBase { val e3 = intercept[IllegalArgumentException] { HoodieOptionConfig.validateTable(spark, schema, sqlOptions3) } - assertTrue(e3.getMessage.contains("Can't find precombine key")) + assertTrue(e3.getMessage.contains("Can't find preCombineKey")) // miss type parameter val sqlOptions4 = baseSqlOptions ++ Map( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 09711cc8f1be..ee1e2e6f42cf 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hudi -import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.exception.HoodieDuplicateKeyException @@ -265,10 +264,6 @@ class TestInsertTable extends TestHoodieSqlBase { test("Test insert for uppercase table name") { withTempDir{ tmp => val tableName = s"H_$generateTableName" - HoodieTableMetaClient.withPropertyBuilder() - .setTableName(tableName) - .setTableType(HoodieTableType.COPY_ON_WRITE.name()) - .initTable(spark.sessionState.newHadoopConf(), tmp.getCanonicalPath) spark.sql( s""" @@ -285,6 +280,11 @@ class TestInsertTable extends TestHoodieSqlBase { checkAnswer(s"select id, name, price from $tableName")( Seq(1, "a1", 10.0) ) + val metaClient = HoodieTableMetaClient.builder() + .setBasePath(tmp.getCanonicalPath) + .setConf(spark.sessionState.newHadoopConf()) + .build() + assertResult(metaClient.getTableConfig.getTableName)(tableName) } }