Skip to content

Commit

Permalink
Merge pull request #204 from cafreeman/sparkr-sql
Browse files Browse the repository at this point in the history
[SparkR-209] Remaining DataFrame methods + `dropTempTable`
  • Loading branch information
shivaram committed Mar 9, 2015
2 parents 789be97 + 15a713f commit 09ff163
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 1 deletion.
9 changes: 8 additions & 1 deletion pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ exportMethods("columns",
"filter",
"groupBy",
"head",
"insertInto",
"intersect",
"isLocal",
"limit",
"orderBy",
Expand All @@ -108,9 +110,13 @@ exportMethods("columns",
"selectExpr",
"showDF",
"sortDF",
"subtract",
"toJSON",
"toRDD",
"where")
"unionAll",
"where",
"withColumn",
"withColumnRenamed")

exportClasses("Column")

Expand Down Expand Up @@ -141,6 +147,7 @@ exportMethods("agg")
export("cacheTable",
"clearCache",
"createExternalTable",
"dropTempTable",
"jsonFile",
"jsonRDD",
"loadDF",
Expand Down
194 changes: 194 additions & 0 deletions pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,36 @@ setMethod("registerTempTable",
callJMethod(x@sdf, "registerTempTable", tableName)
})

#' insertInto
#'
#' Insert the contents of a DataFrame into a table registered in the current SQL Context.
#'
#' @param x A SparkSQL DataFrame
#' @param tableName A character vector containing the name of the table
#' @param overwrite A logical argument indicating whether or not to overwrite
#' the existing rows in the table.
#'
#' @rdname insertInto
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df <- loadDF(sqlCtx, path, "parquet")
#' df2 <- loadDF(sqlCtx, path2, "parquet")
#' registerTempTable(df, "table1")
#' insertInto(df2, "table1", overwrite = TRUE)
#'}
setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertInto") })

#' @rdname insertInto
#' @export
setMethod("insertInto",
signature(x = "DataFrame", tableName = "character"),
function(x, tableName, overwrite = FALSE) {
callJMethod(x@sdf, "insertInto", tableName, overwrite)
})

#' Cache
#'
#' Persist with the default storage level (MEMORY_ONLY).
Expand Down Expand Up @@ -768,6 +798,20 @@ setMethod("select", signature(x = "DataFrame", col = "Column"),
dataFrame(sdf)
})

setMethod("select",
signature(x = "DataFrame", col = "list"),
function(x, col) {
cols <- lapply(col, function(c) {
if (class(c)== "Column") {
c@jc
} else {
col(c)@jc
}
})
sdf <- callJMethod(x@sdf, "select", listToSeq(cols))
dataFrame(sdf)
})

#' SelectExpr
#'
#' Select from a DataFrame using a set of SQL expressions.
Expand Down Expand Up @@ -798,6 +842,70 @@ setMethod("selectExpr",
dataFrame(sdf)
})

#' WithColumn
#'
#' Return a new DataFrame with the specified column added.
#'
#' @param x A DataFrame
#' @param colName A string containing the name of the new column.
#' @param col A Column expression.
#' @return A DataFrame with the new column added.
#' @rdname withColumn
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' newDF <- withColumn(df, "newCol", df$col1 * 5)
#' }
setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn") })

#' @rdname withColumn
#' @export
setMethod("withColumn",
signature(x = "DataFrame", colName = "character", col = "Column"),
function(x, colName, col) {
select(x, x$"*", alias(col, colName))
})

#' WithColumnRenamed
#'
#' Rename an existing column in a DataFrame.
#'
#' @param x A DataFrame
#' @param existingCol The name of the column you want to change.
#' @param newCol The new column name.
#' @return A DataFrame with the column name changed.
#' @rdname withColumnRenamed
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' newDF <- withColumnRenamed(df, "col1", "newCol1")
#' }
setGeneric("withColumnRenamed", function(x, existingCol, newCol) {
standardGeneric("withColumnRenamed") })

#' @rdname withColumnRenamed
#' @export
setMethod("withColumnRenamed",
signature(x = "DataFrame", existingCol = "character", newCol = "character"),
function(x, existingCol, newCol) {
cols <- lapply(columns(x), function(c) {
if (c == existingCol) {
alias(col(c), newCol)
} else {
col(c)
}
})
select(x, cols)
})

#' SortDF
#'
#' Sort a DataFrame by the specified column(s).
Expand Down Expand Up @@ -939,6 +1047,92 @@ setMethod("join",
dataFrame(sdf)
})

#' UnionAll
#'
#' Return a new DataFrame containing the union of rows in this DataFrame
#' and another DataFrame. This is equivalent to `UNION ALL` in SQL.
#'
#' @param x A Spark DataFrame
#' @param y A Spark DataFrame
#' @return A DataFrame containing the result of the union.
#' @rdname unionAll
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df1 <- jsonFile(sqlCtx, path)
#' df2 <- jsonFile(sqlCtx, path2)
#' unioned <- unionAll(df, df2)
#' }
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })

#' @rdname unionAll
#' @export
setMethod("unionAll",
signature(x = "DataFrame", y = "DataFrame"),
function(x, y) {
unioned <- callJMethod(x@sdf, "unionAll", y@sdf)
dataFrame(unioned)
})

#' Intersect
#'
#' Return a new DataFrame containing rows only in both this DataFrame
#' and another DataFrame. This is equivalent to `INTERSECT` in SQL.
#'
#' @param x A Spark DataFrame
#' @param y A Spark DataFrame
#' @return A DataFrame containing the result of the intersect.
#' @rdname intersect
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df1 <- jsonFile(sqlCtx, path)
#' df2 <- jsonFile(sqlCtx, path2)
#' intersectDF <- intersect(df, df2)
#' }
setGeneric("intersect", function(x, y) { standardGeneric("intersect") })

#' @rdname intersect
#' @export
setMethod("intersect",
signature(x = "DataFrame", y = "DataFrame"),
function(x, y) {
intersected <- callJMethod(x@sdf, "intersect", y@sdf)
dataFrame(intersected)
})

#' Subtract
#'
#' Return a new DataFrame containing rows in this DataFrame
#' but not in another DataFrame. This is equivalent to `EXCEPT` in SQL.
#'
#' @param x A Spark DataFrame
#' @param y A Spark DataFrame
#' @return A DataFrame containing the result of the subtract operation.
#' @rdname subtract
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df1 <- jsonFile(sqlCtx, path)
#' df2 <- jsonFile(sqlCtx, path2)
#' subtractDF <- subtract(df, df2)
#' }
setGeneric("subtract", function(x, y) { standardGeneric("subtract") })

#' @rdname subtract
#' @export
setMethod("subtract",
signature(x = "DataFrame", y = "DataFrame"),
function(x, y) {
subtracted <- callJMethod(x@sdf, "except", y@sdf)
dataFrame(subtracted)
})

#' Save the contents of the DataFrame to a data source
#'
Expand Down
21 changes: 21 additions & 0 deletions pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,27 @@ clearCache <- function(sqlCtx) {
callJMethod(sqlCtx, "clearCache")
}

#' Drop Temporary Table
#'
#' Drops the temporary table with the given table name in the catalog.
#' If the table has been cached/persisted before, it's also unpersisted.
#'
#' @param sqlCtx SQLContext to use
#' @param tableName The name of the SparkSQL table to be dropped.
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df <- loadDF(sqlCtx, path, "parquet")
#' registerTempTable(df, "table")
#' dropTempTable(sqlCtx, "table")
#' }

dropTempTable <- function(sqlCtx, tableName) {
if (class(tableName) != "character") {
stop("tableName must be a string.")
}
callJMethod(sqlCtx, "dropTempTable", tableName)
}

#' Load an DataFrame
#'
#' Returns the dataset in a data source as a DataFrame
Expand Down
71 changes: 71 additions & 0 deletions pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ test_that("test cache, uncache and clearCache", {
cacheTable(sqlCtx, "table1")
uncacheTable(sqlCtx, "table1")
clearCache(sqlCtx)
dropTempTable(sqlCtx, "table1")
})

