Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25935][SQL] Prevent null rows from JSON parser #22938

Closed
wants to merge 9 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Nov 4, 2018

What changes were proposed in this pull request?

An input without valid JSON tokens on the root level will be treated as a bad record, and handled according to mode. Previously such input was converted to null. After the changes, the input is converted to a row with nulls in the PERMISSIVE mode according the schema. This allows to remove a code in the from_json function which can produce null as result rows.

How was this patch tested?

It was tested by existing test suites. Some of them I have to modify (JsonSuite for example) because previously bad input was just silently ignored. For now such input is handled according to specified mode.

@HyukjinKwon
Copy link
Member

add to whitelist

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Nov 4, 2018

Test build #98439 has finished for PR 22938 at commit 0589d91.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -240,16 +240,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Seq(Row("1"), Row("2")))
}

test("SPARK-11226 Skip empty line in json file") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the test because it is not relevant to the default mode PERMISSIVE any more. And the SQLQuerySuite is not perfect place for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is it moved to then? Does that mean we don't have a regression test for SPARK-11226 anymore?

@SparkQA
Copy link

SparkQA commented Nov 4, 2018

Test build #98444 has finished for PR 22938 at commit 0589d91.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 4, 2018

Test build #98448 has finished for PR 22938 at commit d1bad7c.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 4, 2018

Test build #98452 has finished for PR 22938 at commit c4d6a80.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 6, 2018

Test build #98527 has finished for PR 22938 at commit a7e016a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 7, 2018

@HyukjinKwon Are you ok with the changes?

@attilapiros
Copy link
Contributor

attilapiros commented Nov 7, 2018

@MaxGekk I have checked out your PR and played a little bit with it: created a new unit test as a copy of "from_json - input=array, schema=array, output=array" with an invalid JSON.

I expected to get an InternalRow(null) for an array schema but I got null.

After debugging a little I have found the reason is result is InternalRow(null) after val result = rows.next() in the convertRow method.
And calling the getArray(0) on it gives back the null. The same is true for calling getMap(0) (it gives back null).

Please fix these and add a small unit test for each.

@HyukjinKwon
Copy link
Member

Yea, looks fine in general. Will take a look within this week or weekends.

@HyukjinKwon
Copy link
Member

@attilapiros, mind showing rough small test codes for it please? just want to see if this is something we should fix or not.

@attilapiros
Copy link
Contributor

@HyukjinKwon Sure, the test would be for invalid JSON array:

test("from_json - input=invalid JSON array, schema=array, output=array") {
    val input = """[{"a": 1}, {a": 2}]"""
    val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
    val output = InternalRow(1) :: InternalRow(2) :: Nil
    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), InternalRow(null))
  }

I have corrupted the JSON by removing " from the key at the second element.

Running this test fails with:

Incorrect evaluation (codegen off): from_json(ArrayType(StructType(StructField(a,IntegerType,true)),true), [{"a": 1}, {a": 2}], Some(GMT)), actual: null, expected: [null]
ScalaTestFailureLocation: org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper$class at (ExpressionEvalHelper.scala:191)
org.scalatest.exceptions.TestFailedException: Incorrect evaluation (codegen off): from_json(ArrayType(StructType(StructField(a,IntegerType,true)),true), [{"a": 1}, {a": 2}], Some(GMT)), actual: null, expected: [null]
	at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
	at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
	at org.scalatest.FunSuite.fail(FunSuite.scala:1560)

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 7, 2018

I guess the problem belongs to FailureSafeParser, in particular

private val nullResult = new GenericInternalRow(schema.length)
. FailureSafeParser was created to safety parse structs not arrays and maps. I think need to properly prepare nullResult for arrays and maps. I will look at it. Thank you @attilapiros for the example.

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 7, 2018

At least it doesn't fail on the cases https://github.com/apache/spark/pull/22938/files#diff-6626026091295ad8c0dfb66ecbcd04b1R568 and https://github.com/apache/spark/pull/22938/files#diff-6626026091295ad8c0dfb66ecbcd04b1R565 which this PR addresses actually. So, I am getting exactly one row from FailureSafeParser

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 7, 2018

I made a fix for broken array and map in JsonToStructs because inside of FailureSafeParser is not clear from where the call came. I am still not sure that wrapping actual type by StructType before passing it to FailureSafeParser was right decision in #22237 /cc @cloud-fan Doing this we cannot distinguish ArrayType/MapType as root type from StructType(StructField(ArrayType/MapType)), and return appropriate null result in the case of bad record.

@SparkQA
Copy link

SparkQA commented Nov 7, 2018

Test build #98555 has finished for PR 22938 at commit 5c17aef.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 7, 2018

Test build #98565 has finished for PR 22938 at commit 9a38626.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -15,6 +15,8 @@ displayTitle: Spark SQL Upgrading Guide

- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.

- In Spark version 2.4 and earlier, JSON data source and the `from_json` function produced `null`s if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for curiosity, how can the json data source return null rows?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we use the data source, we can specify the schema as StructType only. In that case, we get a Seq[InternalRow] or Nil from JacksonParser which is flatMapped, or BadRecordException which is converted to Iterator[InternalRow]. It seems there is no way to get null rows. The difference between JSON datasource and JSON functions is formers don't (and cannot) do flattening. So, the Nil case should be handled especially (this PR addresses the case).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Spark version 2.4 and earlier, JSON data source and the from_json function produced nulls

Shall we update this? According to what you said, JSON data source can't produce null.

case _: StructType => (row: InternalRow) => row
case _: ArrayType => (row: InternalRow) =>
if (row.isNullAt(0)) {
new GenericArrayData(Array())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the place from_json is different from json data source. A data source must produce data as rows, while the from_json can return array or map.

I think the previous behavior also makes sense. For array/map, we don't have the corrupted column, and returning null is reasonable. Actually I prefer null over empty array/map, but we need more discussion about this behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also thought what is better to return here - null or empty Array/MapData. In the case of StructType we return Row in the PERMISSIVE mode. For consistency should we return empty array/map in this mode too?

Maybe we can consider special mode when we can return null for the bad record? For now it is easy to do since we use FailureSafeParser.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we need more discussion about this behavior.

@cloud-fan Should I send an email to the dev list or we can discuss this here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could revert the recent commits and prepare a separate PR for the behaviour change. WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay to return null for map and array. Let's make some changes to make it null for map and array.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have discarded the recent changes.

@SparkQA
Copy link

SparkQA commented Nov 9, 2018

Test build #98655 has finished for PR 22938 at commit 84de402.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 10, 2018

Test build #98660 has finished for PR 22938 at commit 9132af3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 11, 2018

Test build #98701 has finished for PR 22938 at commit 35b3013.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

assert(!rows.hasNext)
castRow(result)
} else {
throw new IllegalArgumentException("Expected one row from JSON parser.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can only happen when we have a bug, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it must not happen.

@@ -1115,6 +1115,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for json data source, previous behavior is, we would skip the row even it's in PERMISSIVE mode. Shall we clearly mention it in the migration guide?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for json data source, previous behavior is, we would skip the row even it's in PERMISSIVE mode.

Yes, we skipped such rows if Jackson parser wasn't able to find any root tokens. So, not only empty strings and gaps got into the category.

Shall we clearly mention it in the migration guide?

Sure.

@@ -1813,6 +1817,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val path = dir.getCanonicalPath
primitiveFieldAndType
.toDF("value")
.repartition(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the repartition required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I remember I added the repartition(1) here and in other places because to eliminate empty files. Such empty files are produced by empty partitions. Probably we could avoid writing empty files at least in the case of text-based datasources but any case let's look at TextOutputWriter, for example. It creates an input stream for a file in its constructor:

private val writer = CodecStreams.createOutputStream(context, new Path(path))

and closes the empty file in . So, even if we didn't write anythings to the file, it creates an empty file.

From the read side, when Jackson parser tries to read the empty file, it cannot detect any JSON tokens on the root level and returns null from nextToken() for which I throw a bad record exception for now -> Row(...) in PERMISSIVE mode.

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 15, 2018

@cloud-fan @HyukjinKwon Do you agree with the proposed changes, or there is anything which blocks the PR for now?

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 21, 2018

@HyukjinKwon @cloud-fan May I ask you to look at this PR one more time.

@cloud-fan
Copy link
Contributor

LGTM except the migration guide. JSON data source can't produce null rows, but skip it even with permisive mode.

@SparkQA
Copy link

SparkQA commented Nov 22, 2018

Test build #99143 has finished for PR 22938 at commit 6a8cac3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@SparkQA
Copy link

SparkQA commented Nov 22, 2018

Test build #99144 has finished for PR 22938 at commit 1ef2d5b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 38628dd Nov 22, 2018
@@ -1892,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.text(path)

val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path)
assert(jsonDF.count() === corruptRecordCount)
assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, does this mean that it reads an empty record from empty file after this change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's true, we should not do this. Empty files can be generated in many cases for now and the behaviour is not currently well defined. If we rely on this behaviour, it will cause some weird behaviours or bugs hard to fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we skip empty files for all the file-based data sources?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Nov 22, 2018

Sorry for the late response. The change looks good to me too in general but I had two questions (see also #22938 (comment)).

cloud-fan pushed a commit that referenced this pull request Jan 15, 2019
## What changes were proposed in this pull request?

This PR reverts  #22938 per discussion in #23325

Closes #23325

Closes #23543 from MaxGekk/return-nulls-from-json-parser.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

An input without valid JSON tokens on the root level will be treated as a bad record, and handled according to `mode`. Previously such input was converted to `null`. After the changes, the input is converted to a row with `null`s in the `PERMISSIVE` mode according the schema. This allows to remove a code in the `from_json` function which can produce `null` as result rows.

## How was this patch tested?

It was tested by existing test suites. Some of them I have to modify (`JsonSuite` for example) because previously bad input was just silently ignored. For now such input is handled according to specified `mode`.

Closes apache#22938 from MaxGekk/json-nulls.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

This PR reverts  apache#22938 per discussion in apache#23325

Closes apache#23325

Closes apache#23543 from MaxGekk/return-nulls-from-json-parser.

Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@MaxGekk MaxGekk deleted the json-nulls branch August 17, 2019 13:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants