Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42457][CONNECT] Adding SparkSession#read #40025

Closed
wants to merge 6 commits into from

Conversation

zhenlineo
Copy link
Contributor

@zhenlineo zhenlineo commented Feb 15, 2023

What changes were proposed in this pull request?

Add SparkSession Read API to read data into Spark via Scala Client:

DataFrameReader.format(…).option(“key”, “value”).schema(…).load()

The following methods are skipped by the Scala Client on purpose:

[info]   deprecated method json(org.apache.spark.api.java.JavaRDD)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version
[info]   deprecated method json(org.apache.spark.rdd.RDD)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version
[info]   method json(org.apache.spark.sql.Dataset)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version
[info]   method csv(org.apache.spark.sql.Dataset)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version

Why are the changes needed?

To read data from csv etc. format.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

E2E, Golden tests.

extraOptions.foreach {
case (k, v) => dataSourceBuilder.putOptions(k, v)
}
paths.foreach(path => dataSourceBuilder.addPaths(path))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also set path/paths in the options. How does this work? Does the server reconcile all these path options?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell I do not think we can anything special here. The server planner will call the original SQL code and merge it based on the config settings. e.g.

def load(paths: String*): DataFrame = {
....
    val legacyPathOptionBehavior = sparkSession.sessionState.conf.legacyPathOptionBehavior
    if (!legacyPathOptionBehavior &&
        (extraOptions.contains("path") || extraOptions.contains("paths")) && paths.nonEmpty) {
      throw QueryCompilationErrors.pathOptionNotSetCorrectlyWhenReadingError()
    }

    DataSource.lookupDataSourceV2(source, sparkSession.sessionState.conf).flatMap { provider =>
      DataSourceV2Utils.loadV2Source(sparkSession, provider, userSpecifiedSchema, extraOptions,
        source, paths: _*)
    }.getOrElse(loadV1Source(paths: _*))

If we merge them on the client we by default assumed the config value. So the best is actually leave the result to server SQL API to handle the merging of paths.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, that works for me. Can you make sure we test this?

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Please provide clarity on path handling.

@hvanhovell
Copy link
Contributor

@zhenlineo can you make the PR description a bit more descriptive?

@hvanhovell
Copy link
Contributor

Oh and can you add the ticket to the title?

@amaliujia
Copy link
Contributor

So it is ok to not have e2e tests for read API (similarly for the write side)?

.schema(StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil))
.option("op1", "op1")
.options(Map("op2" -> "op2"))
.load(testDataPath.resolve("people.txt").toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need the physical data files checked in? If we only compare the plans then we only need a fake data path?

Copy link
Contributor Author

@zhenlineo zhenlineo Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server test ProtoToParsedPlanTestSuite runs this test with data. Yet, it does not verify the read data result correctness. (e.g. the content of the csv is loaded correctly or not)

Copy link
Contributor

@amaliujia amaliujia Feb 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong but I am thinking if you offer a schema in the proto then the server side might not need to load the data: it loads the data as it needs to infer the scheme by reading the data directly when a schema is not set. I am not sure though except this case if server side still has to read the data directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm, tried with json file, but the schema seems not enough. :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sure :)

@zhenlineo zhenlineo changed the title [CONNECT] Adding SparkSession#read [SPARK-42457][CONNECT] Adding SparkSession#read Feb 15, 2023
@zhenlineo zhenlineo marked this pull request as ready for review February 15, 2023 22:31
@amaliujia
Copy link
Contributor

LGTM

Looks like you only need ./build/mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=false -Dscalafmt.validateOnly=false -Dscalafmt.changedOnly=false -pl connector/connect/common -pl connector/connect/server -pl connector/connect/client/jvm to fix the style.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

hvanhovell pushed a commit that referenced this pull request Feb 16, 2023
### What changes were proposed in this pull request?

Add SparkSession Read API to read data into Spark via Scala Client:
```
DataFrameReader.format(…).option(“key”, “value”).schema(…).load()
```

The following methods are skipped by the Scala Client on purpose:
```
[info]   deprecated method json(org.apache.spark.api.java.JavaRDD)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version
[info]   deprecated method json(org.apache.spark.rdd.RDD)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version
[info]   method json(org.apache.spark.sql.Dataset)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version
[info]   method csv(org.apache.spark.sql.Dataset)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version
```

### Why are the changes needed?
To read data from csv etc. format.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
E2E, Golden tests.

Closes #40025 from zhenlineo/session-read.

Authored-by: Zhen Li <zhenlineo@users.noreply.github.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
(cherry picked from commit 8d863e3)
Signed-off-by: Herman van Hovell <herman@databricks.com>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
### What changes were proposed in this pull request?

Add SparkSession Read API to read data into Spark via Scala Client:
```
DataFrameReader.format(…).option(“key”, “value”).schema(…).load()
```

The following methods are skipped by the Scala Client on purpose:
```
[info]   deprecated method json(org.apache.spark.api.java.JavaRDD)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version
[info]   deprecated method json(org.apache.spark.rdd.RDD)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version
[info]   method json(org.apache.spark.sql.Dataset)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version
[info]   method csv(org.apache.spark.sql.Dataset)org.apache.spark.sql.Dataset in class org.apache.spark.sql.DataFrameReader does not have a correspondent in client version
```

### Why are the changes needed?
To read data from csv etc. format.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
E2E, Golden tests.

Closes apache#40025 from zhenlineo/session-read.

Authored-by: Zhen Li <zhenlineo@users.noreply.github.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
(cherry picked from commit 8d863e3)
Signed-off-by: Herman van Hovell <herman@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants