From 4a38c9b15ecc04f2ae2f285af5742608fc91549b Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 14 Jan 2015 12:47:49 -0800 Subject: [PATCH] [SPARK-5193][SQL] Tighten up SQLContext API 1. Removed 2 implicits (logicalPlanToSparkQuery and baseRelationToSchemaRDD) 2. Moved extraStrategies into ExperimentalMethods. 3. Made private methods protected[sql] so they don't show up in javadocs. --- .../spark/sql/ExperimentalMethods.scala | 36 +++++++++++++++++++ .../org/apache/spark/sql/SQLContext.scala | 33 +++++------------ .../apache/spark/sql/execution/commands.scala | 10 +++--- .../org/apache/spark/sql/sources/ddl.scala | 5 ++- .../spark/sql/test/TestSQLContext.scala | 16 +++++++-- .../apache/spark/sql/hive/HiveContext.scala | 4 +-- .../org/apache/spark/sql/hive/TestHive.scala | 2 +- 7 files changed, 68 insertions(+), 38 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala new file mode 100644 index 0000000000000..f0e6a8f332188 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.Experimental + +/** + * Holder for experimental methods for the bravest. We make NO guarantee about the stability + * regarding binary compatibility and source compatibility of methods here. + */ +@Experimental +class ExperimentalMethods protected[sql](sqlContext: SQLContext) { + + /** + * Allows extra strategies to be injected into the query planner at runtime. Note this API + * should be consider experimental and is not intended to be stable across releases. + */ + @Experimental + var extraStrategies: Seq[Strategy] = Nil + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index d9f3b3a53f582..2c84c1557c090 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.json._ import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation} +import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy} import org.apache.spark.sql.types._ /** @@ -59,7 +59,7 @@ class SQLContext(@transient val sparkContext: SparkContext) self => // Note that this is a lazy val so we can override the default value in subclasses. - private[sql] lazy val conf: SQLConf = new SQLConf + protected[sql] lazy val conf: SQLConf = new SQLConf /** Set Spark SQL configuration properties. */ def setConf(props: Properties): Unit = conf.setConf(props) @@ -117,15 +117,6 @@ class SQLContext(@transient val sparkContext: SparkContext) case _ => } - /** - * :: DeveloperApi :: - * Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan - * interface is considered internal, and thus not guaranteed to be stable. As a result, using - * them directly is not recommended. - */ - @DeveloperApi - implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan) - /** * Creates a SchemaRDD from an RDD of case classes. * @@ -139,10 +130,6 @@ class SQLContext(@transient val sparkContext: SparkContext) new SchemaRDD(this, LogicalRDD(attributeSeq, rowRDD)(self)) } - implicit def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = { - logicalPlanToSparkQuery(LogicalRelation(baseRelation)) - } - /** * :: DeveloperApi :: * Creates a [[SchemaRDD]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD. @@ -336,12 +323,10 @@ class SQLContext(@transient val sparkContext: SparkContext) new SchemaRDD(this, catalog.lookupRelation(Seq(tableName))) /** - * :: DeveloperApi :: - * Allows extra strategies to be injected into the query planner at runtime. Note this API - * should be consider experimental and is not intended to be stable across releases. + * A collection of methods that are considered experimental, but can be used to hook into + * the query planner for advanced functionalities. */ - @DeveloperApi - var extraStrategies: Seq[Strategy] = Nil + val experimental: ExperimentalMethods = new ExperimentalMethods(this) protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext: SparkContext = self.sparkContext @@ -353,7 +338,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def numPartitions = self.conf.numShufflePartitions def strategies: Seq[Strategy] = - extraStrategies ++ ( + experimental.extraStrategies ++ ( DataSourceStrategy :: DDLStrategy :: TakeOrdered :: @@ -479,14 +464,14 @@ class SQLContext(@transient val sparkContext: SparkContext) * have the same format as the one generated by `toString` in scala. * It is only used by PySpark. */ - private[sql] def parseDataType(dataTypeString: String): DataType = { + protected[sql] def parseDataType(dataTypeString: String): DataType = { DataType.fromJson(dataTypeString) } /** * Apply a schema defined by the schemaString to an RDD. It is only used by PySpark. */ - private[sql] def applySchemaToPythonRDD( + protected[sql] def applySchemaToPythonRDD( rdd: RDD[Array[Any]], schemaString: String): SchemaRDD = { val schema = parseDataType(schemaString).asInstanceOf[StructType] @@ -496,7 +481,7 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * Apply a schema defined by the schema to an RDD. It is only used by PySpark. */ - private[sql] def applySchemaToPythonRDD( + protected[sql] def applySchemaToPythonRDD( rdd: RDD[Array[Any]], schema: StructType): SchemaRDD = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index af6b07bd6c2f4..52a31f01a4358 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -20,11 +20,11 @@ package org.apache.spark.sql.execution import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Row, Attribute} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.{SQLConf, SQLContext} /** * A logical command that is executed for its side-effects. `RunnableCommand`s are @@ -137,14 +137,12 @@ case class CacheTableCommand( isLazy: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext) = { - import sqlContext._ - - plan.foreach(_.registerTempTable(tableName)) - cacheTable(tableName) + plan.foreach(p => new SchemaRDD(sqlContext, p).registerTempTable(tableName)) + sqlContext.cacheTable(tableName) if (!isLazy) { // Performs eager caching - table(tableName).count() + sqlContext.table(tableName).count() } Seq.empty[Row] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 4cc9641c4d9e0..381298caba6f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -22,7 +22,7 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers import scala.util.parsing.combinator.PackratParsers import org.apache.spark.Logging -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SchemaRDD, SQLContext} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical import org.apache.spark.sql.execution.RunnableCommand @@ -234,8 +234,7 @@ private [sql] case class CreateTempTableUsing( def run(sqlContext: SQLContext) = { val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) - - sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName) + new SchemaRDD(sqlContext, LogicalRelation(resolved.relation)).registerTempTable(tableName) Seq.empty } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala index 8c80be106f3cb..f9c082216085d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -17,8 +17,11 @@ package org.apache.spark.sql.test +import scala.language.implicitConversions + import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{SQLConf, SQLContext} +import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** A SQLContext that can be used for local testing. */ object TestSQLContext @@ -29,7 +32,16 @@ object TestSQLContext new SparkConf().set("spark.sql.testkey", "true"))) { /** Fewer partitions to speed up testing. */ - private[sql] override lazy val conf: SQLConf = new SQLConf { + protected[sql] override lazy val conf: SQLConf = new SQLConf { override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt } + + /** + * Turn a logical plan into a SchemaRDD. This should be removed once we have an easier way to + * construct SchemaRDD directly out of local data without relying on implicits. + */ + protected[sql] implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = { + new SchemaRDD(this, plan) + } + } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index bf56e60cf995f..a9a20a54bebe8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -71,7 +71,7 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) { class HiveContext(sc: SparkContext) extends SQLContext(sc) { self => - private[sql] override lazy val conf: SQLConf = new SQLConf { + protected[sql] override lazy val conf: SQLConf = new SQLConf { override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") } @@ -348,7 +348,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val hivePlanner = new SparkPlanner with HiveStrategies { val hiveContext = self - override def strategies: Seq[Strategy] = extraStrategies ++ Seq( + override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq( DataSourceStrategy, HiveCommandStrategy(self), HiveDDLStrategy, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 52e1f0d94fbd4..47431cef03e13 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -102,7 +102,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { new this.QueryExecution { val logical = plan } /** Fewer partitions to speed up testing. */ - private[sql] override lazy val conf: SQLConf = new SQLConf { + protected[sql] override lazy val conf: SQLConf = new SQLConf { override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") }