-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-35912][SQL] Fix nullability of spark.read.json
#33212
Conversation
Can one of the admins verify this patch? |
I don't agree with this. It's a correctness bug if an integer value is null in the JSON file but Spark read it as 0. We should fail at runtime. We can either do this null check in the jackson parser, or leverage the |
Yeah, I think we should fail. |
@@ -404,6 +407,19 @@ class JacksonParser( | |||
} | |||
} | |||
|
|||
// When the input schema is setting to `nullable = false`, make sure the field is not null. | |||
var index = 0 | |||
while (badRecordException.isEmpty && !skipRow && index < schema.length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we fix parseJsonToken
instead by passing nullable
bool flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the missing field won't call fieldConverter
, it seems it's hard to do this?
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
Lines 404 to 419 in 2537fe8
while (!skipRow && nextUntil(parser, JsonToken.END_OBJECT)) { | |
schema.getFieldIndex(parser.getCurrentName) match { | |
case Some(index) => | |
try { | |
row.update(index, fieldConverters(index).apply(parser)) | |
skipRow = structFilters.skipRow(row, index) | |
} catch { | |
case e: SparkUpgradeException => throw e | |
case NonFatal(e) if isRoot => | |
badRecordException = badRecordException.orElse(Some(e)) | |
parser.skipChildren() | |
} | |
case None => | |
parser.skipChildren() | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there are 2 cases that can lead to null result:
- the JSON value is null
- the JSON field is missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and for malformed fields, we also set the value to null and put the JSON string to a special column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may not return the JSON string, the reason show in test
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
Outdated
Show resolved
Hide resolved
cc @MaxGekk FYI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for failing the unexpected null case.
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
Show resolved
Hide resolved
assert(exceptionMsg1.contains( | ||
"the null value found when parsing non-nullable field c2.")) | ||
val exceptionMsg2 = intercept[SparkException] { | ||
load("PERMISSIVE", schema, jsonString).collect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Since the field is non-nullable, do we still return Row(1, null) for PERMISSIVE mode? if yes, this may cause the cast struct problem as we talked about before, the field is non-nullable but row.isNullAt(index) is true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I think we can't apply permissive mode here. Failing looks fine to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, PERMISSIVE
mode should not fail at runtime. Shall we fail at the very beginning if the mode is PERMISSIVE
and schema contains non-nullable fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, add not nullable check for PERMISSIVE mode when we initial FailureSafeParser
.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
Outdated
Show resolved
Hide resolved
s"the null value found when parsing non-nullable field ${schema(index).name}.") | ||
} | ||
if (!checkedIndexSet.contains(index)) { | ||
skipRow = structFilters.skipRow(row, index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this change related to the nullability issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, when the input JSON string contains the missing value field, JacksonParser
should be able to skip rows through this null value field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was not done before your PR? The missing field is always filled with null value, and we can use it to filter rows. This doesn't seem to be related to this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example:
val missingFieldInput = """{"c1":1}"""
val schema = StructType(Seq(
StructField("c1", IntegerType, nullable = true),
StructField("c2", IntegerType, nullable = true)
))
val filters = Seq(IsNotNull("c2"))
val options = new JSONOptions(Map.empty[String, String], "GMT", "")
val parser = new JacksonParser(schema, options, false, filters)
val createParser = CreateJacksonParser.string _
val result = parser.parse(missingFieldInput, createParser, UTF8String.fromString)
println(result)
before this pr:
List([1,null])
after this pr:
List()
Please let me know if I've missed out anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a nullability issue, but a performance improvement. Spark will evaluate the filter again and drop the rows. Can we exclude this part from this PR? We need to backport the nullability fix but not the performance improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
Could you update the title and description, too? |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
Show resolved
Hide resolved
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonParserSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
Outdated
Show resolved
Hide resolved
private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) | ||
private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) | ||
private val resultRow = new GenericInternalRow(schema.length) | ||
private val nullResult = new GenericInternalRow(schema.length) | ||
|
||
// As PERMISSIVE mode should not fail at runtime, so fail if the mode is PERMISSIVE and schema | ||
// contains non-nullable fields. | ||
private def disableNotNullableForPermissiveMode: Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this method has side effect and it's better to define it with parentheses.
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
Outdated
Show resolved
Hide resolved
load("PERMISSIVE", schema, jsonString).collect | ||
} | ||
val expectedMsg2 = | ||
"Field c2 is not nullable but PERMISSIVE mode only works with nullable fields." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a migration guide for it? This is a breaking change as PERMISSIVE
is the default mode.
Also cc @HyukjinKwon for this breaking change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another choice is to not respect the user-given schema and always turn it into a nullable schema, if the mode is PERMISSIVE.
77239e4
to
484a17e
Compare
@@ -22,6 +22,10 @@ license: | | |||
* Table of contents | |||
{:toc} | |||
|
|||
## Upgrading from Spark SQL 3.2 to 3.3 | |||
|
|||
- In Spark 3.3, spark will fail when parsing a JSON/CSV string with `PERMISSIVE` mode and schema contains non-nullable fields. You can set mode to `FAILFAST/DROPMALFORMED` if you want to read JSON/CSV with a schema that contains nullable fields. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change also affects parsing CSV string, I will add a unit test for CSV in a separate pr. cc @cloud-fan @HyukjinKwon
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
Outdated
Show resolved
Hide resolved
badRecordException.isEmpty && !skipRow && options.parseMode != PermissiveMode | ||
if (checkNotNullable) { | ||
var index = 0 | ||
while (index < schema.length) { | ||
if (!schema(index).nullable && row.isNullAt(index)) { | ||
throw new IllegalSchemaArgumentException( | ||
s"field ${schema(index).name} is not nullable but it's missing in one record.") | ||
} | ||
index += 1 | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we extract this part as a private method like this?
private lazy val checkNotNullableInRow = if (options.parseMode != PermissiveMode) {
(row: GenericInternalRow, schema: StructType, skipRow: Boolean,
runtimeExceptionOption: Option[Throwable]) => {
val checkNotNullable =
runtimeExceptionOption.isEmpty && !skipRow && options.parseMode != PermissiveMode
if (checkNotNullable) {
...
}
}
}
} else {
(_: GenericInternalRow, _: StructType, _: Boolean, _: Option[Throwable]) => {}
}
kindly ping @HyukjinKwon |
The check failure is not related to this pr. |
@@ -405,10 +405,18 @@ class JacksonParser( | |||
schema.getFieldIndex(parser.getCurrentName) match { | |||
case Some(index) => | |||
try { | |||
row.update(index, fieldConverters(index).apply(parser)) | |||
val fieldValue = fieldConverters(index).apply(parser) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I .. am not sure if we really need these complicated fix to address nullability mismatch (which is rather a corner case) to be honest. I wonder if there's a simpler approach, e.g.) simply warning on non-nullable columns?
Just to be clear, I don't mind if other committers prefer to fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's another proposal I mentioned earlier: if the user-given schema is not-nullable, we just turn it into nullable schema and don't fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your suggestions, I'll raise a new PR.
### What changes were proposed in this pull request? Rework [PR](#33212) with suggestions. This PR make `spark.read.json()` has the same behavior with Datasource API `spark.read.format("json").load("path")`. Spark should turn a non-nullable schema into nullable when using API `spark.read.json()` by default. Here is an example: ```scala val schema = StructType(Seq(StructField("value", StructType(Seq( StructField("x", IntegerType, nullable = false), StructField("y", IntegerType, nullable = false) )), nullable = true ))) val testDS = Seq("""{"value":{"x":1}}""").toDS spark.read .schema(schema) .json(testDS) .printSchema() spark.read .schema(schema) .format("json") .load("/tmp/json/t1") .printSchema() // root // |-- value: struct (nullable = true) // | |-- x: integer (nullable = true) // | |-- y: integer (nullable = true) ``` Before this pr: ``` // output of spark.read.json() root |-- value: struct (nullable = true) | |-- x: integer (nullable = false) | |-- y: integer (nullable = false) ``` After this pr: ``` // output of spark.read.json() root |-- value: struct (nullable = true) | |-- x: integer (nullable = true) | |-- y: integer (nullable = true) ``` - `spark.read.csv()` also has the same problem. - Datasource API `spark.read.format("json").load("path")` do this logical when resolve relation. https://github.com/apache/spark/blob/c77acf0bbc25341de2636649fdd76f9bb4bdf4ed/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L415-L421 ### Does this PR introduce _any_ user-facing change? Yes, `spark.read.json()` and `spark.read.csv()` not respect the user-given schema and always turn it into a nullable schema by default. ### How was this patch tested? New test. Closes #33436 from cfmcgrady/SPARK-35912-v3. Authored-by: Fu Chen <cfmcgrady@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
What changes were proposed in this pull request?
Rework PR with suggestions.
This PR changes the behavior of spark.read.json when nullability fields are false. the JacksonParser should fail and throw an exception instead of setting a null value.
here is an example:
output before this pr:
output has different behavior with different parse modes after this pr:
throw an IllegalSchemaArgumentException which contains the message:
throw an IllegalSchemaArgumentException which contains the message:
Does this PR introduce any user-facing change?
Yes. when reading a missing value(or null value) JSON with a not nullable field, Spark will fail and throw an exception for PERMISSIVE/FAILFAST mode, and drop records for DROPMALFORMED mode.
How was this patch tested?
New test.