From d7da491713a83f25de5c07639de7985a96c801a6 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sat, 20 Dec 2014 12:45:28 -0800 Subject: [PATCH 01/16] First draft of persistent tables. --- .../spark/sql/execution/SparkStrategies.scala | 9 +++ .../org/apache/spark/sql/sources/ddl.scala | 39 +++++++++--- .../apache/spark/sql/hive/HiveContext.scala | 12 +++- .../spark/sql/hive/HiveMetastoreCatalog.scala | 62 ++++++++++++++++++- .../spark/sql/hive/HiveStrategies.scala | 5 ++ .../org/apache/spark/sql/hive/TestHive.scala | 1 + .../spark/sql/hive/execution/commands.scala | 15 +++++ 7 files changed, 129 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ce878c137e627..ff4eae6b5660b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing} import org.apache.spark.sql.{SQLContext, Strategy, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ @@ -312,6 +313,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case object CommandStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommand(r) :: Nil + + case CreateTableUsing(tableName, provider, true, options) => + ExecutedCommand( + CreateTempTableUsing(tableName, provider, options)) :: Nil + + case CreateTableUsing(tableName, provider, false, options) => + sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + case logical.SetCommand(kv) => Seq(ExecutedCommand(execution.SetCommand(kv, plan.output))) case logical.ExplainCommand(logicalPlan, extended) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 8a66ac31f2dfb..40bef5c28533b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -72,9 +72,9 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro") */ protected lazy val createTable: Parser[LogicalPlan] = - CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { - case tableName ~ provider ~ opts => - CreateTableUsing(tableName, provider, opts) + (CREATE ~> TEMPORARY.? <~ TABLE) ~ ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { + case temp ~ tableName ~ provider ~ opts => + CreateTableUsing(tableName, provider, temp.isDefined, opts) } protected lazy val options: Parser[Map[String, String]] = @@ -85,12 +85,11 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val pair: Parser[(String, String)] = ident ~ stringLit ^^ { case k ~ v => (k,v) } } -private[sql] case class CreateTableUsing( - tableName: String, - provider: String, - options: Map[String, String]) extends RunnableCommand { - - def run(sqlContext: SQLContext) = { +object ResolvedDataSource { + def apply( + sqlContext: SQLContext, + provider: String, + options: Map[String, String]): ResolvedDataSource = { val loader = Utils.getContextOrSparkClassLoader val clazz: Class[_] = try loader.loadClass(provider) catch { case cnf: java.lang.ClassNotFoundException => @@ -102,7 +101,27 @@ private[sql] case class CreateTableUsing( val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) - sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName) + new ResolvedDataSource(clazz, relation) + } +} + +private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) + +private[sql] case class CreateTableUsing( + tableName: String, + provider: String, + temporary: Boolean, + options: Map[String, String]) extends Command + +private [sql] case class CreateTempTableUsing( + tableName: String, + provider: String, + options: Map[String, String]) extends RunnableCommand { + + def run(sqlContext: SQLContext) = { + val resolved = ResolvedDataSource(sqlContext, provider, options) + + sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName) Seq.empty } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 982e0593fcfd1..bffaa35655c62 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -116,6 +116,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting) } + def refreshTable(tableName: String): Unit = { + // TODO: Database support... + catalog.refreshTable("default", tableName) + } + + protected[hive] def invalidateTable(tableName: String): Unit = { + // TODO: Database support... + catalog.invalidateTable("default", tableName) + } + /** * Analyzes the given table in the current database to generate statistics, which will be * used in query optimizations. @@ -340,8 +350,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def strategies: Seq[Strategy] = extraStrategies ++ Seq( DataSourceStrategy, - CommandStrategy, HiveCommandStrategy(self), + CommandStrategy, TakeOrdered, ParquetOperations, InMemoryScans, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b31a3ec25096b..0136e355cf2bf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,15 +20,16 @@ package org.apache.spark.sql.hive import java.io.IOException import java.util.{List => JList} +import com.google.common.cache.{CacheLoader, CacheBuilder} import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource, BaseRelation} import scala.util.parsing.combinator.RegexParsers import org.apache.hadoop.util.ReflectionUtils import org.apache.hadoop.hive.metastore.TableType -import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} +import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, SerDeInfo, FieldSchema} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException} import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.hadoop.hive.serde.serdeConstants @@ -55,8 +56,60 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with /** Connection to hive metastore. Usages should lock on `this`. */ protected[hive] val client = Hive.get(hive.hiveconf) + // TODO: Use this everywhere instead of tuples or databaseName, tableName,. + /** A fully qualified identifier for a table (i.e., database.tableName) */ + case class TableIdent(database: String, name: String) { + def toLowerCase = TableIdent(database.toLowerCase, name.toLowerCase) + } + + /** A cache of Spark SQL data source tables that have been accessed. */ + protected[hive] val cachedDataSourceTables = CacheBuilder.newBuilder() + .maximumSize(1000) + .build( + new CacheLoader[TableIdent, LogicalPlan]() { + override def load(in: TableIdent): LogicalPlan = { + logDebug(s"Creating new cached data source for $in") + val table = client.getTable(in.database, in.name) + + // It does not appear that the ql client for the metastore has a way to enumerate all the + // SerDe properties directly... + val method = classOf[Table].getDeclaredMethod("getSerdeInfo") + method.setAccessible(true) + val serdeInfo = method.invoke(table).asInstanceOf[SerDeInfo] + + val resolvedRelation = + ResolvedDataSource( + hive, + table.getProperty("spark.sql.sources.provider"), + serdeInfo.getParameters.toMap) + + LogicalRelation(resolvedRelation.relation) + } + }) + + def refreshTable(databaseName: String, tableName: String): Unit = { + cachedDataSourceTables.refresh(TableIdent(databaseName, tableName).toLowerCase) + } + + def invalidateTable(databaseName: String, tableName: String): Unit = { + cachedDataSourceTables.invalidate(TableIdent(databaseName, tableName).toLowerCase) + } + val caseSensitive: Boolean = false + def createDataSourceTable(tableName: String, provider: String, options: Map[String, String]) = { + val (dbName, tblName) = processDatabaseAndTableName("default", tableName) + val tbl = new Table(dbName, tblName) + + tbl.setProperty("spark.sql.sources.provider", provider) + options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } + + // create the table + synchronized { + client.createTable(tbl, false) + } + } + def tableExists(db: Option[String], tableName: String): Boolean = { val (databaseName, tblName) = processDatabaseAndTableName( db.getOrElse(hive.sessionState.getCurrentDatabase), tableName) @@ -70,7 +123,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val (databaseName, tblName) = processDatabaseAndTableName(db.getOrElse(hive.sessionState.getCurrentDatabase), tableName) val table = client.getTable(databaseName, tblName) - if (table.isView) { + + if (table.getProperty("spark.sql.sources.provider") != null) { + cachedDataSourceTables(TableIdent(databaseName, tblName).toLowerCase) + } else if (table.isView) { // if the unresolved relation is from hive view // parse the text into logic node. HiveQl.createPlanForView(table, alias) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index d3f6381b69a4d..6c906ad66dace 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation +import org.apache.spark.sql.sources.CreateTableUsing import org.apache.spark.sql.{SQLContext, SchemaRDD, Strategy} import scala.collection.JavaConversions._ @@ -219,6 +220,10 @@ private[hive] trait HiveStrategies { ExecutedCommand(DescribeCommand(planLater(o), describe.output)) :: Nil } + case CreateTableUsing(tableName, provider, false, options) => + ExecutedCommand( + CreateMetastoreDataSource(tableName, provider, options)) :: Nil + case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index b2149bd95a336..1f6a5441b643b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -394,6 +394,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { clearCache() loadedTables.clear() + catalog.cachedDataSourceTables.invalidateAll() catalog.client.getAllTables("default").foreach { t => logDebug(s"Deleting table $t") val table = catalog.client.getTable("default", t) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 6fc4153f6a5df..4bc7309d391a1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -52,6 +52,8 @@ case class DropTable( override def run(sqlContext: SQLContext) = { val hiveContext = sqlContext.asInstanceOf[HiveContext] val ifExistsClause = if (ifExists) "IF EXISTS " else "" + hiveContext.tryUncacheQuery(hiveContext.table(tableName)) + hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") hiveContext.catalog.unregisterTable(None, tableName) Seq.empty[Row] @@ -85,3 +87,16 @@ case class AddFile(path: String) extends RunnableCommand { Seq.empty[Row] } } + +case class CreateMetastoreDataSource( + tableName: String, + provider: String, + options: Map[String, String]) extends RunnableCommand { + + override def run(sqlContext: SQLContext) = { + val hiveContext = sqlContext.asInstanceOf[HiveContext] + hiveContext.catalog.createDataSourceTable(tableName, provider, options) + + Seq.empty[Row] + } +} From 6edc71026c4a10cce338adaa7b807fef0ee2857b Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sat, 20 Dec 2014 13:03:59 -0800 Subject: [PATCH 02/16] Add tests. --- .../sql/hive/MetastoreDataSourcesSuite.scala | 163 ++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala new file mode 100644 index 0000000000000..ee9933593d73b --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql._ +import org.apache.spark.sql.hive.test.TestHive +import org.scalatest.BeforeAndAfterEach + +/* Implicits */ +import org.apache.spark.sql.hive.test.TestHive._ + +/** + * Tests for persisting tables created though the data sources API into the metastore. + */ +class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { + override def afterEach(): Unit = { + reset() + } + + test ("persistent JSON table") { + sql( + """ + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path 'src/test/resources/data/files/sample.json' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + jsonFile("src/test/resources/data/files/sample.json").collect().toSeq) + + } + + test("resolve shortened provider names") { + sql( + """ + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path 'src/test/resources/data/files/sample.json' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + jsonFile("src/test/resources/data/files/sample.json").collect().toSeq) + } + + test("drop table") { + sql( + """ + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path 'src/test/resources/data/files/sample.json' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + jsonFile("src/test/resources/data/files/sample.json").collect().toSeq) + + sql("DROP TABLE jsonTable") + + intercept[Exception] { + sql("SELECT * FROM jsonTable").collect() + } + } + + test("check change without refresh") { + val tempDir = File.createTempFile("sparksql", "json") + tempDir.delete() + sparkContext.parallelize(("a", "b") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b") :: Nil) + + FileUtils.deleteDirectory(tempDir) + sparkContext.parallelize(("a", "b", "c") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + // Schema is cached so answer does not change. + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b") :: Nil) + + refreshTable("jsonTable") + + // Check that the refresh worked + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b", "c") :: Nil) + FileUtils.deleteDirectory(tempDir) + } + + test("drop, change, recreate") { + val tempDir = File.createTempFile("sparksql", "json") + tempDir.delete() + sparkContext.parallelize(("a", "b") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b") :: Nil) + + FileUtils.deleteDirectory(tempDir) + sparkContext.parallelize(("a", "b", "c") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + + sql("DROP TABLE jsonTable") + + sql( + s""" + |CREATE TABLE jsonTable + |USING org.apache.spark.sql.json + |OPTIONS ( + | path '${tempDir.getCanonicalPath}' + |) + """.stripMargin) + + // New table should reflect new schema. + checkAnswer( + sql("SELECT * FROM jsonTable"), + ("a", "b", "c") :: Nil) + FileUtils.deleteDirectory(tempDir) + } +} \ No newline at end of file From 1ea6e7bbf04c04f7c51884ca0ec819cddfaac10b Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sun, 21 Dec 2014 14:23:34 -0800 Subject: [PATCH 03/16] Don't fail when trying to uncache a table that doesn't exist --- .../scala/org/apache/spark/sql/hive/execution/commands.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 4bc7309d391a1..f38b44688256e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -52,7 +52,9 @@ case class DropTable( override def run(sqlContext: SQLContext) = { val hiveContext = sqlContext.asInstanceOf[HiveContext] val ifExistsClause = if (ifExists) "IF EXISTS " else "" - hiveContext.tryUncacheQuery(hiveContext.table(tableName)) + try hiveContext.tryUncacheQuery(hiveContext.table(tableName)) catch { + case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException => + } hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") hiveContext.catalog.unregisterTable(None, tableName) From c00bb1bf25b8f9875fc3e8b58d007d67496f1b2f Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 22 Dec 2014 11:05:46 -0800 Subject: [PATCH 04/16] Don't use reflection to read options --- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0136e355cf2bf..fac47ec137a45 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -73,15 +73,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // It does not appear that the ql client for the metastore has a way to enumerate all the // SerDe properties directly... - val method = classOf[Table].getDeclaredMethod("getSerdeInfo") - method.setAccessible(true) - val serdeInfo = method.invoke(table).asInstanceOf[SerDeInfo] + val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap val resolvedRelation = ResolvedDataSource( hive, table.getProperty("spark.sql.sources.provider"), - serdeInfo.getParameters.toMap) + options) LogicalRelation(resolvedRelation.relation) } From 2b5972353a47ca1577a0ddcd3aab5c9dbd1d10d4 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Mon, 22 Dec 2014 11:08:13 -0800 Subject: [PATCH 05/16] Set external when creating tables --- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index fac47ec137a45..697170e00a8e6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -102,6 +102,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with tbl.setProperty("spark.sql.sources.provider", provider) options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } + tbl.setProperty("EXTERNAL", "TRUE") + tbl.setTableType(TableType.EXTERNAL_TABLE) + // create the table synchronized { client.createTable(tbl, false) From f47fda1f5e34dd73d7e5db9949eceb21cdd1ce89 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 7 Jan 2015 17:58:00 -0800 Subject: [PATCH 06/16] Unit tests. --- .../apache/spark/sql/execution/commands.scala | 3 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 50 +++++++++++++++++-- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index b8fa4b019953e..d828a5398d98d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -177,7 +177,6 @@ case class DescribeCommand( override val output: Seq[Attribute]) extends RunnableCommand { override def run(sqlContext: SQLContext) = { - Row("# Registered as a temporary table", null, null) +: - child.output.map(field => Row(field.name, field.dataType.toString, null)) + child.output.map(field => Row(field.name, field.dataType.toString, null)) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index ee9933593d73b..e3a54982d9a07 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -51,6 +51,45 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { } + test ("persistent JSON table with a user specified schema") { + sql( + """ + |CREATE TABLE jsonTable (a string, b String) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path 'src/test/resources/data/files/sample.json' + |) + """.stripMargin) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + jsonFile("src/test/resources/data/files/sample.json").collect().toSeq) + + } + + test ("persistent JSON table with a user specified schema with a subset of fields") { + // This works because JSON objects are self-describing and JSONRelation can get needed + // field values based on field names. + sql( + """ + |CREATE TABLE jsonTable (b String) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path 'src/test/resources/data/files/sample.json' + |) + """.stripMargin) + + val expectedSchema = StructType(StructField("b", StringType, true) :: Nil) + + assert(expectedSchema == table("jsonTable").schema) + + checkAnswer( + sql("SELECT * FROM jsonTable"), + jsonFile("src/test/resources/data/files/sample.json").collect().map( + r => Seq(r.getString(1))).toSeq) + + } + test("resolve shortened provider names") { sql( """ @@ -106,19 +145,20 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { ("a", "b") :: Nil) FileUtils.deleteDirectory(tempDir) - sparkContext.parallelize(("a", "b", "c") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) + sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toJSON.saveAsTextFile(tempDir.getCanonicalPath) - // Schema is cached so answer does not change. + // Schema is cached so the new column does not show. The updated values in existing columns + // will show. checkAnswer( sql("SELECT * FROM jsonTable"), - ("a", "b") :: Nil) + ("a1", "b1") :: Nil) refreshTable("jsonTable") // Check that the refresh worked checkAnswer( sql("SELECT * FROM jsonTable"), - ("a", "b", "c") :: Nil) + ("a1", "b1", "c1") :: Nil) FileUtils.deleteDirectory(tempDir) } @@ -160,4 +200,4 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { ("a", "b", "c") :: Nil) FileUtils.deleteDirectory(tempDir) } -} \ No newline at end of file +} From 8f8f1a167360bfab3198b086d4608f5b3517f249 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 7 Jan 2015 16:53:02 -0800 Subject: [PATCH 07/16] [SPARK-4574][SQL] Adding support for defining schema in foreign DDL commands. #3431 --- .../spark/sql/execution/SparkStrategies.scala | 6 +- .../apache/spark/sql/json/JSONRelation.scala | 16 +- .../apache/spark/sql/parquet/newParquet.scala | 33 ++-- .../org/apache/spark/sql/sources/ddl.scala | 136 +++++++++++-- .../apache/spark/sql/sources/interfaces.scala | 29 ++- .../spark/sql/sources/TableScanSuite.scala | 187 ++++++++++++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 136 +++++-------- .../spark/sql/hive/HiveStrategies.scala | 4 +- .../spark/sql/hive/execution/commands.scala | 4 +- .../sql/hive/HiveMetastoreCatalogSuite.scala | 5 +- 10 files changed, 422 insertions(+), 134 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ff4eae6b5660b..9f9eaa0d1cb61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -314,11 +314,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommand(r) :: Nil - case CreateTableUsing(tableName, provider, true, options) => + case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) => ExecutedCommand( - CreateTempTableUsing(tableName, provider, options)) :: Nil + CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil - case CreateTableUsing(tableName, provider, false, options) => + case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") case logical.SetCommand(kv) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index fc70c183437f6..49a9382037a84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -18,31 +18,37 @@ package org.apache.spark.sql.json import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.sources._ -private[sql] class DefaultSource extends RelationProvider { +private[sql] class DefaultSource extends SchemaRelationProvider { /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { + parameters: Map[String, String], + schema: Option[StructType]): BaseRelation = { val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified")) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) - JSONRelation(fileName, samplingRatio)(sqlContext) + JSONRelation(fileName, samplingRatio, schema)(sqlContext) } } -private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)( +private[sql] case class JSONRelation( + fileName: String, + samplingRatio: Double, + userSpecifiedSchema: Option[StructType])( @transient val sqlContext: SQLContext) extends TableScan { private def baseRDD = sqlContext.sparkContext.textFile(fileName) - override val schema = + override val schema = userSpecifiedSchema.getOrElse( JsonRDD.inferSchema( baseRDD, samplingRatio, sqlContext.columnNameOfCorruptRecord) + ) override def buildScan() = JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 2e0c6c51c00e5..506be8ccde6b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -22,37 +22,37 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job} -import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate - import parquet.hadoop.ParquetInputFormat import parquet.hadoop.util.ContextUtil import org.apache.spark.annotation.DeveloperApi import org.apache.spark.{Partition => SparkPartition, Logging} import org.apache.spark.rdd.{NewHadoopPartition, RDD} - -import org.apache.spark.sql.{SQLConf, Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, StructField, StructType} +import org.apache.spark.sql.catalyst.types.{IntegerType, StructField, StructType} import org.apache.spark.sql.sources._ +import org.apache.spark.sql.{SQLConf, SQLContext} import scala.collection.JavaConversions._ + /** * Allows creation of parquet based tables using the syntax * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option * required is `path`, which should be the location of a collection of, optionally partitioned, * parquet files. */ -class DefaultSource extends RelationProvider { +class DefaultSource extends SchemaRelationProvider { /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { + parameters: Map[String, String], + schema: Option[StructType]): BaseRelation = { val path = parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables.")) - ParquetRelation2(path)(sqlContext) + ParquetRelation2(path, schema)(sqlContext) } } @@ -82,7 +82,9 @@ private[parquet] case class Partition(partitionValues: Map[String, Any], files: * discovery. */ @DeveloperApi -case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) +case class ParquetRelation2( + path: String, + userSpecifiedSchema: Option[StructType])(@transient val sqlContext: SQLContext) extends CatalystScan with Logging { def sparkContext = sqlContext.sparkContext @@ -133,12 +135,13 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum - val dataSchema = StructType.fromAttributes( // TODO: Parquet code should not deal with attributes. - ParquetTypesConverter.readSchemaFromFile( - partitions.head.files.head.getPath, - Some(sparkContext.hadoopConfiguration), - sqlContext.isParquetBinaryAsString)) - + val dataSchema = userSpecifiedSchema.getOrElse( + StructType.fromAttributes( // TODO: Parquet code should not deal with attributes. + ParquetTypesConverter.readSchemaFromFile( + partitions.head.files.head.getPath, + Some(sparkContext.hadoopConfiguration), + sqlContext.isParquetBinaryAsString)) + ) val dataIncludesKey = partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 40bef5c28533b..a8489f7fe0309 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -17,16 +17,15 @@ package org.apache.spark.sql.sources -import org.apache.spark.Logging -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.util.Utils - import scala.language.implicitConversions -import scala.util.parsing.combinator.lexical.StdLexical import scala.util.parsing.combinator.syntactical.StandardTokenParsers import scala.util.parsing.combinator.PackratParsers +import org.apache.spark.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.util.Utils import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical @@ -44,6 +43,14 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi } } + def parseType(input: String): DataType = { + phrase(dataType)(new lexical.Scanner(input)) match { + case Success(r, x) => r + case x => + sys.error(s"Unsupported dataType: $x") + } + } + protected case class Keyword(str: String) protected implicit def asParser(k: Keyword): Parser[String] = @@ -55,6 +62,24 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected val USING = Keyword("USING") protected val OPTIONS = Keyword("OPTIONS") + // Data types. + protected val STRING = Keyword("STRING") + protected val BINARY = Keyword("BINARY") + protected val BOOLEAN = Keyword("BOOLEAN") + protected val TINYINT = Keyword("TINYINT") + protected val SMALLINT = Keyword("SMALLINT") + protected val INT = Keyword("INT") + protected val BIGINT = Keyword("BIGINT") + protected val FLOAT = Keyword("FLOAT") + protected val DOUBLE = Keyword("DOUBLE") + protected val DECIMAL = Keyword("DECIMAL") + protected val DATE = Keyword("DATE") + protected val TIMESTAMP = Keyword("TIMESTAMP") + protected val VARCHAR = Keyword("VARCHAR") + protected val ARRAY = Keyword("ARRAY") + protected val MAP = Keyword("MAP") + protected val STRUCT = Keyword("STRUCT") + // Use reflection to find the reserved words defined in this class. protected val reservedWords = this.getClass @@ -67,15 +92,25 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val ddl: Parser[LogicalPlan] = createTable /** - * CREATE TEMPORARY TABLE avroTable + * `CREATE TEMPORARY TABLE avroTable + * USING org.apache.spark.sql.avro + * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` + * or + * `CREATE TEMPORARY TABLE avroTable(intField int, stringField string...) * USING org.apache.spark.sql.avro - * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro") + * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` */ protected lazy val createTable: Parser[LogicalPlan] = - (CREATE ~> TEMPORARY.? <~ TABLE) ~ ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { - case temp ~ tableName ~ provider ~ opts => - CreateTableUsing(tableName, provider, temp.isDefined, opts) + ( + (CREATE ~> TEMPORARY.? <~ TABLE) ~ ident + ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { + case temp ~ tableName ~ columns ~ provider ~ opts => + val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) + CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts) } + ) + + protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" protected lazy val options: Parser[Map[String, String]] = "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap } @@ -83,11 +118,74 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")} protected lazy val pair: Parser[(String, String)] = ident ~ stringLit ^^ { case k ~ v => (k,v) } + + protected lazy val column: Parser[StructField] = + ident ~ dataType ^^ { case columnName ~ typ => + StructField(cleanIdentifier(columnName), typ) + } + + protected lazy val primitiveType: Parser[DataType] = + STRING ^^^ StringType | + BINARY ^^^ BinaryType | + BOOLEAN ^^^ BooleanType | + TINYINT ^^^ ByteType | + SMALLINT ^^^ ShortType | + INT ^^^ IntegerType | + BIGINT ^^^ LongType | + FLOAT ^^^ FloatType | + DOUBLE ^^^ DoubleType | + fixedDecimalType | // decimal with precision/scale + DECIMAL ^^^ DecimalType.Unlimited | // decimal with no precision/scale + DATE ^^^ DateType | + TIMESTAMP ^^^ TimestampType | + VARCHAR ~ "(" ~ numericLit ~ ")" ^^^ StringType + + protected lazy val fixedDecimalType: Parser[DataType] = + (DECIMAL ~ "(" ~> numericLit) ~ ("," ~> numericLit <~ ")") ^^ { + case precision ~ scale => DecimalType(precision.toInt, scale.toInt) + } + + protected lazy val arrayType: Parser[DataType] = + ARRAY ~> "<" ~> dataType <~ ">" ^^ { + case tpe => ArrayType(tpe) + } + + protected lazy val mapType: Parser[DataType] = + MAP ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ { + case t1 ~ _ ~ t2 => MapType(t1, t2) + } + + protected lazy val structField: Parser[StructField] = + ident ~ ":" ~ dataType ^^ { + case fieldName ~ _ ~ tpe => StructField(cleanIdentifier(fieldName), tpe, nullable = true) + } + + protected lazy val structType: Parser[DataType] = + (STRUCT ~> "<" ~> repsep(structField, ",") <~ ">" ^^ { + case fields => new StructType(fields) + }) | + (STRUCT ~> "<>" ^^ { + case fields => new StructType(Nil) + }) + + private[sql] lazy val dataType: Parser[DataType] = + arrayType | + mapType | + structType | + primitiveType + + protected val escapedIdentifier = "`([^`]+)`".r + /** Strips backticks from ident if present */ + protected def cleanIdentifier(ident: String): String = ident match { + case escapedIdentifier(i) => i + case plainIdent => plainIdent + } } object ResolvedDataSource { def apply( sqlContext: SQLContext, + userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String]): ResolvedDataSource = { val loader = Utils.getContextOrSparkClassLoader @@ -98,8 +196,16 @@ object ResolvedDataSource { sys.error(s"Failed to load class for data source: $provider") } } - val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] - val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) + val relation = clazz.newInstance match { + case dataSource: org.apache.spark.sql.sources.RelationProvider => + dataSource + .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options)) + case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => + dataSource + .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] + .createRelation(sqlContext, new CaseInsensitiveMap(options), userSpecifiedSchema) + } new ResolvedDataSource(clazz, relation) } @@ -109,17 +215,19 @@ private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRel private[sql] case class CreateTableUsing( tableName: String, + userSpecifiedSchema: Option[StructType], provider: String, temporary: Boolean, options: Map[String, String]) extends Command private [sql] case class CreateTempTableUsing( tableName: String, + userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String]) extends RunnableCommand { def run(sqlContext: SQLContext) = { - val resolved = ResolvedDataSource(sqlContext, provider, options) + val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName) Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 02eff80456dbe..97157c868cc90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.sources import org.apache.spark.annotation.{Experimental, DeveloperApi} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SQLConf, Row, SQLContext, StructType} +import org.apache.spark.sql.{Row, SQLContext, StructType} import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute} /** @@ -44,6 +44,33 @@ trait RelationProvider { def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation } +/** + * ::DeveloperApi:: + * Implemented by objects that produce relations for a specific kind of data source. When + * Spark SQL is given a DDL operation with + * 1. USING clause: to specify the implemented SchemaRelationProvider + * 2. User defined schema: users can define schema optionally when create table + * + * Users may specify the fully qualified class name of a given data source. When that class is + * not found Spark SQL will append the class name `DefaultSource` to the path, allowing for + * less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the + * data source 'org.apache.spark.sql.json.DefaultSource' + * + * A new instance of this class with be instantiated each time a DDL call is made. + */ +@DeveloperApi +trait SchemaRelationProvider { + /** + * Returns a new base relation with the given parameters and user defined schema. + * Note: the parameters' keywords are case insensitive and this insensitivity is enforced + * by the Map that is passed to the function. + */ + def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: Option[StructType]): BaseRelation +} + /** * ::DeveloperApi:: * Represents a collection of tuples with a known schema. Classes that extend BaseRelation must diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 3cd7b0115d567..26191a8a5c769 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql.sources +import java.sql.{Timestamp, Date} + import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.types.DecimalType class DefaultSource extends SimpleScanSource @@ -38,9 +41,77 @@ case class SimpleScan(from: Int, to: Int)(@transient val sqlContext: SQLContext) override def buildScan() = sqlContext.sparkContext.parallelize(from to to).map(Row(_)) } +class AllDataTypesScanSource extends SchemaRelationProvider { + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: Option[StructType]): BaseRelation = { + AllDataTypesScan(parameters("from").toInt, parameters("TO").toInt, schema)(sqlContext) + } +} + +case class AllDataTypesScan( + from: Int, + to: Int, + userSpecifiedSchema: Option[StructType])(@transient val sqlContext: SQLContext) + extends TableScan { + + override def schema = userSpecifiedSchema.get + + override def buildScan() = { + sqlContext.sparkContext.parallelize(from to to).map { i => + Row( + s"str_$i", + s"str_$i".getBytes(), + i % 2 == 0, + i.toByte, + i.toShort, + i, + i.toLong, + i.toFloat, + i.toDouble, + BigDecimal(i), + BigDecimal(i), + new Date(10000 + i), + new Timestamp(20000 + i), + s"varchar_$i", + Seq(i, i + 1), + Seq(Map(s"str_$i" -> Row(i.toLong))), + Map(i -> i.toString), + Map(Map(s"str_$i" -> i.toFloat) -> Row(i.toLong)), + Row(i, i.toString), + Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(new Date(30000 + i))))) + } + } +} + class TableScanSuite extends DataSourceTest { import caseInsensisitiveContext._ + var tableWithSchemaExpected = (1 to 10).map { i => + Row( + s"str_$i", + s"str_$i", + i % 2 == 0, + i.toByte, + i.toShort, + i, + i.toLong, + i.toFloat, + i.toDouble, + BigDecimal(i), + BigDecimal(i), + new Date(10000 + i), + new Timestamp(20000 + i), + s"varchar_$i", + Seq(i, i + 1), + Seq(Map(s"str_$i" -> Row(i.toLong))), + Map(i -> i.toString), + Map(Map(s"str_$i" -> i.toFloat) -> Row(i.toLong)), + Row(i, i.toString), + Row(Seq(s"str_$i", s"str_${i + 1}"), Row(Seq(new Date(30000 + i))))) + }.toSeq + before { sql( """ @@ -51,6 +122,37 @@ class TableScanSuite extends DataSourceTest { | To '10' |) """.stripMargin) + + sql( + """ + |CREATE TEMPORARY TABLE tableWithSchema ( + |stringField stRIng, + |binaryField binary, + |booleanField boolean, + |byteField tinyint, + |shortField smaLlint, + |intField iNt, + |longField Bigint, + |floatField flOat, + |doubleField doubLE, + |decimalField1 decimal, + |decimalField2 decimal(9,2), + |dateField dAte, + |timestampField tiMestamp, + |varcharField varchaR(12), + |arrayFieldSimple Array, + |arrayFieldComplex Array>>, + |mapFieldSimple MAP, + |mapFieldComplex Map, Struct>, + |structFieldSimple StRuct, + |structFieldComplex StRuct, Value:struct>> + |) + |USING org.apache.spark.sql.sources.AllDataTypesScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) } sqlTest( @@ -73,6 +175,91 @@ class TableScanSuite extends DataSourceTest { "SELECT a.i, b.i FROM oneToTen a JOIN oneToTen b ON a.i = b.i + 1", (2 to 10).map(i => Row(i, i - 1)).toSeq) + test("Schema and all fields") { + val expectedSchema = StructType( + StructField("stringField", StringType, true) :: + StructField("binaryField", BinaryType, true) :: + StructField("booleanField", BooleanType, true) :: + StructField("byteField", ByteType, true) :: + StructField("shortField", ShortType, true) :: + StructField("intField", IntegerType, true) :: + StructField("longField", LongType, true) :: + StructField("floatField", FloatType, true) :: + StructField("doubleField", DoubleType, true) :: + StructField("decimalField1", DecimalType.Unlimited, true) :: + StructField("decimalField2", DecimalType(9, 2), true) :: + StructField("dateField", DateType, true) :: + StructField("timestampField", TimestampType, true) :: + StructField("varcharField", StringType, true) :: + StructField("arrayFieldSimple", ArrayType(IntegerType), true) :: + StructField("arrayFieldComplex", + ArrayType( + MapType(StringType, StructType(StructField("key", LongType, true) :: Nil))), true) :: + StructField("mapFieldSimple", MapType(IntegerType, StringType), true) :: + StructField("mapFieldComplex", + MapType( + MapType(StringType, FloatType), + StructType(StructField("key", LongType, true) :: Nil)), true) :: + StructField("structFieldSimple", + StructType( + StructField("key", IntegerType, true) :: + StructField("Value", StringType, true) :: Nil), true) :: + StructField("structFieldComplex", + StructType( + StructField("key", ArrayType(StringType), true) :: + StructField("Value", + StructType( + StructField("value", ArrayType(DateType), true) :: Nil), true) :: Nil), true) :: Nil + ) + + assert(expectedSchema == table("tableWithSchema").schema) + + checkAnswer( + sql( + """SELECT + | stringField, + | cast(binaryField as string), + | booleanField, + | byteField, + | shortField, + | intField, + | longField, + | floatField, + | doubleField, + | decimalField1, + | decimalField2, + | dateField, + | timestampField, + | varcharField, + | arrayFieldSimple, + | arrayFieldComplex, + | mapFieldSimple, + | mapFieldComplex, + | structFieldSimple, + | structFieldComplex FROM tableWithSchema""".stripMargin), + tableWithSchemaExpected + ) + } + + sqlTest( + "SELECT count(*) FROM tableWithSchema", + 10) + + sqlTest( + "SELECT stringField FROM tableWithSchema", + (1 to 10).map(i => Row(s"str_$i")).toSeq) + + sqlTest( + "SELECT intField FROM tableWithSchema WHERE intField < 5", + (1 to 4).map(Row(_)).toSeq) + + sqlTest( + "SELECT longField * 2 FROM tableWithSchema", + (1 to 10).map(i => Row(i * 2.toLong)).toSeq) + + sqlTest( + "SELECT structFieldSimple.key, arrayFieldSimple[1] FROM tableWithSchema a where intField=1", + Seq(Seq(1, 2))) test("Caching") { // Cached Query Execution diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 697170e00a8e6..5c832ec7f8ede 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,16 +20,12 @@ package org.apache.spark.sql.hive import java.io.IOException import java.util.{List => JList} -import com.google.common.cache.{CacheLoader, CacheBuilder} -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource, BaseRelation} -import scala.util.parsing.combinator.RegexParsers +import com.google.common.cache.{CacheLoader, CacheBuilder} import org.apache.hadoop.util.ReflectionUtils - import org.apache.hadoop.hive.metastore.TableType -import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, SerDeInfo, FieldSchema} +import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException} import org.apache.hadoop.hive.ql.plan.CreateTableDesc import org.apache.hadoop.hive.serde.serdeConstants @@ -37,7 +33,6 @@ import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException} import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.spark.Logging -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.analysis.{Catalog, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions._ @@ -45,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource} import org.apache.spark.util.Utils /* Implicit conversions */ @@ -70,7 +66,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with override def load(in: TableIdent): LogicalPlan = { logDebug(s"Creating new cached data source for $in") val table = client.getTable(in.database, in.name) - + val schemaString = table.getProperty("spark.sql.sources.schema") + val userSpecifiedSchema = + if (schemaString == null) { + None + } else { + Some(DataType.fromJson(schemaString).asInstanceOf[StructType]) + } // It does not appear that the ql client for the metastore has a way to enumerate all the // SerDe properties directly... val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap @@ -78,6 +80,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val resolvedRelation = ResolvedDataSource( hive, + userSpecifiedSchema, table.getProperty("spark.sql.sources.provider"), options) @@ -95,11 +98,18 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val caseSensitive: Boolean = false - def createDataSourceTable(tableName: String, provider: String, options: Map[String, String]) = { + def createDataSourceTable( + tableName: String, + userSpecifiedSchema: Option[StructType], + provider: String, + options: Map[String, String]) = { val (dbName, tblName) = processDatabaseAndTableName("default", tableName) val tbl = new Table(dbName, tblName) tbl.setProperty("spark.sql.sources.provider", provider) + if (userSpecifiedSchema.isDefined) { + tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json) + } options.foreach { case (key, value) => tbl.setSerdeParam(key, value) } tbl.setProperty("EXTERNAL", "TRUE") @@ -443,88 +453,6 @@ private[hive] case class InsertIntoHiveTable( } } -/** - * :: DeveloperApi :: - * Provides conversions between Spark SQL data types and Hive Metastore types. - */ -@DeveloperApi -object HiveMetastoreTypes extends RegexParsers { - protected lazy val primitiveType: Parser[DataType] = - "string" ^^^ StringType | - "float" ^^^ FloatType | - "int" ^^^ IntegerType | - "tinyint" ^^^ ByteType | - "smallint" ^^^ ShortType | - "double" ^^^ DoubleType | - "bigint" ^^^ LongType | - "binary" ^^^ BinaryType | - "boolean" ^^^ BooleanType | - fixedDecimalType | // Hive 0.13+ decimal with precision/scale - "decimal" ^^^ DecimalType.Unlimited | // Hive 0.12 decimal with no precision/scale - "date" ^^^ DateType | - "timestamp" ^^^ TimestampType | - "varchar\\((\\d+)\\)".r ^^^ StringType - - protected lazy val fixedDecimalType: Parser[DataType] = - ("decimal" ~> "(" ~> "\\d+".r) ~ ("," ~> "\\d+".r <~ ")") ^^ { - case precision ~ scale => - DecimalType(precision.toInt, scale.toInt) - } - - protected lazy val arrayType: Parser[DataType] = - "array" ~> "<" ~> dataType <~ ">" ^^ { - case tpe => ArrayType(tpe) - } - - protected lazy val mapType: Parser[DataType] = - "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ { - case t1 ~ _ ~ t2 => MapType(t1, t2) - } - - protected lazy val structField: Parser[StructField] = - "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ { - case name ~ _ ~ tpe => StructField(name, tpe, nullable = true) - } - - protected lazy val structType: Parser[DataType] = - "struct" ~> "<" ~> repsep(structField,",") <~ ">" ^^ { - case fields => new StructType(fields) - } - - protected lazy val dataType: Parser[DataType] = - arrayType | - mapType | - structType | - primitiveType - - def toDataType(metastoreType: String): DataType = parseAll(dataType, metastoreType) match { - case Success(result, _) => result - case failure: NoSuccess => sys.error(s"Unsupported dataType: $metastoreType") - } - - def toMetastoreType(dt: DataType): String = dt match { - case ArrayType(elementType, _) => s"array<${toMetastoreType(elementType)}>" - case StructType(fields) => - s"struct<${fields.map(f => s"${f.name}:${toMetastoreType(f.dataType)}").mkString(",")}>" - case MapType(keyType, valueType, _) => - s"map<${toMetastoreType(keyType)},${toMetastoreType(valueType)}>" - case StringType => "string" - case FloatType => "float" - case IntegerType => "int" - case ByteType => "tinyint" - case ShortType => "smallint" - case DoubleType => "double" - case LongType => "bigint" - case BinaryType => "binary" - case BooleanType => "boolean" - case DateType => "date" - case d: DecimalType => HiveShim.decimalMetastoreString(d) - case TimestampType => "timestamp" - case NullType => "void" - case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType) - } -} - private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: TTable, val partitions: Seq[TPartition]) @@ -582,7 +510,7 @@ private[hive] case class MetastoreRelation implicit class SchemaAttribute(f: FieldSchema) { def toAttribute = AttributeReference( f.getName, - HiveMetastoreTypes.toDataType(f.getType), + sqlContext.ddlParser.parseType(f.getType), // Since data can be dumped in randomly with no validation, everything is nullable. nullable = true )(qualifiers = Seq(alias.getOrElse(tableName))) @@ -602,3 +530,27 @@ private[hive] case class MetastoreRelation /** An attribute map for determining the ordinal for non-partition columns. */ val columnOrdinals = AttributeMap(attributes.zipWithIndex) } + +object HiveMetastoreTypes { + def toMetastoreType(dt: DataType): String = dt match { + case ArrayType(elementType, _) => s"array<${toMetastoreType(elementType)}>" + case StructType(fields) => + s"struct<${fields.map(f => s"${f.name}:${toMetastoreType(f.dataType)}").mkString(",")}>" + case MapType(keyType, valueType, _) => + s"map<${toMetastoreType(keyType)},${toMetastoreType(valueType)}>" + case StringType => "string" + case FloatType => "float" + case IntegerType => "int" + case ByteType => "tinyint" + case ShortType => "smallint" + case DoubleType => "double" + case LongType => "bigint" + case BinaryType => "binary" + case BooleanType => "boolean" + case DateType => "date" + case d: DecimalType => HiveShim.decimalMetastoreString(d) + case TimestampType => "timestamp" + case NullType => "void" + case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 6c906ad66dace..d7a64f6a74547 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -220,9 +220,9 @@ private[hive] trait HiveStrategies { ExecutedCommand(DescribeCommand(planLater(o), describe.output)) :: Nil } - case CreateTableUsing(tableName, provider, false, options) => + case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => ExecutedCommand( - CreateMetastoreDataSource(tableName, provider, options)) :: Nil + CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index f38b44688256e..668c0ab45528c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.SQLContext @@ -92,12 +93,13 @@ case class AddFile(path: String) extends RunnableCommand { case class CreateMetastoreDataSource( tableName: String, + userSpecifiedSchema: Option[StructType], provider: String, options: Map[String, String]) extends RunnableCommand { override def run(sqlContext: SQLContext) = { val hiveContext = sqlContext.asInstanceOf[HiveContext] - hiveContext.catalog.createDataSourceTable(tableName, provider, options) + hiveContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, provider, options) Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 86535f8dd4f58..041a36f1295ef 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import org.scalatest.FunSuite import org.apache.spark.sql.catalyst.types.StructType +import org.apache.spark.sql.sources.DDLParser import org.apache.spark.sql.test.ExamplePointUDT class HiveMetastoreCatalogSuite extends FunSuite { @@ -27,7 +28,9 @@ class HiveMetastoreCatalogSuite extends FunSuite { test("struct field should accept underscore in sub-column name") { val metastr = "struct" - val datatype = HiveMetastoreTypes.toDataType(metastr) + val ddlParser = new DDLParser + + val datatype = ddlParser.parseType(metastr) assert(datatype.isInstanceOf[StructType]) } From 49bf1acc700d454f894edf55cd8fa88aee4d63da Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 7 Jan 2015 17:58:00 -0800 Subject: [PATCH 08/16] Unit tests. --- .../src/test/resources/data/files/sample.json | 3 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 54 ++++++++++++++++--- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/test/resources/data/files/sample.json b/sql/hive/src/test/resources/data/files/sample.json index 7b749791ef419..a2c2ffd5e0330 100644 --- a/sql/hive/src/test/resources/data/files/sample.json +++ b/sql/hive/src/test/resources/data/files/sample.json @@ -1 +1,2 @@ -{"a" : "2" ,"b" : "blah"} +{"a" : "2" ,"b" : "blah", "c_!@(3)":1} +{"" : {"d!" : [4, 5], "=" : [{"Dd2": null}, {"Dd2" : true}]}} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index e3a54982d9a07..069ad35e1a5ab 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -54,16 +54,22 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test ("persistent JSON table with a user specified schema") { sql( """ - |CREATE TABLE jsonTable (a string, b String) + |CREATE TABLE jsonTable ( + |a string, + |b String, + |`c_!@(3)` int, + |`` Struct<`d!`:array, `=`:array>>) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( | path 'src/test/resources/data/files/sample.json' |) """.stripMargin) + jsonFile("src/test/resources/data/files/sample.json").registerTempTable("expectedJsonTable") + checkAnswer( - sql("SELECT * FROM jsonTable"), - jsonFile("src/test/resources/data/files/sample.json").collect().toSeq) + sql("SELECT a, b, `c_!@(3)`, ``.`d!`, ``.`=` FROM jsonTable"), + sql("SELECT a, b, `c_!@(3)`, ``.`d!`, ``.`=` FROM expectedJsonTable").collect().toSeq) } @@ -72,21 +78,26 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { // field values based on field names. sql( """ - |CREATE TABLE jsonTable (b String) + |CREATE TABLE jsonTable (`` Struct<`=`:array>>, b String) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( | path 'src/test/resources/data/files/sample.json' |) """.stripMargin) - val expectedSchema = StructType(StructField("b", StringType, true) :: Nil) + val innerStruct = StructType( + StructField("=", ArrayType(StructType(StructField("Dd2", BooleanType, true) :: Nil))) :: Nil) + val expectedSchema = StructType( + StructField("", innerStruct, true) :: + StructField("b", StringType, true) :: Nil) assert(expectedSchema == table("jsonTable").schema) + jsonFile("src/test/resources/data/files/sample.json").registerTempTable("expectedJsonTable") + checkAnswer( - sql("SELECT * FROM jsonTable"), - jsonFile("src/test/resources/data/files/sample.json").collect().map( - r => Seq(r.getString(1))).toSeq) + sql("SELECT b, ``.`=` FROM jsonTable"), + sql("SELECT b, ``.`=` FROM expectedJsonTable").collect().toSeq) } @@ -200,4 +211,31 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { ("a", "b", "c") :: Nil) FileUtils.deleteDirectory(tempDir) } + + test("invalidate cache and reload") { + sql( + """ + |CREATE TABLE jsonTable (`c_!@(3)` int) + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path 'src/test/resources/data/files/sample.json' + |) + """.stripMargin) + + jsonFile("src/test/resources/data/files/sample.json").registerTempTable("expectedJsonTable") + + checkAnswer( + sql("SELECT * FROM jsonTable"), + sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) + + invalidateTable("jsonTable") + checkAnswer( + sql("SELECT * FROM jsonTable"), + sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) + + invalidateTable("jsonTable") + val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil) + + assert(expectedSchema == table("jsonTable").schema) + } } From 172db80cf71ba4a853a42993e87bc52e6c08b94f Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 8 Jan 2015 17:21:32 -0800 Subject: [PATCH 09/16] Fix unit test. --- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index fb6da33e88ef6..14a8c9c998be0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -615,7 +615,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { assertResult( Array( - Array("# Registered as a temporary table", null, null), Array("a", "IntegerType", null), Array("b", "StringType", null)) ) { From 06f9b0cc917ff4efcce2e9fbf9397e8f553ac601 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sat, 10 Jan 2015 20:59:10 -0800 Subject: [PATCH 10/16] Revert unnecessary changes. --- .../apache/spark/sql/parquet/newParquet.scala | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 506be8ccde6b3..2e0c6c51c00e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -22,37 +22,37 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job} +import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate + import parquet.hadoop.ParquetInputFormat import parquet.hadoop.util.ContextUtil import org.apache.spark.annotation.DeveloperApi import org.apache.spark.{Partition => SparkPartition, Logging} import org.apache.spark.rdd.{NewHadoopPartition, RDD} -import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate + +import org.apache.spark.sql.{SQLConf, Row, SQLContext} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.types.{IntegerType, StructField, StructType} +import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, StructField, StructType} import org.apache.spark.sql.sources._ -import org.apache.spark.sql.{SQLConf, SQLContext} import scala.collection.JavaConversions._ - /** * Allows creation of parquet based tables using the syntax * `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option * required is `path`, which should be the location of a collection of, optionally partitioned, * parquet files. */ -class DefaultSource extends SchemaRelationProvider { +class DefaultSource extends RelationProvider { /** Returns a new base relation with the given parameters. */ override def createRelation( sqlContext: SQLContext, - parameters: Map[String, String], - schema: Option[StructType]): BaseRelation = { + parameters: Map[String, String]): BaseRelation = { val path = parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables.")) - ParquetRelation2(path, schema)(sqlContext) + ParquetRelation2(path)(sqlContext) } } @@ -82,9 +82,7 @@ private[parquet] case class Partition(partitionValues: Map[String, Any], files: * discovery. */ @DeveloperApi -case class ParquetRelation2( - path: String, - userSpecifiedSchema: Option[StructType])(@transient val sqlContext: SQLContext) +case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext) extends CatalystScan with Logging { def sparkContext = sqlContext.sparkContext @@ -135,13 +133,12 @@ case class ParquetRelation2( override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum - val dataSchema = userSpecifiedSchema.getOrElse( - StructType.fromAttributes( // TODO: Parquet code should not deal with attributes. - ParquetTypesConverter.readSchemaFromFile( - partitions.head.files.head.getPath, - Some(sparkContext.hadoopConfiguration), - sqlContext.isParquetBinaryAsString)) - ) + val dataSchema = StructType.fromAttributes( // TODO: Parquet code should not deal with attributes. + ParquetTypesConverter.readSchemaFromFile( + partitions.head.files.head.getPath, + Some(sparkContext.hadoopConfiguration), + sqlContext.isParquetBinaryAsString)) + val dataIncludesKey = partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true) From aeaf4b34f60808fe374f79098c70130fd3eb83ad Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sun, 11 Jan 2015 17:08:10 -0800 Subject: [PATCH 11/16] Add comments. --- .../apache/spark/sql/sources/interfaces.scala | 17 +++++++++++------ .../sql/hive/MetastoreDataSourcesSuite.scala | 7 +++++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 990f7e0e74bcf..2a7be23e37c74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -24,8 +24,8 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute} /** * ::DeveloperApi:: * Implemented by objects that produce relations for a specific kind of data source. When - * Spark SQL is given a DDL operation with a USING clause specified, this interface is used to - * pass in the parameters specified by a user. + * Spark SQL is given a DDL operation with a USING clause specified (to specify the implemented + * RelationProvider), this interface is used to pass in the parameters specified by a user. * * Users may specify the fully qualified class name of a given data source. When that class is * not found Spark SQL will append the class name `DefaultSource` to the path, allowing for @@ -46,10 +46,10 @@ trait RelationProvider { /** * ::DeveloperApi:: - * Implemented by objects that produce relations for a specific kind of data source. When - * Spark SQL is given a DDL operation with - * 1. USING clause: to specify the implemented SchemaRelationProvider - * 2. User defined schema: users can define schema optionally when create table + * Implemented by objects that produce relations for a specific kind of data source + * with a given schema. When Spark SQL is given a DDL operation with a USING clause specified ( + * to specify the implemented SchemaRelationProvider) and a user defined schema, this interface + * is used to pass in the parameters specified by a user. * * Users may specify the fully qualified class name of a given data source. When that class is * not found Spark SQL will append the class name `DefaultSource` to the path, allowing for @@ -57,6 +57,11 @@ trait RelationProvider { * data source 'org.apache.spark.sql.json.DefaultSource' * * A new instance of this class with be instantiated each time a DDL call is made. + * + * The difference between a [[RelationProvider]] and a [[SchemaRelationProvider]] is that + * users need to provide a schema when using a SchemaRelationProvider. + * A relation provider can inherits both [[RelationProvider]] and [[SchemaRelationProvider]] + * if it can support both schema inference and user-specified schemas. */ @DeveloperApi trait SchemaRelationProvider { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 069ad35e1a5ab..f67460511516c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql.hive import java.io.File +import org.scalatest.BeforeAndAfterEach + import org.apache.commons.io.FileUtils + import org.apache.spark.sql._ -import org.apache.spark.sql.hive.test.TestHive -import org.scalatest.BeforeAndAfterEach /* Implicits */ import org.apache.spark.sql.hive.test.TestHive._ @@ -228,7 +229,9 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT * FROM jsonTable"), sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) + // Discard the cached relation. invalidateTable("jsonTable") + checkAnswer( sql("SELECT * FROM jsonTable"), sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) From 7fc4b564b96a1abf8a9d4ffdc6096f2f1679925d Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sun, 11 Jan 2015 17:08:18 -0800 Subject: [PATCH 12/16] Add DDLStrategy and HiveDDLStrategy to plan DDLs based on the data source API. --- .../spark/sql/execution/SparkStrategies.scala | 4 +++- .../org/apache/spark/sql/hive/HiveContext.scala | 1 + .../org/apache/spark/sql/hive/HiveStrategies.scala | 14 ++++++++++---- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ad8395cb710c7..d91b1fbc69834 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -312,7 +312,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - case object DDLStrategy extends Strategy { + object DDLStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) => ExecutedCommand( @@ -320,6 +320,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + + case _ => Nil } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c3daf6c8d3f16..09ff4cc5ab437 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -350,6 +350,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def strategies: Seq[Strategy] = extraStrategies ++ Seq( DataSourceStrategy, HiveCommandStrategy(self), + HiveDDLStrategy, DDLStrategy, TakeOrdered, ParquetOperations, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index c82df0308003e..cdff82e3d04d2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -209,6 +209,16 @@ private[hive] trait HiveStrategies { } } + object HiveDDLStrategy extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => + ExecutedCommand( + CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil + + case _ => Nil + } + } + case class HiveCommandStrategy(context: HiveContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case describe: DescribeCommand => @@ -221,10 +231,6 @@ private[hive] trait HiveStrategies { ExecutedCommand(RunnableDescribeCommand(planLater(o), describe.output)) :: Nil } - case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => - ExecutedCommand( - CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil - case _ => Nil } } From 5315dfcde71a5833c93bd8924019200993e87fa0 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 12 Jan 2015 12:59:42 -0800 Subject: [PATCH 13/16] rxin's comments. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 69 ++++++++++--------- .../spark/sql/hive/execution/commands.scala | 4 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 3 - 3 files changed, 38 insertions(+), 38 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index a8951ae31a576..daeabb6c8bab8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive import java.io.IOException import java.util.{List => JList} -import com.google.common.cache.{CacheLoader, CacheBuilder} +import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder} import org.apache.hadoop.util.ReflectionUtils import org.apache.hadoop.hive.metastore.TableType @@ -54,46 +54,47 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with // TODO: Use this everywhere instead of tuples or databaseName, tableName,. /** A fully qualified identifier for a table (i.e., database.tableName) */ - case class TableIdent(database: String, name: String) { - def toLowerCase = TableIdent(database.toLowerCase, name.toLowerCase) + case class QualifiedTableName(database: String, name: String) { + def toLowerCase = QualifiedTableName(database.toLowerCase, name.toLowerCase) } /** A cache of Spark SQL data source tables that have been accessed. */ - protected[hive] val cachedDataSourceTables = CacheBuilder.newBuilder() - .maximumSize(1000) - .build( - new CacheLoader[TableIdent, LogicalPlan]() { - override def load(in: TableIdent): LogicalPlan = { - logDebug(s"Creating new cached data source for $in") - val table = client.getTable(in.database, in.name) - val schemaString = table.getProperty("spark.sql.sources.schema") - val userSpecifiedSchema = - if (schemaString == null) { - None - } else { - Some(DataType.fromJson(schemaString).asInstanceOf[StructType]) - } - // It does not appear that the ql client for the metastore has a way to enumerate all the - // SerDe properties directly... - val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap - - val resolvedRelation = - ResolvedDataSource( - hive, - userSpecifiedSchema, - table.getProperty("spark.sql.sources.provider"), - options) - - LogicalRelation(resolvedRelation.relation) - } - }) + protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { + val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { + override def load(in: QualifiedTableName): LogicalPlan = { + logDebug(s"Creating new cached data source for $in") + val table = client.getTable(in.database, in.name) + val schemaString = table.getProperty("spark.sql.sources.schema") + val userSpecifiedSchema = + if (schemaString == null) { + None + } else { + Some(DataType.fromJson(schemaString).asInstanceOf[StructType]) + } + // It does not appear that the ql client for the metastore has a way to enumerate all the + // SerDe properties directly... + val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap + + val resolvedRelation = + ResolvedDataSource( + hive, + userSpecifiedSchema, + table.getProperty("spark.sql.sources.provider"), + options) + + LogicalRelation(resolvedRelation.relation) + } + } + + CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader) + } def refreshTable(databaseName: String, tableName: String): Unit = { - cachedDataSourceTables.refresh(TableIdent(databaseName, tableName).toLowerCase) + cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase) } def invalidateTable(databaseName: String, tableName: String): Unit = { - cachedDataSourceTables.invalidate(TableIdent(databaseName, tableName).toLowerCase) + cachedDataSourceTables.invalidate(QualifiedTableName(databaseName, tableName).toLowerCase) } val caseSensitive: Boolean = false @@ -143,7 +144,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val table = client.getTable(databaseName, tblName) if (table.getProperty("spark.sql.sources.provider") != null) { - cachedDataSourceTables(TableIdent(databaseName, tblName).toLowerCase) + cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase) } else if (table.isView) { // if the unresolved relation is from hive view // parse the text into logic node. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 7a7b397b6c3b1..e70cdeaad4c09 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -53,7 +53,9 @@ case class DropTable( override def run(sqlContext: SQLContext) = { val hiveContext = sqlContext.asInstanceOf[HiveContext] val ifExistsClause = if (ifExists) "IF EXISTS " else "" - try hiveContext.tryUncacheQuery(hiveContext.table(tableName)) catch { + try { + hiveContext.tryUncacheQuery(hiveContext.table(tableName)) + } catch { case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException => } hiveContext.invalidateTable(tableName) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index f67460511516c..d436142fad842 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -49,7 +49,6 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { checkAnswer( sql("SELECT * FROM jsonTable"), jsonFile("src/test/resources/data/files/sample.json").collect().toSeq) - } test ("persistent JSON table with a user specified schema") { @@ -71,7 +70,6 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { checkAnswer( sql("SELECT a, b, `c_!@(3)`, ``.`d!`, ``.`=` FROM jsonTable"), sql("SELECT a, b, `c_!@(3)`, ``.`d!`, ``.`=` FROM expectedJsonTable").collect().toSeq) - } test ("persistent JSON table with a user specified schema with a subset of fields") { @@ -99,7 +97,6 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { checkAnswer( sql("SELECT b, ``.`=` FROM jsonTable"), sql("SELECT b, ``.`=` FROM expectedJsonTable").collect().toSeq) - } test("resolve shortened provider names") { From 4456e9824fefae96609c7ee7fe1c413f06df7445 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 12 Jan 2015 15:39:19 -0800 Subject: [PATCH 14/16] Test data. --- .../src/test/resources/data/files/sample.json | 3 +-- sql/hive/src/test/resources/sample.json | 2 ++ .../sql/hive/MetastoreDataSourcesSuite.scala | 24 +++++++++---------- 3 files changed, 15 insertions(+), 14 deletions(-) create mode 100644 sql/hive/src/test/resources/sample.json diff --git a/sql/hive/src/test/resources/data/files/sample.json b/sql/hive/src/test/resources/data/files/sample.json index a2c2ffd5e0330..7b749791ef419 100644 --- a/sql/hive/src/test/resources/data/files/sample.json +++ b/sql/hive/src/test/resources/data/files/sample.json @@ -1,2 +1 @@ -{"a" : "2" ,"b" : "blah", "c_!@(3)":1} -{"" : {"d!" : [4, 5], "=" : [{"Dd2": null}, {"Dd2" : true}]}} +{"a" : "2" ,"b" : "blah"} diff --git a/sql/hive/src/test/resources/sample.json b/sql/hive/src/test/resources/sample.json new file mode 100644 index 0000000000000..a2c2ffd5e0330 --- /dev/null +++ b/sql/hive/src/test/resources/sample.json @@ -0,0 +1,2 @@ +{"a" : "2" ,"b" : "blah", "c_!@(3)":1} +{"" : {"d!" : [4, 5], "=" : [{"Dd2": null}, {"Dd2" : true}]}} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d436142fad842..d8897ad791a2f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -42,13 +42,13 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |CREATE TABLE jsonTable |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( - | path 'src/test/resources/data/files/sample.json' + | path 'src/test/resources/sample.json' |) """.stripMargin) checkAnswer( sql("SELECT * FROM jsonTable"), - jsonFile("src/test/resources/data/files/sample.json").collect().toSeq) + jsonFile("src/test/resources/sample.json").collect().toSeq) } test ("persistent JSON table with a user specified schema") { @@ -61,11 +61,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |`` Struct<`d!`:array, `=`:array>>) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( - | path 'src/test/resources/data/files/sample.json' + | path 'src/test/resources/sample.json' |) """.stripMargin) - jsonFile("src/test/resources/data/files/sample.json").registerTempTable("expectedJsonTable") + jsonFile("src/test/resources/sample.json").registerTempTable("expectedJsonTable") checkAnswer( sql("SELECT a, b, `c_!@(3)`, ``.`d!`, ``.`=` FROM jsonTable"), @@ -80,7 +80,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |CREATE TABLE jsonTable (`` Struct<`=`:array>>, b String) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( - | path 'src/test/resources/data/files/sample.json' + | path 'src/test/resources/sample.json' |) """.stripMargin) @@ -92,7 +92,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { assert(expectedSchema == table("jsonTable").schema) - jsonFile("src/test/resources/data/files/sample.json").registerTempTable("expectedJsonTable") + jsonFile("src/test/resources/sample.json").registerTempTable("expectedJsonTable") checkAnswer( sql("SELECT b, ``.`=` FROM jsonTable"), @@ -105,13 +105,13 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |CREATE TABLE jsonTable |USING org.apache.spark.sql.json |OPTIONS ( - | path 'src/test/resources/data/files/sample.json' + | path 'src/test/resources/sample.json' |) """.stripMargin) checkAnswer( sql("SELECT * FROM jsonTable"), - jsonFile("src/test/resources/data/files/sample.json").collect().toSeq) + jsonFile("src/test/resources/sample.json").collect().toSeq) } test("drop table") { @@ -120,13 +120,13 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |CREATE TABLE jsonTable |USING org.apache.spark.sql.json |OPTIONS ( - | path 'src/test/resources/data/files/sample.json' + | path 'src/test/resources/sample.json' |) """.stripMargin) checkAnswer( sql("SELECT * FROM jsonTable"), - jsonFile("src/test/resources/data/files/sample.json").collect().toSeq) + jsonFile("src/test/resources/sample.json").collect().toSeq) sql("DROP TABLE jsonTable") @@ -216,11 +216,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |CREATE TABLE jsonTable (`c_!@(3)` int) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( - | path 'src/test/resources/data/files/sample.json' + | path 'src/test/resources/sample.json' |) """.stripMargin) - jsonFile("src/test/resources/data/files/sample.json").registerTempTable("expectedJsonTable") + jsonFile("src/test/resources/sample.json").registerTempTable("expectedJsonTable") checkAnswer( sql("SELECT * FROM jsonTable"), From c07cbc6ca2f85f84511fe476a0f0a27d33fa7768 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 12 Jan 2015 16:38:04 -0800 Subject: [PATCH 15/16] Get the location of test file in a correct way. --- .../sql/hive/MetastoreDataSourcesSuite.scala | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d8897ad791a2f..ec9ebb4a775a3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.commons.io.FileUtils import org.apache.spark.sql._ +import org.apache.spark.util.Utils /* Implicits */ import org.apache.spark.sql.hive.test.TestHive._ @@ -36,24 +37,26 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { reset() } + val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile + test ("persistent JSON table") { sql( - """ + s""" |CREATE TABLE jsonTable |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( - | path 'src/test/resources/sample.json' + | path '${filePath}' |) """.stripMargin) checkAnswer( sql("SELECT * FROM jsonTable"), - jsonFile("src/test/resources/sample.json").collect().toSeq) + jsonFile(filePath).collect().toSeq) } test ("persistent JSON table with a user specified schema") { sql( - """ + s""" |CREATE TABLE jsonTable ( |a string, |b String, @@ -61,11 +64,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { |`` Struct<`d!`:array, `=`:array>>) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( - | path 'src/test/resources/sample.json' + | path '${filePath}' |) """.stripMargin) - jsonFile("src/test/resources/sample.json").registerTempTable("expectedJsonTable") + jsonFile(filePath).registerTempTable("expectedJsonTable") checkAnswer( sql("SELECT a, b, `c_!@(3)`, ``.`d!`, ``.`=` FROM jsonTable"), @@ -76,11 +79,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { // This works because JSON objects are self-describing and JSONRelation can get needed // field values based on field names. sql( - """ + s""" |CREATE TABLE jsonTable (`` Struct<`=`:array>>, b String) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( - | path 'src/test/resources/sample.json' + | path '${filePath}' |) """.stripMargin) @@ -92,7 +95,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { assert(expectedSchema == table("jsonTable").schema) - jsonFile("src/test/resources/sample.json").registerTempTable("expectedJsonTable") + jsonFile(filePath).registerTempTable("expectedJsonTable") checkAnswer( sql("SELECT b, ``.`=` FROM jsonTable"), @@ -101,32 +104,32 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("resolve shortened provider names") { sql( - """ + s""" |CREATE TABLE jsonTable |USING org.apache.spark.sql.json |OPTIONS ( - | path 'src/test/resources/sample.json' + | path '${filePath}' |) """.stripMargin) checkAnswer( sql("SELECT * FROM jsonTable"), - jsonFile("src/test/resources/sample.json").collect().toSeq) + jsonFile(filePath).collect().toSeq) } test("drop table") { sql( - """ + s""" |CREATE TABLE jsonTable |USING org.apache.spark.sql.json |OPTIONS ( - | path 'src/test/resources/sample.json' + | path '${filePath}' |) """.stripMargin) checkAnswer( sql("SELECT * FROM jsonTable"), - jsonFile("src/test/resources/sample.json").collect().toSeq) + jsonFile(filePath).collect().toSeq) sql("DROP TABLE jsonTable") @@ -212,15 +215,15 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { test("invalidate cache and reload") { sql( - """ + s""" |CREATE TABLE jsonTable (`c_!@(3)` int) |USING org.apache.spark.sql.json.DefaultSource |OPTIONS ( - | path 'src/test/resources/sample.json' + | path '${filePath}' |) """.stripMargin) - jsonFile("src/test/resources/sample.json").registerTempTable("expectedJsonTable") + jsonFile(filePath).registerTempTable("expectedJsonTable") checkAnswer( sql("SELECT * FROM jsonTable"), From 069c2354ca281916658695fabeb58bb8c331e18f Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 13 Jan 2015 11:48:42 -0800 Subject: [PATCH 16/16] Make exception messages user friendly. --- .../org/apache/spark/sql/sources/ddl.scala | 4 +-- .../spark/sql/sources/TableScanSuite.scala | 30 +++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index e3627a6d29850..f8741e0082098 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -198,7 +198,7 @@ object ResolvedDataSource { .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider] .createRelation(sqlContext, new CaseInsensitiveMap(options), schema) case _ => - sys.error(s"${clazz.getCanonicalName} should extend SchemaRelationProvider.") + sys.error(s"${clazz.getCanonicalName} does not allow user-specified schemas.") } } case None => { @@ -208,7 +208,7 @@ object ResolvedDataSource { .asInstanceOf[org.apache.spark.sql.sources.RelationProvider] .createRelation(sqlContext, new CaseInsensitiveMap(options)) case _ => - sys.error(s"${clazz.getCanonicalName} should extend RelationProvider.") + sys.error(s"A schema needs to be specified when using ${clazz.getCanonicalName}.") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 605190f5ae6a2..a1d2468b2573c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -314,4 +314,34 @@ class TableScanSuite extends DataSourceTest { sql("SELECT * FROM oneToTenDef"), (1 to 10).map(Row(_)).toSeq) } + + test("exceptions") { + // Make sure we do throw correct exception when users use a relation provider that + // only implements the RelationProvier or the SchemaRelationProvider. + val schemaNotAllowed = intercept[Exception] { + sql( + """ + |CREATE TEMPORARY TABLE relationProvierWithSchema (i int) + |USING org.apache.spark.sql.sources.SimpleScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) + } + assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas")) + + val schemaNeeded = intercept[Exception] { + sql( + """ + |CREATE TEMPORARY TABLE schemaRelationProvierWithoutSchema + |USING org.apache.spark.sql.sources.AllDataTypesScanSource + |OPTIONS ( + | From '1', + | To '10' + |) + """.stripMargin) + } + assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using")) + } }