Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35912][SQL] Fix nullability of spark.read.json/spark.read.csv #33436

Closed
wants to merge 3 commits into from

Conversation

cfmcgrady
Copy link
Contributor

@cfmcgrady cfmcgrady commented Jul 20, 2021

What changes were proposed in this pull request?

Rework PR with suggestions.

This PR make spark.read.json() has the same behavior with Datasource API spark.read.format("json").load("path"). Spark should turn a non-nullable schema into nullable when using API spark.read.json() by default.

Here is an example:

  val schema = StructType(Seq(StructField("value",
    StructType(Seq(
      StructField("x", IntegerType, nullable = false),
      StructField("y", IntegerType, nullable = false)
    )),
    nullable = true
  )))

  val testDS = Seq("""{"value":{"x":1}}""").toDS
  spark.read
    .schema(schema)
    .json(testDS)
    .printSchema()

  spark.read
    .schema(schema)
    .format("json")
    .load("/tmp/json/t1")
    .printSchema()
  // root
  //  |-- value: struct (nullable = true)
  //  |    |-- x: integer (nullable = true)
  //  |    |-- y: integer (nullable = true)

Before this pr:

// output of spark.read.json()
root
 |-- value: struct (nullable = true)
 |    |-- x: integer (nullable = false)
 |    |-- y: integer (nullable = false)

After this pr:

// output of spark.read.json()
root
 |-- value: struct (nullable = true)
 |    |-- x: integer (nullable = true)
 |    |-- y: integer (nullable = true)
  • spark.read.csv() also has the same problem.
  • Datasource API spark.read.format("json").load("path") do this logical when resolve relation.

HadoopFsRelation(
fileCatalog,
partitionSchema = partitionSchema,
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
caseInsensitiveOptions)(sparkSession)

Does this PR introduce any user-facing change?

Yes, spark.read.json() and spark.read.csv() not respect the user-given schema and always turn it into a nullable schema by default.

How was this patch tested?

New test.

@github-actions github-actions bot added the SQL label Jul 20, 2021
@cfmcgrady
Copy link
Contributor Author

cc @cloud-fan @HyukjinKwon @maropu

@cloud-fan
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jul 20, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45834/

@SparkQA
Copy link

SparkQA commented Jul 20, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45834/

@srowen
Copy link
Member

srowen commented Jul 20, 2021

For my information - is the purpose here to then cause a failure at runtime when the null values are read, and the original schema is applied, which forbids null?

@cloud-fan
Copy link
Contributor

is the purpose here to then cause a failure at runtime when the null values are read

No, the original schema won't be applied. This PR will turn it into nullable schema. This is the same as reading JSON/CSV from files.

@SparkQA
Copy link

SparkQA commented Jul 20, 2021

Test build #141318 has finished for PR 33436 at commit 3974a43.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@github-actions github-actions bot added the DOCS label Jul 21, 2021
@SparkQA
Copy link

SparkQA commented Jul 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45890/

@SparkQA
Copy link

SparkQA commented Jul 21, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45890/

@SparkQA
Copy link

SparkQA commented Jul 21, 2021

Test build #141374 has finished for PR 33436 at commit 741d0c9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 21, 2021

Kubernetes integration test unable to build dist.

exiting with code: 1
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45913/

@SparkQA
Copy link

SparkQA commented Jul 21, 2021

Test build #141395 has finished for PR 33436 at commit 56ceec4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@HyukjinKwon
Copy link
Member

Merged to master.

@cfmcgrady
Copy link
Contributor Author

Thank you for the review. @cloud-fan @HyukjinKwon @srowen

@@ -22,6 +22,10 @@ license: |
* Table of contents
{:toc}

## Upgrading from Spark SQL 3.2 to 3.3

- Since Spark 3.3, Spark turns a non-nullable schema into nullable for API `DataFrameReader.schema(schema: StructType).json(jsonDataset: Dataset[String])` and `DataFrameReader.schema(schema: StructType).csv(csvDataset: Dataset[String])` when the schema is specified by the user and contains non-nullable fields.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually underestimated this problem. It can actually change the results:

import org.apache.spark.sql.types._
val ds = Seq("a,", "a,b").toDS
spark.read.schema(
  StructType(
    StructField("f1", StringType, nullable = false) ::
    StructField("f2", StringType, nullable = false) :: Nil)
  ).option("mode", "FAILFAST").csv(ds).show()

Before:

+---+---+
| f1| f2|
+---+---+
|  a|  b|
+---+---+

After:

+---+----+
| f1|  f2|
+---+----+
|  a|null|
|  a|   b|
+---+----+

I think we should at least add a legacy configuration .. let me make a quick followup.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HyukjinKwon added a commit that referenced this pull request May 5, 2022
…ng nullability in DataFrame.schema.csv/json(ds)

### What changes were proposed in this pull request?

This PR is a followup of #33436, that adds a legacy configuration. It's found that it can break a valid usacase (https://github.com/apache/spark/pull/33436/files#r863271189):

```scala
import org.apache.spark.sql.types._
val ds = Seq("a,", "a,b").toDS
spark.read.schema(
  StructType(
    StructField("f1", StringType, nullable = false) ::
    StructField("f2", StringType, nullable = false) :: Nil)
  ).option("mode", "DROPMALFORMED").csv(ds).show()
```

**Before:**

```
+---+---+
| f1| f2|
+---+---+
|  a|  b|
+---+---+
```

**After:**

```
+---+----+
| f1|  f2|
+---+----+
|  a|null|
|  a|   b|
+---+----+
```

This PR adds a configuration to restore **Before** behaviour.

### Why are the changes needed?

To avoid breakage of valid usecases.

### Does this PR introduce _any_ user-facing change?

Yes, it adds a new configuration `spark.sql.legacy.respectNullabilityInTextDatasetConversion` (`false` by default) to respect the nullability in `DataFrameReader.schema(schema).csv(dataset)` and `DataFrameReader.schema(schema).json(dataset)` when the user-specified schema is provided.

### How was this patch tested?

Unittests were added.

Closes #36435 from HyukjinKwon/SPARK-35912.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request May 5, 2022
…ng nullability in DataFrame.schema.csv/json(ds)

### What changes were proposed in this pull request?

This PR is a followup of #33436, that adds a legacy configuration. It's found that it can break a valid usacase (https://github.com/apache/spark/pull/33436/files#r863271189):

```scala
import org.apache.spark.sql.types._
val ds = Seq("a,", "a,b").toDS
spark.read.schema(
  StructType(
    StructField("f1", StringType, nullable = false) ::
    StructField("f2", StringType, nullable = false) :: Nil)
  ).option("mode", "DROPMALFORMED").csv(ds).show()
```

**Before:**

```
+---+---+
| f1| f2|
+---+---+
|  a|  b|
+---+---+
```

**After:**

```
+---+----+
| f1|  f2|
+---+----+
|  a|null|
|  a|   b|
+---+----+
```

This PR adds a configuration to restore **Before** behaviour.

### Why are the changes needed?

To avoid breakage of valid usecases.

### Does this PR introduce _any_ user-facing change?

Yes, it adds a new configuration `spark.sql.legacy.respectNullabilityInTextDatasetConversion` (`false` by default) to respect the nullability in `DataFrameReader.schema(schema).csv(dataset)` and `DataFrameReader.schema(schema).json(dataset)` when the user-specified schema is provided.

### How was this patch tested?

Unittests were added.

Closes #36435 from HyukjinKwon/SPARK-35912.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 6689b97)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants