Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14388] [SQL] Implement CREATE TABLE #12363

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -272,8 +272,7 @@ createFileFormat
;

fileFormat
: INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)?
(INPUTDRIVER inDriver=STRING OUTPUTDRIVER outDriver=STRING)? #tableFileFormat
: INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)? #tableFileFormat
| identifier #genericFileFormat
;

Expand Down
Expand Up @@ -220,14 +220,30 @@ case class CatalogTable(
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
partitionColumns: Seq[CatalogColumn] = Seq.empty,
sortColumns: Seq[CatalogColumn] = Seq.empty,
numBuckets: Int = 0,
partitionColumnNames: Seq[String] = Seq.empty,
sortColumnNames: Seq[String] = Seq.empty,
bucketColumnNames: Seq[String] = Seq.empty,
numBuckets: Int = -1,
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
properties: Map[String, String] = Map.empty,
viewOriginalText: Option[String] = None,
viewText: Option[String] = None) {
viewText: Option[String] = None,
comment: Option[String] = None) {

// Verify that the provided columns are part of the schema
private val colNames = schema.map(_.name).toSet
private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
}
requireSubsetOfSchema(partitionColumnNames, "partition")
requireSubsetOfSchema(sortColumnNames, "sort")
requireSubsetOfSchema(bucketColumnNames, "bucket")

/** Columns this table is partitioned by. */
def partitionColumns: Seq[CatalogColumn] =
schema.filter { c => partitionColumnNames.contains(c.name) }

/** Return the database this table was specified to belong to, assuming it exists. */
def database: String = identifier.database.getOrElse {
Expand Down
Expand Up @@ -553,8 +553,12 @@ abstract class CatalogTestUtils {
identifier = TableIdentifier(name, database),
tableType = CatalogTableType.EXTERNAL_TABLE,
storage = storageFormat,
schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")))
schema = Seq(
CatalogColumn("col1", "int"),
CatalogColumn("col2", "string"),
CatalogColumn("a", "int"),
CatalogColumn("b", "string")),
partitionColumnNames = Seq("a", "b"))
}

def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
Expand Down
Expand Up @@ -179,7 +179,9 @@ class SparkSqlAstBuilder extends AstBuilder {
}
}

/** Type to keep track of a table header. */
/**
* Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal).
*/
type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean)

/**
Expand Down Expand Up @@ -616,10 +618,7 @@ class SparkSqlAstBuilder extends AstBuilder {
case s: GenericFileFormatContext =>
(Seq.empty[String], Option(s.identifier.getText))
case s: TableFileFormatContext =>
val elements = Seq(s.inFmt, s.outFmt) ++
Option(s.serdeCls).toSeq ++
Option(s.inDriver).toSeq ++
Option(s.outDriver).toSeq
val elements = Seq(s.inFmt, s.outFmt) ++ Option(s.serdeCls).toSeq
(elements.map(string), None)
}
AlterTableSetFileFormat(
Expand Down Expand Up @@ -773,22 +772,6 @@ class SparkSqlAstBuilder extends AstBuilder {
.map(_.identifier.getText))
}

/**
* Create a skew specification. This contains three components:
* - The Skewed Columns
* - Values for which are skewed. The size of each entry must match the number of skewed columns.
* - A store in directory flag.
*/
override def visitSkewSpec(
ctx: SkewSpecContext): (Seq[String], Seq[Seq[String]], Boolean) = withOrigin(ctx) {
val skewedValues = if (ctx.constantList != null) {
Seq(visitConstantList(ctx.constantList))
} else {
visitNestedConstantList(ctx.nestedConstantList)
}
(visitIdentifierList(ctx.identifierList), skewedValues, ctx.DIRECTORIES != null)
}

/**
* Convert a nested constants list into a sequence of string sequences.
*/
Expand Down
Expand Up @@ -224,29 +224,6 @@ case class DropTable(
}
}

/**
* A command that renames a table/view.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table1 RENAME TO table2;
* ALTER VIEW view1 RENAME TO view2;
* }}}
*/
case class AlterTableRename(
oldName: TableIdentifier,
newName: TableIdentifier)
extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
catalog.invalidateTable(oldName)
catalog.renameTable(oldName, newName)
Seq.empty[Row]
}

}

/**
* A command that sets table/view properties.
*
Expand Down
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable


// TODO: move the rest of the table commands from ddl.scala to this file

/**
* A command to create a table.
*
* Note: This is currently used only for creating Hive tables.
* This is not intended for temporary tables.
*
* The syntax of using this command in SQL is:
* {{{
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
* [(col1 data_type [COMMENT col_comment], ...)]
* [COMMENT table_comment]
* [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
* [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
* [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...)
* [STORED AS DIRECTORIES]
* [ROW FORMAT row_format]
* [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
* [LOCATION path]
* [TBLPROPERTIES (property_name=property_value, ...)]
* [AS select_statement];
* }}}
*/
case class CreateTable(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
sqlContext.sessionState.catalog.createTable(table, ifNotExists)
Seq.empty[Row]
}

}


/**
* A command that renames a table/view.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table1 RENAME TO table2;
* ALTER VIEW view1 RENAME TO view2;
* }}}
*/
case class AlterTableRename(
oldName: TableIdentifier,
newName: TableIdentifier)
extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
catalog.invalidateTable(oldName)
catalog.renameTable(oldName, newName)
Seq.empty[Row]
}

}
Expand Up @@ -440,37 +440,25 @@ class DDLCommandSuite extends PlanTest {
}

test("alter table: set file format") {
val sql1 =
"""
|ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test'
|OUTPUTFORMAT 'test' SERDE 'test' INPUTDRIVER 'test' OUTPUTDRIVER 'test'
""".stripMargin
val sql2 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
val sql1 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
"OUTPUTFORMAT 'test' SERDE 'test'"
val sql3 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
"SET FILEFORMAT PARQUET"
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val parsed3 = parser.parsePlan(sql3)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableSetFileFormat(
tableIdent,
None,
List("test", "test", "test", "test", "test"),
List("test", "test", "test"),
None)(sql1)
val expected2 = AlterTableSetFileFormat(
tableIdent,
None,
List("test", "test", "test"),
None)(sql2)
val expected3 = AlterTableSetFileFormat(
tableIdent,
Some(Map("dt" -> "2008-08-08", "country" -> "us")),
Seq(),
Some("PARQUET"))(sql3)
Some("PARQUET"))(sql2)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
}

test("alter table: set location") {
Expand Down
Expand Up @@ -380,8 +380,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}

// TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext

test("show tables") {
withTempTable("show1a", "show2b") {
sql(
Expand Down
Expand Up @@ -162,7 +162,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {

runCliWithin(3.minute)(
"CREATE TABLE hive_test(key INT, val STRING);"
-> "OK",
-> "",
"SHOW TABLES;"
-> "hive_test",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE hive_test;"
Expand All @@ -187,7 +187,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
"USE hive_test_db;"
-> "",
"CREATE TABLE hive_test(key INT, val STRING);"
-> "OK",
-> "",
"SHOW TABLES;"
-> "hive_test"
)
Expand All @@ -210,9 +210,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
"""CREATE TABLE t1(key string, val string)
|ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
""".stripMargin
-> "OK",
-> "",
"CREATE TABLE sourceTable (key INT, val STRING);"
-> "OK",
-> "",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;"
-> "OK",
"INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;"
Expand Down