Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Type: Package
Package: disk.frame
Title: Larger-than-RAM Disk-Based Data Manipulation Framework
Version: 0.6.0
Date: 2022-01-31
Version: 0.7.2
Date: 2022-03-07
Authors@R: c(
person("Dai", "ZJ", email = "zhuojia.dai@gmail.com", role = c("aut", "cre")),
person("Jacky", "Poon", role = c("ctb"))
Expand Down Expand Up @@ -30,9 +30,11 @@ Imports:
bit64,
benchmarkme,
purrr (>= 0.3.2),
rlang
globals,
rlang,
arrow
Depends:
R (>= 3.4),
R (>= 4.0),
dplyr (>= 1.0.0)
Suggests:
nycflights13,
Expand All @@ -51,4 +53,4 @@ LinkingTo:
RoxygenNote: 7.1.2
Encoding: UTF-8
URL: https://diskframe.com
BugReports: https://github.com/xiaodaigh/disk.frame/issues
BugReports: https://github.com/DiskFrame/disk.frame/issues
6 changes: 6 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export(delete)
export(df_ram_size)
export(dfglm)
export(disk.frame)
export(disk.frame_to_parquet)
export(distribute)
export(evalparseglue)
export(foverlaps.disk.frame)
Expand Down Expand Up @@ -113,6 +114,7 @@ export(nchunks)
export(ncol)
export(nrow)
export(overwrite_check)
export(partition_filter)
export(quantile_df.chunk_agg.disk.frame)
export(quantile_df.collected_agg.disk.frame)
export(rbindlist.disk.frame)
Expand All @@ -138,6 +140,7 @@ import(fs)
import(fst)
import(stringr)
importFrom(Rcpp,evalCpp)
importFrom(arrow,write_parquet)
importFrom(benchmarkme,get_ram)
importFrom(bigreadr,get_split_files)
importFrom(bigreadr,split_file)
Expand Down Expand Up @@ -165,6 +168,7 @@ importFrom(dplyr,full_join)
importFrom(dplyr,glimpse)
importFrom(dplyr,group_by)
importFrom(dplyr,group_by_drop_default)
importFrom(dplyr,group_map)
importFrom(dplyr,group_vars)
importFrom(dplyr,groups)
importFrom(dplyr,inner_join)
Expand Down Expand Up @@ -195,6 +199,7 @@ importFrom(future,nbrOfWorkers)
importFrom(future,plan)
importFrom(future,sequential)
importFrom(future.apply,future_lapply)
importFrom(globals,findGlobals)
importFrom(glue,glue)
importFrom(jsonlite,fromJSON)
importFrom(jsonlite,toJSON)
Expand All @@ -215,5 +220,6 @@ importFrom(utils,capture.output)
importFrom(utils,head)
importFrom(utils,memory.limit)
importFrom(utils,tail)
importFrom(utils,type.convert)
importFrom(utils,unzip)
useDynLib(disk.frame)
16 changes: 16 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
# disk.frame 0.7.2
* Bugfixes Github 384

# disk.frame 0.7.1
* Bugfixes
* And ability to convert to Parquet

# disk.frame 0.7
* Partitioned by folder
* Updated R version to 4

# disk.frame 0.6.1
* Fixed bug with data.table syntax
* Auto detection of srckeep in group by
* Global detection for group by and summarise

# disk.frame 0.6
* Much better NSE support in disk.frame!
* removed `hard_arrange` and `hard_group_by`
Expand Down
29 changes: 27 additions & 2 deletions R/chunk_mapper.r
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,36 @@ create_chunk_mapper <- function(chunk_fn, warning_msg = NULL, as.data.frame = FA
if(!is.null(warning_msg)) {
warning(warning_msg)
}
dotdotdot = rlang::enexprs(...)

# convert any quosure to labels
for (i in seq_along(dotdotdot)) {
dotdotdot[[i]] <- dotdotdot[[i]] %>%
rlang::quo_squash()
}

args_str = mapply(function(name, val) {
rhs = deparse(val) %>%
paste0(collapse = "")
if(name != "") {
sprintf("%s=%s", name, rhs)
} else {
rhs
}
}, names(dotdotdot), dotdotdot) %>%
paste0(collapse = ", ")

if (args_str == "") {
code = parse(text="chunk_fn(.disk.frame.chunk)")[[1]]
} else {
code = parse(text=sprintf("chunk_fn(.disk.frame.chunk, %s)", args_str))[[1]]
}


# you need to use list otherwise the names will be gone
code = substitute(chunk_fn(.disk.frame.chunk, ...))
# code = substitute(chunk_fn(.disk.frame.chunk, ...))

if (paste0(deparse(code), collapse="") == "chunk_fn(NULL)") {
if (paste0(deparse(code), collapse="") == "chunk_fn(.disk.frame.chunk, NULL)") {
globals_and_pkgs = future::getGlobalsAndPackages(expression(chunk_fn()))
} else {
globals_and_pkgs = future::getGlobalsAndPackages(code)
Expand Down
55 changes: 51 additions & 4 deletions R/collect.r
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,61 @@
#' @rdname collect
collect.disk.frame <- function(x, ..., parallel = !is.null(attr(x,"recordings"))) {
cids = get_chunk_ids(x, full.names = TRUE, strip_extension = FALSE)
# obtain filters from structure
partitioned_paths_or_not = get_partition_paths(x)

if (partitioned_paths_or_not$is_partitioned) {
partitioned_paths = partitioned_paths_or_not$paths
if(length(partitioned_paths) >= 1) {
# filter the cids based on the paths
tmp = data.frame(paths = cids) %>%
mutate(dirname = dirname(paths)) %>%
inner_join(data.frame(dirname = sapply(partitioned_paths, tools::file_path_as_absolute)), by = "dirname")
cids = tmp$paths
}
}

if(nchunks(x) > 0) {
if(parallel) {
tmp = future.apply::future_lapply(cids, function(.x) {
get_chunk(x, .x, full.names = TRUE)
}, future.seed = TRUE)
tmp<-future.apply::future_lapply(cids, function(.x, meh) {
if(partitioned_paths_or_not$is_partitioned) {
dirpath = dirname(.x)
tmp2 = partitioned_paths_or_not$df %>%
mutate(fullpath = file.path(attr(x, "path") %>% tools::file_path_as_absolute(), .disk.frame.sub.path))

tmp2a = tmp2 %>%
filter(fullpath == dirpath) %>%
mutate(.check=1)

stopifnot(nrow(tmp2a) == 1)

tmp3 = get_chunk.disk.frame(x, .x, full.names = TRUE, partitioned_info = tmp2a)
return(tmp3)
} else {
return(get_chunk.disk.frame(x, .x, full.names = TRUE))
}
}, future.seed = NULL)
return(rbindlist(tmp))
} else {
purrr::map_dfr(cids, ~get_chunk(x, .x, full.names = TRUE))
tmp<-lapply(cids, function(.x, meh) {
if(partitioned_paths_or_not$is_partitioned) {
dirpath = dirname(.x)
tmp2 = partitioned_paths_or_not$df %>%
mutate(fullpath = file.path(attr(x, "path") %>% tools::file_path_as_absolute(), .disk.frame.sub.path))

tmp2a = tmp2 %>%
filter(fullpath == dirpath) %>%
mutate(.check=1)

stopifnot(nrow(tmp2a) == 1)

tmp3 = get_chunk.disk.frame(x, .x, full.names = TRUE, partitioned_info = tmp2a)
return(tmp3)
} else {
return(get_chunk.disk.frame(x, .x, full.names = TRUE))
}
})
return(rbindlist(tmp))
}
} else {
data.table()
Expand Down
53 changes: 50 additions & 3 deletions R/collect.summarized_disk.frame.r
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#' @importFrom data.table data.table as.data.table
#' @importFrom purrr map_dfr
#' @importFrom dplyr collect select mutate
#' @importFrom globals findGlobals
#' @return collect return a data.frame/data.table
#' @examples
#' cars.df = as.disk.frame(cars)
Expand All @@ -26,7 +27,36 @@
collect.summarized_disk.frame <-
function(x, ..., parallel = !is.null(attr(x, "recordings"))) {
dotdotdot <- attr(x, 'summarize_code')
group_by_vars = attr(x, "group_by_cols")

# look at the group by and summaries codes and figure out which columns need to be
# srckeep
df_to_find_cols = fst::read_fst(get_chunk_ids(x, full.names = TRUE)[1], from=1, to=1)

cols_in_summ = lapply(dotdotdot, function(one) {
globals::findGlobals(one, envir = list2env(df_to_find_cols, parent=globalenv()))
}) %>% unlist %>% unique

cols_in_group_by = lapply(group_by_vars, function(one) {
globals::findGlobals(one, envir = list2env(df_to_find_cols, parent=globalenv()))
}) %>% unlist %>% unique

cols_used = c(cols_in_summ, cols_in_group_by) %>% unique
src_keep_cols = intersect(names(df_to_find_cols), cols_used)

# are there any variables used in the group by or summarise that is not present in the original data?
# if yes then that indicates this could be more complicated e.g. a new var was created with mutate
extra_vars = setdiff(cols_used, names(df_to_find_cols))
if(length(extra_vars) > 0) {
warning(sprintf(
"These columns that appear in the group-by and summarise does not appear in the original data set: %s. This set of action is too hard for disk.frame to figure out the `srckeep` automatically, you must do the `srckeep` manually."
, paste0(extra_vars, collapse = ", ")))
} else {
x = srckeep(x, src_keep_cols)
}



# make a copy
dotdotdot_chunk_agg <- dotdotdot
dotdotdot_collected_agg <- dotdotdot
Expand Down Expand Up @@ -74,8 +104,18 @@ collect.summarized_disk.frame <-
}
}

group_by_globals_list = attr(x_as.disk.frame, "group_by_globals_and_pkgs")$globals

if(is.null(group_by_globals_list)) {
eval_clos = parent.frame()
} else {
eval_clos = list2env(group_by_globals_list, parent=parent.frame())
}

# TODO add appropriate environment
tmp_df = eval(first_stage_code)
# tmp_df = eval(first_stage_code, envir=environment(), enclos = eval_clos
tmp_df = eval(first_stage_code, group_by_globals_list)


n_summ_args = length(dotdotdot_chunk_agg)

Expand Down Expand Up @@ -103,15 +143,22 @@ collect.summarized_disk.frame <-
)
}

tmp2 = collect(eval(parse(text = chunk_summ_code_str)))
summarize_globals_list = attr(x_as.disk.frame, "summarize_globals_and_pkgs")$globals

tmp2 = collect(eval(parse(text = chunk_summ_code_str), envir = summarize_globals_list))

second_stage_code = eval(parse(text = sprintf(
"quote(group_by(tmp2, %s))", paste0(rep_len("NULL", n_grp_args), collapse = ", ")
)))

if (n_grp_args >= 1) {
for (i in 1:n_grp_args) {
second_stage_code[[i + 2]] = group_by_vars[[i]]
second_stage_code[[i + 2]] = group_by_vars[[i]] %>%
deparse() %>%
paste0(collapse="") %>%
sprintf("`%s`", .) %>%
parse(text=.) %>%
.[[1]]
}
}

Expand Down
37 changes: 1 addition & 36 deletions R/data.table.r
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,7 @@

code = substitute(chunk[...])

# sometimes the arguments could be empty
# in a recent version of globals that would cause a fail
# to avoid the fail remove them from the test
#dotdotdot_for_find_global = dotdotdot[!sapply(sapply(dotdotdot, as.character), function(x) all(unlist(x) == ""))]

#ag = globals::findGlobals(dotdotdot_for_find_global)
#ag = setdiff(ag, "") # "" can cause issues with future # this line no longer needed


# you need to use list otherwise the names will be gone
if (paste0(deparse(code), collapse="") == "chunk_fn(NULL)") {
globals_and_pkgs = future::getGlobalsAndPackages(expression(chunk_fn()))
} else {
globals_and_pkgs = future::getGlobalsAndPackages(code)
}


global_vars = globals_and_pkgs$globals

env = parent.frame()

done = identical(env, emptyenv()) || identical(env, globalenv())

# keep adding global variables by moving up the environment chain
while(!done) {
tmp_globals_and_pkgs = future::getGlobalsAndPackages(code, envir = env)
new_global_vars = tmp_globals_and_pkgs$globals
for (name in setdiff(names(new_global_vars), names(global_vars))) {
global_vars[[name]] <- new_global_vars[[name]]
}

done = identical(env, emptyenv()) || identical(env, globalenv())
env = parent.env(env)
}

globals_and_pkgs$globals = global_vars
globals_and_pkgs = find_globals_recursively(code, parent.frame())

res = future.apply::future_lapply(get_chunk_ids(df, full.names = TRUE), function(chunk_id) {
#res = lapply(get_chunk_ids(df, full.names = TRUE), function(chunk_id) {
Expand Down
35 changes: 35 additions & 0 deletions R/disk.frame-to-parquet.r
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#' A function to convert a disk.frame to parquet format
#' @importFrom arrow write_parquet
#' @export
#' @param df a disk.frame or a path to a disk.frame
#' @param outdir the path to save the parquet files
disk.frame_to_parquet <- function(df, outdir) {
if("disk.frame" %in% class(df)) {
path = attr(df, "path")
} else {
path = df
}

path = normalizePath(path)
fst_files = normalizePath(list.files(path, "fst$", full.names = TRUE, recursive=TRUE))

if(!fs::dir_exists(outdir)) {
fs::dir_create(outdir)
}


future.apply::future_lapply(fst_files, function(fst_file) {
file_name = basename(fst_file)
file_name = paste0(stringr::str_sub(file_name, 1, nchar(file_name)-4), ".parquet")
path_name = normalizePath(dirname(fst_file))

# remove base directory from path
path_name = stringr::str_sub(path_name, nchar(path)+1)

if(!fs::dir_exists(file.path(outdir, path_name))) {
fs::dir_create(file.path(outdir, path_name))
}

arrow::write_parquet(fst::read_fst(fst_file), file.path(outdir, path_name, file_name))
})
}
1 change: 1 addition & 0 deletions R/dplyr_verbs.r
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ chunk_ungroup = create_chunk_mapper(dplyr::ungroup)
#ungroup.disk.frame( < - create_dplyr_mapper(dplyr::ungroup, , warning_msg="`ungroup.disk.frame` is now deprecated. Please use `chunk_ungroup` instead. This is in preparation for a more powerful `group_by` framework")



#' @export
#' @rdname dplyr_verbs
glimpse.disk.frame <- function(.data, ...) {
Expand Down
Loading