Skip to content

[SPARK-21024][SQL] CSV parse mode handles Univocity parser exceptions#18250

Closed
maropu wants to merge 2 commits intoapache:masterfrom
maropu:SPARK-21024
Closed

[SPARK-21024][SQL] CSV parse mode handles Univocity parser exceptions#18250
maropu wants to merge 2 commits intoapache:masterfrom
maropu:SPARK-21024

Conversation

@maropu
Copy link
Member

@maropu maropu commented Jun 9, 2017

What changes were proposed in this pull request?

This pr fixed code to handle Univocity parser exceptions by CSV parse modes.
The current master cannot skip the illegal records that Univocity parsers:

scala> Seq("0,1", "0,1,2,3").toDF().write.text("/Users/maropu/Desktop/data")
scala> val df = spark.read.format("csv").schema("a int, b int").option("maxColumns", "3").load("/Users/maropu/Desktop/data")
scala> df.show

com.univocity.parsers.common.TextParsingException: java.lang.ArrayIndexOutOfBoundsException - 3
Hint: Number of columns processed may have exceeded limit of 3 columns. Use settings.setMaxColumns(int) to define the maximum number of columns your input can have
Ensure your configuration is correct, with delimiters, quotes and escape sequences that match the input format you are trying to parse
Parser Configuration: CsvParserSettings:
        Auto configuration enabled=true
        Autodetect column delimiter=false
        Autodetect quotes=false
        Column reordering enabled=true
        Empty value=null
        Escape unquoted values=false
        ...

at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:339)
at com.univocity.parsers.common.AbstractParser.handleEOF(AbstractParser.java:195)
at com.univocity.parsers.common.AbstractParser.parseLine(AbstractParser.java:544)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser.parse(UnivocityParser.scala:191)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:308)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$5.apply(UnivocityParser.scala:308)
at org.apache.spark.sql.execution.datasources.FailureSafeParser.parse(FailureSafeParser.scala:60)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$parseIterator$1.apply(UnivocityParser.scala:312)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$parseIterator$1.apply(UnivocityParser.scala:312)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
...

How was this patch tested?

Added tests in CSVSuite.

@SparkQA
Copy link

SparkQA commented Jun 9, 2017

Test build #77839 has finished for PR 18250 at commit 5304fbd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds okay to me.

import scala.util.Try
import scala.util.control.NonFatal

import com.univocity.parsers.common.TextParsingException
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks this one can be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, missed. Thanks

.option("maxColumns", "2")
.option("mode", "PERMISSIVE")
.load(path.getAbsolutePath)
checkAnswer(df, Row(0, 1) :: Row(null, null) :: Nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe also check what is put in the malformed column?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I'll set columnNameOfCorruptRecord column.

@HyukjinKwon
Copy link
Member

@maropu, just to make sure, do you mind if I ask test this with wholeFile too? IIRC, it calls different a code path for parsing tokens.

@maropu
Copy link
Member Author

maropu commented Jun 9, 2017

oh, sure. I'll add tests.

@maropu
Copy link
Member Author

maropu commented Jun 13, 2017

@HyukjinKwon I tried to fix this case even in wholeFile mode though, I easily couldn't do because CSVParser in wholeFile has state to parse InputStream data. Once the parser hits exception, IIUC it cannot restart parsing the input. So, to fix this in wholeFile, we might need to refactor the current logic there. IMHO in case that parsing exceptions happen in whileFile, we easily couldn't restart to parse because the restart position for parsing seems unclear. WDYT?

.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
.option("wholeFile", wholeFile)
.load(path.getAbsolutePath)
checkAnswer(df, Row(0, 1, null) :: Row(null, null, "0,1,2,") :: Nil)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon weird behaviour..., when we set maxColumns in a Univocity parser, it seems currentParsedContent returns the (maxColumns + 1) elements in inputs.

@SparkQA
Copy link

SparkQA commented Jun 13, 2017

Test build #77956 has finished for PR 18250 at commit b8c4462.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@giaosudau
Copy link

I found that if you using inferSchema option it will throw the same error.
Should handle this case too.

@maropu
Copy link
Member Author

maropu commented Jun 14, 2017

Yea, I thinks so. But, to fix the inferSchema case, we need some more modifications. So, I feel we first will fix this case (schema specified by user), then fix that case in follow-up.

@maropu
Copy link
Member Author

maropu commented Jun 25, 2017

@gatorsmile @HyukjinKwon ping

@HyukjinKwon
Copy link
Member

I am sorry @maropu. I can't think of a good way to handle this for now ... Will be back after thinking more maybe ..

@maropu
Copy link
Member Author

maropu commented Jul 24, 2017

ok, thanks!

@maropu
Copy link
Member Author

maropu commented Jul 24, 2017

I think we could be back if we have better way to handle this, so I'll close this for now (we better keeping this discussion in jira).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants