From 1e181bc12652f531a979ac32bafcd0a8f967d33e Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 7 Feb 2017 23:05:22 +0800 Subject: [PATCH 1/4] [WIP][SPARK-19488][SQL]fix csv infer schema when the field is Nan/Inf etc --- .../sql/execution/datasources/csv/CSVInferSchema.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index 065bf53574366..249a001f4612a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -98,6 +98,11 @@ private[csv] object CSVInferSchema { } } + private def isInfOrNan(field: String, options: CSVOptions): Boolean = { + val infOrNan = Seq(options.nanValue, options.negativeInf, options.positiveInf) + infOrNan.contains(field) + } + private def tryParseInteger(field: String, options: CSVOptions): DataType = { if ((allCatch opt field.toInt).isDefined) { IntegerType @@ -133,7 +138,7 @@ private[csv] object CSVInferSchema { } private def tryParseDouble(field: String, options: CSVOptions): DataType = { - if ((allCatch opt field.toDouble).isDefined) { + if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) { DoubleType } else { tryParseTimestamp(field, options) From 8ac2baf0b4cd5d4b973f7f8db15affde1938c881 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 7 Feb 2017 23:26:03 +0800 Subject: [PATCH 2/4] add unit test --- .../execution/datasources/csv/CSVInferSchemaSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index 8620bb9f65b97..d99ed2195954a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -131,4 +131,12 @@ class CSVInferSchemaSuite extends SparkFunSuite { assert(CSVInferSchema.inferField(DecimalType(20, 0), "2015-12-01 00:00:00", options) == StringType) } + + test("DoubleType should be infered when a user defined nanValue/negativeInf/positiveInf") { + val options = new CSVOptions(Map("nanValue" -> "nan", "negativeInf" -> "-inf", + "positiveInf" -> "inf")) + assert(CSVInferSchema.inferField(NullType, "nan", options) == DoubleType) + assert(CSVInferSchema.inferField(NullType, "inf", options) == DoubleType) + assert(CSVInferSchema.inferField(NullType, "-inf", options) == DoubleType) + } } From e0481f4c8bd5a0634f18e2ff9aa01cd7f2912108 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 7 Feb 2017 23:28:27 +0800 Subject: [PATCH 3/4] add a comment --- .../sql/execution/datasources/csv/CSVInferSchemaSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index d99ed2195954a..f57f17e0a3aa8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -132,7 +132,8 @@ class CSVInferSchemaSuite extends SparkFunSuite { == StringType) } - test("DoubleType should be infered when a user defined nanValue/negativeInf/positiveInf") { + test("DoubleType should be infered when user defined nanValue/negativeInf/positiveInf" + + "are provided") { val options = new CSVOptions(Map("nanValue" -> "nan", "negativeInf" -> "-inf", "positiveInf" -> "inf")) assert(CSVInferSchema.inferField(NullType, "nan", options) == DoubleType) From 452df9dcc8be46bbd5fba8fb84efa7d673415a92 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 8 Feb 2017 11:13:47 +0800 Subject: [PATCH 4/4] optimize some code --- .../spark/sql/execution/datasources/csv/CSVInferSchema.scala | 3 +-- .../sql/execution/datasources/csv/CSVInferSchemaSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index 249a001f4612a..a349d0e104a99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -99,8 +99,7 @@ private[csv] object CSVInferSchema { } private def isInfOrNan(field: String, options: CSVOptions): Boolean = { - val infOrNan = Seq(options.nanValue, options.negativeInf, options.positiveInf) - infOrNan.contains(field) + field == options.nanValue || field == options.negativeInf || field == options.positiveInf } private def tryParseInteger(field: String, options: CSVOptions): DataType = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala index f57f17e0a3aa8..d8c6c25504781 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala @@ -132,8 +132,7 @@ class CSVInferSchemaSuite extends SparkFunSuite { == StringType) } - test("DoubleType should be infered when user defined nanValue/negativeInf/positiveInf" + - "are provided") { + test("DoubleType should be infered when user defined nan/inf are provided") { val options = new CSVOptions(Map("nanValue" -> "nan", "negativeInf" -> "-inf", "positiveInf" -> "inf")) assert(CSVInferSchema.inferField(NullType, "nan", options) == DoubleType)