Skip to content

Commit

Permalink
[HUDI-2811] [MINOR] alter table add columns
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed Dec 11, 2021
1 parent 5d6ffa6 commit 2e84bd7
Showing 1 changed file with 6 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,22 @@ case class AlterHoodieTableAddColumnsCommand(
s" table columns is: [${hoodieCatalogTable.tableSchemaWithoutMetaFields.fieldNames.mkString(",")}]")
}
// Get the new schema
val newSqlSchema = StructType(tableSchema.fields ++ colsToAdd)
val newSqlSchema = StructType(hoodieCatalogTable.dataSchema.fields ++ colsToAdd ++ hoodieCatalogTable.partitionSchema.fields)
val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableId.table)
val newSchema = AvroConversionUtils.convertStructTypeToAvroSchema(newSqlSchema, structName, nameSpace)

// Commit with new schema to change the table schema
AlterHoodieTableAddColumnsCommand.commitWithSchema(newSchema, hoodieCatalogTable, sparkSession)

// Refresh the new schema to meta
refreshSchemaInMeta(sparkSession, hoodieCatalogTable.table, newSchema)
val newDataSchema = StructType(hoodieCatalogTable.dataSchema.fields ++ colsToAdd)
refreshSchemaInMeta(sparkSession, hoodieCatalogTable.table, newDataSchema)
}
Seq.empty[Row]
}

private def refreshSchemaInMeta(sparkSession: SparkSession, table: CatalogTable,
newSchema: Schema): Unit = {
newSqlDataSchema: StructType): Unit = {
try {
sparkSession.catalog.uncacheTable(tableId.quotedString)
} catch {
Expand All @@ -87,16 +88,12 @@ case class AlterHoodieTableAddColumnsCommand(
}
sparkSession.catalog.refreshTable(table.identifier.unquotedString)

val newSqlSchema = AvroConversionUtils.convertAvroSchemaToStructType(newSchema)
val newTable = table.copy(schema = newSqlSchema)
SchemaUtils.checkColumnNameDuplication(
newTable.dataSchema.map(_.name),
newSqlDataSchema.map(_.name),
"in the table definition of " + table.identifier,
conf.caseSensitiveAnalysis)

DDLUtils.checkDataColNames(newTable)

sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newTable.dataSchema)
sparkSession.sessionState.catalog.alterTableDataSchema(tableId, newSqlDataSchema)
}
}

Expand Down

0 comments on commit 2e84bd7

Please sign in to comment.