Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12689][SQL] Migrate DDL parsing to the newly absorbed parser #10723

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ KW_ISOLATION: 'ISOLATION';
KW_LEVEL: 'LEVEL';
KW_SNAPSHOT: 'SNAPSHOT';
KW_AUTOCOMMIT: 'AUTOCOMMIT';
KW_REFRESH: 'REFRESH';
KW_OPTIONS: 'OPTIONS';
KW_WEEK: 'WEEK'|'WEEKS';
KW_MILLISECOND: 'MILLISECOND'|'MILLISECONDS';
KW_MICROSECOND: 'MICROSECOND'|'MICROSECONDS';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ TOK_UNIONTYPE;
TOK_COLTYPELIST;
TOK_CREATEDATABASE;
TOK_CREATETABLE;
TOK_CREATETABLEUSING;
TOK_TRUNCATETABLE;
TOK_CREATEINDEX;
TOK_CREATEINDEX_INDEXTBLNAME;
Expand Down Expand Up @@ -371,6 +372,10 @@ TOK_TXN_READ_WRITE;
TOK_COMMIT;
TOK_ROLLBACK;
TOK_SET_AUTOCOMMIT;
TOK_REFRESHTABLE;
TOK_TABLEPROVIDER;
TOK_TABLEOPTIONS;
TOK_TABLEOPTION;
}


Expand Down Expand Up @@ -764,6 +769,7 @@ ddlStatement
| truncateTableStatement
| alterStatement
| descStatement
| refreshStatement
| showStatement
| metastoreCheck
| createViewStatement
Expand Down Expand Up @@ -890,12 +896,31 @@ createTableStatement
@init { pushMsg("create table statement", state); }
@after { popMsg(state); }
: KW_CREATE (temp=KW_TEMPORARY)? (ext=KW_EXTERNAL)? KW_TABLE ifNotExists? name=tableName
( like=KW_LIKE likeName=tableName
(
like=KW_LIKE likeName=tableName
tableRowFormat?
tableFileFormat?
tableLocation?
tablePropertiesPrefixed?
-> ^(TOK_CREATETABLE $name $temp? $ext? ifNotExists?
^(TOK_LIKETABLE $likeName?)
tableRowFormat?
tableFileFormat?
tableLocation?
tablePropertiesPrefixed?
)
|
tableProvider
tableOpts?
(KW_AS selectStatementWithCTE)?
-> ^(TOK_CREATETABLEUSING $name $temp? ifNotExists?
tableProvider
tableOpts?
selectStatementWithCTE?
)
| (LPAREN columnNameTypeList RPAREN)?
(p=tableProvider?)
tableOpts?
tableComment?
tablePartition?
tableBuckets?
Expand All @@ -905,8 +930,14 @@ createTableStatement
tableLocation?
tablePropertiesPrefixed?
(KW_AS selectStatementWithCTE)?
)
-> ^(TOK_CREATETABLE $name $temp? $ext? ifNotExists?
-> {p != null}?
^(TOK_CREATETABLEUSING $name $temp? ifNotExists?
columnNameTypeList?
$p
tableOpts?
)
->
^(TOK_CREATETABLE $name $temp? $ext? ifNotExists?
^(TOK_LIKETABLE $likeName?)
columnNameTypeList?
tableComment?
Expand All @@ -918,7 +949,8 @@ createTableStatement
tableLocation?
tablePropertiesPrefixed?
selectStatementWithCTE?
)
)
)
;

truncateTableStatement
Expand Down Expand Up @@ -1362,6 +1394,13 @@ tabPartColTypeExpr
: tableName partitionSpec? extColumnName? -> ^(TOK_TABTYPE tableName partitionSpec? extColumnName?)
;

refreshStatement
@init { pushMsg("refresh statement", state); }
@after { popMsg(state); }
:
KW_REFRESH KW_TABLE tableName -> ^(TOK_REFRESHTABLE tableName)
;

descStatement
@init { pushMsg("describe statement", state); }
@after { popMsg(state); }
Expand Down Expand Up @@ -1757,6 +1796,29 @@ showStmtIdentifier
| StringLiteral
;

tableProvider
@init { pushMsg("table's provider", state); }
@after { popMsg(state); }
:
KW_USING provider=Identifier -> ^(TOK_TABLEPROVIDER $provider)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Providers can have dotted names, e.g.: org.apache.spark.sql.avro. Identifier won't work here. You would need something like: Identifier (DOT Identifier)*

I am not sure if we should support quoted identifiers here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we just use identifiers without quotes for providers before.

;

