From cb124e8b66d25d53f0b08e6f7ad1b72af9e379c4 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 19 Jun 2016 12:27:44 -0700 Subject: [PATCH 1/3] [SPARK-16059][R] Add `monotonically_increasing_id` function in SparkR --- R/pkg/NAMESPACE | 1 + R/pkg/R/functions.R | 27 +++++++++++++++++++++++ R/pkg/R/generics.R | 5 +++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- 4 files changed, 34 insertions(+), 1 deletion(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 82e56ca437299..0cfe1902794ab 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -218,6 +218,7 @@ exportMethods("%in%", "mean", "min", "minute", + "monotonically_increasing_id", "month", "months_between", "n", diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index a779127b379a0..ec029e5efbf73 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -911,6 +911,33 @@ setMethod("minute", column(jc) }) +#' monotonically_increasing_id +#' +#' Return a column that generates monotonically increasing 64-bit integers. +#' +#' The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. +#' The current implementation puts the partition ID in the upper 31 bits, and the record number +#' within each partition in the lower 33 bits. The assumption is that the data frame has +#' less than 1 billion partitions, and each partition has less than 8 billion records. +#' +#' As an example, consider a SparkDataFrame with two partitions, each with 3 records. +#' This expression would return the following IDs: +#' 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594. +#' +#' This is equivalent to the MONOTONICALLY_INCREASING_ID function in SQL. +#' +#' @rdname monotonically_increasing_id +#' @name monotonically_increasing_id +#' @family misc_funcs +#' @export +#' @examples \dontrun{select(df, monotonically_increasing_id())} +setMethod("monotonically_increasing_id", + signature(x = "missing"), + function(x) { + jc <- callJStatic("org.apache.spark.sql.functions", "monotonically_increasing_id") + column(jc) + }) + #' month #' #' Extracts the month as an integer from a given date/timestamp/string. diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 6e754afab6c6d..37d05560c3e00 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -993,6 +993,11 @@ setGeneric("md5", function(x) { standardGeneric("md5") }) #' @export setGeneric("minute", function(x) { standardGeneric("minute") }) +#' @rdname monotonically_increasing_id +#' @export +setGeneric("monotonically_increasing_id", + function(x) { standardGeneric("monotonically_increasing_id") }) + #' @rdname month #' @export setGeneric("month", function(x) { standardGeneric("month") }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index fcc2ab3ed6a2b..c5c5a069a817f 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1047,7 +1047,7 @@ test_that("column functions", { c5 <- hour(c) + initcap(c) + last(c) + last_day(c) + length(c) c6 <- log(c) + (c) + log1p(c) + log2(c) + lower(c) + ltrim(c) + max(c) + md5(c) c7 <- mean(c) + min(c) + month(c) + negate(c) + quarter(c) - c8 <- reverse(c) + rint(c) + round(c) + rtrim(c) + sha1(c) + c8 <- reverse(c) + rint(c) + round(c) + rtrim(c) + sha1(c) + monotonically_increasing_id() c9 <- signum(c) + sin(c) + sinh(c) + size(c) + stddev(c) + soundex(c) + sqrt(c) + sum(c) c10 <- sumDistinct(c) + tan(c) + tanh(c) + toDegrees(c) + toRadians(c) c11 <- to_date(c) + trim(c) + unbase64(c) + unhex(c) + upper(c) From b7633b4f41d3ac46b42f8f098abd7cd06d077718 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 20 Jun 2016 04:08:14 -0700 Subject: [PATCH 2/3] Address comments. --- R/pkg/R/functions.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index ec029e5efbf73..00c7b67ffd2fd 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -933,7 +933,7 @@ setMethod("minute", #' @examples \dontrun{select(df, monotonically_increasing_id())} setMethod("monotonically_increasing_id", signature(x = "missing"), - function(x) { + function() { jc <- callJStatic("org.apache.spark.sql.functions", "monotonically_increasing_id") column(jc) }) From 2b661a0268cdf944bab47fd4b5f07f033760bf8d Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 20 Jun 2016 09:36:05 -0700 Subject: [PATCH 3/3] Use `SparkDataFrame` consistently. --- R/pkg/R/functions.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 00c7b67ffd2fd..0fb38bc2891ad 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -917,7 +917,7 @@ setMethod("minute", #' #' The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. #' The current implementation puts the partition ID in the upper 31 bits, and the record number -#' within each partition in the lower 33 bits. The assumption is that the data frame has +#' within each partition in the lower 33 bits. The assumption is that the SparkDataFrame has #' less than 1 billion partitions, and each partition has less than 8 billion records. #' #' As an example, consider a SparkDataFrame with two partitions, each with 3 records.