Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26745][SQL] Skip empty lines in JSON-derived DataFrames when skipParsing optimization in effect #23665

Closed
wants to merge 17 commits into from

Conversation

Projects
None yet
6 participants
@sumitsu
Copy link
Contributor

commented Jan 28, 2019

What changes were proposed in this pull request?

This PR updates FailureSafeParser to allow text-input data sources to optionally specify a "fast" emptiness check for records, to be applied in cases where full parsing is disabled (i.e. where skipParsing==true: non-multiline + permissive-mode + empty schema).

TextInputJsonDataSource is updated such that it creates FailureSafeParser with an emptiness check which filters out blank (or all-whitespace) lines. This behavior resolves SPARK-26745 by preventing count() from including blank lines (which the full parser ignores) under conditions where skipParsing is enabled.

How was this patch tested?

Existing JsonSuite unit tests, supplemented by a new test case which verifies that pre-parsing and post-parsing count() values are equal for JSON-derived DataFrames.

JsonBenchmark performance test results

The JsonBenchmark performance suite was executed on the following branches:

  • this PR (branch: sumitsu/spark:json_emptyline_count)
  • apache/spark:master branch
  • apache/spark:master branch, modified to not use the SPARK-24959 optimization

The no-optimization code base was simulated by hard-coding the skipParsing flag in FailureSafeParser to false for the test.

Compared with the no-optimization scenario, this PR appears to preserve most of the SPARK-24959 optimization performance gains, but there is a small performance regression compared with master.

Summary charts:

test environment:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.12.6
Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz

with changes in this PR (branch: sumitsu/spark:json_emptyline_count)

JSON schema inferring:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
No encoding                               397395 / 422450          0.3        3973.9       1.0X
UTF-8 is set                              430505 / 436580          0.2        4305.1       0.9X

count a short column:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
No encoding                                 18986 / 19018          5.3         189.9       1.0X
UTF-8 is set                                18848 / 18954          5.3         188.5       1.0X

count a wide column:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
No encoding                                 39076 / 39130          0.3        3907.6       1.0X
UTF-8 is set                                39383 / 39455          0.3        3938.3       1.0X

Select a subset of 10 columns:           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Select 10 columns + count()                 14586 / 14904          0.7        1458.6       1.0X
Select 1 column + count()                   10969 / 10992          0.9        1096.9       1.3X
count()                                       2740 / 2755          3.6         274.0       5.3X

creation of JSON parser per line:        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Short column without encoding                 6822 / 6870          1.5         682.2       1.0X
Short column with UTF-8                       8901 / 8937          1.1         890.1       0.8X
Wide column without encoding              140199 / 140659          0.1       14019.9       0.0X
Wide column with UTF-8                    158228 / 158439          0.1       15822.8       0.0X

apache/spark:master branch

JSON schema inferring:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
No encoding                               376210 / 378100          0.3        3762.1       1.0X
UTF-8 is set                              410952 / 414711          0.2        4109.5       0.9X

count a short column:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
No encoding                                 12871 / 12904          7.8         128.7       1.0X
UTF-8 is set                                12857 / 12932          7.8         128.6       1.0X

count a wide column:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
No encoding                                 38650 / 38680          0.3        3865.0       1.0X
UTF-8 is set                                38751 / 38774          0.3        3875.1       1.0X

Select a subset of 10 columns:           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Select 10 columns + count()                 14570 / 14986          0.7        1457.0       1.0X
Select 1 column + count()                   11410 / 11757          0.9        1141.0       1.3X
count()                                       2346 / 2367          4.3         234.6       6.2X

creation of JSON parser per line:        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Short column without encoding                 6596 / 6708          1.5         659.6       1.0X
Short column with UTF-8                       8867 / 8902          1.1         886.7       0.7X
Wide column without encoding              139712 / 139725          0.1       13971.2       0.0X
Wide column with UTF-8                    156809 / 156832          0.1       15680.9       0.0X

optimization disabled

JSON schema inferring:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
No encoding                               375309 / 376301          0.3        3753.1       1.0X
UTF-8 is set                              442666 / 448741          0.2        4426.7       0.8X