optionKeyValue
@init { pushMsg("table's option specification", state); }
@after { popMsg(state); }
:
Identifier StringLiteral
-> ^(TOK_TABLEOPTION Identifier StringLiteral)
;

tableOpts
@init { pushMsg("table's options", state); }
@after { popMsg(state); }
:
KW_OPTIONS LPAREN optionKeyValue (COMMA optionKeyValue)* RPAREN
-> ^(TOK_TABLEOPTIONS optionKeyValue+)
;

tableComment
@init { pushMsg("table's comment", state); }
@after { popMsg(state); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
*/
package org.apache.spark.sql.execution

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.parser.{ASTNode, ParserConf, SimpleParserConf}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.types.StructType

private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends CatalystQl(conf) {
/** Check if a command should not be explained. */
Expand All @@ -42,6 +45,84 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs)
ExplainCommand(nodeToPlan(query), extended = extended.isDefined)

case Token("TOK_REFRESHTABLE", nameParts :: Nil) =>
val tableIdent = extractTableIdent(nameParts)
RefreshTable(tableIdent)

case Token("TOK_CREATETABLEUSING", createTableArgs) =>
val clauses = getClauses(
Seq("TEMPORARY", "TOK_IFNOTEXISTS", "TOK_TABNAME", "TOK_TABCOLLIST",
"TOK_TABLEPROVIDER", "TOK_TABLEOPTIONS", "TOK_QUERY"), createTableArgs)

val temp = clauses(0)
val allowExisting = clauses(1)
val Some(tabName) = clauses(2)
val tableCols = clauses(3)
val Some(tableProvider) = clauses(4)
val tableOpts = clauses(5)
val tableAs = clauses(6)

val tableIdent: TableIdentifier = tabName match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use extractTableIdent?

case Token("TOK_TABNAME", Token(dbName, _) :: Token(tableName, _) :: Nil) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am personally not big on using wildcards in pattern matches. This prevents us from catching a grammar problem early. Since most of the wildcards are actually empty lists, why not match these?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get understood. I think I don't use wildcards in pattern matches here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as you use an underscore in a pattern match, for example: Token(dbName, _), you are using a wildcard in the second position.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see, just not sure where you are pointing to. I was wondering I don't have case _ branch. Thanks.

new TableIdentifier(tableName, Some(dbName))
case Token("TOK_TABNAME", Token(tableName, _) :: Nil) =>
TableIdentifier(tableName)
}

val columns = tableCols.map {
case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField))
}

