Skip to content

Commit

Permalink
[SPARK-26372][SQL] Don't reuse value from previous row when parsing b…
Browse files Browse the repository at this point in the history
…ad CSV input field

## What changes were proposed in this pull request?

CSV parsing accidentally uses the previous good value for a bad input field. See example in Jira.

This PR ensures that the associated column is set to null when an input field cannot be converted.

## How was this patch tested?

Added new test.
Ran all SQL unit tests (testOnly org.apache.spark.sql.*).
Ran pyspark tests for pyspark-sql

Closes #23323 from bersprockets/csv-bad-field.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
bersprockets authored and holdenk committed Jan 5, 2019
1 parent ffbc6b1 commit 4c2af74
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ class UnivocityParser(
} catch {
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
row.setNullAt(i)
}
i += 1
}
Expand Down
2 changes: 2 additions & 0 deletions sql/core/src/test/resources/test-data/bad_after_good.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
"good record",1999-08-01
"bad record",1999-088-01
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
private val datesFile = "test-data/dates.csv"
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
private val valueMalformedFile = "test-data/value-malformed.csv"
private val badAfterGoodFile = "test-data/bad_after_good.csv"

/** Verifies data and schema. */
private def verifyCars(
Expand Down Expand Up @@ -2012,4 +2013,22 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
assert(!files.exists(_.getName.endsWith("csv")))
}
}

test("Do not reuse last good value for bad input field") {
val schema = StructType(
StructField("col1", StringType) ::
StructField("col2", DateType) ::
Nil
)
val rows = spark.read
.schema(schema)
.format("csv")
.load(testFile(badAfterGoodFile))

val expectedRows = Seq(
Row("good record", java.sql.Date.valueOf("1999-08-01")),
Row("bad record", null))

checkAnswer(rows, expectedRows)
}
}

0 comments on commit 4c2af74

Please sign in to comment.