From 9f15f246c330b0ea142b7dedf141eab3409d9b59 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 18 Aug 2015 17:04:43 +0800 Subject: [PATCH 01/10] [SPARK-10048][SPARKR] Support arbitrary nested Java array in serde. --- R/pkg/R/deserialize.R | 23 +++++- .../scala/org/apache/spark/api/r/SerDe.scala | 82 +++++++++++-------- 2 files changed, 68 insertions(+), 37 deletions(-) diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index 33bf13ec9e784..1b669e21c43b9 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -48,6 +48,7 @@ readTypedObject <- function(con, type) { "r" = readRaw(con), "D" = readDate(con), "t" = readTime(con), + "a" = readArray(con), "l" = readList(con), "n" = NULL, "j" = getJobj(readString(con)), @@ -85,8 +86,7 @@ readTime <- function(con) { as.POSIXct(t, origin = "1970-01-01") } -# We only support lists where all elements are of same type -readList <- function(con) { +readArray <- function(con) { type <- readType(con) len <- readInt(con) if (len > 0) { @@ -100,6 +100,25 @@ readList <- function(con) { } } +# Read a list. Types of each element may be different. +# Null objects are read as NA. +readList <- function(con) { + len <- readInt(con) + if (len > 0) { + l <- vector("list", len) + for (i in 1:len) { + elem <- readObject(con) + if (is.null(elem)) { + elem <- NA + } + l[[i]] <- elem + } + l + } else { + list() + } +} + readRaw <- function(con) { dataLen <- readInt(con) readBin(con, raw(), as.integer(dataLen), endian = "big") diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index 3c89f24473744..646de816e1c60 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -200,6 +200,9 @@ private[spark] object SerDe { case "date" => dos.writeByte('D') case "time" => dos.writeByte('t') case "raw" => dos.writeByte('r') + // Array of primitive types + case "array" => dos.writeByte('a') + // Array of objects case "list" => dos.writeByte('l') case "jobj" => dos.writeByte('j') case _ => throw new IllegalArgumentException(s"Invalid type $typeStr") @@ -211,26 +214,35 @@ private[spark] object SerDe { writeType(dos, "void") } else { value.getClass.getName match { + case "java.lang.Character" => + writeType(dos, "character") + writeString(dos, value.asInstanceOf[Character].toString) case "java.lang.String" => writeType(dos, "character") writeString(dos, value.asInstanceOf[String]) - case "long" | "java.lang.Long" => + case "java.lang.Long" => writeType(dos, "double") writeDouble(dos, value.asInstanceOf[Long].toDouble) - case "float" | "java.lang.Float" => + case "java.lang.Float" => writeType(dos, "double") writeDouble(dos, value.asInstanceOf[Float].toDouble) - case "decimal" | "java.math.BigDecimal" => + case "java.math.BigDecimal" => writeType(dos, "double") val javaDecimal = value.asInstanceOf[java.math.BigDecimal] writeDouble(dos, scala.math.BigDecimal(javaDecimal).toDouble) - case "double" | "java.lang.Double" => + case "java.lang.Double" => writeType(dos, "double") writeDouble(dos, value.asInstanceOf[Double]) - case "int" | "java.lang.Integer" => + case "java.lang.Byte" => + writeType(dos, "integer") + writeInt(dos, value.asInstanceOf[Byte].toInt) + case "java.lang.Short" => + writeType(dos, "integer") + writeInt(dos, value.asInstanceOf[Short].toInt) + case "java.lang.Integer" => writeType(dos, "integer") writeInt(dos, value.asInstanceOf[Int]) - case "boolean" | "java.lang.Boolean" => + case "java.lang.Boolean" => writeType(dos, "logical") writeBoolean(dos, value.asInstanceOf[Boolean]) case "java.sql.Date" => @@ -242,43 +254,48 @@ private[spark] object SerDe { case "java.sql.Timestamp" => writeType(dos, "time") writeTime(dos, value.asInstanceOf[Timestamp]) + + // Handle arrays + + // Array of primitive types + + // Special handling for byte array case "[B" => writeType(dos, "raw") writeBytes(dos, value.asInstanceOf[Array[Byte]]) - // TODO: Types not handled right now include - // byte, char, short, float - // Handle arrays - case "[Ljava.lang.String;" => - writeType(dos, "list") - writeStringArr(dos, value.asInstanceOf[Array[String]]) + case "[C" => + writeType(dos, "array") + writeStringArr(dos, value.asInstanceOf[Array[Char]].map(_.toString)) + case "[S" => + writeType(dos, "array") + writeIntArr(dos, value.asInstanceOf[Array[Short]].map(_.toInt)) case "[I" => - writeType(dos, "list") + writeType(dos, "array") writeIntArr(dos, value.asInstanceOf[Array[Int]]) case "[J" => - writeType(dos, "list") + writeType(dos, "array") writeDoubleArr(dos, value.asInstanceOf[Array[Long]].map(_.toDouble)) + case "[F" => + writeType(dos, "array") + writeDoubleArr(dos, value.asInstanceOf[Array[Float]].map(_.toDouble)) case "[D" => - writeType(dos, "list") + writeType(dos, "array") writeDoubleArr(dos, value.asInstanceOf[Array[Double]]) case "[Z" => - writeType(dos, "list") + writeType(dos, "array") writeBooleanArr(dos, value.asInstanceOf[Array[Boolean]]) - case "[[B" => + + // Array of objects, null objects use "void" type + case c if c.startsWith("[") => writeType(dos, "list") - writeBytesArr(dos, value.asInstanceOf[Array[Array[Byte]]]) - case otherName => - // Handle array of objects - if (otherName.startsWith("[L")) { - val objArr = value.asInstanceOf[Array[Object]] - writeType(dos, "list") - writeType(dos, "jobj") - dos.writeInt(objArr.length) - objArr.foreach(o => writeJObj(dos, o)) - } else { - writeType(dos, "jobj") - writeJObj(dos, value) - } + val array = value.asInstanceOf[Array[Object]] + writeInt(dos, array.length) + array.foreach(elem => writeObject(dos, elem)) + + case _ => + writeType(dos, "jobj") + writeJObj(dos, value) } } } @@ -350,11 +367,6 @@ private[spark] object SerDe { value.foreach(v => writeString(out, v)) } - def writeBytesArr(out: DataOutputStream, value: Array[Array[Byte]]): Unit = { - writeType(out, "raw") - out.writeInt(value.length) - value.foreach(v => writeBytes(out, v)) - } } private[r] object SerializationFormats { From a5c11d9e3a1834f0187399eefe85e125265af0eb Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 18 Aug 2015 20:55:21 +0800 Subject: [PATCH 02/10] Improve collect() to hold data of complex types. --- R/pkg/R/DataFrame.R | 55 +++++++++++++++---- .../org/apache/spark/sql/api/r/SQLUtils.scala | 25 +-------- 2 files changed, 45 insertions(+), 35 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 895603235011e..c47b19cdc67a8 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -628,18 +628,49 @@ setMethod("dim", setMethod("collect", signature(x = "DataFrame"), function(x, stringsAsFactors = FALSE) { - # listCols is a list of raw vectors, one per column - listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf) - cols <- lapply(listCols, function(col) { - objRaw <- rawConnection(col) - numRows <- readInt(objRaw) - col <- readCol(objRaw, numRows) - close(objRaw) - col - }) - names(cols) <- columns(x) - do.call(cbind.data.frame, list(cols, stringsAsFactors = stringsAsFactors)) - }) + names <- columns(x) + ncol <- length(names) + if (ncol <= 0) { + # empty data.frame with 0 columns and 0 rows + data.frame() + } else { + # listCols is a list of columns + listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf) + stopifnot(length(listCols) == ncol) + + # An empty data.frame with 0 columns and number of rows as collected + nrow <- length(listCols[[1]]) + if (nrow <= 0) { + df <- data.frame() + } else { + df <- data.frame(row.names = c(1 : nrow)) + } + + # Append columns one by one + for (colIndex in 1 : ncol) { + # Note: appending a column of list type into a data.frame so that + # data of complex type can be held. But getting a cell from a column + # of list type returns a list instead of a vector. So for columns of + # non-complex type, append them as vector. + col <- listCols[[colIndex]] + if (length(col) <= 0) { + df[[names[colIndex]]] <- col + } else { + # TODO: more robust check on column of primitive types + vec <- do.call(c, col) + if (class(vec) != "list") { + df[[names[colIndex]]] <- vec + } else { + # For columns of complex type, be careful to access them. + # Get a column of complex type returns a list. + # Get a cell from a column of complex type returns a list instead of a vector. + df[[names[colIndex]]] <- col + } + } + } + df + } + }) #' Limit #' diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 92861ab038f19..a449b3d28399e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -106,19 +106,12 @@ private[r] object SQLUtils { bos.toByteArray() } - def dfToCols(df: DataFrame): Array[Array[Byte]] = { + def dfToCols(df: DataFrame): Array[Array[Any]] = { // localDF is Array[Row] val localDF = df.collect() val numCols = df.columns.length - // dfCols is Array[Array[Any]] - val dfCols = convertRowsToColumns(localDF, numCols) - dfCols.map { col => - colToRBytes(col) - } - } - - def convertRowsToColumns(localDF: Array[Row], numCols: Int): Array[Array[Any]] = { + // result is Array[Array[Any]] (0 until numCols).map { colIdx => localDF.map { row => row(colIdx) @@ -126,20 +119,6 @@ private[r] object SQLUtils { }.toArray } - def colToRBytes(col: Array[Any]): Array[Byte] = { - val numRows = col.length - val bos = new ByteArrayOutputStream() - val dos = new DataOutputStream(bos) - - SerDe.writeInt(dos, numRows) - - col.map { item => - val obj: Object = item.asInstanceOf[Object] - SerDe.writeObject(dos, obj) - } - bos.toByteArray() - } - def saveMode(mode: String): SaveMode = { mode match { case "append" => SaveMode.Append From f3239eb9220d8e316416a03af770695381404cb2 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 19 Aug 2015 09:09:11 +0800 Subject: [PATCH 03/10] Remove unuseful readCol() function. --- R/pkg/R/deserialize.R | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index 1b669e21c43b9..f1bfaa3fa7d36 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -192,17 +192,3 @@ readRow <- function(inputCon) { list() } } - -# Take a single column as Array[Byte] and deserialize it into an atomic vector -readCol <- function(inputCon, numRows) { - if (numRows > 0) { - # sapply can not work with POSIXlt - do.call(c, lapply(1:numRows, function(x) { - value <- readObject(inputCon) - # Replace NULL with NA so we can coerce to vectors - if (is.null(value)) NA else value - })) - } else { - vector() - } -} From 0ddc8e20855ce6269a8d95427729aa1ed36f1236 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 19 Aug 2015 14:17:34 +0800 Subject: [PATCH 04/10] Improve SerDe for conversion between DataFrame and RDD. --- R/pkg/R/deserialize.R | 37 ++++++------------- R/pkg/R/serialize.R | 10 +---- R/pkg/inst/worker/worker.R | 4 +- .../org/apache/spark/sql/api/r/SQLUtils.scala | 10 ++--- 4 files changed, 19 insertions(+), 42 deletions(-) diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index f1bfaa3fa7d36..d8eef47f6763b 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -151,18 +151,19 @@ readDeserialize <- function(con) { } } -readDeserializeRows <- function(inputCon) { - # readDeserializeRows will deserialize a DataOutputStream composed of - # a list of lists. Since the DOS is one continuous stream and - # the number of rows varies, we put the readRow function in a while loop - # that termintates when the next row is empty. +readMultipleObjects <- function(inputCon) { + # readMultipleObjects will read multiple continuous objects from + # a DataOutputStream. There is no preceding field telling the count + # of the objects, so the number of objects varies, we try to read + # all objects in a loop until the end of the stream. data <- list() while(TRUE) { - row <- readRow(inputCon) - if (length(row) == 0) { + # If reaching the end of the stream, type returned should be "". + type <- readType(inputCon) + if (type == "") { break } - data[[length(data) + 1L]] <- row + data[[length(data) + 1L]] <- readTypedObject(inputCon, type) } data # this is a list of named lists now } @@ -174,21 +175,5 @@ readRowList <- function(obj) { # deserialize the row. rawObj <- rawConnection(obj, "r+") on.exit(close(rawObj)) - readRow(rawObj) -} - -readRow <- function(inputCon) { - numCols <- readInt(inputCon) - if (length(numCols) > 0 && numCols > 0) { - lapply(1:numCols, function(x) { - obj <- readObject(inputCon) - if (is.null(obj)) { - NA - } else { - obj - } - }) # each row is a list now - } else { - list() - } -} + readObject(rawObj) +} \ No newline at end of file diff --git a/R/pkg/R/serialize.R b/R/pkg/R/serialize.R index 311021e5d8473..e3676f57f907f 100644 --- a/R/pkg/R/serialize.R +++ b/R/pkg/R/serialize.R @@ -110,18 +110,10 @@ writeRowSerialize <- function(outputCon, rows) { serializeRow <- function(row) { rawObj <- rawConnection(raw(0), "wb") on.exit(close(rawObj)) - writeRow(rawObj, row) + writeGenericList(rawObj, row) rawConnectionValue(rawObj) } -writeRow <- function(con, row) { - numCols <- length(row) - writeInt(con, numCols) - for (i in 1:numCols) { - writeObject(con, row[[i]]) - } -} - writeRaw <- function(con, batch) { writeInt(con, length(batch)) writeBin(batch, con, endian = "big") diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index 7e3b5fc403b25..0c3b0d1f4be20 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -94,7 +94,7 @@ if (isEmpty != 0) { } else if (deserializer == "string") { data <- as.list(readLines(inputCon)) } else if (deserializer == "row") { - data <- SparkR:::readDeserializeRows(inputCon) + data <- SparkR:::readMultipleObjects(inputCon) } # Timing reading input data for execution inputElap <- elapsedSecs() @@ -120,7 +120,7 @@ if (isEmpty != 0) { } else if (deserializer == "string") { data <- readLines(inputCon) } else if (deserializer == "row") { - data <- SparkR:::readDeserializeRows(inputCon) + data <- SparkR:::readMultipleObjects(inputCon) } # Timing reading input data for execution inputElap <- elapsedSecs() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index a449b3d28399e..03b36921291cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -98,11 +98,11 @@ private[r] object SQLUtils { val bos = new ByteArrayOutputStream() val dos = new DataOutputStream(bos) - SerDe.writeInt(dos, row.length) - (0 until row.length).map { idx => - val obj: Object = row(idx).asInstanceOf[Object] - SerDe.writeObject(dos, obj) - } + val cols = + (0 until row.length).map { idx => + row(idx).asInstanceOf[Object] + }.toArray + SerDe.writeObject(dos, cols) bos.toByteArray() } From 42286d186ebd03b6ff7999a1135d83e0be60a229 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Thu, 20 Aug 2015 11:00:29 +0800 Subject: [PATCH 05/10] Address comments. --- R/pkg/R/DataFrame.R | 2 +- R/pkg/R/deserialize.R | 2 +- .../src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala | 5 +---- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index c47b19cdc67a8..4540f58a2c1e8 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -643,7 +643,7 @@ setMethod("collect", if (nrow <= 0) { df <- data.frame() } else { - df <- data.frame(row.names = c(1 : nrow)) + df <- data.frame(row.names = 1 : nrow) } # Append columns one by one diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index d8eef47f6763b..6cf628e3007de 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -176,4 +176,4 @@ readRowList <- function(obj) { rawObj <- rawConnection(obj, "r+") on.exit(close(rawObj)) readObject(rawObj) -} \ No newline at end of file +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 03b36921291cb..bc9e28324ab4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -98,10 +98,7 @@ private[r] object SQLUtils { val bos = new ByteArrayOutputStream() val dos = new DataOutputStream(bos) - val cols = - (0 until row.length).map { idx => - row(idx).asInstanceOf[Object] - }.toArray + val cols = (0 until row.length).map { idx => row(idx).asInstanceOf[Object]}.toArray SerDe.writeObject(dos, cols) bos.toByteArray() } From d6b739af53dad46b744ddda0a15a576d4b9c77cc Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 24 Aug 2015 14:58:03 +0800 Subject: [PATCH 06/10] Add test cases for SerDe. --- .../scala/org/apache/spark/api/r/RBackendHandler.scala | 7 +++++++ core/src/main/scala/org/apache/spark/api/r/SerDe.scala | 4 ++++ .../main/scala/org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala index 6ce02e2ea336a..bb82f3285f1d9 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala @@ -53,6 +53,13 @@ private[r] class RBackendHandler(server: RBackend) if (objId == "SparkRHandler") { methodName match { + // This function is for test-purpose only + case "echo" => + val args = readArgs(numArgs, dis) + assert(numArgs == 1) + + writeInt(dos, 0) + writeObject(dos, args(0)) case "stopBackend" => writeInt(dos, 0) writeType(dos, "void") diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index 646de816e1c60..0a7a2b6709016 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -149,6 +149,10 @@ private[spark] object SerDe { case 'b' => readBooleanArr(dis) case 'j' => readStringArr(dis).map(x => JVMObjectTracker.getObject(x)) case 'r' => readBytesArr(dis) + case 'l' => { + val len = readInt(dis) + (0 until len).map(_ => readList(dis)).toArray + } case _ => throw new IllegalArgumentException(s"Invalid array type $arrType") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index bc9e28324ab4e..7f3defec3d42e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -98,7 +98,7 @@ private[r] object SQLUtils { val bos = new ByteArrayOutputStream() val dos = new DataOutputStream(bos) - val cols = (0 until row.length).map { idx => row(idx).asInstanceOf[Object]}.toArray + val cols = (0 until row.length).map(row(_).asInstanceOf[Object]).toArray SerDe.writeObject(dos, cols) bos.toByteArray() } From fd0d0869fb672246a39fa076076a431510c894c0 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 24 Aug 2015 14:59:58 +0800 Subject: [PATCH 07/10] Forgot the test file for SerDe. --- R/pkg/inst/tests/test_Serde.R | 65 +++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 R/pkg/inst/tests/test_Serde.R diff --git a/R/pkg/inst/tests/test_Serde.R b/R/pkg/inst/tests/test_Serde.R new file mode 100644 index 0000000000000..fb2b4f66701e4 --- /dev/null +++ b/R/pkg/inst/tests/test_Serde.R @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("SerDe functionality") + +test_that("SerDe of primitive types", { + x <- callJStatic("SparkRHandler", "echo", 1L) + expect_equal(x, 1L) + expect_equal(class(x), "integer") + + x <- callJStatic("SparkRHandler", "echo", 1) + expect_equal(x, 1) + expect_equal(class(x), "numeric") + + x <- callJStatic("SparkRHandler", "echo", TRUE) + expect_true(x) + expect_equal(class(x), "logical") + + x <- callJStatic("SparkRHandler", "echo", "abc") + expect_equal(x, "abc") + expect_equal(class(x), "character") +}) + +test_that("SerDe of list of primitive types", { + x <- list(1L, 2L, 3L) + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) + expect_equal(class(y[[1]]), "integer") + + x <- list(1, 2, 3) + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) + expect_equal(class(y[[1]]), "numeric") + + x <- list(TRUE, FALSE) + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) + expect_equal(class(y[[1]]), "logical") + + x <- list("a", "b", "c") + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) + expect_equal(class(y[[1]]), "character") +}) + +test_that("SerDe of list of lists", { + x <- list(list(1L, 2L, 3L), list(1, 2, 3), + list(TRUE, FALSE), list("a", "b", "c")) + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) +}) From 9025c9fb808738354872b90e76ee4773167e1530 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 25 Aug 2015 09:45:38 +0800 Subject: [PATCH 08/10] Add spark context initialization into test_Serde.R. --- R/pkg/inst/tests/test_Serde.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/R/pkg/inst/tests/test_Serde.R b/R/pkg/inst/tests/test_Serde.R index fb2b4f66701e4..4c18919179e08 100644 --- a/R/pkg/inst/tests/test_Serde.R +++ b/R/pkg/inst/tests/test_Serde.R @@ -17,6 +17,8 @@ context("SerDe functionality") +sc <- sparkR.init() + test_that("SerDe of primitive types", { x <- callJStatic("SparkRHandler", "echo", 1L) expect_equal(x, 1L) From 0d82eae42b3cb48a86cb750a315d51f0cedb14d7 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 25 Aug 2015 13:04:21 +0800 Subject: [PATCH 09/10] Add test cases for empty lists in test_Serde. --- R/pkg/inst/tests/test_Serde.R | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/R/pkg/inst/tests/test_Serde.R b/R/pkg/inst/tests/test_Serde.R index 4c18919179e08..e192271b31e91 100644 --- a/R/pkg/inst/tests/test_Serde.R +++ b/R/pkg/inst/tests/test_Serde.R @@ -57,6 +57,11 @@ test_that("SerDe of list of primitive types", { y <- callJStatic("SparkRHandler", "echo", x) expect_equal(x, y) expect_equal(class(y[[1]]), "character") + + # Empty list + x<-list() + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) }) test_that("SerDe of list of lists", { @@ -64,4 +69,9 @@ test_that("SerDe of list of lists", { list(TRUE, FALSE), list("a", "b", "c")) y <- callJStatic("SparkRHandler", "echo", x) expect_equal(x, y) + + # List of empty lists + x <- list(list(), list()) + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) }) From eae33411f0cf597072560fa6176174dda4f75d24 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 25 Aug 2015 16:17:45 +0800 Subject: [PATCH 10/10] Fix coding style. --- R/pkg/inst/tests/test_Serde.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/test_Serde.R b/R/pkg/inst/tests/test_Serde.R index e192271b31e91..009db85da2beb 100644 --- a/R/pkg/inst/tests/test_Serde.R +++ b/R/pkg/inst/tests/test_Serde.R @@ -59,7 +59,7 @@ test_that("SerDe of list of primitive types", { expect_equal(class(y[[1]]), "character") # Empty list - x<-list() + x <- list() y <- callJStatic("SparkRHandler", "echo", x) expect_equal(x, y) })