val provider = tableProvider match {
case Token("TOK_TABLEPROVIDER", Token(provider, _) :: Nil) => provider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TOK_TABLEPROVIDER will have multiple children if the name is dotted, e.g.:org.apache.spark.sql.avro. So we need to handle that case here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. One test failed because of this. I will fix it.

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could make more use of pattern matching here. For example:

val temp ::
    allowExisting ::
    Some(Token("TOK_TABNAME", Token(tableName, Nil)) ::
    tableCols ::
    Some(Token("TOK_TABLEPROVIDER", Token(provider, Nil) :: Nil)) ::
    tableOpts ::
    tableAs ::
    Nil =  getClauses(Seq(
      "TEMPORARY",
      "TOK_IFNOTEXISTS",
      "TOK_TABNAME",
      "TOK_TABCOLLIST",
      "TOK_TABLEPROVIDER",
      "TOK_TABLEOPTIONS",
      "TOK_QUERY"), createTableArgs)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I was using that to match. But the compiler continues to throw syntax problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried this:

val Seq(
      temp,
      allowExisting,
      Some(Token("TOK_TABNAME", Token(tableName, Nil) :: Nil)),
      tableCols,
      Some(Token("TOK_TABLEPROVIDER", providerNameParts)),
      tableOpts,
      tableAs) = getClauses(Seq(
      "TEMPORARY",
      "TOK_IFNOTEXISTS",
      "TOK_TABNAME", "TOK_TABCOLLIST",
      "TOK_TABLEPROVIDER",
      "TOK_TABLEOPTIONS",
      "TOK_QUERY"), createTableArgs)

...and this compiles.

[edit: posted the wrong code initialy]


val options = tableOpts.map { opts =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: You could also turn the option into a sequence, flatMap over it, and create the Map as a result:

val options = tableOpts.toSeq.flatMap {
  case Token("TOK_TABLEOPTIONS", options) =>
    options.map {
      case Token("TOK_TABLEOPTION", keysAndValue) =>
        val key = keysAndValue.init.map.(_.text).mkString(".")
        val value = unquoteString(keysAndValue.last.text)
        key -> value
    }
}.toMap

(note: code is not tested)

opts match {
case Token("TOK_TABLEOPTIONS", options) =>
options.map {
case Token("TOK_TABLEOPTION", Token(key, _) :: Token(value, _) :: Nil) =>
(key, value.replaceAll("^\'|^\"|\"$|\'$", ""))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use unquoteString this does the same and is easier to read?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know there is unquoteString. Thanks.

case _ => super.nodeToPlan(node)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you have a plan inside table options?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This line should be removed.

}.asInstanceOf[Seq[(String, String)]].toMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this cast needed?

}
}.getOrElse(Map.empty[String, String])

val asClause = tableAs.map(nodeToPlan(_))

if (temp.isDefined && allowExisting.isDefined) {
throw new DDLException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need a DDL exception? Why not throw an analysis exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually here the thrown exception will be caught in CatalystQl which will then throw an AnalysisException. But I agreed that we can remove DDLException. Only DDLParser uses it.

"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
}

if (asClause.isDefined) {
val mode = if (allowExisting.isDefined) {
SaveMode.Ignore
} else if (temp.isDefined) {
SaveMode.Overwrite
} else {
SaveMode.ErrorIfExists
}

CreateTableUsingAsSelect(tableIdent,
provider,
temp.isDefined,
Array.empty[String],
bucketSpec = None,
mode,
options,
asClause.get)
} else {
CreateTableUsing(
tableIdent,
columns,
provider,
temp.isDefined,
options,
allowExisting.isDefined,
managedIfNoPath = false)
}

case Token("TOK_DESCTABLE", describeArgs) =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
val Some(tableType) :: formatted :: extended :: pretty :: Nil =
Expand All @@ -52,26 +133,30 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
nodeToDescribeFallback(node)
} else {
tableType match {
case Token("TOK_TABTYPE", Token("TOK_TABNAME", nameParts :: Nil) :: Nil) =>
case Token("TOK_TABTYPE", Token("TOK_TABNAME", nameParts) :: Nil) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this? You didn't touch the describe stuff in SparkSqlParser.g right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think it is incorrect from beginning but not be tested it out because we don't reach here before. I've tested it locally. Once all three commands are migrated, we can see this passing tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we parse the following SQL using the parse driver org.apache.spark.sql.catalyst.parser.ParseDriver.parsePlan("DESCRIBE EXTENDED tbl.a", null)

We would end up with the following AST:

TOK_DESCTABLE 1, 0, 6, 18
:- TOK_TABTYPE 1, 4, 6, 18 
:  +- TOK_TABNAME 1, 4, 6, 18 
:     :- tbl 1, 4, 4, 18 
:     +- a 1, 6, 6, 22 
+- EXTENDED 1, 2, 2, 9 

This change would pick this up, and old code didn't (I am sure I tested this though :S ). You can disable this in the DDL parser, to see if it works now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a test for this? The Hive test suite apparently misses this one. I could also address in a different PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we have test for describe table command in HiveQuerySuite. Do we need another test?

nameParts match {
case Token(".", dbName :: tableName :: Nil) =>
case Token(dbName, _) :: Token(tableName, _) :: Nil =>
// It is describing a table with the format like "describe db.table".
// TODO: Actually, a user may mean tableName.columnName. Need to resolve this
// issue.
val tableIdent = extractTableIdent(nameParts)
val tableIdent = TableIdentifier(
cleanIdentifier(tableName), Some(cleanIdentifier(dbName)))
datasources.DescribeCommand(
UnresolvedRelation(tableIdent, None), isExtended = extended.isDefined)
case Token(".", dbName :: tableName :: colName :: Nil) =>
case Token(dbName, _) :: Token(tableName, _) :: Token(colName, _) :: Nil =>
// It is describing a column with the format like "describe db.table column".
nodeToDescribeFallback(node)
case tableName =>
case tableName :: Nil =>
// It is describing a table with the format like "describe table".
datasources.DescribeCommand(
UnresolvedRelation(TableIdentifier(tableName.text), None),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleanIdentifier?

isExtended = extended.isDefined)
case _ =>
nodeToDescribeFallback(node)
}
// All other cases.
case _ => nodeToDescribeFallback(node)
case _ =>
nodeToDescribeFallback(node)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ class HiveContext private[hive](
}

protected[sql] override def parseSql(sql: String): LogicalPlan = {
super.parseSql(substitutor.substitute(hiveconf, sql))
sqlParser.parsePlan(substitutor.substitute(hiveconf, sql))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about gradually moving functionality from the DLL parser to SparkQl? That would allow us to test this in the meantime.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DDLParser is still used in SQLContext. Do we want to completely remove it? Because I already migrate three commands. I think we can test them all together.

}

override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
Expand Down