Skip to content

Commit

Permalink
[SPARK-26227][R] from_[csv|json] should accept schema_of_[csv|json] i…
Browse files Browse the repository at this point in the history
…n R API

## What changes were proposed in this pull request?

**1. Document `from_csv(..., schema_of_csv(...))` support:**

```R
csv <- "Amsterdam,2018"
df <- sql(paste0("SELECT '", csv, "' as csv"))
head(select(df, from_csv(df$csv, schema_of_csv(csv))))
```

```
    from_csv(csv)
1 Amsterdam, 2018
```

**2. Allow `from_json(..., schema_of_json(...))`**

Before:

```R
df2 <- sql("SELECT named_struct('name', 'Bob') as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
head(select(df2, from_json(df2$people_json, schema_of_json(head(df2)$people_json))))
```

```
Error in (function (classes, fdef, mtable)  :
  unable to find an inherited method for function ‘from_json’ for signature ‘"Column", "Column"’
```

After:

```R
df2 <- sql("SELECT named_struct('name', 'Bob') as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
head(select(df2, from_json(df2$people_json, schema_of_json(head(df2)$people_json))))
```

```
  from_json(people_json)
1                    Bob
```

**3. (While I'm here) Allow `structType` as schema for `from_csv` support to match with `from_json`.**

Before:

```R
csv <- "Amsterdam,2018"
df <- sql(paste0("SELECT '", csv, "' as csv"))
head(select(df, from_csv(df$csv, structType("city STRING, year INT"))))
```

```
Error in (function (classes, fdef, mtable)  :
  unable to find an inherited method for function ‘from_csv’ for signature ‘"Column", "structType"’
```

After:

```R
csv <- "Amsterdam,2018"
df <- sql(paste0("SELECT '", csv, "' as csv"))
head(select(df, from_csv(df$csv, structType("city STRING, year INT"))))
```

```
    from_csv(csv)
1 Amsterdam, 2018
```

## How was this patch tested?

Manually tested and unittests were added.

Closes #23184 from HyukjinKwon/SPARK-26227-1.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon committed Jan 2, 2019
1 parent 5da5587 commit 39a0493
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 23 deletions.
60 changes: 40 additions & 20 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ NULL
#' \itemize{
#' \item \code{from_json}: a structType object to use as the schema to use
#' when parsing the JSON string. Since Spark 2.3, the DDL-formatted string is
#' also supported for the schema.
#' \item \code{from_csv}: a DDL-formatted string
#' also supported for the schema. Since Spark 3.0, \code{schema_of_json} or
#' the DDL-formatted string literal can also be accepted.
#' \item \code{from_csv}: a structType object, DDL-formatted string or \code{schema_of_csv}
#' }
#' @param ... additional argument(s).
#' \itemize{
Expand Down Expand Up @@ -2254,40 +2255,54 @@ setMethod("date_format", signature(y = "Column", x = "character"),
column(jc)
})

setClassUnion("characterOrstructTypeOrColumn", c("character", "structType", "Column"))

#' @details
#' \code{from_json}: Parses a column containing a JSON string into a Column of \code{structType}
#' with the specified \code{schema} or array of \code{structType} if \code{as.json.array} is set
#' to \code{TRUE}. If the string is unparseable, the Column will contain the value NA.
#'
#' @rdname column_collection_functions
#' @param as.json.array indicating if input string is JSON array of objects or a single object.
#' @aliases from_json from_json,Column,characterOrstructType-method
#' @aliases from_json from_json,Column,characterOrstructTypeOrColumn-method
#' @examples
#'
#' \dontrun{
#' df2 <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d")
#' df2 <- mutate(df2, d2 = to_json(df2$d, dateFormat = 'dd/MM/yyyy'))
#' schema <- structType(structField("date", "string"))
#' head(select(df2, from_json(df2$d2, schema, dateFormat = 'dd/MM/yyyy')))

