Skip to content

Commit

Permalink
Bug fix: malformed input + permissive mode + columnNameOfCorruptRecor…
Browse files Browse the repository at this point in the history
…d option
  • Loading branch information
MaxGekk committed Jan 13, 2020
1 parent 18389b0 commit e302fa4
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,15 @@ class UnivocityParser(
i += 1
}

var skipValueConversion = false
var skipRow = false
var badRecordException: Option[Throwable] = None
i = 0
while (!skipValueConversion && i < parsedSchema.length) {
while (!skipRow && i < parsedSchema.length) {
try {
val convertedValue = valueConverters(i).apply(getToken(tokens, i))
parsedRow(i) = convertedValue
if (csvFilters.skipRow(parsedRow, i)) {
skipValueConversion = true
skipRow = true
} else {
val requiredIndex = parsedToRequiredIndex(i)
if (requiredIndex != -1) {
Expand All @@ -300,20 +300,20 @@ class UnivocityParser(
}
} catch {
case NonFatal(e) =>
badRecordException = Some(e)
skipValueConversion = true
badRecordException = badRecordException.orElse(Some(e))
requiredSingleRow.setNullAt(i)
}
i += 1
}
if (skipValueConversion) {
if (skipRow) {
noRows
} else {
if (badRecordException.isDefined) {
throw BadRecordException(
() => getCurrentInput, () => requiredRow.headOption, badRecordException.get)
} else {
noRows
requiredRow
}
} else {
requiredRow
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.log4j.{AppenderSkeleton, LogManager}
import org.apache.log4j.spi.LoggingEvent

import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -2242,4 +2242,41 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData {
}
}
}

test("filters push down - malformed input in PERMISSIVE mode") {
val invalidTs = "2019-123-14 20:35:30"
val invalidRow = s"0,$invalidTs,999"
val validTs = "2019-12-14 20:35:30"
Seq(true, false).foreach { filterPushdown =>
withSQLConf(SQLConf.CSV_FILTER_PUSHDOWN_ENABLED.key -> filterPushdown.toString) {
withTempPath { path =>
Seq(
"c0,c1,c2",
invalidRow,
s"1,$validTs,999").toDF("data")
.repartition(1)
.write.text(path.getAbsolutePath)
def checkReadback(condition: Column, expected: Seq[Row]): Unit = {
val readback = spark.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "c3")
.option("header", true)
.option("timestampFormat", "uuuu-MM-dd HH:mm:ss")
.schema("c0 integer, c1 timestamp, c2 integer, c3 string")
.csv(path.getAbsolutePath)
.where(condition)
.select($"c0", $"c1", $"c3")
checkAnswer(readback, expected)
}

checkReadback(
condition = $"c2" === 999,
expected = Seq(Row(0, null, invalidRow), Row(1, Timestamp.valueOf(validTs), null)))
checkReadback(
condition = $"c2" === 999 && $"c1" > "1970-01-01 00:00:00",
expected = Seq(Row(1, Timestamp.valueOf(validTs), null)))
}
}
}
}
}

0 comments on commit e302fa4

Please sign in to comment.