From 014cfe9542f8ccdc634097fbebda21b9eb99ab7b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 20 May 2018 23:16:02 +0200 Subject: [PATCH 1/2] Removing of unnecessary comments filtering. --- .../apache/spark/sql/DataFrameReader.scala | 2 +- .../datasources/csv/UnivocityParser.scala | 20 ++++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 917f0cb221412..be25227bcff24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -504,7 +504,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val parsed = linesWithoutHeader.mapPartitions { iter => val rawParser = new UnivocityParser(actualSchema, parsedOptions) val parser = new FailureSafeParser[String]( - input => Seq(rawParser.parse(input)), + input => rawParser.parse(input), parsedOptions.parseMode, schema, parsedOptions.columnNameOfCorruptRecord) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 99557a1ceb0c8..53bf41ee7c60c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -182,10 +182,12 @@ class UnivocityParser( * Parses a single CSV string and turns it into either one resulting row or no row (if the * the record is malformed). */ - def parse(input: String): InternalRow = convert(tokenizer.parseLine(input)) + def parse(input: String): Seq[InternalRow] = convert(tokenizer.parseLine(input)) - private def convert(tokens: Array[String]): InternalRow = { - if (tokens.length != schema.length) { + private def convert(tokens: Array[String]): Seq[InternalRow] = { + if (tokens == null) { + Seq.empty + } else if (tokens.length != schema.length) { // If the number of tokens doesn't match the schema, we should treat it as a malformed record. // However, we still have chance to parse some of the tokens, by adding extra null tokens in // the tail if the number is smaller, or by dropping extra tokens if the number is larger. @@ -196,7 +198,7 @@ class UnivocityParser( } def getPartialResult(): Option[InternalRow] = { try { - Some(convert(checkedTokens)) + convert(checkedTokens).headOption } catch { case _: BadRecordException => None } @@ -215,7 +217,7 @@ class UnivocityParser( row(i) = valueConverters(from).apply(tokens(from)) i += 1 } - row + Seq(row) } catch { case NonFatal(e) => // For corrupted records with the number of tokens same as the schema, @@ -249,7 +251,7 @@ private[csv] object UnivocityParser { schema: StructType): Iterator[InternalRow] = { val tokenizer = parser.tokenizer val safeParser = new FailureSafeParser[Array[String]]( - input => Seq(parser.convert(input)), + input => parser.convert(input), parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord) @@ -300,11 +302,11 @@ private[csv] object UnivocityParser { lines } - val filteredLines: Iterator[String] = - CSVUtils.filterCommentAndEmpty(linesWithoutHeader, options) + val filteredLines: Iterator[String] = linesWithoutHeader + // CSVUtils.filterCommentAndEmpty(linesWithoutHeader, options) val safeParser = new FailureSafeParser[String]( - input => Seq(parser.parse(input)), + input => parser.parse(input), parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord) From 36522689f9579ec05e7d69d1d7bd1f507f6bdbc0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 20 May 2018 23:18:20 +0200 Subject: [PATCH 2/2] Cleanup remove of comments --- .../sql/execution/datasources/csv/UnivocityParser.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 53bf41ee7c60c..ad3b4bd0dae06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -302,14 +302,11 @@ private[csv] object UnivocityParser { lines } - val filteredLines: Iterator[String] = linesWithoutHeader - // CSVUtils.filterCommentAndEmpty(linesWithoutHeader, options) - val safeParser = new FailureSafeParser[String]( input => parser.parse(input), parser.options.parseMode, schema, parser.options.columnNameOfCorruptRecord) - filteredLines.flatMap(safeParser.parse) + linesWithoutHeader.flatMap(safeParser.parse) } }