From 3a9ef7550e201a74759f45cca56595c107100df8 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 7 Apr 2016 09:26:28 +0900 Subject: [PATCH 1/4] Parse unescaped quotes --- dev/deps/spark-deps-hadoop-2.2 | 3 ++- dev/deps/spark-deps-hadoop-2.3 | 3 ++- dev/deps/spark-deps-hadoop-2.4 | 3 ++- dev/deps/spark-deps-hadoop-2.6 | 3 ++- dev/deps/spark-deps-hadoop-2.7 | 3 ++- sql/core/pom.xml | 2 +- .../sql/execution/datasources/csv/CSVParser.scala | 1 + sql/core/src/test/resources/unescaped-quotes.csv | 2 ++ .../sql/execution/datasources/csv/CSVSuite.scala | 12 ++++++++++++ 9 files changed, 26 insertions(+), 6 deletions(-) create mode 100644 sql/core/src/test/resources/unescaped-quotes.csv diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index 2c24366cc3a15..f192cb1667f98 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -175,7 +175,8 @@ stax-api-1.0.1.jar stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar -univocity-parsers-1.5.6.jar +univocity-parsers-2.0.2.jar +unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xmlenc-0.52.jar xz-1.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index e9cb0d8f3eac2..e07b1fca1e7e1 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -166,7 +166,8 @@ stax-api-1.0.1.jar stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar -univocity-parsers-1.5.6.jar +univocity-parsers-2.0.2.jar +unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xmlenc-0.52.jar xz-1.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index d8d1840da5531..06791438d8739 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -167,7 +167,8 @@ stax-api-1.0.1.jar stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar -univocity-parsers-1.5.6.jar +univocity-parsers-2.0.2.jar +unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xmlenc-0.52.jar xz-1.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 8beede1e38d28..cdbadfcf0071c 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -173,7 +173,8 @@ stax-api-1.0.1.jar stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar -univocity-parsers-1.5.6.jar +univocity-parsers-2.0.2.jar +unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xercesImpl-2.9.1.jar xmlenc-0.52.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index a9d814f944872..3e4cae3510f5b 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -174,7 +174,8 @@ stax-api-1.0.1.jar stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar -univocity-parsers-1.5.6.jar +univocity-parsers-2.0.2.jar +unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xercesImpl-2.9.1.jar xmlenc-0.52.jar diff --git a/sql/core/pom.xml b/sql/core/pom.xml index 708670b2923fc..8b1017042cd93 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -39,7 +39,7 @@ com.univocity univocity-parsers - 1.5.6 + 2.0.2 jar diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala index 5570b2c173e1b..c3d863f547dab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala @@ -47,6 +47,7 @@ private[sql] abstract class CsvReader(params: CSVOptions, headers: Seq[String]) settings.setMaxColumns(params.maxColumns) settings.setNullValue(params.nullValue) settings.setMaxCharsPerColumn(params.maxCharsPerColumn) + settings.setParseUnescapedQuotesUntilDelimiter(true) if (headers != null) settings.setHeaders(headers: _*) new CsvParser(settings) diff --git a/sql/core/src/test/resources/unescaped-quotes.csv b/sql/core/src/test/resources/unescaped-quotes.csv new file mode 100644 index 0000000000000..7c68055575de0 --- /dev/null +++ b/sql/core/src/test/resources/unescaped-quotes.csv @@ -0,0 +1,2 @@ +"a"b,ccc,ddd +ab,cc"c,ddd" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 58d9d69d9a8a5..9baae80f15981 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -45,6 +45,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { private val disableCommentsFile = "disable_comments.csv" private val boolFile = "bool.csv" private val simpleSparseFile = "simple_sparse.csv" + private val unescapedQuotesFile = "unescaped-quotes.csv" private def testFile(fileName: String): String = { Thread.currentThread().getContextClassLoader.getResource(fileName).toString @@ -140,6 +141,17 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { verifyCars(cars, withHeader = true) } + test("parse unescaped quotes with maxCharsPerColumn") { + val rows = sqlContext.read + .format("csv") + .option("maxCharsPerColumn", "4") + .load(testFile(unescapedQuotesFile)) + + val expectedRows = Seq(Row("\"a\"b", "ccc", "ddd"), Row("ab", "cc\"c", "ddd\"")) + + checkAnswer(rows, expectedRows) + } + test("bad encoding name") { val exception = intercept[UnsupportedCharsetException] { sqlContext From 3165d1d2aca61280ba123adc694dcb078c8a6d50 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 7 Apr 2016 09:28:12 +0900 Subject: [PATCH 2/4] Remove unindentionally added dependencies --- dev/deps/spark-deps-hadoop-2.2 | 1 - dev/deps/spark-deps-hadoop-2.3 | 1 - dev/deps/spark-deps-hadoop-2.4 | 1 - dev/deps/spark-deps-hadoop-2.6 | 1 - dev/deps/spark-deps-hadoop-2.7 | 1 - 5 files changed, 5 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index f192cb1667f98..2794b3d235cea 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -176,7 +176,6 @@ stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar univocity-parsers-2.0.2.jar -unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xmlenc-0.52.jar xz-1.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index e07b1fca1e7e1..4906fe9cfae47 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -167,7 +167,6 @@ stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar univocity-parsers-2.0.2.jar -unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xmlenc-0.52.jar xz-1.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index 06791438d8739..23ff5cfa2ea48 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -168,7 +168,6 @@ stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar univocity-parsers-2.0.2.jar -unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xmlenc-0.52.jar xz-1.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index cdbadfcf0071c..9b5a5643f392a 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -174,7 +174,6 @@ stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar univocity-parsers-2.0.2.jar -unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xercesImpl-2.9.1.jar xmlenc-0.52.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 3e4cae3510f5b..1dca2fc55ad30 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -175,7 +175,6 @@ stream-2.7.0.jar stringtemplate-3.2.1.jar super-csv-2.2.0.jar univocity-parsers-2.0.2.jar -unused-1.0.0.jar xbean-asm5-shaded-4.4.jar xercesImpl-2.9.1.jar xmlenc-0.52.jar From cca836ae38dbd79165b9919a222049703e06be06 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 7 Apr 2016 10:03:15 +0900 Subject: [PATCH 3/4] Add a test for exception --- .../execution/datasources/csv/CSVParser.scala | 30 +++++++++++++++---- .../execution/datasources/csv/CSVSuite.scala | 13 ++++++++ 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala index c3d863f547dab..f3c32644efd97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.{ByteArrayOutputStream, OutputStreamWriter, StringReader} import java.nio.charset.StandardCharsets +import com.univocity.parsers.common.TextParsingException import com.univocity.parsers.csv.{CsvParser, CsvParserSettings, CsvWriter, CsvWriterSettings} import org.apache.spark.internal.Logging @@ -104,10 +105,18 @@ private[sql] class LineCsvReader(params: CSVOptions) * @return array of strings where each string is a field in the CSV record */ def parseLine(line: String): Array[String] = { - parser.beginParsing(new StringReader(line)) - val parsed = parser.parseNext() - parser.stopParsing() - parsed + try { + parser.parseLine(line) + } catch { + case e: TextParsingException => + val contents = e.getParsedContent + val length = e.getParsedContent.length + val colIndex = e.getColumnIndex + throw new RuntimeException(s"Length of parsed input exceeds the maximum number of " + + s"characters defined at maxCharsPerColumn. Please increase the value " + + s"for maxCharsPerColumn option. Column index: [$colIndex] " + + s"maxCharsPerColumn value: [$length] Parsed content: [$contents]") + } } } @@ -135,7 +144,18 @@ private[sql] class BulkCsvReader( override def next(): Array[String] = { val curRecord = nextRecord if(curRecord != null) { - nextRecord = parser.parseNext() + try { + nextRecord = parser.parseNext() + } catch { + case e: TextParsingException => + val contents = e.getParsedContent + val length = e.getParsedContent.length + val colIndex = e.getColumnIndex + throw new RuntimeException(s"Length of parsed input exceeds the maximum number of " + + s"characters defined at maxCharsPerColumn. Please increase the value " + + s"for maxCharsPerColumn option. Column index: [$colIndex] " + + s"maxCharsPerColumn value: [$length] Parsed content: [$contents]") + } } else { throw new NoSuchElementException("next record is null") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 9baae80f15981..38c9a8ed5ca19 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -152,6 +152,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { checkAnswer(rows, expectedRows) } + test("throws an wrapped exception for maxCharsPerColumn") { + val exception = intercept[RuntimeException] { + val rows = sqlContext.read + .format("csv") + .option("maxCharsPerColumn", "2") + .load(testFile(unescapedQuotesFile)) + .collect() + } + + assert(exception.getMessage.contains("Length of parsed input exceeds the " + + "maximum number of characters defined at maxCharsPerColumn.")) + } + test("bad encoding name") { val exception = intercept[UnsupportedCharsetException] { sqlContext From d554c311efacd26cc08c9dd352a85da9d5e4990f Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 7 Apr 2016 10:20:52 +0900 Subject: [PATCH 4/4] Remove exception halding as not distinguishing this exception --- .../execution/datasources/csv/CSVParser.scala | 30 ++++--------------- .../execution/datasources/csv/CSVSuite.scala | 13 -------- 2 files changed, 5 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala index f3c32644efd97..c3d863f547dab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.{ByteArrayOutputStream, OutputStreamWriter, StringReader} import java.nio.charset.StandardCharsets -import com.univocity.parsers.common.TextParsingException import com.univocity.parsers.csv.{CsvParser, CsvParserSettings, CsvWriter, CsvWriterSettings} import org.apache.spark.internal.Logging @@ -105,18 +104,10 @@ private[sql] class LineCsvReader(params: CSVOptions) * @return array of strings where each string is a field in the CSV record */ def parseLine(line: String): Array[String] = { - try { - parser.parseLine(line) - } catch { - case e: TextParsingException => - val contents = e.getParsedContent - val length = e.getParsedContent.length - val colIndex = e.getColumnIndex - throw new RuntimeException(s"Length of parsed input exceeds the maximum number of " + - s"characters defined at maxCharsPerColumn. Please increase the value " + - s"for maxCharsPerColumn option. Column index: [$colIndex] " + - s"maxCharsPerColumn value: [$length] Parsed content: [$contents]") - } + parser.beginParsing(new StringReader(line)) + val parsed = parser.parseNext() + parser.stopParsing() + parsed } } @@ -144,18 +135,7 @@ private[sql] class BulkCsvReader( override def next(): Array[String] = { val curRecord = nextRecord if(curRecord != null) { - try { - nextRecord = parser.parseNext() - } catch { - case e: TextParsingException => - val contents = e.getParsedContent - val length = e.getParsedContent.length - val colIndex = e.getColumnIndex - throw new RuntimeException(s"Length of parsed input exceeds the maximum number of " + - s"characters defined at maxCharsPerColumn. Please increase the value " + - s"for maxCharsPerColumn option. Column index: [$colIndex] " + - s"maxCharsPerColumn value: [$length] Parsed content: [$contents]") - } + nextRecord = parser.parseNext() } else { throw new NoSuchElementException("next record is null") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 38c9a8ed5ca19..9baae80f15981 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -152,19 +152,6 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { checkAnswer(rows, expectedRows) } - test("throws an wrapped exception for maxCharsPerColumn") { - val exception = intercept[RuntimeException] { - val rows = sqlContext.read - .format("csv") - .option("maxCharsPerColumn", "2") - .load(testFile(unescapedQuotesFile)) - .collect() - } - - assert(exception.getMessage.contains("Length of parsed input exceeds the " + - "maximum number of characters defined at maxCharsPerColumn.")) - } - test("bad encoding name") { val exception = intercept[UnsupportedCharsetException] { sqlContext