Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ case class AlterTableSerDePropertiesCommand(
tableName: TableIdentifier,
serdeClassName: Option[String],
serdeProperties: Option[Map[String, String]],
partition: Option[Map[String, String]])
partSpec: Option[TablePartitionSpec])
extends RunnableCommand {

// should never happen if we parsed things correctly
Expand All @@ -306,15 +306,29 @@ case class AlterTableSerDePropertiesCommand(
"ALTER TABLE SERDEPROPERTIES")
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
// Do not support setting serde for datasource tables
// For datasource tables, disallow setting serde or specifying partition
if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException("Operation not allowed: ALTER TABLE SET " +
"[SERDE | SERDEPROPERTIES] for a specific partition is not supported " +
"for tables created with the datasource API")
}
if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
throw new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " +
"not supported for tables created with the datasource API")
}
val newTable = table.withNewStorage(
serde = serdeClassName.orElse(table.storage.serde),
serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map()))
catalog.alterTable(newTable)
if (partSpec.isEmpty) {
val newTable = table.withNewStorage(
serde = serdeClassName.orElse(table.storage.serde),
serdeProperties = table.storage.serdeProperties ++ serdeProperties.getOrElse(Map()))
catalog.alterTable(newTable)
} else {
val spec = partSpec.get
val part = catalog.getPartition(tableName, spec)
val newPart = part.copy(storage = part.storage.copy(
serde = serdeClassName.orElse(part.storage.serde),
serdeProperties = part.storage.serdeProperties ++ serdeProperties.getOrElse(Map())))
catalog.alterPartitions(tableName, Seq(newPart))
}
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testSetSerde(isDatasourceTable = true)
}

test("alter table: set serde partition") {
testSetSerdePartition(isDatasourceTable = false)
}

test("alter table: set serde partition (datasource table)") {
testSetSerdePartition(isDatasourceTable = true)
}

test("alter table: bucketing is not supported") {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
Expand Down Expand Up @@ -931,6 +939,62 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo"))
}

private def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
val spec = Map("a" -> "1", "b" -> "2")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, spec, tableIdent)
createTablePartition(catalog, Map("a" -> "1", "b" -> "3"), tableIdent)
createTablePartition(catalog, Map("a" -> "2", "b" -> "2"), tableIdent)
createTablePartition(catalog, Map("a" -> "2", "b" -> "3"), tableIdent)
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}
assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty)
assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties.isEmpty)
// set table serde and/or properties (should fail on datasource tables)
if (isDatasourceTable) {
val e1 = intercept[AnalysisException] {
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'whatever'")
}
val e2 = intercept[AnalysisException] {
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " +
"WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
}
assert(e1.getMessage.contains("datasource"))
assert(e2.getMessage.contains("datasource"))
} else {
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.jadoop'")
assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.jadoop"))
assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties.isEmpty)
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " +
"WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.madoop"))
assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties ==
Map("k" -> "v", "kay" -> "vee"))
}
// set serde properties only
maybeWrapException(isDatasourceTable) {
sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) " +
"SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties ==
Map("k" -> "vvv", "kay" -> "vee"))
}
// set things without explicitly specifying database
catalog.setCurrentDatabase("dbx")
maybeWrapException(isDatasourceTable) {
sql("ALTER TABLE tab1 PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 'veee')")
assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties ==
Map("k" -> "vvv", "kay" -> "veee"))
}
// table to alter does not exist
intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
}
}

private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
val catalog = spark.sessionState.catalog
val tableIdent = TableIdentifier("tab1", Some("dbx"))
Expand Down