Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Support for Spark Connect #1570

Open
1 of 3 tasks
tomvanbussel opened this issue Jan 17, 2023 · 2 comments
Open
1 of 3 tasks

[Feature Request] Support for Spark Connect #1570

tomvanbussel opened this issue Jan 17, 2023 · 2 comments
Labels
enhancement New feature or request
Milestone

Comments

@tomvanbussel
Copy link
Collaborator

Feature request

Overview

The Spark community is adding a new interface to Spark 3.4 that is called Spark Connect. This new interface promises several benefits by separating user code from Spark by adding a gRPC layer between the user code and the driver. We should add a new implementation of the DeltaTable interface that is compatible with Spark Connect.

Motivation

Delta Connect is expected to bring the same benefits as Spark Connect:

  1. Easier upgrading to more recent versions of Spark and Delta, as the client interface is completely decoupled from the server.
  2. Simpler integration of Spark and Delta with developer tooling. IDEs no longer have to integrate with the full Spark and Delta implementation, and instead can integrate with a thin-client.
  3. Support for languages other than Java/Scala and Python. Clients "merely" have to generate Protocol Buffers and therefore become simpler to implement.
  4. Spark and Delta will become more stable, as user code is no longer running in the same JVM as Spark's driver.
  5. Remote connectivity. Code can run anywhere now, as there is a gRPC layer between the user interface and the driver.

Further details

We can add support for Delta to Spark Connect by implementing the extension points that it provides. We can use the extension field in the Relation and Command messages to add Delta specific relations such as DescribeHistory and commands such as Vacuum respectively. On the server-side we can implement the RelationPlugin and CommandPlugin to translate these Protobuf messages to LogicalPlan nodes in Spark.

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute an implementation of this feature?

  • Yes. I can contribute this feature independently.
  • Yes. I would be willing to contribute this feature with guidance from the Delta Lake community.
  • No. I cannot contribute this feature at this time.
@tomvanbussel tomvanbussel added the enhancement New feature or request label Jan 17, 2023
@tdas
Copy link
Contributor

tdas commented Jan 17, 2023

This would be a great addition. Unfortunately we probably have to wait for Spark 3.4 release to make any substantial implementation work. Nonetheless I would love see a bit more design details, especially about how to make DeltaTable APIs work with this (since DeltaTable APIs hook on to logical/physical plans differently from SQL commands). Maybe we need to refactor some stuff internally to make it work?

@tomvanbussel
Copy link
Collaborator Author

tomvanbussel commented Jan 18, 2023

@tdas Here's a short sketch of what the implementation would look like:

We start by creating Protobuf messages for every operation on DeltaTable. Here's a (simplified) example for DeltaTable.history():

message DescribeHistory {
  string table_name = 1;
}

Next we'll introduce pyspark.sql.connect.LogicalPlan nodes in the Python client that are used to generate the Protobuf messages:

from pyspark.sql.connect import LogicalPlan, SparkConnectClient

class DescribeHistory(LogicalPlan):
  def __init__(self, tableName: str):
    self._tableName = tableName

  @override
  def plan(self, client: SparkConnectClient) -> proto.Relation:
    describe = proto.DescribeHistory()
    describe.table_name = self._tableName
    relation = proto.Relation()
    relation.extension.Pack(describe)
    return relation

The client can then create pyspark.sql.connect.DataFrame instances using these plan nodes:

from pyspark.sql.connect import DataFrame, SparkSession

class DeltaTable(object):
  def __init__(self, spark: SparkSession, tableName: str):
    self._spark = spark
    self._tableName = tableName

  def history(self) -> DataFrame:
    return DataFrame.withPlan(DescribeHistory(self._tableName), session=self._spark)

  @classmethod
  def forName(cls, spark: SparkSession, tableName: str) -> "DeltaTable":
    return DeltaTable(spark, tableName)

Calling DataFrame.collect() will generate a Protobuf message for the entire plan, which is then sent to Spark's driver over a gRPC connection. Spark Connect's planner then turns this Protobuf message into LogicalPlan, which it then executes. Finally it sends the result back over gRPC as a stream of serialized Arrow batches (this is all implemented by Spark Connect). Internally Spark Connect's planner will delegate to Delta's planner plugin to transform the DescribeHistory message:

import io.delta.tables.DeltaTable
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.connect.plugin.RelationPlugin

class DeltaRelationPlugin extends RelationPlugin {
  override def transform(relation: protobuf.Any, planner: SparkConnectPlanner): Option[LogicalPlan] = {
    if (!relation.is(classOf[proto.DescribeHistory])) {
      return None
    }
    val history = relation.unpack(classOf[proto.DescribeHistory])
    val deltaTable = DeltaTable.forName(planner.session, history.getTableName)
    Some(deltaTable.history().queryExecution.analyzed)
  }
}

@scottsand-db scottsand-db added this to the 4.0.0 milestone Jun 5, 2024
allisonport-db added a commit that referenced this issue Jun 12, 2024
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

## Description
Add a documentation page for the [Delta
Connect](#1570), in Delta 4.0
preview.
<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
N/A
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

---------

Co-authored-by: Allison Portis <allison.portis@databricks.com>
allisonport-db pushed a commit to allisonport-db/delta that referenced this issue Jun 12, 2024
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

## Description
Add a documentation page for the [Delta
Connect](delta-io#1570), in Delta 4.0
preview.
<!--
- Describe what this PR changes.
- Describe why we need the change.

If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
N/A
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

---------

Co-authored-by: Allison Portis <allison.portis@databricks.com>
(cherry picked from commit 4fac1f1)
allisonport-db pushed a commit to allisonport-db/delta that referenced this issue Jun 13, 2024
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

## Description
Add a documentation page for the [Delta
Connect](delta-io#1570), in Delta 4.0
preview.
<!--
- Describe what this PR changes.
- Describe why we need the change.

If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
N/A
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

---------

Co-authored-by: Allison Portis <allison.portis@databricks.com>
(cherry picked from commit 4fac1f1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Development

No branches or pull requests

3 participants