Skip to content

Commit

Permalink
Avoid table creation in logical plan analyzing
Browse files Browse the repository at this point in the history
  • Loading branch information
chenghao-intel committed Sep 11, 2014
1 parent c27718f commit 9a57abc
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,12 @@ case class InsertIntoTable(
}
}

case class InsertIntoCreatedTable(
case class CreateTableAsSelect(
databaseName: Option[String],
tableName: String,
child: LogicalPlan) extends UnaryNode {
override def output = child.output
override lazy val resolved = (databaseName != None && childrenResolved)
}

case class WriteToFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike {
@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile =>
queryExecution.toRdd
SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
case _ =>
Expand Down Expand Up @@ -124,7 +124,7 @@ private[sql] trait SchemaRDDLike {
*/
@Experimental
def saveAsTable(tableName: String): Unit =
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan)).toRdd

/** Returns the schema as a string in the tree format.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
db: Option[String],
tableName: String,
alias: Option[String]): LogicalPlan = synchronized {
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
val (databaseName, tblName) = processDatabaseAndTableName(
db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
val table = client.getTable(databaseName, tblName)
val partitions: Seq[Partition] =
if (table.isPartitioned) {
Expand Down Expand Up @@ -112,17 +112,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
// Wait until children are resolved.
case p: LogicalPlan if !p.childrenResolved => p

case InsertIntoCreatedTable(db, tableName, child) =>
case CreateTableAsSelect(db, tableName, child) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)

createTable(databaseName, tblName, child.output)

InsertIntoTable(
lookupRelation(Some(databaseName), tblName, None),
Map.empty,
child,
overwrite = false)
CreateTableAsSelect(Some(databaseName), tableName, child)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ private[hive] object HiveQl {

val (db, tableName) = extractDbNameTableName(tableNameParts)

InsertIntoCreatedTable(db, tableName, nodeToPlan(query))
CreateTableAsSelect(db, tableName, nodeToPlan(query))

// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
case Token("TOK_CREATETABLE", _) => NativePlaceholder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ private[hive] trait HiveStrategies {
InMemoryRelation(_, _, _,
HiveTableScan(_, table, _)), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.CreateTableAsSelect(database, tableName, child) =>
val query = planLater(child)
CreateTableAsSelect(
database.get,
tableName,
query,
InsertIntoHiveTable(_: MetastoreRelation,
Map(),
query,
true)(hiveContext)) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution

import org.apache.spark.annotation.Experimental
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LowerCaseSchema
import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.MetastoreRelation

/**
* :: Experimental ::
* Create table and insert the query result into it.
* @param database the database name of the new relation
* @param tableName the table name of the new relation
* @param insertIntoRelation function of creating the `InsertIntoHiveTable`
* by specifying the `MetaStoreRelation`, the data will be inserted into that table.
* TODO Add more table creating properties, e.g. SerDe, StorageHandler, in-memory cache etc.
*/
@Experimental
case class CreateTableAsSelect(
database: String,
tableName: String,
query: SparkPlan,
insertIntoRelation: MetastoreRelation => InsertIntoHiveTable)
extends LeafNode with Command {

def output = Seq.empty

// A lazy computing of the metastoreRelation
private[this] lazy val metastoreRelation: MetastoreRelation = {
// Create the table
val sc = sqlContext.asInstanceOf[HiveContext]
sc.catalog.createTable(database, tableName, query.output, false)
// Get the Metastore Relation
sc.catalog.lookupRelation(Some(database), tableName, None) match {
case LowerCaseSchema(r: MetastoreRelation) => r
case o: MetastoreRelation => o
}
}

override protected[sql] lazy val sideEffectResult: Seq[Row] = {
insertIntoRelation(metastoreRelation).execute
Seq.empty[Row]
}

override def execute(): RDD[Row] = {
sideEffectResult
sparkContext.emptyRDD[Row]
}

override def argString: String = {
s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]\n" + query.toString
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ case class InsertIntoHiveTable(
(@transient sc: HiveContext)
extends UnaryNode {

val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private val hiveContext = new Context(sc.hiveconf)
@transient private val db = Hive.get(sc.hiveconf)
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val db = Hive.get(sc.hiveconf)

private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.QueryTest

import org.apache.spark.sql.{SQLConf, QueryTest, Row}
import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._

case class Nested1(f1: Nested2)
Expand Down Expand Up @@ -54,4 +58,11 @@ class SQLQuerySuite extends QueryTest {
sql("SELECT f1.f2.f3 FROM nested"),
1)
}

test("test CTAS") {
checkAnswer(sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src"), Seq.empty[Row])
checkAnswer(
sql("SELECT key, value FROM test_ctas_123 ORDER BY key"),
sql("SELECT key, value FROM src ORDER BY key").collect().toSeq)
}
}

0 comments on commit 9a57abc

Please sign in to comment.