diff --git a/DESCRIPTION b/DESCRIPTION index 1301c323..51eedc85 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,8 +1,8 @@ Type: Package Package: disk.frame Title: Larger-than-RAM Disk-Based Data Manipulation Framework -Version: 0.6.0 -Date: 2022-01-31 +Version: 0.7.2 +Date: 2022-03-07 Authors@R: c( person("Dai", "ZJ", email = "zhuojia.dai@gmail.com", role = c("aut", "cre")), person("Jacky", "Poon", role = c("ctb")) @@ -30,9 +30,11 @@ Imports: bit64, benchmarkme, purrr (>= 0.3.2), - rlang + globals, + rlang, + arrow Depends: - R (>= 3.4), + R (>= 4.0), dplyr (>= 1.0.0) Suggests: nycflights13, @@ -51,4 +53,4 @@ LinkingTo: RoxygenNote: 7.1.2 Encoding: UTF-8 URL: https://diskframe.com -BugReports: https://github.com/xiaodaigh/disk.frame/issues +BugReports: https://github.com/DiskFrame/disk.frame/issues diff --git a/NAMESPACE b/NAMESPACE index 2def4912..03a29bef 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -82,6 +82,7 @@ export(delete) export(df_ram_size) export(dfglm) export(disk.frame) +export(disk.frame_to_parquet) export(distribute) export(evalparseglue) export(foverlaps.disk.frame) @@ -113,6 +114,7 @@ export(nchunks) export(ncol) export(nrow) export(overwrite_check) +export(partition_filter) export(quantile_df.chunk_agg.disk.frame) export(quantile_df.collected_agg.disk.frame) export(rbindlist.disk.frame) @@ -138,6 +140,7 @@ import(fs) import(fst) import(stringr) importFrom(Rcpp,evalCpp) +importFrom(arrow,write_parquet) importFrom(benchmarkme,get_ram) importFrom(bigreadr,get_split_files) importFrom(bigreadr,split_file) @@ -165,6 +168,7 @@ importFrom(dplyr,full_join) importFrom(dplyr,glimpse) importFrom(dplyr,group_by) importFrom(dplyr,group_by_drop_default) +importFrom(dplyr,group_map) importFrom(dplyr,group_vars) importFrom(dplyr,groups) importFrom(dplyr,inner_join) @@ -195,6 +199,7 @@ importFrom(future,nbrOfWorkers) importFrom(future,plan) importFrom(future,sequential) importFrom(future.apply,future_lapply) +importFrom(globals,findGlobals) importFrom(glue,glue) importFrom(jsonlite,fromJSON) importFrom(jsonlite,toJSON) @@ -215,5 +220,6 @@ importFrom(utils,capture.output) importFrom(utils,head) importFrom(utils,memory.limit) importFrom(utils,tail) +importFrom(utils,type.convert) importFrom(utils,unzip) useDynLib(disk.frame) diff --git a/NEWS.md b/NEWS.md index e075c6da..501a74ab 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,19 @@ +# disk.frame 0.7.2 +* Bugfixes Github 384 + +# disk.frame 0.7.1 +* Bugfixes +* And ability to convert to Parquet + +# disk.frame 0.7 +* Partitioned by folder +* Updated R version to 4 + +# disk.frame 0.6.1 +* Fixed bug with data.table syntax +* Auto detection of srckeep in group by +* Global detection for group by and summarise + # disk.frame 0.6 * Much better NSE support in disk.frame! * removed `hard_arrange` and `hard_group_by` diff --git a/R/chunk_mapper.r b/R/chunk_mapper.r index 1b0d3ff7..dc6b0556 100644 --- a/R/chunk_mapper.r +++ b/R/chunk_mapper.r @@ -41,11 +41,36 @@ create_chunk_mapper <- function(chunk_fn, warning_msg = NULL, as.data.frame = FA if(!is.null(warning_msg)) { warning(warning_msg) } + dotdotdot = rlang::enexprs(...) + + # convert any quosure to labels + for (i in seq_along(dotdotdot)) { + dotdotdot[[i]] <- dotdotdot[[i]] %>% + rlang::quo_squash() + } + + args_str = mapply(function(name, val) { + rhs = deparse(val) %>% + paste0(collapse = "") + if(name != "") { + sprintf("%s=%s", name, rhs) + } else { + rhs + } + }, names(dotdotdot), dotdotdot) %>% + paste0(collapse = ", ") + + if (args_str == "") { + code = parse(text="chunk_fn(.disk.frame.chunk)")[[1]] + } else { + code = parse(text=sprintf("chunk_fn(.disk.frame.chunk, %s)", args_str))[[1]] + } + # you need to use list otherwise the names will be gone - code = substitute(chunk_fn(.disk.frame.chunk, ...)) + # code = substitute(chunk_fn(.disk.frame.chunk, ...)) - if (paste0(deparse(code), collapse="") == "chunk_fn(NULL)") { + if (paste0(deparse(code), collapse="") == "chunk_fn(.disk.frame.chunk, NULL)") { globals_and_pkgs = future::getGlobalsAndPackages(expression(chunk_fn())) } else { globals_and_pkgs = future::getGlobalsAndPackages(code) diff --git a/R/collect.r b/R/collect.r index cf5add15..98347581 100644 --- a/R/collect.r +++ b/R/collect.r @@ -25,14 +25,61 @@ #' @rdname collect collect.disk.frame <- function(x, ..., parallel = !is.null(attr(x,"recordings"))) { cids = get_chunk_ids(x, full.names = TRUE, strip_extension = FALSE) + # obtain filters from structure + partitioned_paths_or_not = get_partition_paths(x) + + if (partitioned_paths_or_not$is_partitioned) { + partitioned_paths = partitioned_paths_or_not$paths + if(length(partitioned_paths) >= 1) { + # filter the cids based on the paths + tmp = data.frame(paths = cids) %>% + mutate(dirname = dirname(paths)) %>% + inner_join(data.frame(dirname = sapply(partitioned_paths, tools::file_path_as_absolute)), by = "dirname") + cids = tmp$paths + } + } + if(nchunks(x) > 0) { if(parallel) { - tmp = future.apply::future_lapply(cids, function(.x) { - get_chunk(x, .x, full.names = TRUE) - }, future.seed = TRUE) + tmp<-future.apply::future_lapply(cids, function(.x, meh) { + if(partitioned_paths_or_not$is_partitioned) { + dirpath = dirname(.x) + tmp2 = partitioned_paths_or_not$df %>% + mutate(fullpath = file.path(attr(x, "path") %>% tools::file_path_as_absolute(), .disk.frame.sub.path)) + + tmp2a = tmp2 %>% + filter(fullpath == dirpath) %>% + mutate(.check=1) + + stopifnot(nrow(tmp2a) == 1) + + tmp3 = get_chunk.disk.frame(x, .x, full.names = TRUE, partitioned_info = tmp2a) + return(tmp3) + } else { + return(get_chunk.disk.frame(x, .x, full.names = TRUE)) + } + }, future.seed = NULL) return(rbindlist(tmp)) } else { - purrr::map_dfr(cids, ~get_chunk(x, .x, full.names = TRUE)) + tmp<-lapply(cids, function(.x, meh) { + if(partitioned_paths_or_not$is_partitioned) { + dirpath = dirname(.x) + tmp2 = partitioned_paths_or_not$df %>% + mutate(fullpath = file.path(attr(x, "path") %>% tools::file_path_as_absolute(), .disk.frame.sub.path)) + + tmp2a = tmp2 %>% + filter(fullpath == dirpath) %>% + mutate(.check=1) + + stopifnot(nrow(tmp2a) == 1) + + tmp3 = get_chunk.disk.frame(x, .x, full.names = TRUE, partitioned_info = tmp2a) + return(tmp3) + } else { + return(get_chunk.disk.frame(x, .x, full.names = TRUE)) + } + }) + return(rbindlist(tmp)) } } else { data.table() diff --git a/R/collect.summarized_disk.frame.r b/R/collect.summarized_disk.frame.r index edd686a4..58c7a500 100644 --- a/R/collect.summarized_disk.frame.r +++ b/R/collect.summarized_disk.frame.r @@ -13,6 +13,7 @@ #' @importFrom data.table data.table as.data.table #' @importFrom purrr map_dfr #' @importFrom dplyr collect select mutate +#' @importFrom globals findGlobals #' @return collect return a data.frame/data.table #' @examples #' cars.df = as.disk.frame(cars) @@ -26,7 +27,36 @@ collect.summarized_disk.frame <- function(x, ..., parallel = !is.null(attr(x, "recordings"))) { dotdotdot <- attr(x, 'summarize_code') + group_by_vars = attr(x, "group_by_cols") + + # look at the group by and summaries codes and figure out which columns need to be + # srckeep + df_to_find_cols = fst::read_fst(get_chunk_ids(x, full.names = TRUE)[1], from=1, to=1) + + cols_in_summ = lapply(dotdotdot, function(one) { + globals::findGlobals(one, envir = list2env(df_to_find_cols, parent=globalenv())) + }) %>% unlist %>% unique + + cols_in_group_by = lapply(group_by_vars, function(one) { + globals::findGlobals(one, envir = list2env(df_to_find_cols, parent=globalenv())) + }) %>% unlist %>% unique + + cols_used = c(cols_in_summ, cols_in_group_by) %>% unique + src_keep_cols = intersect(names(df_to_find_cols), cols_used) + + # are there any variables used in the group by or summarise that is not present in the original data? + # if yes then that indicates this could be more complicated e.g. a new var was created with mutate + extra_vars = setdiff(cols_used, names(df_to_find_cols)) + if(length(extra_vars) > 0) { + warning(sprintf( + "These columns that appear in the group-by and summarise does not appear in the original data set: %s. This set of action is too hard for disk.frame to figure out the `srckeep` automatically, you must do the `srckeep` manually." + , paste0(extra_vars, collapse = ", "))) + } else { + x = srckeep(x, src_keep_cols) + } + + # make a copy dotdotdot_chunk_agg <- dotdotdot dotdotdot_collected_agg <- dotdotdot @@ -74,8 +104,18 @@ collect.summarized_disk.frame <- } } + group_by_globals_list = attr(x_as.disk.frame, "group_by_globals_and_pkgs")$globals + + if(is.null(group_by_globals_list)) { + eval_clos = parent.frame() + } else { + eval_clos = list2env(group_by_globals_list, parent=parent.frame()) + } + # TODO add appropriate environment - tmp_df = eval(first_stage_code) + # tmp_df = eval(first_stage_code, envir=environment(), enclos = eval_clos + tmp_df = eval(first_stage_code, group_by_globals_list) + n_summ_args = length(dotdotdot_chunk_agg) @@ -103,7 +143,9 @@ collect.summarized_disk.frame <- ) } - tmp2 = collect(eval(parse(text = chunk_summ_code_str))) + summarize_globals_list = attr(x_as.disk.frame, "summarize_globals_and_pkgs")$globals + + tmp2 = collect(eval(parse(text = chunk_summ_code_str), envir = summarize_globals_list)) second_stage_code = eval(parse(text = sprintf( "quote(group_by(tmp2, %s))", paste0(rep_len("NULL", n_grp_args), collapse = ", ") @@ -111,7 +153,12 @@ collect.summarized_disk.frame <- if (n_grp_args >= 1) { for (i in 1:n_grp_args) { - second_stage_code[[i + 2]] = group_by_vars[[i]] + second_stage_code[[i + 2]] = group_by_vars[[i]] %>% + deparse() %>% + paste0(collapse="") %>% + sprintf("`%s`", .) %>% + parse(text=.) %>% + .[[1]] } } diff --git a/R/data.table.r b/R/data.table.r index 0d12184f..9d648121 100644 --- a/R/data.table.r +++ b/R/data.table.r @@ -24,42 +24,7 @@ code = substitute(chunk[...]) - # sometimes the arguments could be empty - # in a recent version of globals that would cause a fail - # to avoid the fail remove them from the test - #dotdotdot_for_find_global = dotdotdot[!sapply(sapply(dotdotdot, as.character), function(x) all(unlist(x) == ""))] - - #ag = globals::findGlobals(dotdotdot_for_find_global) - #ag = setdiff(ag, "") # "" can cause issues with future # this line no longer needed - - - # you need to use list otherwise the names will be gone - if (paste0(deparse(code), collapse="") == "chunk_fn(NULL)") { - globals_and_pkgs = future::getGlobalsAndPackages(expression(chunk_fn())) - } else { - globals_and_pkgs = future::getGlobalsAndPackages(code) - } - - - global_vars = globals_and_pkgs$globals - - env = parent.frame() - - done = identical(env, emptyenv()) || identical(env, globalenv()) - - # keep adding global variables by moving up the environment chain - while(!done) { - tmp_globals_and_pkgs = future::getGlobalsAndPackages(code, envir = env) - new_global_vars = tmp_globals_and_pkgs$globals - for (name in setdiff(names(new_global_vars), names(global_vars))) { - global_vars[[name]] <- new_global_vars[[name]] - } - - done = identical(env, emptyenv()) || identical(env, globalenv()) - env = parent.env(env) - } - - globals_and_pkgs$globals = global_vars + globals_and_pkgs = find_globals_recursively(code, parent.frame()) res = future.apply::future_lapply(get_chunk_ids(df, full.names = TRUE), function(chunk_id) { #res = lapply(get_chunk_ids(df, full.names = TRUE), function(chunk_id) { diff --git a/R/disk.frame-to-parquet.r b/R/disk.frame-to-parquet.r new file mode 100644 index 00000000..d9ec2e45 --- /dev/null +++ b/R/disk.frame-to-parquet.r @@ -0,0 +1,35 @@ +#' A function to convert a disk.frame to parquet format +#' @importFrom arrow write_parquet +#' @export +#' @param df a disk.frame or a path to a disk.frame +#' @param outdir the path to save the parquet files +disk.frame_to_parquet <- function(df, outdir) { + if("disk.frame" %in% class(df)) { + path = attr(df, "path") + } else { + path = df + } + + path = normalizePath(path) + fst_files = normalizePath(list.files(path, "fst$", full.names = TRUE, recursive=TRUE)) + + if(!fs::dir_exists(outdir)) { + fs::dir_create(outdir) + } + + + future.apply::future_lapply(fst_files, function(fst_file) { + file_name = basename(fst_file) + file_name = paste0(stringr::str_sub(file_name, 1, nchar(file_name)-4), ".parquet") + path_name = normalizePath(dirname(fst_file)) + + # remove base directory from path + path_name = stringr::str_sub(path_name, nchar(path)+1) + + if(!fs::dir_exists(file.path(outdir, path_name))) { + fs::dir_create(file.path(outdir, path_name)) + } + + arrow::write_parquet(fst::read_fst(fst_file), file.path(outdir, path_name, file_name)) + }) +} \ No newline at end of file diff --git a/R/dplyr_verbs.r b/R/dplyr_verbs.r index 285c678f..49586774 100644 --- a/R/dplyr_verbs.r +++ b/R/dplyr_verbs.r @@ -137,6 +137,7 @@ chunk_ungroup = create_chunk_mapper(dplyr::ungroup) #ungroup.disk.frame( < - create_dplyr_mapper(dplyr::ungroup, , warning_msg="`ungroup.disk.frame` is now deprecated. Please use `chunk_ungroup` instead. This is in preparation for a more powerful `group_by` framework") + #' @export #' @rdname dplyr_verbs glimpse.disk.frame <- function(.data, ...) { diff --git a/R/get_chunk.r b/R/get_chunk.r index 01c06e61..c0125985 100644 --- a/R/get_chunk.r +++ b/R/get_chunk.r @@ -4,6 +4,7 @@ #' @param keep the columns to keep #' @param full.names whether n is the full path to the chunks or just a relative path file name. Ignored if n is numeric #' @param ... passed to fst::read_fst or whichever read function is used in the backend +#' @param partitioned_info for internal use only. It's a data.frame used to help with filtering by partitions #' @export #' @examples #' cars.df = as.disk.frame(cars, nchunks = 2) @@ -24,7 +25,7 @@ get_chunk <- function(...) { #' @rdname get_chunk #' @importFrom fst read_fst #' @export -get_chunk.disk.frame <- function(df, n, keep = NULL, full.names = FALSE, ...) { +get_chunk.disk.frame <- function(df, n, keep = NULL, full.names = FALSE, ..., partitioned_info=NULL) { stopifnot("disk.frame" %in% class(df)) # keep_chunks = attr(df, "keep_chunks", exact=TRUE) @@ -44,9 +45,10 @@ get_chunk.disk.frame <- function(df, n, keep = NULL, full.names = FALSE, ...) { keep1_vars = paste0(keep1, collapse = ", ") keep_no_good_vars = setdiff(keep, keep1) %>% paste0(collapse = ", ") stop( - glue::glue( - "This disk.frame has a srckeep containing these variables {keep1_vars}. - You are trying to keep {keep_no_good_vars}, which are not available.")) + sprintf( + "This disk.frame has a `srckeep` containing these columns: `%s`. + You are trying to keep `%s`, which are not available.", + paste0(keep1_vars, collapse=", "), paste0(keep_no_good_vars, collapse=", "))) } keep = intersect(keep1, keep) if (!all(keep %in% keep1)) { @@ -78,19 +80,37 @@ get_chunk.disk.frame <- function(df, n, keep = NULL, full.names = FALSE, ...) { return(notbl) } - if (is.null(recordings)) { if(typeof(keep)=="closure") { - fst::read_fst(filename, as.data.table = TRUE,...) + tmp = fst::read_fst(filename, as.data.table = TRUE,...) } else { - fst::read_fst(filename, columns = keep, as.data.table = TRUE,...) + tmp = fst::read_fst(filename, columns = keep, as.data.table = TRUE,...) + } + + if(!is.null(partitioned_info)) { + res = tmp %>% + mutate(.check=1) %>% + full_join(partitioned_info %>% mutate(.check=1), by=".check") %>% + select(-.check, -fullpath, -.disk.frame.sub.path) + return(res) + } else{ + return(tmp) } } else { - if(typeof(keep)!="closure") { - play(fst::read_fst(filename, as.data.table = TRUE,...), recordings) + if(typeof(keep)=="closure") { + tmp_df_input = fst::read_fst(filename, as.data.table = TRUE,...) } else { - play(fst::read_fst(filename, columns = keep, as.data.table = TRUE,...), recordings) - + tmp_df_input = fst::read_fst(filename, columns = keep, as.data.table = TRUE,...) + } + + if(!is.null(partitioned_info)) { + res = tmp_df_input %>% + mutate(.check=1) %>% + full_join(partitioned_info %>% mutate(.check=1), by=".check") %>% + select(-.check, -fullpath, -.disk.frame.sub.path) + return(play(res, recordings)) + } else{ + return(play(tmp_df_input, recordings)) } } } diff --git a/R/get_chunk_ids.r b/R/get_chunk_ids.r index bbb3890a..9a023e90 100644 --- a/R/get_chunk_ids.r +++ b/R/get_chunk_ids.r @@ -20,19 +20,19 @@ #' # clean up cars.df #' delete(cars.df) get_chunk_ids <- function(df, ..., full.names = FALSE, strip_extension = TRUE) { - lf = list.files(attr(df,"path"), full.names = full.names, ...) + stopifnot("disk.frame" %in% class(df)) + + lf = list.files(attr(df,"path"), full.names = full.names, ..., recursive = TRUE) if(full.names) { return(lf) } - purrr::map_chr(lf, ~{ - tmp = stringr::str_split(.x,stringr::fixed("."), simplify = TRUE) - l = length(tmp) - if(l == 1) { - return(tmp) - } else if(strip_extension) { - paste0(tmp[-l], collapse="") - } else { - .x + + # strip out the path or file name if required + sapply(lf, function(path) { + tmp = basename(path) + if (strip_extension) { + tmp = tools::file_path_sans_ext(tmp) } + return(tmp) }) } diff --git a/R/get_partition.r b/R/get_partition.r new file mode 100644 index 00000000..b43e68f7 --- /dev/null +++ b/R/get_partition.r @@ -0,0 +1,71 @@ +#' Turn a string of the form /partion1=val/partion2=val2 into data.frame +#' @param path_strs The paths in string form to break into partition format +split_string_into_df <- function(path_strs) { + paths = dirname(path_strs) %>% unique + list_of_partitions = stringr::str_split(paths, "/") + + tmp = mapply(function(partition, path) { + part_val = stringr::str_split(partition, "=") + tmp = lapply(part_val, function(part_val, lvl) { + tmp = data.frame(partition = part_val[2]) + names(tmp) = part_val[1] + tmp + }) %>% + do.call(cbind, .) + + tmp$.disk.frame.sub.path = path + + tmp + }, list_of_partitions, paths, SIMPLIFY = FALSE) %>% data.table::rbindlist() + + tmp +} + +if(F) { + df = disk.frame("C:/temp/ok.df") %>% + filter(partition == 1) +} + +#' Get the partitioning structure of a folder +#' @param df a disk.frame whose paths will be used to determine if it's +#' folder-partitioned disk.frame +#' @importFrom utils type.convert +get_partition_paths <- function(df) { + stopifnot("disk.frame" %in% class(df)) + path = tools::file_path_as_absolute(attr(df, "path")) + + allowed_paths = path + + meta_path = file.path(path, ".metadata") + + is_partitioned = FALSE + df_of_partitions = NULL + + # if it's a partitioned structure allow more search paths than root + if (length(setdiff(list.dirs(path, recursive = FALSE), meta_path)) >= 1) { + lf = list.files(path, full.names = FALSE, pattern="fst", recursive=TRUE) + + # create a data.frame of the paths so it can be filtered + df_of_partitions = split_string_into_df(lf) + # infer the types + df_of_partitions = utils::type.convert(df_of_partitions, as.is=TRUE) + + # if there is a filter operation, filter the above to figure out + allowed_paths = df_of_partitions$.disk.frame.sub.path + + # filter for some paths if necessary + partition_filter_info = attr(df, "partition_filter") + if (!is.null(partition_filter_info)) { + # apply filter + df_of_partitions = eval(partition_filter_info$expr, list(dataframe=df_of_partitions)) + + allowed_paths = df_of_partitions$.disk.frame.sub.path + } + is_partitioned = TRUE + } + + # now go through the allowed paths + list(paths=file.path(attr(df, "path"), allowed_paths), is_partitioned=is_partitioned, df = df_of_partitions) + + # TODO check all files sit within the same structure +} diff --git a/R/is_disk.frame.r b/R/is_disk.frame.r index 542bd38b..48f538da 100644 --- a/R/is_disk.frame.r +++ b/R/is_disk.frame.r @@ -10,24 +10,30 @@ #' # clean up cars.df #' delete(cars.df) is_disk.frame <- function(df) { - ##browser if("disk.frame" %in% class(df)) { df = attr(df, "path", exact=TRUE) } else if(!"character" %in% class(df)) { # character then check the path return(FALSE) } - files <- fs::dir_ls(df, type="file", all = TRUE) + files <- fs::dir_ls(df, type="file", all = TRUE) # if all files are fst if(length(files)>0) { if(any(purrr::map_lgl(files, ~length(grep(glob2rx("*.fst"), .x)) == 0))) { - # some of the fiels do not have a .fst extension + # some of the files do not have a .fst extension return(FALSE) } } dirs = fs::dir_ls(df, type="directory", all = TRUE) if(length(dirs) > 1) { + # are the directories of this form name=val + split_dirs = stringr::str_split(basename(setdiff(dirs, file.path(df, ".metadata"))), "=") + if (all(sapply(split_dirs, length) == 2)) { + # all folder are + # TODO check the folder recursively + return(TRUE) + } return(FALSE) } else if(length(dirs) == 1) { if(substr(dirs, nchar(dirs)-8,nchar(dirs)) != ".metadata") { diff --git a/R/names.r b/R/names.r index 05d047da..9f4393a3 100644 --- a/R/names.r +++ b/R/names.r @@ -25,17 +25,17 @@ names.disk.frame <- function(x, ...) { #' @export colnames.disk.frame <- function(x, ...) { res = attr(x, "path", exact=TRUE) %>% - list.files(full.names = TRUE) + list.files(full.names = TRUE, recursive=TRUE, pattern = "fst") - if(is.null(attr(x, "recordings"))) { + # if(is.null(attr(x, "recordings"))) { if(length(res) == 0) { return(vector("character")) } return(fst::metadata_fst(res[1])$columnNames) - } else { - tiny_example_data.frame = get_chunk(x, 1, from=1, to=1) - return(colnames(tiny_example_data.frame)) - } + # } else { + # tiny_example_data.frame = get_chunk(x, 1, from=1, to=1) + # return(colnames(tiny_example_data.frame)) + # } } diff --git a/R/nchunks.r b/R/nchunks.r index d40eb6e6..17c89888 100644 --- a/R/nchunks.r +++ b/R/nchunks.r @@ -36,7 +36,7 @@ nchunks.disk.frame <- function(df, skip.ready.check = FALSE, ...) { #if(!skip.ready.check) stopifnot(is_ready(df)) fpath <- attr(df,"path", exact=TRUE) if(is.dir.disk.frame(df)) { - return(length(fs::dir_ls(fpath, type="file"))) + return(length(list.files(fpath, pattern="fst", recursive = TRUE))) } else { return(1) } diff --git a/R/ncol-nrow.r b/R/ncol-nrow.r index 1addc2dc..3f9e17c2 100644 --- a/R/ncol-nrow.r +++ b/R/ncol-nrow.r @@ -29,7 +29,7 @@ nrow.disk.frame <- function(df, ...) { stopifnot(is_ready(df)) path1 <- attr(df,"path", exact=TRUE) if(dir.exists(path1)) { - path2 <- list.files(path1,full.names = TRUE) + path2 <- list.files(path1, full.names = TRUE, recursive=TRUE, pattern = "fst") if(length(path2) == 0) { return(0) } diff --git a/R/one-stage-verbs.R b/R/one-stage-verbs.R index 27f62f2f..39d9b350 100644 --- a/R/one-stage-verbs.R +++ b/R/one-stage-verbs.R @@ -209,19 +209,31 @@ IQR_df.collected_agg.disk.frame <- function(listx, ...) { #' @rdname group_by #' @export summarise.grouped_disk.frame <- function(.data, ...) { + class(.data) <- c("summarized_disk.frame", "disk.frame") + # get all components of the summarise dotdotdot = rlang::enexprs(...) # convert any quosure to labels for (i in seq_along(dotdotdot)) { - if("quosure" %in% class(dotdotdot[[i]])) { - dotdotdot[[i]] <- rlang::sym(rlang::as_label(dotdotdot[[i]])) - } + dotdotdot[[i]] <- rlang::quo_squash(dotdotdot[[i]]) } - class(.data) <- c("summarized_disk.frame", "disk.frame") attr(.data, "summarize_code") = dotdotdot + # detect any global variables + args_str = sapply(dotdotdot, function(code) { + deparse(code) %>% + paste0(collapse="") + }) %>% paste(collapse = ", ") + + + attr(.data, "summarize_globals_and_pkgs") = + find_globals_recursively( + parse(text=sprintf("list(%s)", args_str))[[1]], + parent.frame() + ) + return(.data) } @@ -246,22 +258,28 @@ summarize.grouped_disk.frame = summarise.grouped_disk.frame #' @rdname group_by # learning from https://docs.dask.org/en/latest/dataframe-groupby.html group_by.disk.frame <- function(.data, ..., .add = FALSE, .drop = stop("disk.frame does not support `.drop` in `group_by` at this stage")) { - class(.data) <- c("grouped_disk.frame", "disk.frame") - # using rlang is a neccesary evil here as I need to deal with !!! that is supported by group_by etc + # using rlang is a necessary evil here as I need to deal with !!! that is supported by group_by etc group_by_cols = rlang::enexprs(...) # convert any quosure to labels for (i in seq_along(group_by_cols)) { - if("quosure" %in% class(group_by_cols[[i]])) { - group_by_cols[[i]] <- rlang::sym(rlang::as_label(group_by_cols[[i]])) - } + group_by_cols[[i]] <- group_by_cols[[i]] %>% + rlang::quo_squash() } - attr(.data, "group_by_cols") = group_by_cols + # detect any global variables + args_str = sapply(group_by_cols, function(code) { + deparse(code) %>% + paste0(collapse="") + }) %>% paste(collapse = ", ") + + + attr(.data, "group_by_globals_and_pkgs") = find_globals_recursively(parse(text=sprintf("list(%s)", args_str))[[1]], parent.frame()) + .data } @@ -270,19 +288,33 @@ group_by.disk.frame <- function(.data, ..., .add = FALSE, .drop = stop("disk.fra #' @importFrom dplyr summarize #' @rdname group_by summarize.disk.frame <- function(.data, ...) { + + class(.data) <- c("summarized_disk.frame", "disk.frame") + # get all components of the summarise dotdotdot = rlang::enexprs(...) # convert any quosure to labels for (i in seq_along(dotdotdot)) { - if("quosure" %in% class(dotdotdot[[i]])) { - dotdotdot[[i]] <- rlang::sym(rlang::as_label(dotdotdot[[i]])) - } + dotdotdot[[i]] <- dotdotdot[[i]] %>% + rlang::quo_squash() } - class(.data) <- c("summarized_disk.frame", "disk.frame") attr(.data, "summarize_code") = dotdotdot + # detect any global variables + args_str = sapply(dotdotdot, function(code) { + deparse(code) %>% + paste0(collapse="") + }) %>% paste(collapse = ", ") + + + attr(.data, "summarize_globals_and_pkgs") = + find_globals_recursively( + parse(text=sprintf("list(%s)", args_str))[[1]], + parent.frame() + ) + return(.data) } diff --git a/R/overwrite_check.r b/R/overwrite_check.r index f0651315..eabcf92b 100644 --- a/R/overwrite_check.r +++ b/R/overwrite_check.r @@ -15,7 +15,6 @@ #' # clean up #' fs::dir_delete(tf) overwrite_check <- function(outdir, overwrite) { - ##browser if (is.null(outdir)) { warning("outdir is NULL; no overwrite check is performed") return(NULL) @@ -30,7 +29,7 @@ overwrite_check <- function(outdir, overwrite) { fs::dir_delete(outdir) }, error = function(e) { message(e) - stop(glue::glue("Failed to delete the directory {outdir} in preparation for overwrite, this could be due to many reason and may be a genuine bug. Firstly, though, please ensure you do not have the folder open by Explorer (Windows) or other file management systems")) + stop(glue::glue("Failed to delete the directory {outdir} in preparation for overwrite, this could be due to many reasons and may be a genuine bug. Firstly, though, please ensure you do not have the folder open by Explorer (Windows) or other file management systems")) }) } diff --git a/R/partition-filter.r b/R/partition-filter.r new file mode 100644 index 00000000..6d754f2e --- /dev/null +++ b/R/partition-filter.r @@ -0,0 +1,13 @@ +#' Filter the dataset based on folder partitions +#' @param x a disk.frame +#' @param ... filtering conditions for filtering the disk.frame at (folder) partition level +#' @importFrom dplyr filter +#' @export +partition_filter <- function(x, ...) { + expr = bquote(dplyr::filter(dataframe, .(substitute(...)))) + globals = find_globals_recursively(expr, parent.frame()) + + attr(x, "partition_filter") = list(expr=expr, globals=globals) + + return(x) +} diff --git a/R/recommend_nchunks.r b/R/recommend_nchunks.r index 2b987600..bb59aec4 100644 --- a/R/recommend_nchunks.r +++ b/R/recommend_nchunks.r @@ -1,3 +1,4 @@ + #' Recommend number of chunks based on input size #' @description Computes the recommended number of chunks to break a data.frame #' into. It can accept filesizes in bytes (as integer) or a data.frame @@ -22,7 +23,6 @@ #' # recommend nchunks based on file size ONLY CSV is implemented at the moment #' recommend_nchunks(1024^3) recommend_nchunks <- function(df, type = "csv", minchunks = data.table::getDTthreads(), conservatism = 8, ram_size = df_ram_size()) { - dfsize = 0 if ("data.frame" %in% class(df)) { # the df's size in gigabytes @@ -55,7 +55,6 @@ recommend_nchunks <- function(df, type = "csv", minchunks = data.table::getDTthr #' # returns the RAM size in gigabyte (GB) #' df_ram_size() df_ram_size <- function() { - #browser() tryCatch({ ram_size = NULL # the amount of memory available in gigabytes @@ -119,10 +118,10 @@ df_ram_size <- function() { ram_size = benchmarkme::get_ram()/1024^3 if(is.na(ram_size)) { - warning("RAM size can't be determined. Assume you have 16GB of RAM.") - warning("Please report this error at github.com/xiaodaigh/disk.frame/issues") - warning(glue::glue("Please include your operating system, R version, and if using RStudio the Rstudio version number")) - return(16) + warning("RAM size can't be determined. Assume you have 16GB of RAM.") + warning("Please report this error at github.com/xiaodaigh/disk.frame/issues") + warning(glue::glue("Please include your operating system, R version, and if using RStudio the Rstudio version number")) + return(16) } else { ram_size = max(ram_size, 1, na.rm = TRUE) return(ram_size) diff --git a/R/util.r b/R/util.r index 8b760f0f..03987c4b 100644 --- a/R/util.r +++ b/R/util.r @@ -45,3 +45,36 @@ purrr_as_mapper <- function(.f) { } return(.f) } + +#' Find globals in an expression by searching through the chain +#' @param code An expression to search for globals +#' @param envir The environment from which to begin the search +find_globals_recursively <- function(code, envir) { + globals_and_pkgs = future::getGlobalsAndPackages(code, envir) + + global_vars = globals_and_pkgs$globals + + done = identical(envir, emptyenv()) || identical(envir, globalenv()) + + if(done) return(globals_and_pkgs) + + env = parent.env(envir) + + done = identical(env, emptyenv()) || identical(env, globalenv()) + + # keep adding global variables by moving up the environment chain + while(!done) { + tmp_globals_and_pkgs = future::getGlobalsAndPackages(code, envir = env) + new_global_vars = tmp_globals_and_pkgs$globals + for (name in setdiff(names(new_global_vars), names(global_vars))) { + global_vars[[name]] <- new_global_vars[[name]] + } + + done = identical(env, emptyenv()) || identical(env, globalenv()) + env = parent.env(env) + } + + globals_and_pkgs$globals = global_vars + + return(globals_and_pkgs) +} \ No newline at end of file diff --git a/R/write_disk.frame.r b/R/write_disk.frame.r index cdfd0c36..7d38e0c4 100644 --- a/R/write_disk.frame.r +++ b/R/write_disk.frame.r @@ -7,13 +7,12 @@ #' @param nchunks number of chunks #' @param overwrite overwrite output directory #' @param shardby the columns to shard by +#' @param partitionby the columns to (folder) partition by #' @param compress compression ratio for fst files -#' @param shardby_function splitting of chunks: "hash" for hash function or "sort" for semi-sorted chunks -#' @param sort_splits for the "sort" shardby function, a dataframe with the split values. -#' @param desc_vars for the "sort" shardby function, the variables to sort descending. #' @param ... passed to cmap.disk.frame #' @export #' @import fst fs +#' @importFrom dplyr group_map #' @importFrom glue glue #' @examples #' cars.df = as.disk.frame(cars) @@ -33,18 +32,67 @@ write_disk.frame <- function( nchunks.disk.frame(diskf), recommend_nchunks(diskf)), overwrite = FALSE, - shardby=NULL, compress = 50, shardby_function="hash", sort_splits=NULL, desc_vars=NULL, ...) { + shardby=NULL, + partitionby=NULL, + compress = 50, ...) { force(nchunks) overwrite_check(outdir, overwrite) - if(is.null(outdir)) { stop("write_disk.frame error: outdir must not be NULL") } if(is_disk.frame(diskf)) { - if(is.null(shardby)) { + if(!is.null(partitionby)) { + + # for each chunk group by the partionby and then write out a partitioned disk.frame for each chunk + list_of_paths = diskf %>% + cimap(~{ + tmp_dir_to_write = tempfile(as.character(.y)) + tmp = .x %>% + group_by(!!!syms(partitionby)) %>% + dplyr::group_map(~{ + # convert group keys to path + tmp_path = lapply(names(.y), function(n) { + sprintf("%s=%s", n, .y[, n]) + }) %>% + do.call(file.path, .) + + final_tmp_path = file.path(tmp_dir_to_write, tmp_path) + as.disk.frame(.x, final_tmp_path, overwrite = FALSE) + }) + return(tmp_dir_to_write) + }, lazy=FALSE) + + # for each of the chunks, do a soft row-append + partitioned_files = lapply(list_of_paths, function(path) { + # each path is a partitioned disk.frame + files = list.files(path, full.names = TRUE, recursive=TRUE) + tmp = data.frame(partition_path = files %>% + dirname %>% + sapply(tools::file_path_as_absolute) %>% + stringr::str_sub(nchar(path)+2)) + tmp = tmp %>% mutate(path=path, files=files) + + tmp + }) %>% rbindlist + + partitioned_files %>% + group_by(partition_path) %>% + dplyr::group_map(function(df, grp) { + mapply(function(file, i) { + outfile = file.path(outdir, grp$partition_path, paste0(i, ".fst")) + if(!dir.exists(file.path(outdir, grp$partition_path))) { + fs::dir_create(file.path(outdir, grp$partition_path)) + } + fs::file_move(file, outfile) + }, df$files, seq_along(df$files)) + }) + + return(disk.frame(outdir)) + + } else if(is.null(shardby)) { path = attr(diskf, "path") files_shortname <- list.files(path) cids = get_chunk_ids(diskf, full.names = T, strip_extension = F) @@ -69,8 +117,6 @@ write_disk.frame <- function( overwrite = TRUE, shardby = shardby, compress = compress, - shardby_function=shardby_function, - sort_splits=sort_splits, desc_vars=desc_vars, ... ) } diff --git a/R/zzz.r b/R/zzz.r index 7c82cd92..02b5febf 100644 --- a/R/zzz.r +++ b/R/zzz.r @@ -5,15 +5,35 @@ #' @importFrom future nbrOfWorkers #' @importFrom crayon red blue green .onAttach <- function(libname, pkgname) { - #setup_disk.frame() - packageStartupMessage( - crayon::red( + crayon::blue( +"\n\n Thank you for using {disk.frame}. However {disk.frame} has been soft-deprecated. You are recommended to switch over to using the {arrow} package for handling larger-than-RAM data. You can convert your existing disk.frames to the parquet format which {arrow} can use by using:\n\n +``` +disk.frame::disk.frame_to_parquet(path.to.your.disk.frame, parquet_path) +```` + +Once done you can use {arrow}'s dataset features to manipulate the larger-than-RAM data using dplyr verbs. E.g. + +``` +dataset = arrow::open_dataset(parquet_path) + +parquet_path |> + mutate(...) |> + group_by(...) |> + summarize(...) |> + collect(...) +``` +" + ), + crayon::red( glue::glue( - "\n\n## Message from disk.frame: + "\n\n## Message from disk.frame: We have {future::nbrOfWorkers()} workers to use with disk.frame. -To change that, use setup_disk.frame(workers = n) or just setup_disk.frame() to use the defaults.")), - crayon::green("\n\n +To change that, use setup_disk.frame(workers = n) or just setup_disk.frame() to use the defaults." + ) + ), + crayon::green( + "\n\n It is recommended that you run the following immediately to set up disk.frame with multiple workers in order to parallelize your operations:\n\n ```r # this will set up disk.frame with multiple workers @@ -21,7 +41,8 @@ setup_disk.frame() # this will allow unlimited amount of data to be passed from worker to worker options(future.globals.maxSize = Inf) ``` -\n\n")) +\n\n" + )) } globalVariables(c( @@ -58,7 +79,13 @@ globalVariables(c( "pathB", "w", "xid", - "yid")) + "yid", + "paths", + ".disk.frame.sub.path", + "fullpath", + ".check", + "partition_path" + )) #' @useDynLib disk.frame #' @importFrom Rcpp evalCpp diff --git a/cran-comments.md b/cran-comments.md index 05b563d8..245c6d74 100644 --- a/cran-comments.md +++ b/cran-comments.md @@ -1,14 +1,12 @@ -## Resubmission 1 for v0.6.0 -* removed the vignette builder which should remove the 1 NOTE - -## Submission for v0.6.0 -* Updated the API for many functions leading to semver update of functions +## Submission for v0.7 +* Implemented partition by folder +* Updated R version to v4.0 ## Test environments * local Windows 11 Pro install, R 4.1.2 -* local Windows 11 Pro install, R devel (as of 2022-01-31) +* local Windows 11 Pro install, R devel (as of 2022-02-01) * local Linux/Ubuntu install, R 4.1.2 -* local Linux/Ubuntu install, R devel (as of 2022-01-31) +* local Linux/Ubuntu install, R devel (as of 2022-02-01) ## R CMD check results There were no ERRORs nor WARNINGs nor NOTE when run locally. diff --git a/docs/404.html b/docs/404.html index 9ccac74e..49d167f0 100644 --- a/docs/404.html +++ b/docs/404.html @@ -32,7 +32,7 @@ diff --git a/docs/LICENSE-text.html b/docs/LICENSE-text.html index 5757cc4b..d3b2e167 100644 --- a/docs/LICENSE-text.html +++ b/docs/LICENSE-text.html @@ -17,7 +17,7 @@ diff --git a/docs/articles/01-intro.html b/docs/articles/01-intro.html index af353427..35e9a2b4 100644 --- a/docs/articles/01-intro.html +++ b/docs/articles/01-intro.html @@ -33,7 +33,7 @@ diff --git a/docs/articles/02-intro-disk-frame.html b/docs/articles/02-intro-disk-frame.html index 5526ed10..adfe01a1 100644 --- a/docs/articles/02-intro-disk-frame.html +++ b/docs/articles/02-intro-disk-frame.html @@ -33,7 +33,7 @@ @@ -555,26 +555,26 @@
flights.df %>% sample_frac(0.01) %>% collect %>% head
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
-#> 1: 2013 1 10 1614 1605 9 1926 1944
-#> 2: 2013 5 13 1136 1136 0 1225 1234
-#> 3: 2013 5 16 1428 1420 8 1535 1538
-#> 4: 2013 12 29 1536 1520 16 1817 1750
-#> 5: 2013 12 21 849 820 29 1315 1345
-#> 6: 2013 12 25 1624 1559 25 1846 1825
+#> 1: 2013 5 24 1452 1455 -3 1840 1820
+#> 2: 2013 1 10 939 905 34 1259 1235
+#> 3: 2013 2 4 1911 1915 -4 2117 2111
+#> 4: 2013 5 12 1634 1545 49 1940 1916
+#> 5: 2013 1 16 2315 2250 25 40 7
+#> 6: 2013 12 28 913 900 13 1215 1227
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
-#> 1: -18 DL 1508 N952DL JFK RSW 163 1074 16 5
-#> 2: -9 EV 3830 N13955 EWR PVD 35 160 11 36
-#> 3: -3 EV 4284 N11536 EWR ROC 45 246 14 20
-#> 4: 27 MQ 3553 N520MQ LGA XNA 181 1147 15 20
-#> 5: -30 DL 454 N682DA JFK STT 188 1623 8 20
-#> 6: 21 EV 5567 N870AS LGA CAE 99 617 15 59
+#> 1: 20 AA 1769 N372AA JFK MIA 165 1089 14 55
+#> 2: 24 VX 407 N622VA JFK LAX 349 2475 9 5
+#> 3: 6 9E 3525 N903XJ JFK ORD 127 740 19 15
+#> 4: 24 DL 1773 N3739P JFK SLC 262 1990 15 45
+#> 5: 33 B6 30 N187JB JFK ROC 59 264 22 50
+#> 6: -12 DL 422 N713TW JFK LAX 327 2475 9 0
#> time_hour
-#> 1: 2013-01-10 21:00:00
-#> 2: 2013-05-13 15:00:00
-#> 3: 2013-05-16 18:00:00
-#> 4: 2013-12-29 20:00:00
-#> 5: 2013-12-21 13:00:00
-#> 6: 2013-12-25 20:00:00
+#> 1: 2013-05-24 18:00:00
+#> 2: 2013-01-10 14:00:00
+#> 3: 2013-02-05 00:00:00
+#> 4: 2013-05-12 19:00:00
+#> 5: 2013-01-17 03:00:00
+#> 6: 2013-12-28 14:00:00
We note that there is some time needed for disk.frame to start up all the workers. Next we try to convert the largest CSV file to disk.frame format. The file to be converted is about 2.2GB in size
time_to_convert_disk.frame = system.time(df1 <- csv_to_disk.frame("c:/data/Performance_2004Q3.txt", header = FALSE))[3]
time_to_convert_disk.frame
#> elapsed
-#> 28.3Now that we have converted it, we want to a count by the first column. To achieve this we use a “two-stage” aggregation strategy. Note that use keep="V1" to bring only the column V1 into RAM. This avoids the reading of other unnecessary columns and should speed-up the analysis significantly
time_to_agg_disk.frame = system.time(summ <- df1[,.N, V1, keep = "V1"][, .(N = sum(N)), V1])
@@ -204,7 +204,7 @@ disk.frametime_to_agg_disk.frame
#> user system elapsed
-#> 0.13 0.03 8.48
We can inspect the result as well.
summ
@@ -228,7 +228,7 @@ disk.framesummarise(N = n()) %>%
collect)
#> user system elapsed
-#> 1.53 0.17 10.25
However, the dplyr syntax tends to be slightly slower than using data.table syntax. This may be improved as much of the overhead is due to inefficient use of NSE.
ZJ D (2022). disk.frame: Larger-than-RAM Disk-Based Data Manipulation Framework. -R package version 0.6.0, https://diskframe.com. +R package version 0.7, https://diskframe.com.
@Manual{,
title = {disk.frame: Larger-than-RAM Disk-Based Data Manipulation Framework},
author = {Dai ZJ},
year = {2022},
- note = {R package version 0.6.0},
+ note = {R package version 0.7},
url = {https://diskframe.com},
}
diff --git a/docs/index.html b/docs/index.html
index 7ea2fc4e..f86e68c5 100644
--- a/docs/index.html
+++ b/docs/index.html
@@ -37,7 +37,7 @@
@@ -228,6 +228,7 @@
# where is the disk.frame stored
attr(flights.df, "path")
-#> [1] "C:\\Users\\RTX2080\\AppData\\Local\\Temp\\RtmpQH7obF\\file42d452c32907.df"
+#> [1] "C:\\Users\\RTX2080\\AppData\\Local\\Temp\\RtmpQJt0m9\\file393027da33b.df"
A number of data.frame functions are implemented for disk.frame
# get first few rows
head(flights.df, 1)
-#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
-#> 1: 2013 1 1 517 515 2 830 819
-#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
-#> 1: 11 UA 1545 N14228 EWR IAH 227 1400 5 15
+#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time arr_delay
+#> 1: 2013 1 1 517 515 2 830 819 11
+#> carrier flight tailnum origin dest air_time distance hour minute
+#> 1: UA 1545 N14228 EWR IAH 227 1400 5 15
#> time_hour
#> 1: 2013-01-01 05:00:00
# get last few rows
tail(flights.df, 1)
-#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
-#> 1: 2013 9 30 NA 840 NA NA 1020
-#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
-#> 1: NA MQ 3531 N839MQ LGA RDU NA 431 8 40
+#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time arr_delay
+#> 1: 2013 9 30 NA 840 NA NA 1020 NA
+#> carrier flight tailnum origin dest air_time distance hour minute
+#> 1: MQ 3531 N839MQ LGA RDU NA 431 8 40
#> time_hour
#> 1: 2013-09-30 08:00:00
diff --git a/docs/news/index.html b/docs/news/index.html
index 48d1c26d..551c9495 100644
--- a/docs/news/index.html
+++ b/docs/news/index.html
@@ -17,7 +17,7 @@
NEWS.md
+
+ path = file.path(tempdir(),"cars")
as.disk.frame(cars, outdir=path, overwrite = TRUE, nchunks = 2)
-#> path: "C:\Users\RTX2080\AppData\Local\Temp\RtmpyknGIm/cars"
+#> path: "C:\Users\RTX2080\AppData\Local\Temp\RtmpUX5lOH/cars"
#> nchunks: 2
#> nrow (at source): 50
#> ncol (at source): 2
diff --git a/docs/reference/dplyr_verbs.html b/docs/reference/dplyr_verbs.html
index 143e9c7e..ff377e41 100644
--- a/docs/reference/dplyr_verbs.html
+++ b/docs/reference/dplyr_verbs.html
@@ -18,7 +18,7 @@
R/util.r
+ find_globals_recursively.RdFind globals in an expression by searching through the chain
+find_globals_recursively(code, envir)An expression to search for globals
The environment from which to begin the search
get_chunk(...)
# S3 method for disk.frame
-get_chunk(df, n, keep = NULL, full.names = FALSE, ...)the columns to keep
whether n is the full path to the chunks or just a relative path file name. Ignored if n is numeric
for internal use only. It's a data.frame used to help with filtering by partitions
Get the partitioning structure of a folder
+get_partition_paths(df)a disk.frame whose paths will be used to determine if it's +folder-partitioned disk.frame
evalparseglue()
Helper function to evalparse some `glue::glue` string
Find globals in an expression by searching through the chain
get_chunk_ids()
Get the chunk IDs and files names
Get the partitioning structure of a folder
overwrite_check()
Check if the outdir exists or not
Filter the dataset based on folder partitions
show_ceremony() ceremony_text() show_boilerplate() insert_ceremony()
Show the code to setup disk.frame
Turn a string of the form /partion1=val/partion2=val2 into data.frame
R/partition-filter.r
+ partition_filter.RdFilter the dataset based on folder partitions
+partition_filter(x, ...)a disk.frame
filtering conditions for filtering the disk.frame at (folder) partition level
R/get_partition.r
+ split_string_into_df.RdTurn a string of the form /partion1=val/partion2=val2 into data.frame
+split_string_into_df(path_strs)The paths in string form to break into partition format
overwrite output directory
the columns to shard by
the columns to (folder) partition by
compression ratio for fst files
splitting of chunks: "hash" for hash function or "sort" for semi-sorted chunks
for the "sort" shardby function, a dataframe with the split values.
for the "sort" shardby function, the variables to sort descending.
passed to cmap.disk.frame