From 9d01bcdf9a3e9ce0e2dd27493184eb15b7e0f21e Mon Sep 17 00:00:00 2001 From: cafreeman Date: Thu, 5 Mar 2015 14:26:58 -0600 Subject: [PATCH 1/7] `dropTempTable` Add `dropTempTable` function and update tests to drop the test table at the end of every test. --- pkg/NAMESPACE | 1 + pkg/R/SQLContext.R | 17 +++++++++++++++++ pkg/inst/tests/test_sparkSQL.R | 7 +++++++ 3 files changed, 25 insertions(+) diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index 2471c6a52..4c58de97e 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -137,6 +137,7 @@ exportMethods("asc", export("cacheTable", "clearCache", "createExternalTable", + "dropTempTable", "jsonFile", "jsonRDD", "loadDF", diff --git a/pkg/R/SQLContext.R b/pkg/R/SQLContext.R index c9e811750..2ea9b0b15 100644 --- a/pkg/R/SQLContext.R +++ b/pkg/R/SQLContext.R @@ -230,6 +230,23 @@ clearCache <- function(sqlCtx) { callJMethod(sqlCtx, "clearCache") } +#' Drop Temporary Table +#' +#' Drops the temporary table with the given table name in the catalog. +#' If the table has been cached/persisted before, it's also unpersisted. +#' +#' @param sqlCtx SQLContext to use +#' @param tableName The name of the SparkSQL table to be dropped. +#' clearCache(sqlCtx) +#' } + +dropTempTable <- function(sqlCtx, tableName) { + if (class(tableName) != "character") { + stop("tableName must be a string.") + } + callJMethod(sqlCtx, "dropTempTable", tableName) +} + #' Load an DataFrame #' #' Returns the dataset in a data source as a DataFrame diff --git a/pkg/inst/tests/test_sparkSQL.R b/pkg/inst/tests/test_sparkSQL.R index 4fabec337..aa6e2b7f9 100644 --- a/pkg/inst/tests/test_sparkSQL.R +++ b/pkg/inst/tests/test_sparkSQL.R @@ -40,6 +40,7 @@ test_that("test cache, uncache and clearCache", { cacheTable(sqlCtx, "table1") uncacheTable(sqlCtx, "table1") clearCache(sqlCtx) + dropTempTable(sqlCtx, "table1") }) test_that("test tableNames and tables", { @@ -48,6 +49,7 @@ test_that("test tableNames and tables", { expect_true(length(tableNames(sqlCtx)) == 1) df <- tables(sqlCtx) expect_true(count(df) == 1) + dropTempTable(sqlCtx, "table1") }) test_that("registerTempTable() results in a queryable table and sql() results in a new DataFrame", { @@ -56,12 +58,17 @@ test_that("registerTempTable() results in a queryable table and sql() results in newdf <- sql(sqlCtx, "SELECT * FROM table1 where name = 'Michael'") expect_true(inherits(newdf, "DataFrame")) expect_true(count(newdf) == 1) + dropTempTable(sqlCtx, "table1") +}) }) test_that("table() returns a new DataFrame", { + df <- jsonFile(sqlCtx, jsonPath) + registerTempTable(df, "table1") tabledf <- table(sqlCtx, "table1") expect_true(inherits(tabledf, "DataFrame")) expect_true(count(tabledf) == 3) + dropTempTable(sqlCtx, "table1") }) test_that("toRDD() returns an RRDD", { From befbd3249d06b3a1f4e48e3b8e43107261ba8a0f Mon Sep 17 00:00:00 2001 From: cafreeman Date: Thu, 5 Mar 2015 14:28:40 -0600 Subject: [PATCH 2/7] `insertInto` --- pkg/NAMESPACE | 1 + pkg/R/DataFrame.R | 30 ++++++++++++++++++++++++++++++ pkg/inst/tests/test_sparkSQL.R | 26 ++++++++++++++++++++++++++ 3 files changed, 57 insertions(+) diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index 4c58de97e..2d5a5531e 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -91,6 +91,7 @@ exportMethods("columns", "explain", "filter", "head", + "insertInto", "isLocal", "limit", "orderBy", diff --git a/pkg/R/DataFrame.R b/pkg/R/DataFrame.R index b729d734c..1a84efdaf 100644 --- a/pkg/R/DataFrame.R +++ b/pkg/R/DataFrame.R @@ -263,6 +263,36 @@ setMethod("registerTempTable", callJMethod(x@sdf, "registerTempTable", tableName) }) +#' insertInto +#' +#' Insert the contents of a DataFrame into a table registered in the current SQL Context. +#' +#' @param x A SparkSQL DataFrame +#' @param tableName A character vector containing the name of the table +#' @param overwrite A logical argument indicating whether or not to overwrite +#' the existing rows in the table. +#' +#' @rdname insertInto +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' df <- loadDF(sqlCtx, path, "parquet") +#' df2 <- loadDF(sqlCtx, path2, "parquet") +#' registerTempTable(df, "table1") +#' insertInto(df2, "table1", overwrite = TRUE) +#'} +setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertInto") }) + +#' @rdname insertInto +#' @export +setMethod("insertInto", + signature(x = "DataFrame", tableName = "character"), + function(x, tableName, overwrite = FALSE) { + callJMethod(x@sdf, "insertInto", tableName, overwrite) + }) + #' Cache #' #' Persist with the default storage level (MEMORY_ONLY). diff --git a/pkg/inst/tests/test_sparkSQL.R b/pkg/inst/tests/test_sparkSQL.R index aa6e2b7f9..51463db41 100644 --- a/pkg/inst/tests/test_sparkSQL.R +++ b/pkg/inst/tests/test_sparkSQL.R @@ -60,6 +60,32 @@ test_that("registerTempTable() results in a queryable table and sql() results in expect_true(count(newdf) == 1) dropTempTable(sqlCtx, "table1") }) + +test_that("insertInto() on a registered table", { + df <- loadDF(sqlCtx, jsonPath, "json") + saveDF(df, parquetPath, "parquet", "overwrite") + dfParquet <- loadDF(sqlCtx, parquetPath, "parquet") + + lines <- c("{\"name\":\"Bob\", \"age\":24}", + "{\"name\":\"James\", \"age\":35}") + jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp") + parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet") + writeLines(lines, jsonPath2) + df2 <- loadDF(sqlCtx, jsonPath2, "json") + saveDF(df2, parquetPath2, "parquet", "overwrite") + dfParquet2 <- loadDF(sqlCtx, parquetPath2, "parquet") + + registerTempTable(dfParquet, "table1") + insertInto(dfParquet2, "table1") + expect_true(count(sql(sqlCtx, "select * from table1")) == 5) + expect_true(first(sql(sqlCtx, "select * from table1"))$name == "Michael") + dropTempTable(sqlCtx, "table1") + + registerTempTable(dfParquet, "table1") + insertInto(dfParquet2, "table1", overwrite = TRUE) + expect_true(count(sql(sqlCtx, "select * from table1")) == 2) + expect_true(first(sql(sqlCtx, "select * from table1"))$name == "Bob") + dropTempTable(sqlCtx, "table1") }) test_that("table() returns a new DataFrame", { From fef99de5d2f5db4310fe019873ae32e21ef90f8f Mon Sep 17 00:00:00 2001 From: cafreeman Date: Thu, 5 Mar 2015 15:14:30 -0600 Subject: [PATCH 3/7] `intersect`, `subtract`, `unionAll` --- pkg/NAMESPACE | 3 ++ pkg/R/DataFrame.R | 86 ++++++++++++++++++++++++++++++++++ pkg/inst/tests/test_sparkSQL.R | 26 ++++++++++ 3 files changed, 115 insertions(+) diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index 2d5a5531e..f964d6d1c 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -92,6 +92,7 @@ exportMethods("columns", "filter", "head", "insertInto", + "intersect", "isLocal", "limit", "orderBy", @@ -108,8 +109,10 @@ exportMethods("columns", "selectExpr", "showDF", "sortDF", + "subtract", "toJSON", "toRDD", + "unionAll", "where") exportClasses("Column") diff --git a/pkg/R/DataFrame.R b/pkg/R/DataFrame.R index 1a84efdaf..810ce81a6 100644 --- a/pkg/R/DataFrame.R +++ b/pkg/R/DataFrame.R @@ -942,6 +942,92 @@ setMethod("join", dataFrame(sdf) }) +#' UnionAll +#' +#' Return a new DataFrame containing the union of rows in this DataFrame +#' and another DataFrame. This is equivalent to `UNION ALL` in SQL. +#' +#' @param x A Spark DataFrame +#' @param y A Spark DataFrame +#' @return A DataFrame containing the result of the union. +#' @rdname unionAll +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' df1 <- jsonFile(sqlCtx, path) +#' df2 <- jsonFile(sqlCtx, path2) +#' unioned <- unionAll(df, df2) +#' } +setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") }) + +#' @rdname unionAll +#' @export +setMethod("unionAll", + signature(x = "DataFrame", y = "DataFrame"), + function(x, y) { + unioned <- callJMethod(x@sdf, "unionAll", y@sdf) + dataFrame(unioned) + }) + +#' Intersect +#' +#' Return a new DataFrame containing rows only in both this DataFrame +#' and another DataFrame. This is equivalent to `INTERSECT` in SQL. +#' +#' @param x A Spark DataFrame +#' @param y A Spark DataFrame +#' @return A DataFrame containing the result of the intersect. +#' @rdname intersect +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' df1 <- jsonFile(sqlCtx, path) +#' df2 <- jsonFile(sqlCtx, path2) +#' intersectDF <- intersect(df, df2) +#' } +setGeneric("intersect", function(x, y) { standardGeneric("intersect") }) + +#' @rdname intersect +#' @export +setMethod("intersect", + signature(x = "DataFrame", y = "DataFrame"), + function(x, y) { + intersected <- callJMethod(x@sdf, "intersect", y@sdf) + dataFrame(intersected) + }) + +#' Subtract +#' +#' Return a new DataFrame containing rows in this DataFrame +#' but not in another DataFrame. This is equivalent to `EXCEPT` in SQL. +#' +#' @param x A Spark DataFrame +#' @param y A Spark DataFrame +#' @return A DataFrame containing the result of the subtract operation. +#' @rdname subtract +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' df1 <- jsonFile(sqlCtx, path) +#' df2 <- jsonFile(sqlCtx, path2) +#' subtractDF <- subtract(df, df2) +#' } +setGeneric("subtract", function(x, y) { standardGeneric("subtract") }) + +#' @rdname subtract +#' @export +setMethod("subtract", + signature(x = "DataFrame", y = "DataFrame"), + function(x, y) { + subtracted <- callJMethod(x@sdf, "except", y@sdf) + dataFrame(subtracted) + }) #' Save the contents of the DataFrame to a data source #' diff --git a/pkg/inst/tests/test_sparkSQL.R b/pkg/inst/tests/test_sparkSQL.R index 51463db41..eba8c7f86 100644 --- a/pkg/inst/tests/test_sparkSQL.R +++ b/pkg/inst/tests/test_sparkSQL.R @@ -438,6 +438,32 @@ test_that("isLocal()", { expect_false(isLocal(df)) }) +test_that("unionAll(), subtract(), and intersect() on a DataFrame", { + df <- jsonFile(sqlCtx, jsonPath) + + lines <- c("{\"name\":\"Bob\", \"age\":24}", + "{\"name\":\"Andy\", \"age\":30}", + "{\"name\":\"James\", \"age\":35}") + jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp") + writeLines(lines, jsonPath2) + df2 <- loadDF(sqlCtx, jsonPath2, "json") + + unioned <- unionAll(df, df2) + expect_true(inherits(unioned, "DataFrame")) + expect_true(count(unioned) == 6) + expect_true(first(unioned)$name == "Michael") + + subtracted <- subtract(df, df2) + expect_true(inherits(unioned, "DataFrame")) + expect_true(count(subtracted) == 2) + expect_true(first(subtracted)$name == "Justin") + + intersected <- intersect(df, df2) + expect_true(inherits(unioned, "DataFrame")) + expect_true(count(intersected) == 1) + expect_true(first(intersected)$name == "Andy") +}) + # TODO: Enable and test once the parquetFile PR has been merged # test_that("saveAsParquetFile() on DataFrame and works with parquetFile", { # df <- jsonFile(sqlCtx, jsonPath) From c5fa3b9fd9278509f2c1c0683f30e5f23bcf3c0d Mon Sep 17 00:00:00 2001 From: cafreeman Date: Thu, 5 Mar 2015 18:22:53 -0600 Subject: [PATCH 4/7] New `select` method Added another version of `select` that will take a list as an argument (instead of just specific column names or expression) --- pkg/R/DataFrame.R | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/R/DataFrame.R b/pkg/R/DataFrame.R index 810ce81a6..04d5b1922 100644 --- a/pkg/R/DataFrame.R +++ b/pkg/R/DataFrame.R @@ -771,6 +771,20 @@ setMethod("select", signature(x = "DataFrame", col = "Column"), dataFrame(sdf) }) +setMethod("select", + signature(x = "DataFrame", col = "list"), + function(x, col) { + cols <- lapply(col, function(c) { + if (class(c)== "Column") { + c@jc + } else { + col(c)@jc + } + }) + sdf <- callJMethod(x@sdf, "select", listToSeq(cols)) + dataFrame(sdf) + }) + #' SelectExpr #' #' Select from a DataFrame using a set of SQL expressions. From 7a5d6fdfa73efdc87dd022e14f5bac71a75d3a3e Mon Sep 17 00:00:00 2001 From: cafreeman Date: Thu, 5 Mar 2015 18:23:16 -0600 Subject: [PATCH 5/7] `withColumn` and `withColumnRenamed` --- pkg/NAMESPACE | 4 ++- pkg/R/DataFrame.R | 64 ++++++++++++++++++++++++++++++++++ pkg/inst/tests/test_sparkSQL.R | 12 +++++++ 3 files changed, 79 insertions(+), 1 deletion(-) diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index f964d6d1c..82b46d5c9 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -113,7 +113,9 @@ exportMethods("columns", "toJSON", "toRDD", "unionAll", - "where") + "where", + "withColumn", + "withColumnRenamed") exportClasses("Column") diff --git a/pkg/R/DataFrame.R b/pkg/R/DataFrame.R index 04d5b1922..054c72745 100644 --- a/pkg/R/DataFrame.R +++ b/pkg/R/DataFrame.R @@ -815,6 +815,70 @@ setMethod("selectExpr", dataFrame(sdf) }) +#' WithColumn +#' +#' Return a new DataFrame with the specified column added. +#' +#' @param x A DataFrame +#' @param colName A string containing the name of the new column. +#' @param col A Column expression. +#' @return A DataFrame with the new column added. +#' @rdname withColumn +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- jsonFile(sqlCtx, path) +#' newDF <- withColumn(df, "newCol", df$col1 * 5) +#' } +setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn") }) + +#' @rdname withColumn +#' @export +setMethod("withColumn", + signature(x = "DataFrame", colName = "character", col = "Column"), + function(x, colName, col) { + select(x, x$"*", alias(col, colName)) + }) + +#' WithColumnRenamed +#' +#' Rename an existing column in a DataFrame. +#' +#' @param x A DataFrame +#' @param existingCol The name of the column you want to change. +#' @param newCol The new column name. +#' @return A DataFrame with the column name changed. +#' @rdname withColumnRenamed +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- jsonFile(sqlCtx, path) +#' newDF <- withColumnRenamed(df, "col1", "newCol1") +#' } +setGeneric("withColumnRenamed", function(x, existingCol, newCol) { + standardGeneric("withColumnRenamed") }) + +#' @rdname withColumnRenamed +#' @export +setMethod("withColumnRenamed", + signature(x = "DataFrame", existingCol = "character", newCol = "character"), + function(x, existingCol, newCol) { + cols <- lapply(columns(x), function(c) { + if (c == existingCol) { + alias(col(c), newCol) + } else { + col(c) + } + }) + select(x, cols) + }) + #' SortDF #' #' Sort a DataFrame by the specified column(s). diff --git a/pkg/inst/tests/test_sparkSQL.R b/pkg/inst/tests/test_sparkSQL.R index eba8c7f86..792d68c19 100644 --- a/pkg/inst/tests/test_sparkSQL.R +++ b/pkg/inst/tests/test_sparkSQL.R @@ -464,6 +464,18 @@ test_that("unionAll(), subtract(), and intersect() on a DataFrame", { expect_true(first(intersected)$name == "Andy") }) +test_that("withColumn() and withColumnRenamed()", { + df <- jsonFile(sqlCtx, jsonPath) + newDF <- withColumn(df, "newAge", df$age + 2) + expect_true(length(columns(newDF)) == 3) + expect_true(columns(newDF)[3] == "newAge") + expect_true(first(filter(newDF, df$name != "Michael"))$newAge == 32) + + newDF2 <- withColumnRenamed(df, "age", "newerAge") + expect_true(length(columns(newDF2)) == 2) + expect_true(columns(newDF2)[1] == "newerAge") +}) + # TODO: Enable and test once the parquetFile PR has been merged # test_that("saveAsParquetFile() on DataFrame and works with parquetFile", { # df <- jsonFile(sqlCtx, jsonPath) From e60578ac52f07e6dfc2bd31f51c6a4ef5d7023d3 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Thu, 5 Mar 2015 23:21:29 -0600 Subject: [PATCH 6/7] update tests to guarantee row order --- pkg/inst/tests/test_sparkSQL.R | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/inst/tests/test_sparkSQL.R b/pkg/inst/tests/test_sparkSQL.R index 9f7eb851f..20da4b600 100644 --- a/pkg/inst/tests/test_sparkSQL.R +++ b/pkg/inst/tests/test_sparkSQL.R @@ -78,13 +78,13 @@ test_that("insertInto() on a registered table", { registerTempTable(dfParquet, "table1") insertInto(dfParquet2, "table1") expect_true(count(sql(sqlCtx, "select * from table1")) == 5) - expect_true(first(sql(sqlCtx, "select * from table1"))$name == "Michael") + expect_true(first(sql(sqlCtx, "select * from table1 order by age"))$name == "Michael") dropTempTable(sqlCtx, "table1") registerTempTable(dfParquet, "table1") insertInto(dfParquet2, "table1", overwrite = TRUE) expect_true(count(sql(sqlCtx, "select * from table1")) == 2) - expect_true(first(sql(sqlCtx, "select * from table1"))$name == "Bob") + expect_true(first(sql(sqlCtx, "select * from table1 order by age"))$name == "Bob") dropTempTable(sqlCtx, "table1") }) @@ -473,17 +473,17 @@ test_that("unionAll(), subtract(), and intersect() on a DataFrame", { writeLines(lines, jsonPath2) df2 <- loadDF(sqlCtx, jsonPath2, "json") - unioned <- unionAll(df, df2) + unioned <- sortDF(unionAll(df, df2), df$age) expect_true(inherits(unioned, "DataFrame")) expect_true(count(unioned) == 6) expect_true(first(unioned)$name == "Michael") - subtracted <- subtract(df, df2) + subtracted <- sortDF(subtract(df, df2), desc(df$age)) expect_true(inherits(unioned, "DataFrame")) expect_true(count(subtracted) == 2) expect_true(first(subtracted)$name == "Justin") - intersected <- intersect(df, df2) + intersected <- sortDF(intersect(df, df2), df$age) expect_true(inherits(unioned, "DataFrame")) expect_true(count(intersected) == 1) expect_true(first(intersected)$name == "Andy") From 15a713fda7972f31add2699132daf27b56264858 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Sun, 8 Mar 2015 20:21:49 -0500 Subject: [PATCH 7/7] Fix example for `dropTempTable` --- pkg/R/SQLContext.R | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/R/SQLContext.R b/pkg/R/SQLContext.R index 2ea9b0b15..436d7501a 100644 --- a/pkg/R/SQLContext.R +++ b/pkg/R/SQLContext.R @@ -237,7 +237,11 @@ clearCache <- function(sqlCtx) { #' #' @param sqlCtx SQLContext to use #' @param tableName The name of the SparkSQL table to be dropped. -#' clearCache(sqlCtx) +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' df <- loadDF(sqlCtx, path, "parquet") +#' registerTempTable(df, "table") +#' dropTempTable(sqlCtx, "table") #' } dropTempTable <- function(sqlCtx, tableName) {