count a short column:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
No encoding                                 39014 / 39036          2.6         390.1       1.0X
UTF-8 is set                                66988 / 67107          1.5         669.9       0.6X

count a wide column:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
No encoding                                 62555 / 62712          0.2        6255.5       1.0X
UTF-8 is set                                85354 / 85509          0.1        8535.4       0.7X

Select a subset of 10 columns:           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Select 10 columns + count()                 17173 / 17249          0.6        1717.3       1.0X
Select 1 column + count()                   11503 / 11514          0.9        1150.3       1.5X
count()                                     13806 / 13849          0.7        1380.6       1.2X

creation of JSON parser per line:        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Short column without encoding                 6388 / 6432          1.6         638.8       1.0X
Short column with UTF-8                       8910 / 8923          1.1         891.0       0.7X
Wide column without encoding              135854 / 135964          0.1       13585.4       0.0X
Wide column with UTF-8                    154108 / 154186          0.1       15410.8       0.0X

sumitsu added some commits Jan 21, 2019

@maropu

This comment has been minimized.

Copy link
Member

commented Jan 28, 2019

@maropu

This comment has been minimized.

Copy link
Member

commented Jan 28, 2019

ok to test

}
} else {
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
}

This comment has been minimized.

Copy link
@maropu

maropu Jan 28, 2019

Member

We need to modify this file? Since this is a issue in json stuffs, I think its better to handle this case in the json parser side. Can't we do handle this in the same way with CSV one?, e.g.,

val filteredLines: Iterator[String] = CSVExprUtils.filterCommentAndEmpty(lines, options)

This comment has been minimized.

Copy link
@sumitsu

sumitsu Jan 28, 2019

Author Contributor

