Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3247][SQL] An API for adding data sources to Spark SQL #2475

Closed
wants to merge 20 commits into from

Conversation

marmbrus
Copy link
Contributor

This PR introduces a new set of APIs to Spark SQL to allow other developers to add support for reading data from new sources in org.apache.spark.sql.sources.

New sources must implement the interface BaseRelation, which is responsible for describing the schema of the data. BaseRelations have three Scan subclasses, which are responsible for producing an RDD containing row objects. The various Scan interfaces allow for optimizations such as column pruning and filter push down, when the underlying data source can handle these operations.

By implementing a class that inherits from RelationProvider these data sources can be accessed using using pure SQL. I've used the functionality to update the JSON support so it can now be used in this way as follows:

CREATE TEMPORARY TABLE jsonTableSQL
USING org.apache.spark.sql.json
OPTIONS (
  path '/home/michael/data.json'
)

Further example usage can be found in the test cases: https://github.com/marmbrus/spark/tree/foreign/sql/core/src/test/scala/org/apache/spark/sql/sources

There is also a library that uses this new API to read avro data available here:
https://github.com/marmbrus/sql-avro

@SparkQA
Copy link

SparkQA commented Sep 21, 2014

QA tests have started for PR 2475 at commit 47d542c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 21, 2014

QA tests have finished for PR 2475 at commit 47d542c.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 23, 2014

QA tests have started for PR 2475 at commit c249907.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 23, 2014

QA tests have finished for PR 2475 at commit c249907.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class CacheCommand(tableName: String, doCache: Boolean) extends Command
    • case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command
    • case class StructField(name: String, dataType: DataType, nullable: Boolean = true)
    • implicit class AvroContext(sqlContext: SQLContext)
    • case class AvroRelation(location: String)(val sqlContext: SQLContext)
    • trait RunnableCommand extends logical.Command
    • case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
    • class DDLParser extends StandardTokenParsers with PackratParsers with Logging
    • protected case class Keyword(str: String)
    • case class CreateForeignTable(
    • trait DataSource
    • abstract class BaseRelation
    • trait TableScan
    • trait PrunedScan
    • trait FilteredScan
    • case class LogicalRelation(relation: BaseRelation)

@SparkQA
Copy link

SparkQA commented Sep 23, 2014

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20692/

@@ -209,6 +209,7 @@ object Catalyst {

object SQL {
lazy val settings = Seq(
libraryDependencies += "org.apache.avro" % "avro" % "1.7.7",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should check whether this conflicts with anything we already have ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only here as an illustration. This library will be its own package likely and I'll remove this before merging.

@rxin
Copy link
Contributor

rxin commented Sep 27, 2014

Moving some of the existing data sources (parquet or json) into using this api would be a great way to test this api's design.

@yhuai

@liancheng
Copy link
Contributor

Considering from a user's perspective, mixing all kinds of data sources is cool, but correspond to TableScan, we also need a TableInsert trait to close the loop. Usually a foreign data source should implement TableScan, and optionally implement TableInsert, which should be generally easier to implement than the former.

@SparkQA
Copy link

SparkQA commented Sep 30, 2014

QA tests have started for PR 2475 at commit 0e45dea.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 30, 2014

QA tests have finished for PR 2475 at commit 0e45dea.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 7, 2014

QA tests have started for PR 2475 at commit b818ee8.

  • This patch does not merge cleanly!

@SparkQA
Copy link

SparkQA commented Oct 7, 2014

QA tests have finished for PR 2475 at commit b818ee8.

  • This patch fails unit tests.
  • This patch does not merge cleanly!

@@ -254,6 +254,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def numPartitions = self.numPartitions

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case l @ foreign.LogicalRelation(t: foreign.TableScan) =>
ExistingRdd(l.output, t.buildScan()) :: Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we add rules for PrunedScan and FilteredScan in this PR?

asfgit pushed a commit that referenced this pull request Oct 10, 2014
This PR is a follow up of #2590, and tries to introduce a top level SQL parser entry point for all SQL dialects supported by Spark SQL.

A top level parser `SparkSQLParser` is introduced to handle the syntaxes that all SQL dialects should recognize (e.g. `CACHE TABLE`, `UNCACHE TABLE` and `SET`, etc.). For all the syntaxes this parser doesn't recognize directly, it fallbacks to a specified function that tries to parse arbitrary input to a `LogicalPlan`. This function is typically another parser combinator like `SqlParser`. DDL syntaxes introduced in #2475 can be moved to here.

The `ExtendedHiveQlParser` now only handle Hive specific extensions.

Also took the chance to refactor/reformat `SqlParser` for better readability.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2698 from liancheng/gen-sql-parser and squashes the following commits:

ceada76 [Cheng Lian] Minor styling fixes
9738934 [Cheng Lian] Minor refactoring, removes optional trailing ";" in the parser
bb2ab12 [Cheng Lian] SET property value can be empty string
ce8860b [Cheng Lian] Passes test suites
e86968e [Cheng Lian] Removes debugging code
8bcace5 [Cheng Lian] Replaces digit.+ to rep1(digit) (Scala style checking doesn't like it)
d15d54f [Cheng Lian] Unifies SQL and HiveQL parsers
@SparkQA
Copy link

SparkQA commented Oct 30, 2014

Test build #22509 has finished for PR 2475 at commit 0fd3a07.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class StructField(name: String, dataType: DataType, nullable: Boolean = true)
    • trait RunnableCommand extends logical.Command
    • case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
    • protected case class Keyword(str: String)
    • sys.error(s"Failed to load class for data source: $provider")
    • trait RelationProvider
    • abstract class BaseRelation
    • trait TableScan
    • trait PrunedScan
    • trait FilteredScan

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22641 has finished for PR 2475 at commit 70da6d9.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging
    • case class StructField(name: String, dataType: DataType, nullable: Boolean = true)
    • trait RunnableCommand extends logical.Command
    • case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
    • protected case class Keyword(str: String)
    • sys.error(s"Failed to load class for data source: $provider")
    • case class EqualTo(attribute: String, value: Any) extends Filter
    • trait RelationProvider
    • abstract class BaseRelation
    • abstract class TableScan extends BaseRelation
    • abstract class PrunedScan extends BaseRelation
    • abstract class FilteredScan extends BaseRelation

@marmbrus
Copy link
Contributor Author

Okay, thanks for all the comments.

I've done the following:

  • Changed the various base relations into abstract class instead of trait.
  • Removed Expression, Attribute, Seq from all function signatures instead using Filter, String and Array (a simple case class that is used only in this package) respectively. This should make binary compatibility / java interop easier, and also takes care of some things like a = 1 vs 1 = a for the user. Right now this only supports equality, but that will be easy to extend. This comes at the cost of expressivity but its probably worth it for long term compatibility.
  • Added test cases that verify that pushdown of filters and column pruning are happening as expected.
  • Pulled Strategy out of the Planner so it is now possible to integrate very tightly if so desired. This interface is appropriately commented as being unstable and only for experimentation.
  • Addressed a bunch of other small comments

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22646 has finished for PR 2475 at commit a70d602.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging
    • case class StructField(name: String, dataType: DataType, nullable: Boolean = true)
    • trait RunnableCommand extends logical.Command
    • case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
    • protected case class Keyword(str: String)
    • sys.error(s"Failed to load class for data source: $provider")
    • case class EqualTo(attribute: String, value: Any) extends Filter
    • trait RelationProvider
    • abstract class BaseRelation
    • abstract class TableScan extends BaseRelation
    • abstract class PrunedScan extends BaseRelation
    • abstract class PrunedFilteredScan extends BaseRelation

@SparkQA
Copy link

SparkQA commented Oct 31, 2014

Test build #22653 has finished for PR 2475 at commit e3e690e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging
    • case class StructField(name: String, dataType: DataType, nullable: Boolean = true)
    • trait RunnableCommand extends logical.Command
    • case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
    • protected case class Keyword(str: String)
    • sys.error(s"Failed to load class for data source: $provider")
    • case class EqualTo(attribute: String, value: Any) extends Filter
    • trait RelationProvider
    • abstract class BaseRelation
    • abstract class TableScan extends BaseRelation
    • abstract class PrunedScan extends BaseRelation
    • abstract class PrunedFilteredScan extends BaseRelation


package org.apache.spark.sql.sources

abstract sealed class Filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make it sealed, will this lead to compatibility issues across Spark versions (if someone has code compiled when there was only one subclass of this, but they link to some version of Spark with 2 of them)? At one point I thought sealed classes got their own numeric ID to avoid isInstanceOf checks, but I'm not sure that really happens. @heathermiller do you know how this works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed sealed to be safe.

@mateiz
Copy link
Contributor

mateiz commented Nov 1, 2014

The new API for sources looks good to me, thanks for making the changes. It will be easy to plug in a lot of neat data sources here.


abstract sealed class Filter

case class EqualTo(attribute: String, value: Any) extends Filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this change to decouple it from the expression hierarchy.

presumably we would want > and < as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wanted to sketch it out first to make sure everyone was onboard. I've added more types now.

@rxin
Copy link
Contributor

rxin commented Nov 1, 2014

I like the new API a lot. Java support. Long term binary compatibility!

Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
	sql/core/src/main/scala/org/apache/spark/sql/package.scala
@SparkQA
Copy link

SparkQA commented Nov 1, 2014

Test build #22711 has finished for PR 2475 at commit 1d41bb5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 2, 2014

Test build #22713 has finished for PR 2475 at commit ab2c31f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Conflicts:
	sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@SparkQA
Copy link

SparkQA commented Nov 2, 2014

Test build #22731 has finished for PR 2475 at commit 1ed6010.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DecimalType(DataType):
    • case class UnscaledValue(child: Expression) extends UnaryExpression
    • case class MakeDecimal(child: Expression, precision: Int, scale: Int) extends UnaryExpression
    • case class MutableLiteral(var value: Any, dataType: DataType, nullable: Boolean = true)
    • abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging
    • case class PrecisionInfo(precision: Int, scale: Int)
    • case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalType
    • final class Decimal extends Ordered[Decimal] with Serializable
    • trait DecimalIsConflicted extends Numeric[Decimal]
    • trait RunnableCommand extends logical.Command
    • case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
    • protected case class Keyword(str: String)
    • sys.error(s"Failed to load class for data source: $provider")
    • case class EqualTo(attribute: String, value: Any) extends Filter
    • case class GreaterThan(attribute: String, value: Any) extends Filter
    • case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter
    • case class LessThan(attribute: String, value: Any) extends Filter
    • case class LessThanOrEqual(attribute: String, value: Any) extends Filter
    • trait RelationProvider
    • abstract class BaseRelation
    • abstract class TableScan extends BaseRelation
    • abstract class PrunedScan extends BaseRelation
    • abstract class PrunedFilteredScan extends BaseRelation

@rxin
Copy link
Contributor

rxin commented Nov 2, 2014

LGTM. You'd want to update the PR description before merging it.

@rxin
Copy link
Contributor

rxin commented Nov 2, 2014

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Nov 2, 2014

Test build #22765 has finished for PR 2475 at commit 1ed6010.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging
    • trait RunnableCommand extends logical.Command
    • case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
    • protected case class Keyword(str: String)
    • sys.error(s"Failed to load class for data source: $provider")
    • case class EqualTo(attribute: String, value: Any) extends Filter
    • case class GreaterThan(attribute: String, value: Any) extends Filter
    • case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter
    • case class LessThan(attribute: String, value: Any) extends Filter
    • case class LessThanOrEqual(attribute: String, value: Any) extends Filter
    • trait RelationProvider
    • abstract class BaseRelation
    • abstract class TableScan extends BaseRelation
    • abstract class PrunedScan extends BaseRelation
    • abstract class PrunedFilteredScan extends BaseRelation

case class GreaterThan(attribute: String, value: Any) extends Filter
case class GreaterThanOrEqual(attribute: String, value: Any) extends Filter
case class LessThan(attribute: String, value: Any) extends Filter
case class LessThanOrEqual(attribute: String, value: Any) extends Filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we only support push down filters with attribute and literal? It looks to me that filters with 2 attribute are also push-able, like 'a < 'b
cc @liancheng

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's because most underlying data sources (like Parquet and ORC) don't support predicates like a < b. JDBC might be the only Spark SQL data source out there that supports atomic predicates involving more than one attributes?

cc @rxin @marmbrus

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was only intended to be an initial set of things that we could push down. We can always add more here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
8 participants