Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5193][SQL] Tighten up SQLContext API #4049

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.annotation.Experimental

/**
* Holder for experimental methods for the bravest. We make NO guarantee about the stability
* regarding binary compatibility and source compatibility of methods here.
*/
@Experimental
class ExperimentalMethods protected[sql](sqlContext: SQLContext) {

/**
* Allows extra strategies to be injected into the query planner at runtime. Note this API
* should be consider experimental and is not intended to be stable across releases.
*/
@Experimental
var extraStrategies: Seq[Strategy] = Nil

}
152 changes: 93 additions & 59 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.spark.sql

import java.beans.Introspector
import java.util.Properties

import scala.collection.immutable
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
Expand All @@ -36,9 +37,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.json._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation}
import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation, DDLParser, DataSourceStrategy}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

/**
* :: AlphaComponent ::
Expand All @@ -59,7 +60,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
self =>

// Note that this is a lazy val so we can override the default value in subclasses.
private[sql] lazy val conf: SQLConf = new SQLConf
protected[sql] lazy val conf: SQLConf = new SQLConf

/** Set Spark SQL configuration properties. */
def setConf(props: Properties): Unit = conf.setConf(props)
Expand Down Expand Up @@ -117,15 +118,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
case _ =>
}

/**
* :: DeveloperApi ::
* Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan
* interface is considered internal, and thus not guaranteed to be stable. As a result, using
* them directly is not recommended.
*/
@DeveloperApi
implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan)

/**
* Creates a SchemaRDD from an RDD of case classes.
*
Expand All @@ -139,8 +131,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, LogicalRDD(attributeSeq, rowRDD)(self))
}

implicit def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
logicalPlanToSparkQuery(LogicalRelation(baseRelation))
/**
* Convert a [[BaseRelation]] created for external data sources into a [[SchemaRDD]].
*/
def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = {
new SchemaRDD(this, LogicalRelation(baseRelation))
}

/**
Expand Down Expand Up @@ -181,6 +176,43 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, logicalPlan)
}

/**
* Applies a schema to an RDD of Java Beans.
*
* WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
* SELECT * queries will return the columns in an undefined order.
*/
def applySchema(rdd: RDD[_], beanClass: Class[_]): SchemaRDD = {
val attributeSeq = getSchema(beanClass)
val className = beanClass.getName
val rowRdd = rdd.mapPartitions { iter =>
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
val localBeanInfo = Introspector.getBeanInfo(
Class.forName(className, true, Utils.getContextOrSparkClassLoader))
val extractors =
localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)

iter.map { row =>
new GenericRow(
extractors.zip(attributeSeq).map { case (e, attr) =>
DataTypeConversions.convertJavaToCatalyst(e.invoke(row), attr.dataType)
}.toArray[Any]
) : Row
}
}
new SchemaRDD(this, LogicalRDD(attributeSeq, rowRdd)(this))
}

/**
* Applies a schema to an RDD of Java Beans.
*
* WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
* SELECT * queries will return the columns in an undefined order.
*/
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): SchemaRDD = {
applySchema(rdd.rdd, beanClass)
}

/**
* Loads a Parquet file, returning the result as a [[SchemaRDD]].
*
Expand Down Expand Up @@ -259,41 +291,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
applySchema(rowRDD, appliedSchema)
}

/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
* This registered table can be used as the target of future `insertInto` operations.
*
* {{{
* val sqlContext = new SQLContext(...)
* import sqlContext._
*
* case class Person(name: String, age: Int)
* createParquetFile[Person]("path/to/file.parquet").registerTempTable("people")
* sql("INSERT INTO people SELECT 'michael', 29")
* }}}
*
* @tparam A A case class type that describes the desired schema of the parquet file to be
* created.
* @param path The path where the directory containing parquet metadata should be created.
* Data inserted into this table will also be stored at this location.
* @param allowExisting When false, an exception will be thrown if this directory already exists.
* @param conf A Hadoop configuration object that can be used to specify options to the parquet
* output format.
*
* @group userf
*/
@Experimental
def createParquetFile[A <: Product : TypeTag](
path: String,
allowExisting: Boolean = true,
conf: Configuration = new Configuration()): SchemaRDD = {
new SchemaRDD(
this,
ParquetRelation.createEmpty(
path, ScalaReflection.attributesFor[A], allowExisting, conf, this))
}

/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
* during the lifetime of this instance of SQLContext.
Expand Down Expand Up @@ -336,12 +333,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, catalog.lookupRelation(Seq(tableName)))