#' df2 <- sql("SELECT named_struct('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#' schema <- structType(structField("name", "string"))
#' head(select(df2, from_json(df2$people_json, schema)))
#' head(select(df2, from_json(df2$people_json, "name STRING")))}
#' head(select(df2, from_json(df2$people_json, "name STRING")))
#' head(select(df2, from_json(df2$people_json, schema_of_json(head(df2)$people_json))))}
#' @note from_json since 2.2.0
setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"),
setMethod("from_json", signature(x = "Column", schema = "characterOrstructTypeOrColumn"),
function(x, schema, as.json.array = FALSE, ...) {
if (is.character(schema)) {
schema <- structType(schema)
jschema <- structType(schema)$jobj
} else if (class(schema) == "structType") {
jschema <- schema$jobj
} else {
jschema <- schema@jc
}

if (as.json.array) {
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
"createArrayType",
schema$jobj)
} else {
jschema <- schema$jobj
# This case is R-specifically different. Unlike Scala and Python side,
# R side has 'as.json.array' option to indicate if the schema should be
# treated as struct or element type of array in order to make it more
# R-friendly.
if (class(schema) == "Column") {
jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"createArrayType",
jschema)
} else {
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
"createArrayType",
jschema)
}
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
Expand Down Expand Up @@ -2328,22 +2343,27 @@ setMethod("schema_of_json", signature(x = "characterOrColumn"),
#' If the string is unparseable, the Column will contain the value NA.
#'
#' @rdname column_collection_functions
#' @aliases from_csv from_csv,Column,character-method
#' @aliases from_csv from_csv,Column,characterOrstructTypeOrColumn-method
#' @examples
#'
#' \dontrun{
#' df <- sql("SELECT 'Amsterdam,2018' as csv")
#' csv <- "Amsterdam,2018"
#' df <- sql(paste0("SELECT '", csv, "' as csv"))
#' schema <- "city STRING, year INT"
#' head(select(df, from_csv(df$csv, schema)))}
#' head(select(df, from_csv(df$csv, schema)))
#' head(select(df, from_csv(df$csv, structType(schema))))
#' head(select(df, from_csv(df$csv, schema_of_csv(csv))))}
#' @note from_csv since 3.0.0
setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"),
setMethod("from_csv", signature(x = "Column", schema = "characterOrstructTypeOrColumn"),
function(x, schema, ...) {
if (class(schema) == "Column") {
jschema <- schema@jc
} else if (is.character(schema)) {
if (class(schema) == "structType") {
schema <- callJMethod(schema$jobj, "toDDL")
}

if (is.character(schema)) {
jschema <- callJStatic("org.apache.spark.sql.functions", "lit", schema)
} else {
stop("schema argument should be a column or character")
jschema <- schema@jc
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
Expand Down
16 changes: 14 additions & 2 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,12 @@ test_that("column functions", {
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, structType("a INT")), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, schema_of_csv("1")), "csv")))
expect_equal(c[[1]][[1]]$`_c0`, 1)
c <- collect(select(df, alias(from_csv(df$col, schema_of_csv(lit("1"))), "csv")))
expect_equal(c[[1]][[1]]$`_c0`, 1)

df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, schema_of_csv("Amsterdam,2018")))
Expand All @@ -1651,7 +1657,9 @@ test_that("column functions", {
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
df <- as.DataFrame(j)
schemas <- list(structType(structField("age", "integer"), structField("height", "double")),
"age INT, height DOUBLE")
"age INT, height DOUBLE",
schema_of_json("{\"age\":16,\"height\":176.5}"),
schema_of_json(lit("{\"age\":16,\"height\":176.5}")))
for (schema in schemas) {
s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
expect_equal(ncol(s), 1)
Expand Down Expand Up @@ -1691,7 +1699,11 @@ test_that("column functions", {
# check if array type in string is correctly supported.
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
df <- as.DataFrame(list(list("people" = jsonArr)))
for (schema in list(structType(structField("name", "string")), "name STRING")) {
schemas <- list(structType(structField("name", "string")),
"name STRING",
schema_of_json("{\"name\":\"Alice\"}"),
schema_of_json(lit("{\"name\":\"Bob\"}")))
for (schema in schemas) {
arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol")))
expect_equal(ncol(arr), 1)
expect_equal(nrow(arr), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericRowWithSchema}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
Expand Down Expand Up @@ -233,4 +233,8 @@ private[sql] object SQLUtils extends Logging {
}
sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
}

def createArrayType(column: Column): ArrayType = {
new ArrayType(ExprUtils.evalTypeExpr(column.expr), true)
}
}

0 comments on commit 39a0493

Please sign in to comment.