Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions R/chunk_mapper.r
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,31 @@ create_chunk_mapper <- function(chunk_fn, warning_msg = NULL, as.data.frame = FA
warning(warning_msg)
}

dotdotdot = rlang::enexprs(...)

# convert any quosure to labels
for (i in seq_along(dotdotdot)) {
dotdotdot[[i]] <- dotdotdot[[i]] %>%
rlang::quo_squash()
}

args_str = mapply(function(name, val) {
rhs = deparse(val) %>%
paste0(collapse = "")
if(name != "") {
sprintf("%s=%s", name, rhs)
} else {
rhs
}
}, names(dotdotdot), dotdotdot) %>%
paste0(collapse = ", ")

code = parse(text=sprintf("chunk_fn(.disk.frame.chunk, %s)", args_str))[[1]]

# you need to use list otherwise the names will be gone
code = substitute(chunk_fn(.disk.frame.chunk, ...))
# code = substitute(chunk_fn(.disk.frame.chunk, ...))

if (paste0(deparse(code), collapse="") == "chunk_fn(NULL)") {
if (paste0(deparse(code), collapse="") == "chunk_fn(.disk.frame.chunk, NULL)") {
globals_and_pkgs = future::getGlobalsAndPackages(expression(chunk_fn()))
} else {
globals_and_pkgs = future::getGlobalsAndPackages(code)
Expand Down
14 changes: 4 additions & 10 deletions R/one-stage-verbs.R
Original file line number Diff line number Diff line change
Expand Up @@ -209,17 +209,14 @@ IQR_df.collected_agg.disk.frame <- function(listx, ...) {
#' @rdname group_by
#' @export
summarise.grouped_disk.frame <- function(.data, ...) {

class(.data) <- c("summarized_disk.frame", "disk.frame")

# get all components of the summarise
dotdotdot = rlang::enexprs(...)

# convert any quosure to labels
for (i in seq_along(dotdotdot)) {
dotdotdot[[i]] <- rlang::as_label(dotdotdot[[i]]) %>%
parse(text = .) %>%
.[[1]]
dotdotdot[[i]] <- rlang::quo_squash(dotdotdot[[i]])
}

attr(.data, "summarize_code") = dotdotdot
Expand Down Expand Up @@ -269,9 +266,7 @@ group_by.disk.frame <- function(.data, ..., .add = FALSE, .drop = stop("disk.fra
# convert any quosure to labels
for (i in seq_along(group_by_cols)) {
group_by_cols[[i]] <- group_by_cols[[i]] %>%
rlang::as_label() %>%
parse(text=.) %>%
.[[1]]
rlang::quo_squash()
}

attr(.data, "group_by_cols") = group_by_cols
Expand Down Expand Up @@ -301,9 +296,8 @@ summarize.disk.frame <- function(.data, ...) {

# convert any quosure to labels
for (i in seq_along(dotdotdot)) {
dotdotdot[[i]] <- rlang::as_label(dotdotdot[[i]]) %>%
parse(text=.) %>%
.[[1]]
dotdotdot[[i]] <- dotdotdot[[i]] %>%
rlang::quo_squash()
}

attr(.data, "summarize_code") = dotdotdot
Expand Down
1 change: 0 additions & 1 deletion R/recommend_nchunks.r
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ recommend_nchunks <- function(df, type = "csv", minchunks = data.table::getDTthr
#' # returns the RAM size in gigabyte (GB)
#' df_ram_size()
df_ram_size <- function() {
#browser()
tryCatch({
ram_size = NULL
# the amount of memory available in gigabytes
Expand Down
6 changes: 4 additions & 2 deletions tests/testthat/test-data-table.r
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ test_that("data.table global vars", {
}

# Check function with data.table object
grep_storm_name(storms_dt, "^A")
a = grep_storm_name(storms_dt, "^A")

# Check function with diskframe object
grep_storm_name(storms_df, "^A")
b = grep_storm_name(storms_df, "^A")

expect_equal(a, b)
})

teardown({
Expand Down
130 changes: 77 additions & 53 deletions tests/testthat/test-dplyr-verbs.r
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,31 @@ setup({

test_that("testing select", {
b = disk.frame(file.path(tempdir(), "tmp_b_dv.df"))
df = b %>%
select(a) %>%

df = b %>%
select(a) %>%
collect

expect_equal(ncol(df), 1)
})

test_that("testing rename", {
b = disk.frame(file.path(tempdir(), "tmp_b_dv.df"))
df = b %>%
rename(a_new_name = a) %>%

df = b %>%
rename(a_new_name = a) %>%
collect

expect_setequal(colnames(df), c("a_new_name", "b"))
})

test_that("testing filter", {
b = disk.frame(file.path(tempdir(), "tmp_b_dv.df"))
df = b %>%
filter(a <= 100, b <= 10) %>%

df = b %>%
filter(a <= 100, b <= 10) %>%
collect

expect_setequal(nrow(df), 10)
})

Expand All @@ -44,131 +44,155 @@ test_that("testing filter - global vars", {
filter(a <= one_hundred, b <= 10) %>%
collect


expect_setequal(nrow(df), 10)


# testing_env = new.env()
#
# assign("one_hundred", 100, testing_env)
# assign("b", disk.frame(file.path(tempdir(), "tmp_b_dv.df")), testing_env)
#
# eval(parse(text=code), envir = testing_env, enclos=emptyenv())
})

test_that("testing mutate", {
b = disk.frame(file.path(tempdir(), "tmp_b_dv.df"))
df = b %>%
mutate(d = a + b) %>%


df = b %>%
mutate(d = a + b) %>%
collect

expect_setequal(sum(df$d), sum(df$a, df$b))
df = b %>%

df = b %>%
mutate(e = rank(desc(a))) %>%
collect

expect_equal(nrow(df), 100)

# need to test
value <- as.disk.frame(tibble(char = LETTERS,
num = 1:26))
df2 = value %>%
dplyr::mutate(b = case_when(
char %in% c("A", "B", "C") ~ "1",
TRUE ~ char)) %>%
TRUE ~ char)) %>%
collect

expect_equal(ncol(df2), 3)

# testing
fn = function(a, b) {
a+b
}

df3 = value %>%
dplyr::mutate(b = fn(num, num)) %>%
collect

expect_equal(ncol(df3), 3)


global_var = 100

df4 = value %>%
dplyr::mutate(b = fn(num, num), d = global_var*2) %>%
collect

expect_equal(ncol(df4), 4)
expect_true(all(df4$d == 200))
})

test_that("testing mutate user-defined function", {
b = disk.frame(file.path(tempdir(), "tmp_b_dv.df"))


udf = function(a1, b1) {
a1 + b1
}

df = b %>%
mutate(d = udf(a,b)) %>%
collect

expect_setequal(sum(df$d), sum(df$a, df$b))
})

test_that("testing transmute", {
b = disk.frame(file.path(tempdir(), "tmp_b_dv.df"))
df = b %>%
transmute(d = a + b) %>%

df = b %>%
transmute(d = a + b) %>%
collect

expect_setequal(names(df), c("d"))
})

test_that("testing arrange", {
b = disk.frame(file.path(tempdir(), "tmp_b_dv.df"))

expect_warning(df <- b %>%
mutate(random_unif = runif(dplyr::n())) %>%
mutate(random_unif = runif(dplyr::n())) %>%
arrange(desc(random_unif)))

df <- b %>%
mutate(random_unif = runif(dplyr::n())) %>%
mutate(random_unif = runif(dplyr::n())) %>%
chunk_arrange(desc(random_unif))

x = purrr::map_lgl(1:nchunks(df), ~{
is.unsorted(.x) == FALSE
})

expect_true(all(x))
})

test_that("testing chunk_summarise", {
b = disk.frame(file.path(tempdir(), "tmp_b_dv.df"))

df = b %>%
chunk_summarise(suma = sum(a)) %>%
collect %>%
chunk_summarise(suma = sum(a)) %>%
collect %>%
summarise(suma = sum(suma))

expect_equal(df$suma, collect(b)$a %>% sum)
})

test_that("testing mutate within function works", {
test_f <- function(params, x_df){
x_df %>% mutate(aha = params[1]*cyl + params[2]*disp)
}

expect_true("aha" %in% names(test_f(c(1, 2), mtcars)))
})

test_that("filter failure: prevent github #191 regression", {
flights_df = as.disk.frame(nycflights13::flights)

# expect error due to syntax error
expect_error(flights_df %>%
filter(tailnum %in% paste0(unique(nycflights13::flights$tailnum)[1:60]), "") %>%
expect_error(flights_df %>%
filter(tailnum %in% paste0(unique(nycflights13::flights$tailnum)[1:60]), "") %>%
collect)

delete(flights_df)
})

test_that("testing {{}}", {
ok <- function(input_data, col) {
val = 2
input_data %>%
mutate({{col}} + val, val2 = speed+dist) %>%
collect
}

a = ok(as.disk.frame(cars), dist)

b = ok(cars, dist)

expect_equal(a, b)
})


teardown({
fs::dir_delete(file.path(tempdir(), "tmp_b_dv.df"))
Expand Down