Skip to content

Commit

Permalink
added SchemaRelationProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
scwf committed Dec 30, 2014
1 parent 0ba70df commit 7787ec7
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 12 deletions.
Expand Up @@ -21,12 +21,12 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.sources._

private[sql] class DefaultSource extends RelationProvider {
private[sql] class DefaultSource extends SchemaRelationProvider {
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: Option[StructType]): BaseRelation = {
schema: Option[StructType] = None): BaseRelation = {
val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)

Expand Down
Expand Up @@ -43,12 +43,12 @@ import scala.collection.JavaConversions._
* required is `path`, which should be the location of a collection of, optionally partitioned,
* parquet files.
*/
class DefaultSource extends RelationProvider {
class DefaultSource extends SchemaRelationProvider {
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: Option[StructType]): BaseRelation = {
schema: Option[StructType] = None): BaseRelation = {
val path =
parameters.getOrElse("path", sys.error("'path' must be specified for parquet tables."))

Expand Down
Expand Up @@ -217,7 +217,8 @@ private[sql] case class CreateTableUsing(
sys.error(s"Failed to load class for data source: $provider")
}
}
val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
val dataSource =
clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
val relation = dataSource.createRelation(
sqlContext, new CaseInsensitiveMap(options), Some(StructType(tableCols)))

Expand Down
Expand Up @@ -41,10 +41,35 @@ trait RelationProvider {
* Note: the parameters' keywords are case insensitive and this insensitivity is enforced
* by the Map that is passed to the function.
*/
def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
}

/**
* ::DeveloperApi::
* Implemented by objects that produce relations for a specific kind of data source. When
* Spark SQL is given a DDL operation with a USING clause specified and user defined schema optionally,
* this interface is used to pass in the parameters specified by a user.
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class with be instantiated each time a DDL call is made.
*/
@DeveloperApi
trait SchemaRelationProvider {
/**
* Returns a new base relation with the given parameters and user defined schema.
* Note: the parameters' keywords are case insensitive and this insensitivity is enforced
* by the Map that is passed to the function.
*/
def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
schema: Option[StructType]): BaseRelation
schema: Option[StructType] = None): BaseRelation
}

/**
Expand Down
Expand Up @@ -21,10 +21,11 @@ import scala.language.existentials

import org.apache.spark.sql._

class FilteredScanSource extends RelationProvider {
class FilteredScanSource extends SchemaRelationProvider {
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
parameters: Map[String, String],
schema: Option[StructType] = None): BaseRelation = {
SimpleFilteredScan(parameters("from").toInt, parameters("to").toInt)(sqlContext)
}
}
Expand Down
Expand Up @@ -19,10 +19,11 @@ package org.apache.spark.sql.sources

import org.apache.spark.sql._

class PrunedScanSource extends RelationProvider {
class PrunedScanSource extends SchemaRelationProvider {
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
parameters: Map[String, String],
schema: Option[StructType] = None): BaseRelation = {
SimplePrunedScan(parameters("from").toInt, parameters("to").toInt)(sqlContext)
}
}
Expand Down
Expand Up @@ -21,10 +21,11 @@ import org.apache.spark.sql._

class DefaultSource extends SimpleScanSource

class SimpleScanSource extends RelationProvider {
class SimpleScanSource extends SchemaRelationProvider {
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
parameters: Map[String, String],
schema: Option[StructType] = None): BaseRelation = {
SimpleScan(parameters("from").toInt, parameters("TO").toInt)(sqlContext)
}
}
Expand Down

0 comments on commit 7787ec7

Please sign in to comment.