Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
maropu committed Jun 25, 2018
1 parent a5849ad commit 59a7f14
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,19 @@ class UnivocityParser(
}
}

private lazy val doParse = if (schema.nonEmpty) {
(input: String) => convert(tokenizer.parseLine(input))
} else {
// If `columnPruning` enabled and partition attributes scanned only,
// `schema` gets empty.
(_: String) => InternalRow.empty
}

/**
* Parses a single CSV string and turns it into either one resulting row or no row (if the
* the record is malformed).
*/
def parse(input: String): InternalRow = convert(tokenizer.parseLine(input))
def parse(input: String): InternalRow = doParse(input)

private def convert(tokens: Array[String]): InternalRow = {
if (tokens.length != schema.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1602,4 +1602,14 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
assert(testAppender2.events.asScala
.exists(msg => msg.getRenderedMessage.contains("CSV header does not conform to the schema")))
}

test("SPARK-24645 skip parsing when columnPruning enabled and partitions scanned only") {
withSQLConf(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "true") {
withTempPath { path =>
val dir = path.getAbsolutePath
spark.range(10).selectExpr("id % 2 AS p", "id").write.partitionBy("p").csv(dir)
spark.read.csv(dir).selectExpr("sum(p)").collect()
}
}
}
}

0 comments on commit 59a7f14

Please sign in to comment.