From 5304fbd29f3c91fd8f37561e6f041db3a0289844 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 8 Jun 2017 08:42:23 -0700 Subject: [PATCH 1/2] Handle parser exceptions --- .../datasources/csv/UnivocityParser.scala | 11 ++++++++- .../execution/datasources/csv/CSVSuite.scala | 23 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index c3657acb7d86..ec986eee00e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -25,6 +25,7 @@ import java.util.Locale import scala.util.Try import scala.util.control.NonFatal +import com.univocity.parsers.common.TextParsingException import com.univocity.parsers.csv.CsvParser import org.apache.spark.internal.Logging @@ -188,7 +189,15 @@ class UnivocityParser( * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + def parse(input: String): InternalRow = { + val parsedTokens = try { + tokenizer.parseLine(input) + } catch { + case NonFatal(e) => + throw BadRecordException(() => getCurrentInput, () => None, e) + } + convert(parsedTokens) + } private def convert(tokens: Array[String]): InternalRow = { if (tokens.length != schema.length) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 352dba79a4c0..c905b88c3c9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1174,4 +1174,27 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } } } + + test("SPARK-21024 CSV parser mode controls parser exceptions") { + withTempPath { path => + Seq("0,1", "0,1,2,3").toDF().write.text(path.getAbsolutePath) + + val msg = intercept[SparkException] { + spark.read.format("csv") + .schema("a INT, b INT") + .option("maxColumns", "3") + .option("mode", "FAILFAST") + .load(path.getAbsolutePath) + .collect + }.getMessage + assert(msg.contains("Number of columns processed may have exceeded limit of 3 columns.")) + + val df = spark.read.format("csv") + .schema(s"a INT, b INT") + .option("maxColumns", "2") + .option("mode", "PERMISSIVE") + .load(path.getAbsolutePath) + checkAnswer(df, Row(0, 1) :: Row(null, null) :: Nil) + } + } } From b8c4462f965e29df339a345d8d428753c9ef732e Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 9 Jun 2017 22:47:22 +0900 Subject: [PATCH 2/2] Apply review comments --- .../datasources/csv/UnivocityParser.scala | 1 - .../execution/datasources/csv/CSVSuite.scala | 36 +++++++++++-------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index ec986eee00e0..d7a5738a5a26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -25,7 +25,6 @@ import java.util.Locale import scala.util.Try import scala.util.control.NonFatal -import com.univocity.parsers.common.TextParsingException import com.univocity.parsers.csv.CsvParser import org.apache.spark.internal.Logging diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index c905b88c3c9f..23377e03d843 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1179,22 +1179,28 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { withTempPath { path => Seq("0,1", "0,1,2,3").toDF().write.text(path.getAbsolutePath) - val msg = intercept[SparkException] { - spark.read.format("csv") - .schema("a INT, b INT") - .option("maxColumns", "3") - .option("mode", "FAILFAST") + Seq(false).foreach { wholeFile => + val msg = intercept[SparkException] { + spark.read.format("csv") + .schema("a INT, b INT") + .option("maxColumns", "2") + .option("mode", "FAILFAST") + .option("wholeFile", wholeFile) + .load(path.getAbsolutePath) + .collect + }.getMessage + assert(msg.contains("Number of columns processed may have exceeded limit of 2 columns.")) + + val columnNameOfCorruptRecord = "_unparsed" + val df = spark.read.format("csv") + .schema(s"a INT, b INT, $columnNameOfCorruptRecord STRING") + .option("maxColumns", "2") + .option("mode", "PERMISSIVE") + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .option("wholeFile", wholeFile) .load(path.getAbsolutePath) - .collect - }.getMessage - assert(msg.contains("Number of columns processed may have exceeded limit of 3 columns.")) - - val df = spark.read.format("csv") - .schema(s"a INT, b INT") - .option("maxColumns", "2") - .option("mode", "PERMISSIVE") - .load(path.getAbsolutePath) - checkAnswer(df, Row(0, 1) :: Row(null, null) :: Nil) + checkAnswer(df, Row(0, 1, null) :: Row(null, null, "0,1,2,") :: Nil) + } } } }