From 349a47b1551d1bd73c6c21cbcdebb08f01b8cd45 Mon Sep 17 00:00:00 2001 From: Hossein Date: Mon, 10 Oct 2016 13:56:55 -0700 Subject: [PATCH 1/8] Support for deserializing NA as Date and Time --- .../scala/org/apache/spark/api/r/SerDe.scala | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index e4932a4192d39..ce04177584b25 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -24,10 +24,12 @@ import java.sql.{Date, Time, Timestamp} import scala.collection.JavaConverters._ import scala.collection.mutable.WrappedArray +import org.apache.spark.internal.Logging + /** * Utility functions to serialize, deserialize objects to / from R */ -private[spark] object SerDe { +private[spark] object SerDe extends Logging { type ReadObject = (DataInputStream, Char) => Object type WriteObject = (DataOutputStream, Object) => Boolean @@ -125,15 +127,24 @@ private[spark] object SerDe { } def readDate(in: DataInputStream): Date = { - Date.valueOf(readString(in)) + val inStr = readString(in) + if (inStr == "NA") { + null + } else { + Date.valueOf(inStr) + } } def readTime(in: DataInputStream): Timestamp = { val seconds = in.readDouble() - val sec = Math.floor(seconds).toLong - val t = new Timestamp(sec * 1000L) - t.setNanos(((seconds - sec) * 1e9).toInt) - t + if (java.lang.Double.isNaN(seconds)) { + null + } else { + val sec = Math.floor(seconds).toLong + val t = new Timestamp(sec * 1000L) + t.setNanos(((seconds - sec) * 1e9).toInt) + t + } } def readBytesArr(in: DataInputStream): Array[Array[Byte]] = { From 83726fc96e198703e18a02682fc4004cce3bae00 Mon Sep 17 00:00:00 2001 From: Hossein Date: Mon, 10 Oct 2016 14:39:19 -0700 Subject: [PATCH 2/8] Added unit tests --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 +++++++++++ .../src/main/scala/org/apache/spark/api/r/SerDe.scala | 4 +--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 6d8cfad5c1f93..6d18b6a0eb9d3 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -379,6 +379,17 @@ test_that("create DataFrame with different data types", { expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE)) }) +test_that("SPARK-17811: can create DataFrame containing NA as date and time", { + df <- data.frame( + id: 1:2, + time = c(as.POSIXlt("2016-01-10"), NA), + date = c(as.Date("2016-10-01"), NA)) + + DF <- collect(createDataFrame(df)) + expect_true(is.na(DF$date[2])) + expect_true(is.na(DF$time[2])) +}) + test_that("create DataFrame with complex types", { e <- new.env() assign("n", 3L, envir = e) diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index ce04177584b25..09968d4e7e23b 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -24,12 +24,10 @@ import java.sql.{Date, Time, Timestamp} import scala.collection.JavaConverters._ import scala.collection.mutable.WrappedArray -import org.apache.spark.internal.Logging - /** * Utility functions to serialize, deserialize objects to / from R */ -private[spark] object SerDe extends Logging { +private[spark] object SerDe { type ReadObject = (DataInputStream, Char) => Object type WriteObject = (DataOutputStream, Object) => Boolean From 9e621ebb1b4d9ac20fa294937ebe87e88730f3c9 Mon Sep 17 00:00:00 2001 From: Hossein Date: Mon, 10 Oct 2016 19:01:05 -0700 Subject: [PATCH 3/8] Fixed syntax bug in test --- R/pkg/DESCRIPTION | 7 ++++--- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 5a83883089e0e..d06c07ac41015 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -11,7 +11,8 @@ Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), email = "felixcheung@apache.org"), person(family = "The Apache Software Foundation", role = c("aut", "cph"))) URL: http://www.apache.org/ http://spark.apache.org/ -BugReports: https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingBugReports +BugReports: https://cwiki.apache.org/confluence/display/SPARK/Contributing+to + +Spark#ContributingtoSpark-ContributingBugReports Depends: R (>= 3.0), methods @@ -25,10 +26,10 @@ Collate: 'schema.R' 'generics.R' 'jobj.R' - 'column.R' - 'group.R' 'RDD.R' 'pairRDD.R' + 'column.R' + 'group.R' 'DataFrame.R' 'SQLContext.R' 'WindowSpec.R' diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 6d18b6a0eb9d3..3c90a7c87358f 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -381,7 +381,7 @@ test_that("create DataFrame with different data types", { test_that("SPARK-17811: can create DataFrame containing NA as date and time", { df <- data.frame( - id: 1:2, + id = 1:2, time = c(as.POSIXlt("2016-01-10"), NA), date = c(as.Date("2016-10-01"), NA)) From 6e1d60f0c26d60ae0281f41e379d01f98f2424ec Mon Sep 17 00:00:00 2001 From: Hossein Date: Tue, 11 Oct 2016 08:58:22 -0700 Subject: [PATCH 4/8] Better test coverage --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 3c90a7c87358f..bfdf70c7bf847 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -387,7 +387,9 @@ test_that("SPARK-17811: can create DataFrame containing NA as date and time", { DF <- collect(createDataFrame(df)) expect_true(is.na(DF$date[2])) + expect_equal(DF$date[1], as.Date("2016-10-01")) expect_true(is.na(DF$time[2])) + expect_equal(DF$time[1], as.POSIXlt("2016-01-10")) }) test_that("create DataFrame with complex types", { From 82ec5c81e0c0e48bfb6008dcdeef544457cfc014 Mon Sep 17 00:00:00 2001 From: Hossein Date: Tue, 11 Oct 2016 09:03:12 -0700 Subject: [PATCH 5/8] Reverted changes to DESCRIPTION --- R/pkg/DESCRIPTION | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index d06c07ac41015..5a83883089e0e 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -11,8 +11,7 @@ Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), email = "felixcheung@apache.org"), person(family = "The Apache Software Foundation", role = c("aut", "cph"))) URL: http://www.apache.org/ http://spark.apache.org/ -BugReports: https://cwiki.apache.org/confluence/display/SPARK/Contributing+to - +Spark#ContributingtoSpark-ContributingBugReports +BugReports: https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingBugReports Depends: R (>= 3.0), methods @@ -26,10 +25,10 @@ Collate: 'schema.R' 'generics.R' 'jobj.R' - 'RDD.R' - 'pairRDD.R' 'column.R' 'group.R' + 'RDD.R' + 'pairRDD.R' 'DataFrame.R' 'SQLContext.R' 'WindowSpec.R' From 59827a19db93604120dc7229f6ed82777b4cd354 Mon Sep 17 00:00:00 2001 From: Hossein Date: Tue, 11 Oct 2016 11:55:07 -0700 Subject: [PATCH 6/8] Catching NegativeArraySizeException for windows --- .../scala/org/apache/spark/api/r/SerDe.scala | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index 09968d4e7e23b..1e092bff12c8f 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -125,23 +125,33 @@ private[spark] object SerDe { } def readDate(in: DataInputStream): Date = { - val inStr = readString(in) - if (inStr == "NA") { - null - } else { - Date.valueOf(inStr) + try { + val inStr = readString(in) + if (inStr == "NA") { + null + } else { + Date.valueOf(inStr) + } + } catch { + // On windows we get NegativeArraySizeException for NAs in R + case _: NegativeArraySizeException => null } } def readTime(in: DataInputStream): Timestamp = { - val seconds = in.readDouble() - if (java.lang.Double.isNaN(seconds)) { - null - } else { - val sec = Math.floor(seconds).toLong - val t = new Timestamp(sec * 1000L) - t.setNanos(((seconds - sec) * 1e9).toInt) - t + try { + val seconds = in.readDouble() + if (java.lang.Double.isNaN(seconds)) { + null + } else { + val sec = Math.floor(seconds).toLong + val t = new Timestamp(sec * 1000L) + t.setNanos(((seconds - sec) * 1e9).toInt) + t + } + } catch { + // On windows we get NegativeArraySizeException for NAs in R + case _: NegativeArraySizeException => null } } From 28b5070a8d2f8190ac3ed32992e39f260e3a6248 Mon Sep 17 00:00:00 2001 From: Hossein Date: Thu, 13 Oct 2016 10:23:21 -0700 Subject: [PATCH 7/8] Passing the exception in SerDe --- .../scala/org/apache/spark/api/r/SerDe.scala | 36 +++++++------------ 1 file changed, 13 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index 1e092bff12c8f..09968d4e7e23b 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -125,33 +125,23 @@ private[spark] object SerDe { } def readDate(in: DataInputStream): Date = { - try { - val inStr = readString(in) - if (inStr == "NA") { - null - } else { - Date.valueOf(inStr) - } - } catch { - // On windows we get NegativeArraySizeException for NAs in R - case _: NegativeArraySizeException => null + val inStr = readString(in) + if (inStr == "NA") { + null + } else { + Date.valueOf(inStr) } } def readTime(in: DataInputStream): Timestamp = { - try { - val seconds = in.readDouble() - if (java.lang.Double.isNaN(seconds)) { - null - } else { - val sec = Math.floor(seconds).toLong - val t = new Timestamp(sec * 1000L) - t.setNanos(((seconds - sec) * 1e9).toInt) - t - } - } catch { - // On windows we get NegativeArraySizeException for NAs in R - case _: NegativeArraySizeException => null + val seconds = in.readDouble() + if (java.lang.Double.isNaN(seconds)) { + null + } else { + val sec = Math.floor(seconds).toLong + val t = new Timestamp(sec * 1000L) + t.setNanos(((seconds - sec) * 1e9).toInt) + t } } From 20d0234be212b84c916a13bbe3d343fa05118547 Mon Sep 17 00:00:00 2001 From: Hossein Date: Wed, 19 Oct 2016 15:17:32 -0700 Subject: [PATCH 8/8] Adding back catching for NegativeArraySizeException with a TODO --- .../scala/org/apache/spark/api/r/SerDe.scala | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index 09968d4e7e23b..550e075a95129 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -125,23 +125,33 @@ private[spark] object SerDe { } def readDate(in: DataInputStream): Date = { - val inStr = readString(in) - if (inStr == "NA") { - null - } else { - Date.valueOf(inStr) + try { + val inStr = readString(in) + if (inStr == "NA") { + null + } else { + Date.valueOf(inStr) + } + } catch { + // TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE + case _: NegativeArraySizeException => null } } def readTime(in: DataInputStream): Timestamp = { - val seconds = in.readDouble() - if (java.lang.Double.isNaN(seconds)) { - null - } else { - val sec = Math.floor(seconds).toLong - val t = new Timestamp(sec * 1000L) - t.setNanos(((seconds - sec) * 1e9).toInt) - t + try { + val seconds = in.readDouble() + if (java.lang.Double.isNaN(seconds)) { + null + } else { + val sec = Math.floor(seconds).toLong + val t = new Timestamp(sec * 1000L) + t.setNanos(((seconds - sec) * 1e9).toInt) + t + } + } catch { + // TODO: SPARK-18011 with some versions of R deserializing NA from R results in NASE + case _: NegativeArraySizeException => null } }