test_that("test tableNames and tables", {
Expand All @@ -48,6 +49,7 @@ test_that("test tableNames and tables", {
expect_true(length(tableNames(sqlCtx)) == 1)
df <- tables(sqlCtx)
expect_true(count(df) == 1)
dropTempTable(sqlCtx, "table1")
})

test_that("registerTempTable() results in a queryable table and sql() results in a new DataFrame", {
Expand All @@ -56,12 +58,43 @@ test_that("registerTempTable() results in a queryable table and sql() results in
newdf <- sql(sqlCtx, "SELECT * FROM table1 where name = 'Michael'")
expect_true(inherits(newdf, "DataFrame"))
expect_true(count(newdf) == 1)
dropTempTable(sqlCtx, "table1")
})

test_that("insertInto() on a registered table", {
df <- loadDF(sqlCtx, jsonPath, "json")
saveDF(df, parquetPath, "parquet", "overwrite")
dfParquet <- loadDF(sqlCtx, parquetPath, "parquet")

lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp")
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
writeLines(lines, jsonPath2)
df2 <- loadDF(sqlCtx, jsonPath2, "json")
saveDF(df2, parquetPath2, "parquet", "overwrite")
dfParquet2 <- loadDF(sqlCtx, parquetPath2, "parquet")

registerTempTable(dfParquet, "table1")
insertInto(dfParquet2, "table1")
expect_true(count(sql(sqlCtx, "select * from table1")) == 5)
expect_true(first(sql(sqlCtx, "select * from table1 order by age"))$name == "Michael")
dropTempTable(sqlCtx, "table1")

registerTempTable(dfParquet, "table1")
insertInto(dfParquet2, "table1", overwrite = TRUE)
expect_true(count(sql(sqlCtx, "select * from table1")) == 2)
expect_true(first(sql(sqlCtx, "select * from table1 order by age"))$name == "Bob")
dropTempTable(sqlCtx, "table1")
})

test_that("table() returns a new DataFrame", {
df <- jsonFile(sqlCtx, jsonPath)
registerTempTable(df, "table1")
tabledf <- table(sqlCtx, "table1")
expect_true(inherits(tabledf, "DataFrame"))
expect_true(count(tabledf) == 3)
dropTempTable(sqlCtx, "table1")
})

test_that("toRDD() returns an RRDD", {
Expand Down Expand Up @@ -430,6 +463,44 @@ test_that("isLocal()", {
expect_false(isLocal(df))
})

test_that("unionAll(), subtract(), and intersect() on a DataFrame", {
df <- jsonFile(sqlCtx, jsonPath)

lines <- c("{\"name\":\"Bob\", \"age\":24}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"James\", \"age\":35}")
jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPath2)
df2 <- loadDF(sqlCtx, jsonPath2, "json")

unioned <- sortDF(unionAll(df, df2), df$age)
expect_true(inherits(unioned, "DataFrame"))
expect_true(count(unioned) == 6)
expect_true(first(unioned)$name == "Michael")

subtracted <- sortDF(subtract(df, df2), desc(df$age))
expect_true(inherits(unioned, "DataFrame"))
expect_true(count(subtracted) == 2)
expect_true(first(subtracted)$name == "Justin")

intersected <- sortDF(intersect(df, df2), df$age)
expect_true(inherits(unioned, "DataFrame"))
expect_true(count(intersected) == 1)
expect_true(first(intersected)$name == "Andy")
})

test_that("withColumn() and withColumnRenamed()", {
df <- jsonFile(sqlCtx, jsonPath)
newDF <- withColumn(df, "newAge", df$age + 2)
expect_true(length(columns(newDF)) == 3)
expect_true(columns(newDF)[3] == "newAge")
expect_true(first(filter(newDF, df$name != "Michael"))$newAge == 32)

newDF2 <- withColumnRenamed(df, "age", "newerAge")
expect_true(length(columns(newDF2)) == 2)
expect_true(columns(newDF2)[1] == "newerAge")
})

# TODO: Enable and test once the parquetFile PR has been merged
# test_that("saveAsParquetFile() on DataFrame and works with parquetFile", {
# df <- jsonFile(sqlCtx, jsonPath)
Expand Down

0 comments on commit 09ff163

Please sign in to comment.