Skip to content

Commit

Permalink
[SPARK-4574][SQL] Adding support for defining schema in foreign DDL c…
Browse files Browse the repository at this point in the history
…ommands.

Adding support for defining schema in foreign DDL commands. Now foreign DDL support commands like:
```
CREATE TEMPORARY TABLE avroTable
USING org.apache.spark.sql.avro
OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")
```
With this PR user can define schema instead of infer from file, so  support ddl command as follows:
```
CREATE TEMPORARY TABLE avroTable(a int, b string)
USING org.apache.spark.sql.avro
OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")
```

Author: scwf <wangfei1@huawei.com>
Author: Yin Huai <yhuai@databricks.com>
Author: Fei Wang <wangfei1@huawei.com>
Author: wangfei <wangfei1@huawei.com>

Closes apache#3431 from scwf/ddl and squashes the following commits:

7e79ce5 [Fei Wang] Merge pull request apache#22 from yhuai/pr3431yin
38f634e [Yin Huai] Remove Option from createRelation.
65e9c73 [Yin Huai] Revert all changes since applying a given schema has not been testd.
a852b10 [scwf] remove cleanIdentifier
f336a16 [Fei Wang] Merge pull request apache#21 from yhuai/pr3431yin
baf79b5 [Yin Huai] Test special characters quoted by backticks.
50a03b0 [Yin Huai] Use JsonRDD.nullTypeToStringType to convert NullType to StringType.
1eeb769 [Fei Wang] Merge pull request apache#20 from yhuai/pr3431yin
f5c22b0 [Yin Huai] Refactor code and update test cases.
f1cffe4 [Yin Huai] Revert "minor refactory"
b621c8f [scwf] minor refactory
d02547f [scwf] fix HiveCompatibilitySuite test failure
8dfbf7a [scwf] more tests for complex data type
ddab984 [Fei Wang] Merge pull request apache#19 from yhuai/pr3431yin
91ad91b [Yin Huai] Parse data types in DDLParser.
cf982d2 [scwf] fixed test failure
445b57b [scwf] address comments
02a662c [scwf] style issue
44eb70c [scwf] fix decimal parser issue
83b6fc3 [scwf] minor fix
9bf12f8 [wangfei] adding test case
7787ec7 [wangfei] added SchemaRelationProvider
0ba70df [wangfei] draft version
  • Loading branch information
scwf authored and orenmazor committed Feb 4, 2015
1 parent cf5686b commit 37a7955
Show file tree
Hide file tree
Showing 6 changed files with 400 additions and 113 deletions.
Expand Up @@ -18,31 +18,48 @@
package org.apache.spark.sql.json

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.sources._

private[sql] class DefaultSource extends RelationProvider {
/** Returns a new base relation with the given parameters. */
private[sql] class DefaultSource extends RelationProvider with SchemaRelationProvider {

/** Returns a new base relation with the parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)

JSONRelation(fileName, samplingRatio)(sqlContext)
JSONRelation(fileName, samplingRatio, None)(sqlContext)
}

/** Returns a new base relation with the given schema and parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation = {
val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)

JSONRelation(fileName, samplingRatio, Some(schema))(sqlContext)
}
}

private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(
private[sql] case class JSONRelation(
fileName: String,
samplingRatio: Double,
userSpecifiedSchema: Option[StructType])(
@transient val sqlContext: SQLContext)
extends TableScan {

private def baseRDD = sqlContext.sparkContext.textFile(fileName)

override val schema =
JsonRDD.inferSchema(
baseRDD,
samplingRatio,
sqlContext.columnNameOfCorruptRecord)
override val schema = userSpecifiedSchema.getOrElse(
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(
baseRDD,
samplingRatio,
sqlContext.columnNameOfCorruptRecord)))

override def buildScan() =
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord)
Expand Down
138 changes: 125 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Expand Up @@ -17,16 +17,15 @@

package org.apache.spark.sql.sources

import org.apache.spark.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.util.Utils

import scala.language.implicitConversions
import scala.util.parsing.combinator.lexical.StdLexical
import scala.util.parsing.combinator.syntactical.StandardTokenParsers
import scala.util.parsing.combinator.PackratParsers

import org.apache.spark.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.SqlLexical

Expand All @@ -44,6 +43,14 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
}
}

def parseType(input: String): DataType = {
phrase(dataType)(new lexical.Scanner(input)) match {
case Success(r, x) => r
case x =>
sys.error(s"Unsupported dataType: $x")
}
}

protected case class Keyword(str: String)

protected implicit def asParser(k: Keyword): Parser[String] =
Expand All @@ -55,6 +62,24 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected val USING = Keyword("USING")
protected val OPTIONS = Keyword("OPTIONS")

// Data types.
protected val STRING = Keyword("STRING")
protected val BINARY = Keyword("BINARY")
protected val BOOLEAN = Keyword("BOOLEAN")
protected val TINYINT = Keyword("TINYINT")
protected val SMALLINT = Keyword("SMALLINT")
protected val INT = Keyword("INT")
protected val BIGINT = Keyword("BIGINT")
protected val FLOAT = Keyword("FLOAT")
protected val DOUBLE = Keyword("DOUBLE")
protected val DECIMAL = Keyword("DECIMAL")
protected val DATE = Keyword("DATE")
protected val TIMESTAMP = Keyword("TIMESTAMP")
protected val VARCHAR = Keyword("VARCHAR")
protected val ARRAY = Keyword("ARRAY")
protected val MAP = Keyword("MAP")
protected val STRUCT = Keyword("STRUCT")

// Use reflection to find the reserved words defined in this class.
protected val reservedWords =
this.getClass
Expand All @@ -67,26 +92,92 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi
protected lazy val ddl: Parser[LogicalPlan] = createTable

/**
* CREATE TEMPORARY TABLE avroTable
* `CREATE TEMPORARY TABLE avroTable
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...)
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
*/
protected lazy val createTable: Parser[LogicalPlan] =
CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
case tableName ~ provider ~ opts =>
CreateTableUsing(tableName, provider, opts)
(
CREATE ~ TEMPORARY ~ TABLE ~> ident
~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
case tableName ~ columns ~ provider ~ opts =>
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
CreateTableUsing(tableName, userSpecifiedSchema, provider, opts)
}
)

protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"

protected lazy val options: Parser[Map[String, String]] =
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }

protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}

protected lazy val pair: Parser[(String, String)] = ident ~ stringLit ^^ { case k ~ v => (k,v) }

protected lazy val column: Parser[StructField] =
ident ~ dataType ^^ { case columnName ~ typ =>
StructField(columnName, typ)
}

protected lazy val primitiveType: Parser[DataType] =
STRING ^^^ StringType |
BINARY ^^^ BinaryType |
BOOLEAN ^^^ BooleanType |
TINYINT ^^^ ByteType |
SMALLINT ^^^ ShortType |
INT ^^^ IntegerType |
BIGINT ^^^ LongType |
FLOAT ^^^ FloatType |
DOUBLE ^^^ DoubleType |
fixedDecimalType | // decimal with precision/scale
DECIMAL ^^^ DecimalType.Unlimited | // decimal with no precision/scale
DATE ^^^ DateType |
TIMESTAMP ^^^ TimestampType |
VARCHAR ~ "(" ~ numericLit ~ ")" ^^^ StringType

protected lazy val fixedDecimalType: Parser[DataType] =
(DECIMAL ~ "(" ~> numericLit) ~ ("," ~> numericLit <~ ")") ^^ {
case precision ~ scale => DecimalType(precision.toInt, scale.toInt)
}

protected lazy val arrayType: Parser[DataType] =
ARRAY ~> "<" ~> dataType <~ ">" ^^ {
case tpe => ArrayType(tpe)
}

protected lazy val mapType: Parser[DataType] =
MAP ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
case t1 ~ _ ~ t2 => MapType(t1, t2)
}

protected lazy val structField: Parser[StructField] =
ident ~ ":" ~ dataType ^^ {
case fieldName ~ _ ~ tpe => StructField(fieldName, tpe, nullable = true)
}

protected lazy val structType: Parser[DataType] =
(STRUCT ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
case fields => new StructType(fields)
}) |
(STRUCT ~> "<>" ^^ {
case fields => new StructType(Nil)
})

private[sql] lazy val dataType: Parser[DataType] =
arrayType |
mapType |
structType |
primitiveType
}

private[sql] case class CreateTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
options: Map[String, String]) extends RunnableCommand {

Expand All @@ -99,8 +190,29 @@ private[sql] case class CreateTableUsing(
sys.error(s"Failed to load class for data source: $provider")
}
}
val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))

val relation = userSpecifiedSchema match {
case Some(schema: StructType) => {
clazz.newInstance match {
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
dataSource
.asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
case _ =>
sys.error(s"${clazz.getCanonicalName} should extend SchemaRelationProvider.")
}
}
case None => {
clazz.newInstance match {
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
dataSource
.asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
.createRelation(sqlContext, new CaseInsensitiveMap(options))
case _ =>
sys.error(s"${clazz.getCanonicalName} should extend RelationProvider.")
}
}
}

sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)
Seq.empty
Expand Down
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.sources

import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLConf, Row, SQLContext, StructType}
import org.apache.spark.sql.{Row, SQLContext, StructType}
import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}

/**
Expand All @@ -44,6 +44,33 @@ trait RelationProvider {
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

/**
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source. When
* Spark SQL is given a DDL operation with
* 1. USING clause: to specify the implemented SchemaRelationProvider
* 2. User defined schema: users can define schema optionally when create table
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class with be instantiated each time a DDL call is made.
*/
@DeveloperApi
trait SchemaRelationProvider {
/**
* Returns a new base relation with the given parameters and user defined schema.
* Note: the parameters' keywords are case insensitive and this insensitivity is enforced
* by the Map that is passed to the function.
*/
def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: StructType): BaseRelation
}

/**
* ::DeveloperApi::
* Represents a collection of tuples with a known schema. Classes that extend BaseRelation must
Expand Down

0 comments on commit 37a7955

Please sign in to comment.