One of my earlier revisions worked in the way you suggest (if I've understood your point correctly); I changed it in order to avoid redundant empty-line filtering in the case where the full parser has to run anyway (i.e. where skipParsing == false).

What do you think? Is that a valid optimization, or is it better to do it on the JSON side as in 13942b8 to avoid changes to FailureSafeParser?

This comment has been minimized.

Copy link
@maropu

maropu Jan 28, 2019

Member

I thought this: maropu@f4df907
But, you should follow other guys who are familiar this part, @HyukjinKwon and @MaxGekk

val withEmptyLineData = Array(Map("a" -> 1, "b" -> 2, "c" -> 3),
Map("a" -> 4, "b" -> 5, "c" -> 6),
Map("a" -> 7, "b" -> 8, "c" -> 9))
val df = spark.read.json("src/test/resources/test-data/with-empty-line.json")

This comment has been minimized.

Copy link
@maropu

maropu Jan 28, 2019

Member

plz use testFile.

assert(df.count() === withEmptyLineData.length,
"JSON DataFrame parsed-count should exclude whitespace-only lines")
val collected = df.collect().map(_.getValuesMap(Seq("a", "b", "c")))
assert(collected === withEmptyLineData)

This comment has been minimized.

Copy link
@maropu

maropu Jan 28, 2019

Member

plz check checkAnswer.

assert(df.count() === withEmptyLineData.length,
"JSON DataFrame unparsed-count should exclude whitespace-only lines")
// cache and collect to check that count stays stable under those operations
df.cache()

This comment has been minimized.

Copy link
@maropu

maropu Jan 28, 2019

Member

we dont need this cache.

rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
}
if (skipParsing) {
if (unparsedRecordIsNonEmpty(input)) {

This comment has been minimized.

Copy link
@HyukjinKwon

HyukjinKwon Jan 28, 2019

Member

I think we should rather revert #21909. I think #21909 was a bandaid fix and this is another bandaid fix for that.

JacksonParser itself can produce no record or multiple records. Previous code path assumed that it always produce a single record, and the current fix it checked the input again outside of JacksonParser.

There is another problem from #21909 . It also looks going to produce incorrect counts when the input json is an array:

$ cat tmp.json
[{"a": 1}, {"a": 2}]

Current master:

scala> spark.read.json("tmp.json").show()
+---+
|  a|
+---+
|  1|
|  2|
+---+


scala> spark.read.json("tmp.json").count()
res1: Long = 1

Spark 2.3.1:

scala> spark.read.json("tmp.json").show()
+---+
|  a|
+---+
|  1|
|  2|
+---+


scala> spark.read.json("tmp.json").count()
res1: Long = 2

This comment has been minimized.

Copy link
@HyukjinKwon

HyukjinKwon Jan 28, 2019

Member

cc @cloud-fan and @gatorsmile, if you don't mind, I would like to revert #21909. WDYT?

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 28, 2019

Contributor

that's a good catch! I think the idea of count optimization still makes sense, but our parser is so flexible and there are many corner cases we need to think of.

cc @MaxGekk , how hard do you think it is to fix it? If it's too hard, +1 to revert it. I think a safer approach maybe, only enable this count optimization under some certain cases that are 100% safe. (whitelist).

This comment has been minimized.

Copy link
@HyukjinKwon

HyukjinKwon Jan 28, 2019

Member

Yea, that's safer approach if possible but to do that, we should manually check the input like the current PR. It doesn't looks a good idea to me to check input outside of JacksonParser.

The problem is, we cannot distinguish the cases below without parsing:

[{...}, {...}]
[]
{...}
# empty string

One line (input: IN) can be, 0 record, 1 record and multiple records.

This comment has been minimized.

Copy link
@MaxGekk

MaxGekk Jan 28, 2019

Contributor

The case when an user sets StructType for arrays, can be excluded from the count optimization in advance.

Regarding empty (blank) string, before #23543 they are considered as bad records (appear in results). And count() produced pretty consistent results.

As far as you know, in the case of count we have empty StructType as the required schema. It means we don't have to fully parse the input and convert all field to desired types. It means count() can "count" bad records. And we cannot compare number of rows in show() output to count(). There are always the case when the number of rows can be different. I think we should answer to more generic question - which input conform to empty StructType(). After that it should be clear what kind of optimization can be applied for count(). Possible answers:

  • readable string by Hadoop LineReader
  • any text with length > 0 ( we can do cheap filtering linesReader.filter(_.getLength > 0)
  • any valid UTF8 String (Hadoop's LineReader does not check that)
  • anything parsable by Jackson parser
  • Anything on which FailureSafeParser doesn't produce bad records (in PERMISSIVE mode).
  • String contains opened { and closed }. And anything in between.
  • Valid JSON record. Parsable by Jackson in our case.
  • Valid JSON + correct field values (can be converted to specified types).
  • something else

Till we answer to the above question, reverting of the #21909 just move us from one "bad" behavior to one another "bad" behavior.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 28, 2019

Contributor
[{...}, {...}] => 2
[] => 0
{...} => 1
# empty string => 0

I think the key here is, one line can produce 0 or 1 or more records, how to speed it up when we only care about counts? It looks to me that we can enable the count optimization only for {...}, and fallback to parsing for other cases. @MaxGekk do you think this is applicable? If it's a simple fix, let's do it for branch 2.4 as well, otherwise +1 for reverting it from 2.4 and re-do it at master.

This comment has been minimized.

Copy link
@MaxGekk

MaxGekk Jan 28, 2019

Contributor

Like, how are we going to explain this to users?

Outside of datasources, count() has very well defined semantic - number of rows matched to required schema. In the case of count(), the required schema is Struct(Array()). From user's view, count() doesn't require any field presented in counted row.

If you would like to see the same number of rows in show() output and what count() returns, you need to push full datasource schema as required schema otherwise there are always malformed input on which you will see different results.

This comment has been minimized.

Copy link
@cloud-fan

cloud-fan Jan 28, 2019

Contributor

otherwise there are always malformed input on which you will see different results

I think for permissive mode, the results(at least the counts) are always same even if some input are malformed? Otherwise, it seems like users only want to count the number of lines, and they should read the json files as text and do count.

This comment has been minimized.

Copy link
@HyukjinKwon

HyukjinKwon Jan 29, 2019

Member

re: #23665 (comment)

This is reasoning about the count. (see below, continues)

I think for permissive mode, the results(at least the counts) are always same even if some input are malformed?

To be 100% about the correct results, we should always parse everything although we're doing the current way for optimization and it started to have some inconsistent results.

Yes, so we don't convert 100%. In that case, we should at least parse StructType() which I guess empty object {...}. I think that's what JSON did before the pointed PRs above.

Otherwise, it seems like users only want to count the number of lines, and they should read the json files as text and do count.

Yes, I agree. It shouldn't behaves like text source + count(). Let's revert anyway. I don't think this behaviour is ideal anyway.

For other behaviours, I was thinking about making a README.md that whitelists behaviours for both CSV and JSON for Spark developers under somewhere related JSON and CSV directory. It's a bit grunting job but sounds like it should be done. I could do this together @MaxGekk since he has worked on this area a lot as well.

This comment has been minimized.

Copy link
@HyukjinKwon

HyukjinKwon Jan 29, 2019

Member

It looks to me that we can enable the count optimization only for {...}, and fallback to parsing for other cases. @MaxGekk do you think this is applicable? If it's a simple fix, let's do it for branch 2.4 as well, otherwise +1 for reverting it from 2.4 and re-do it at master.

We can but we should add a if-else for each input + checking some characters since it targets to avoid to parse.

@HyukjinKwon

This comment has been minimized.

Copy link
Member

commented Jan 28, 2019

@sumitsu, if we agree upon reverting it (at #23667), let's convert this PR into a test-only PR. We can add the test.

@SparkQA

This comment has been minimized.

Copy link

commented Jan 28, 2019

Test build #101739 has finished for PR 23665 at commit cd2f30c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

sumitsu added a commit to sumitsu/spark that referenced this pull request Jan 28, 2019

@sumitsu

This comment has been minimized.

Copy link
Contributor Author

commented Jan 28, 2019

Thanks @HyukjinKwon ; I've created a new PR (#23674) which includes only the test components of this PR, and incorporates @maropu 's comments above regarding code style on that test case.

@HyukjinKwon

This comment has been minimized.

Copy link
Member

commented Jan 31, 2019

Closing this since that's reverted.

asfgit pushed a commit that referenced this pull request Feb 1, 2019

[SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count optimization…
… in JSON datasource by

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of #21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also #23665 (comment).

## How was this patch tested?

Manually tested.

Closes #23708 from HyukjinKwon/SPARK-26745-backport.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>

asfgit pushed a commit that referenced this pull request Feb 6, 2019

[SPARK-26745][SQL][TESTS] JsonSuite test case: empty line -> 0 record…
… count

## What changes were proposed in this pull request?

This PR consists of the `test` components of #23665 only, minus the associated patch from that PR.

It adds a new unit test to `JsonSuite` which verifies that the `count()` returned from a `DataFrame` loaded from JSON containing empty lines does not include those empty lines in the record count. The test runs `count` prior to otherwise reading data from the `DataFrame`, so as to catch future cases where a pre-parsing optimization might result in `count` results inconsistent with existing behavior.

This PR is intended to be deployed alongside #23667; `master` currently causes the test to fail, as described in [SPARK-26745](https://issues.apache.org/jira/browse/SPARK-26745).

## How was this patch tested?

Manual testing, existing `JsonSuite` unit tests.

Closes #23674 from sumitsu/json_emptyline_count_test.

Authored-by: Branden Smith <branden.smith@publicismedia.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>

stczwd added a commit to stczwd/spark that referenced this pull request Feb 18, 2019

[SPARK-26745][SQL] Revert count optimization in JSON datasource by SP…
…ARK-24959

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of apache#21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also apache#23665 (comment).

## How was this patch tested?

Manually tested.

Closes apache#23667 from HyukjinKwon/revert-SPARK-24959.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>

stczwd added a commit to stczwd/spark that referenced this pull request Feb 18, 2019

[SPARK-26745][SQL][TESTS] JsonSuite test case: empty line -> 0 record…
… count

## What changes were proposed in this pull request?

This PR consists of the `test` components of apache#23665 only, minus the associated patch from that PR.

It adds a new unit test to `JsonSuite` which verifies that the `count()` returned from a `DataFrame` loaded from JSON containing empty lines does not include those empty lines in the record count. The test runs `count` prior to otherwise reading data from the `DataFrame`, so as to catch future cases where a pre-parsing optimization might result in `count` results inconsistent with existing behavior.

This PR is intended to be deployed alongside apache#23667; `master` currently causes the test to fail, as described in [SPARK-26745](https://issues.apache.org/jira/browse/SPARK-26745).

## How was this patch tested?

Manual testing, existing `JsonSuite` unit tests.

Closes apache#23674 from sumitsu/json_emptyline_count_test.

Authored-by: Branden Smith <branden.smith@publicismedia.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>

wangyum added a commit to wangyum/spark that referenced this pull request Mar 14, 2019

[SPARK-26745][SQL] Revert count optimization in JSON datasource by SP…
…ARK-24959

## What changes were proposed in this pull request?

This PR reverts JSON count optimization part of apache#21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
# empty string
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also apache#23665 (comment).

## How was this patch tested?

Manually tested.

Closes apache#23667 from HyukjinKwon/revert-SPARK-24959.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>

wangyum added a commit to wangyum/spark that referenced this pull request Mar 14, 2019

[SPARK-26745][SQL][TESTS] JsonSuite test case: empty line -> 0 record…
… count

## What changes were proposed in this pull request?

This PR consists of the `test` components of apache#23665 only, minus the associated patch from that PR.

It adds a new unit test to `JsonSuite` which verifies that the `count()` returned from a `DataFrame` loaded from JSON containing empty lines does not include those empty lines in the record count. The test runs `count` prior to otherwise reading data from the `DataFrame`, so as to catch future cases where a pre-parsing optimization might result in `count` results inconsistent with existing behavior.

This PR is intended to be deployed alongside apache#23667; `master` currently causes the test to fail, as described in [SPARK-26745](https://issues.apache.org/jira/browse/SPARK-26745).

## How was this patch tested?

Manual testing, existing `JsonSuite` unit tests.

Closes apache#23674 from sumitsu/json_emptyline_count_test.

Authored-by: Branden Smith <branden.smith@publicismedia.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>

cloudera-hudson pushed a commit to cloudera/spark that referenced this pull request Mar 29, 2019

[SPARK-26745][SPARK-24959][SQL][BRANCH-2.4] Revert count optimization…
… in JSON datasource by

This PR reverts JSON count optimization part of #21909.

We cannot distinguish the cases below without parsing:

```
[{...}, {...}]
```

```
[]
```

```
{...}
```

```bash
```

when we `count()`. One line (input: IN) can be, 0 record, 1 record and multiple records and this is dependent on each input.

See also apache/spark#23665 (comment).

Manually tested.

Closes #23708 from HyukjinKwon/SPARK-26745-backport.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 2a8343121e62aabe5c69d1e20fbb2c01e2e520e7)

(cherry picked from commit 30d40e7d99c8ec0fb19e5f7c36c23e308abc9bce)

Cloudera ID: CDH-77419

Change-Id: Ib427dceba9dc97ce0d4db196d8f0f0866eb1b2c6

dongjoon-hyun added a commit that referenced this pull request Apr 27, 2019

[SPARK-26745][SQL][TESTS] JsonSuite test case: empty line -> 0 record…
… count

This PR consists of the `test` components of #23665 only, minus the associated patch from that PR.

It adds a new unit test to `JsonSuite` which verifies that the `count()` returned from a `DataFrame` loaded from JSON containing empty lines does not include those empty lines in the record count. The test runs `count` prior to otherwise reading data from the `DataFrame`, so as to catch future cases where a pre-parsing optimization might result in `count` results inconsistent with existing behavior.

This PR is intended to be deployed alongside #23667; `master` currently causes the test to fail, as described in [SPARK-26745](https://issues.apache.org/jira/browse/SPARK-26745).

Manual testing, existing `JsonSuite` unit tests.

Closes #23674 from sumitsu/json_emptyline_count_test.

Authored-by: Branden Smith <branden.smith@publicismedia.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 63bced9)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.