Skip to content

Commit

Permalink
[SPARK-5193][SQL] Tighten up SQLContext API
Browse files Browse the repository at this point in the history
1. Removed 2 implicits (logicalPlanToSparkQuery and baseRelationToSchemaRDD)
2. Moved extraStrategies into ExperimentalMethods.
3. Made private methods protected[sql] so they don't show up in javadocs.
  • Loading branch information
rxin committed Jan 14, 2015
1 parent d5eeb35 commit 4a38c9b
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.annotation.Experimental

/**
* Holder for experimental methods for the bravest. We make NO guarantee about the stability
* regarding binary compatibility and source compatibility of methods here.
*/
@Experimental
class ExperimentalMethods protected[sql](sqlContext: SQLContext) {

/**
* Allows extra strategies to be injected into the query planner at runtime. Note this API
* should be consider experimental and is not intended to be stable across releases.
*/
@Experimental
var extraStrategies: Seq[Strategy] = Nil

}
33 changes: 9 additions & 24 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.json._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation}
import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
import org.apache.spark.sql.types._

/**
Expand All @@ -59,7 +59,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
self =>

// Note that this is a lazy val so we can override the default value in subclasses.
private[sql] lazy val conf: SQLConf = new SQLConf
protected[sql] lazy val conf: SQLConf = new SQLConf

/** Set Spark SQL configuration properties. */
def setConf(props: Properties): Unit = conf.setConf(props)
Expand Down Expand Up @@ -117,15 +117,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
case _ =>
}

/**
* :: DeveloperApi ::
* Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan
* interface is considered internal, and thus not guaranteed to be stable. As a result, using
* them directly is not recommended.
*/
@DeveloperApi
implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan)

/**
* Creates a SchemaRDD from an RDD of case classes.
*
Expand All @@ -139,10 +130,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, LogicalRDD(attributeSeq, rowRDD)(self))
}

implicit def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
logicalPlanToSparkQuery(LogicalRelation(baseRelation))
}

/**
* :: DeveloperApi ::
* Creates a [[SchemaRDD]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD.
Expand Down Expand Up @@ -336,12 +323,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, catalog.lookupRelation(Seq(tableName)))

/**
* :: DeveloperApi ::
* Allows extra strategies to be injected into the query planner at runtime. Note this API
* should be consider experimental and is not intended to be stable across releases.
* A collection of methods that are considered experimental, but can be used to hook into
* the query planner for advanced functionalities.
*/
@DeveloperApi
var extraStrategies: Seq[Strategy] = Nil
val experimental: ExperimentalMethods = new ExperimentalMethods(this)

protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext: SparkContext = self.sparkContext
Expand All @@ -353,7 +338,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def numPartitions = self.conf.numShufflePartitions

def strategies: Seq[Strategy] =
extraStrategies ++ (
experimental.extraStrategies ++ (
DataSourceStrategy ::
DDLStrategy ::
TakeOrdered ::
Expand Down Expand Up @@ -479,14 +464,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
* have the same format as the one generated by `toString` in scala.
* It is only used by PySpark.
*/
private[sql] def parseDataType(dataTypeString: String): DataType = {
protected[sql] def parseDataType(dataTypeString: String): DataType = {
DataType.fromJson(dataTypeString)
}

/**
* Apply a schema defined by the schemaString to an RDD. It is only used by PySpark.
*/
private[sql] def applySchemaToPythonRDD(
protected[sql] def applySchemaToPythonRDD(
rdd: RDD[Array[Any]],
schemaString: String): SchemaRDD = {
val schema = parseDataType(schemaString).asInstanceOf[StructType]
Expand All @@ -496,7 +481,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* Apply a schema defined by the schema to an RDD. It is only used by PySpark.
*/
private[sql] def applySchemaToPythonRDD(
protected[sql] def applySchemaToPythonRDD(
rdd: RDD[Array[Any]],
schema: StructType): SchemaRDD = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package org.apache.spark.sql.execution
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{SQLConf, SQLContext}

/**
* A logical command that is executed for its side-effects. `RunnableCommand`s are
Expand Down Expand Up @@ -137,14 +137,12 @@ case class CacheTableCommand(
isLazy: Boolean) extends RunnableCommand {

override def run(sqlContext: SQLContext) = {
import sqlContext._

plan.foreach(_.registerTempTable(tableName))
cacheTable(tableName)
plan.foreach(p => new SchemaRDD(sqlContext, p).registerTempTable(tableName))
sqlContext.cacheTable(tableName)

if (!isLazy) {
// Performs eager caching
table(tableName).count()
sqlContext.table(tableName).count()
}

Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers
import scala.util.parsing.combinator.PackratParsers

import org.apache.spark.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SchemaRDD, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.SqlLexical
import org.apache.spark.sql.execution.RunnableCommand
Expand Down Expand Up @@ -234,8 +234,7 @@ private [sql] case class CreateTempTableUsing(

def run(sqlContext: SQLContext) = {
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)

sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName)
new SchemaRDD(sqlContext, LogicalRelation(resolved.relation)).registerTempTable(tableName)
Seq.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.spark.sql.test

import scala.language.implicitConversions

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLConf, SQLContext}
import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/** A SQLContext that can be used for local testing. */
object TestSQLContext
Expand All @@ -29,7 +32,16 @@ object TestSQLContext
new SparkConf().set("spark.sql.testkey", "true"))) {

/** Fewer partitions to speed up testing. */
private[sql] override lazy val conf: SQLConf = new SQLConf {
protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
}

/**
* Turn a logical plan into a SchemaRDD. This should be removed once we have an easier way to
* construct SchemaRDD directly out of local data without relying on implicits.
*/
protected[sql] implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = {
new SchemaRDD(this, plan)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) {
class HiveContext(sc: SparkContext) extends SQLContext(sc) {
self =>

private[sql] override lazy val conf: SQLConf = new SQLConf {
protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
}

Expand Down Expand Up @@ -348,7 +348,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val hivePlanner = new SparkPlanner with HiveStrategies {
val hiveContext = self

override def strategies: Seq[Strategy] = extraStrategies ++ Seq(
override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq(
DataSourceStrategy,
HiveCommandStrategy(self),
HiveDDLStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
new this.QueryExecution { val logical = plan }

/** Fewer partitions to speed up testing. */
private[sql] override lazy val conf: SQLConf = new SQLConf {
protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
}
Expand Down

0 comments on commit 4a38c9b

Please sign in to comment.