/**
* :: DeveloperApi ::
* Allows extra strategies to be injected into the query planner at runtime. Note this API
* should be consider experimental and is not intended to be stable across releases.
* A collection of methods that are considered experimental, but can be used to hook into
* the query planner for advanced functionalities.
*/
@DeveloperApi
var extraStrategies: Seq[Strategy] = Nil
val experimental: ExperimentalMethods = new ExperimentalMethods(this)

protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext: SparkContext = self.sparkContext
Expand All @@ -353,7 +348,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def numPartitions = self.conf.numShufflePartitions

def strategies: Seq[Strategy] =
extraStrategies ++ (
experimental.extraStrategies ++ (
DataSourceStrategy ::
DDLStrategy ::
TakeOrdered ::
Expand Down Expand Up @@ -479,14 +474,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
* have the same format as the one generated by `toString` in scala.
* It is only used by PySpark.
*/
private[sql] def parseDataType(dataTypeString: String): DataType = {
protected[sql] def parseDataType(dataTypeString: String): DataType = {
DataType.fromJson(dataTypeString)
}

/**
* Apply a schema defined by the schemaString to an RDD. It is only used by PySpark.
*/
private[sql] def applySchemaToPythonRDD(
protected[sql] def applySchemaToPythonRDD(
rdd: RDD[Array[Any]],
schemaString: String): SchemaRDD = {
val schema = parseDataType(schemaString).asInstanceOf[StructType]
Expand All @@ -496,7 +491,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* Apply a schema defined by the schema to an RDD. It is only used by PySpark.
*/
private[sql] def applySchemaToPythonRDD(
protected[sql] def applySchemaToPythonRDD(
rdd: RDD[Array[Any]],
schema: StructType): SchemaRDD = {

Expand Down Expand Up @@ -527,4 +522,43 @@ class SQLContext(@transient val sparkContext: SparkContext)

new SchemaRDD(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
}

/**
* Returns a Catalyst Schema for the given java bean class.
*/
protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
val beanInfo = Introspector.getBeanInfo(beanClass)

// Note: The ordering of elements may differ from when the schema is inferred in Scala.
// This is because beanInfo.getPropertyDescriptors gives no guarantees about
// element ordering.
val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
fields.map { property =>
val (dataType, nullable) = property.getPropertyType match {
case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
(c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
case c: Class[_] if c == classOf[java.lang.String] => (StringType, true)
case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false)
case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false)
case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false)
case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false)
case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false)
case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false)
case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false)

case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true)
case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true)
case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true)
case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true)
case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true)
case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true)
case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)
case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true)
case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
}
AttributeReference(property.getName, dataType, nullable)()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package org.apache.spark.sql.execution
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{SQLConf, SQLContext}

/**
* A logical command that is executed for its side-effects. `RunnableCommand`s are
Expand Down Expand Up @@ -137,14 +137,12 @@ case class CacheTableCommand(
isLazy: Boolean) extends RunnableCommand {

override def run(sqlContext: SQLContext) = {
import sqlContext._

plan.foreach(_.registerTempTable(tableName))
cacheTable(tableName)
plan.foreach(p => new SchemaRDD(sqlContext, p).registerTempTable(tableName))
sqlContext.cacheTable(tableName)

if (!isLazy) {
// Performs eager caching
table(tableName).count()
sqlContext.table(tableName).count()
}

Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers
import scala.util.parsing.combinator.PackratParsers

import org.apache.spark.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SchemaRDD, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.SqlLexical
import org.apache.spark.sql.execution.RunnableCommand
Expand Down Expand Up @@ -234,8 +234,7 @@ private [sql] case class CreateTempTableUsing(

def run(sqlContext: SQLContext) = {
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)

sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName)
new SchemaRDD(sqlContext, LogicalRelation(resolved.relation)).registerTempTable(tableName)
Seq.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.spark.sql.test

import scala.language.implicitConversions

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLConf, SQLContext}
import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/** A SQLContext that can be used for local testing. */
object TestSQLContext
Expand All @@ -29,7 +32,16 @@ object TestSQLContext
new SparkConf().set("spark.sql.testkey", "true"))) {

/** Fewer partitions to speed up testing. */
private[sql] override lazy val conf: SQLConf = new SQLConf {
protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
}

/**
* Turn a logical plan into a SchemaRDD. This should be removed once we have an easier way to
* construct SchemaRDD directly out of local data without relying on implicits.
*/
protected[sql] implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = {
new SchemaRDD(this, plan)
}

}
Loading