Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions CRAN-RELEASE

This file was deleted.

14 changes: 6 additions & 8 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Type: Package
Package: disk.frame
Title: Larger-than-RAM Disk-Based Data Manipulation Framework
Version: 0.5.0
Date: 2021-05-09
Version: 0.5.99.999
Date: 2022-01-26
Authors@R: c(
person("Dai", "ZJ", email = "zhuojia.dai@gmail.com", role = c("aut", "cre")),
person("Jacky", "Poon", role = c("ctb"))
Expand All @@ -17,25 +17,23 @@ License: MIT + file LICENSE
Imports:
Rcpp (>= 0.12.13),
glue (>= 1.3.1),
rlang (>= 0.4.0),
future.apply (>= 1.3.0),
fs (>= 1.3.1),
jsonlite (>= 1.6),
pryr (>= 0.1.4),
stringr (>= 1.4.0),
fst (>= 0.8.0),
globals (>= 0.12.4),
future (>= 1.14.0),
data.table (>= 1.12.2),
crayon (>= 1.3.4),
bigreadr (>= 0.2.0),
furrr (>= 0.2.2),
bit64,
benchmarkme
benchmarkme,
purrr (>= 0.3.2),
rlang
Depends:
R (>= 3.4),
dplyr (>= 1.0.0),
purrr (>= 0.3.2)
dplyr (>= 1.0.0)
Suggests:
testthat (>= 2.1.0),
nycflights13,
Expand Down
35 changes: 3 additions & 32 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,17 @@ S3method(colnames,disk.frame)
S3method(compute,disk.frame)
S3method(delayed,disk.frame)
S3method(distinct,disk.frame)
S3method(do,disk.frame)
S3method(filter,disk.frame)
S3method(full_join,disk.frame)
S3method(get_chunk,disk.frame)
S3method(glimpse,disk.frame)
S3method(group_by,disk.frame)
S3method(group_vars,disk.frame)
S3method(groups,disk.frame)
S3method(hard_arrange,data.frame)
S3method(hard_arrange,disk.frame)
S3method(hard_group_by,data.frame)
S3method(hard_group_by,disk.frame)
S3method(head,disk.frame)
S3method(imap,default)
S3method(imap_dfr,default)
S3method(imap_dfr,disk.frame)
S3method(inner_join,disk.frame)
S3method(lazy,disk.frame)
S3method(left_join,disk.frame)
S3method(map,default)
S3method(map,disk.frame)
S3method(map2,default)
S3method(map2,disk.frame)
S3method(map_dfr,default)
S3method(map_dfr,disk.frame)
S3method(merge,disk.frame)
S3method(mutate,disk.frame)
S3method(names,disk.frame)
Expand All @@ -67,22 +53,22 @@ S3method(transmute,disk.frame)
export(IQR_df.chunk_agg.disk.frame)
export(IQR_df.collected_agg.disk.frame)
export(add_chunk)
export(add_tally.disk.frame)
export(all_df.chunk_agg.disk.frame)
export(all_df.collected_agg.disk.frame)
export(any_df.chunk_agg.disk.frame)
export(any_df.collected_agg.disk.frame)
export(as.disk.frame)
export(bind_rows.disk.frame)
export(ceremony_text)
export(chunk_arrange)
export(chunk_distinct)
export(chunk_group_by)
export(chunk_lapply)
export(chunk_summarise)
export(chunk_summarize)
export(chunk_ungroup)
export(cimap)
export(cimap_dfr)
export(clapply)
export(cmap)
export(cmap2)
export(cmap_dfr)
Expand All @@ -102,18 +88,12 @@ export(foverlaps.disk.frame)
export(gen_datatable_synthetic)
export(get_chunk)
export(get_chunk_ids)
export(hard_arrange)
export(hard_group_by)
export(imap)
export(imap_dfr)
export(insert_ceremony)
export(is_disk.frame)
export(lazy)
export(length_df.chunk_agg.disk.frame)
export(length_df.collected_agg.disk.frame)
export(make_glm_streaming_fn)
export(map)
export(map2)
export(map_by_chunk_id)
export(max_df.chunk_agg.disk.frame)
export(max_df.collected_agg.disk.frame)
Expand Down Expand Up @@ -148,7 +128,6 @@ export(shardkey_equal)
export(show_boilerplate)
export(show_ceremony)
export(srckeep)
export(srckeepchunks)
export(sum_df.chunk_agg.disk.frame)
export(sum_df.collected_agg.disk.frame)
export(var_df.chunk_agg.disk.frame)
Expand All @@ -172,10 +151,8 @@ importFrom(data.table,foverlaps)
importFrom(data.table,fread)
importFrom(data.table,rbindlist)
importFrom(data.table,setDT)
importFrom(data.table,setkey)
importFrom(data.table,setkeyv)
importFrom(data.table,timetaken)
importFrom(dplyr,add_tally)
importFrom(dplyr,anti_join)
importFrom(dplyr,arrange)
importFrom(dplyr,bind_rows)
Expand Down Expand Up @@ -218,7 +195,6 @@ importFrom(future,nbrOfWorkers)
importFrom(future,plan)
importFrom(future,sequential)
importFrom(future.apply,future_lapply)
importFrom(globals,findGlobals)
importFrom(glue,glue)
importFrom(jsonlite,fromJSON)
importFrom(jsonlite,toJSON)
Expand All @@ -230,19 +206,14 @@ importFrom(purrr,map2)
importFrom(purrr,map_chr)
importFrom(purrr,map_dfr)
importFrom(purrr,map_lgl)
importFrom(rlang,enquos)
importFrom(rlang,eval_tidy)
importFrom(rlang,quo)
importFrom(rlang,enexpr)
importFrom(stats,median)
importFrom(stats,quantile)
importFrom(stats,runif)
importFrom(stringr,fixed)
importFrom(utils,capture.output)
importFrom(utils,head)
importFrom(utils,memory.limit)
importFrom(utils,methods)
importFrom(utils,setTxtProgressBar)
importFrom(utils,tail)
importFrom(utils,txtProgressBar)
importFrom(utils,unzip)
useDynLib(disk.frame)
5 changes: 3 additions & 2 deletions R/add_chunk.r
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ add_chunk <- function(df, chunk, chunk_id = NULL, full.names = FALSE, ...) {

data.table::setDT(check_vars)
if(nrow(check_vars[is.na(new_chunk)]) > 0) {
vars_strings = paste0(check_vars[is.na(new_chunk), colnames], collapse=',\n ')
warning(
glue::glue(
"these variables are in the disk.frame but not in the new chunk: \n {paste0(check_vars[is.na(new_chunk), colnames], collapse=',\n ')}"))
sprintf(
"these variables are in the disk.frame but not in the new chunk: \n %s", vars_strings))
}
if(nrow(check_vars[is.na(existing_df)]) > 0){
warning(glue::glue("these variables are in the new chunk but not in the existing disk.frame: {paste0(check_vars[is.na(existing_df), colnames], collapse=', ')}"))
Expand Down
15 changes: 7 additions & 8 deletions R/anti_join.r
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#' @param .progress Show progress or not. Defaults to FALSE
#' @param ... same as dplyr's joins
#' @rdname join
#' @importFrom rlang quo enquos
#' @importFrom dplyr anti_join left_join full_join semi_join inner_join
#' @return disk.frame or data.frame/data.table
#' @export
Expand All @@ -29,11 +28,11 @@ anti_join.disk.frame <- function(x, y, by=NULL, copy=FALSE, ..., outdir = tempfi
overwrite_check(outdir, overwrite)

if("data.frame" %in% class(y)) {
quo_dotdotdot = enquos(...)
cmap_dfr.disk.frame(x, ~{
code = quo(anti_join(.x, y, by = by, copy = copy, !!!quo_dotdotdot))
rlang::eval_tidy(code)
tmp = cmap.disk.frame(x, ~{
anti_join(.x, y, by = by, copy = copy, ...)
}, .progress = .progress)

return(tmp)
} else if("disk.frame" %in% class(y)) {
if(is.null(merge_by_chunk_id)) {
stop("both x and y are disk.frames. You need to specify merge_by_chunk_id = TRUE or FALSE explicitly")
Expand All @@ -47,12 +46,12 @@ anti_join.disk.frame <- function(x, y, by=NULL, copy=FALSE, ..., outdir = tempfi
if (merge_by_chunk_id == FALSE) {
warning("merge_by_chunk_id = FALSE. This will take significantly longer and the preparations needed are performed eagerly which may lead to poor performance. Consider making y a data.frame or set merge_by_chunk_id = TRUE for better performance.")

x = hard_group_by(x, by, nchunks = max(ncy,ncx), overwrite = TRUE)
y = hard_group_by(y, by, nchunks = max(ncy,ncx), overwrite = TRUE)
ncxy = max(ncy,ncx)
x = rechunk(x, shardby=by, nchunks = ncxy, outdir=tempfile(fileext = ".jdf"), overwrite = FALSE)
y = rechunk(y, shardby=by, nchunks =ncxy, outdir=tempfile(fileext = ".jdf"), overwrite = FALSE)
return(anti_join.disk.frame(x, y, by, copy = copy, outdir = outdir, merge_by_chunk_id = TRUE, overwrite = overwrite))
} else if ((identical(shardkey(x)$shardkey, "") & identical(shardkey(y)$shardkey, "")) | identical(shardkey(x), shardkey(y))) {
res = cmap2.disk.frame(x, y, ~{
#res = cmap2(x, y, ~{
if(is.null(.y)) {
return(.x)
} else if (is.null(.x)) {
Expand Down
1 change: 0 additions & 1 deletion R/as.disk.frame.r
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#' delete(cars_new_location.df)
#' delete(cars_chunks.df)
as.disk.frame <- function(df, outdir = tempfile(fileext = ".df"), nchunks = recommend_nchunks(df), overwrite = FALSE, shardby = NULL, compress = 50,...) {

stopifnot("data.frame" %in% class(df))
overwrite_check(outdir, overwrite)
data.table::setDT(df)
Expand Down
6 changes: 6 additions & 0 deletions R/bind_rows.r
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#' Bind rows
#' @param ...
#' @export
bind_rows.disk.frame <- function(...) {
rbindlist.disk.frame(list(...))
}
84 changes: 37 additions & 47 deletions R/chunk_mapper.r
Original file line number Diff line number Diff line change
Expand Up @@ -31,59 +31,49 @@
#' @param chunk_fn The dplyr function to create a mapper for
#' @param warning_msg The warning message to display when invoking the mapper
#' @param as.data.frame force the input chunk of a data.frame; needed for dtplyr
#' @importFrom rlang enquos quo
#' @export
create_chunk_mapper <- function(chunk_fn, warning_msg = NULL, as.data.frame = TRUE) {
return_func <- function(.data, ...) {
if (!is.null(warning_msg)) {
create_chunk_mapper <- function(chunk_fn, warning_msg = NULL, as.data.frame = FALSE) {
if(as.data.frame) {
warning("`as.data.frame` is deprecated in create_chunk_mapper")
}

return(function(.data, ...) {
if(!is.null(warning_msg)) {
warning(warning_msg)
}

# you need to use list otherwise the names will be gone
code = substitute(chunk_fn(.disk.frame.chunk, ...))

quo_dotdotdot = rlang::enquos(...)
if (paste0(deparse(code), collapse="") == "chunk_fn(NULL)") {
globals_and_pkgs = future::getGlobalsAndPackages(expression(chunk_fn()))
} else {
globals_and_pkgs = future::getGlobalsAndPackages(code)
}

# this is designed to capture any global stuff
vars_and_pkgs = future::getGlobalsAndPackages(quo_dotdotdot)
data_for_eval_tidy = force(vars_and_pkgs$globals)

res = cmap(.data, ~{

this_env = environment()

if(length(data_for_eval_tidy) > 0) {
for(i in 1:length(data_for_eval_tidy)) {
assign(names(data_for_eval_tidy)[i], data_for_eval_tidy[[i]], pos = this_env)
}
}

lapply(quo_dotdotdot, function(x) {
attr(x, ".Environment") = this_env
})

if(as.data.frame) {
if("grouped_df" %in% class(.x)) {
code = rlang::quo(chunk_fn(.x, !!!quo_dotdotdot))
} else {
code = rlang::quo(chunk_fn(as.data.frame(.x), !!!quo_dotdotdot))
}
} else {
code = rlang::quo(chunk_fn(.x, !!!quo_dotdotdot))
global_vars = globals_and_pkgs$globals

env = parent.frame()

done = identical(env, emptyenv()) || identical(env, globalenv())

# keep adding global variables by moving up the environment chain
while(!done) {
tmp_globals_and_pkgs = future::getGlobalsAndPackages(code, envir = env)
new_global_vars = tmp_globals_and_pkgs$globals
for (name in setdiff(names(new_global_vars), names(global_vars))) {
global_vars[[name]] <- new_global_vars[[name]]
}

# ZJ: we need both approaches. TRUST ME
# TODO better NSE at some point need dist
tryCatch({
return(rlang::eval_tidy(code))
}, error = function(e) {
as_label_code = rlang::as_label(code)
if(as_label_code == "chunk_fn(...)") {
stop(glue::glue("disk.frame has detected a syntax error in \n\n`{code}`\n\n. If you believe your syntax is correct, raise an issue at https://github.com/xiaodaigh/disk.frame with a MWE"))
} else {
# likely to be dealing with data.tables
return(eval(parse(text=as_label_code), envir = this_env))
}
})
}, lazy = TRUE)
}
return_func
}
done = identical(env, emptyenv()) || identical(env, globalenv())
env = parent.env(env)
}

globals_and_pkgs$globals = global_vars

attr(.data, "recordings") = c(attr(.data, "recordings"), list(globals_and_pkgs))

.data
})
}
Empty file added R/clapply.r
Empty file.
Loading