Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions r/sedonadb/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export(sd_read_parquet)
export(sd_sql)
export(sd_to_view)
export(sd_view)
export(sd_write_parquet)
export(sedonadb_adbc)
importFrom(nanoarrow,as_nanoarrow_array_stream)
importFrom(nanoarrow,infer_nanoarrow_schema)
Expand Down
8 changes: 8 additions & 0 deletions r/sedonadb/R/000-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ class(`InternalContext`) <- c("InternalContext__bundle", "savvy_sedonadb__sealed
}
}

`InternalDataFrame_to_parquet` <- function(self) {
function(`ctx`, `path`, `partition_by`, `sort_by`, `single_file_output`, `overwrite_bbox_columns`, `geoparquet_version` = NULL) {
`ctx` <- .savvy_extract_ptr(`ctx`, "InternalContext")
invisible(.Call(savvy_InternalDataFrame_to_parquet__impl, `self`, `ctx`, `path`, `partition_by`, `sort_by`, `single_file_output`, `overwrite_bbox_columns`, `geoparquet_version`))
}
}

`InternalDataFrame_to_view` <- function(self) {
function(`ctx`, `table_ref`, `overwrite`) {
`ctx` <- .savvy_extract_ptr(`ctx`, "InternalContext")
Expand All @@ -202,6 +209,7 @@ class(`InternalContext`) <- c("InternalContext__bundle", "savvy_sedonadb__sealed
e$`show` <- `InternalDataFrame_show`(ptr)
e$`to_arrow_schema` <- `InternalDataFrame_to_arrow_schema`(ptr)
e$`to_arrow_stream` <- `InternalDataFrame_to_arrow_stream`(ptr)
e$`to_parquet` <- `InternalDataFrame_to_parquet`(ptr)
e$`to_view` <- `InternalDataFrame_to_view`(ptr)

class(e) <- c("InternalDataFrame", "savvy_sedonadb__sealed")
Expand Down
77 changes: 77 additions & 0 deletions r/sedonadb/R/dataframe.R
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,83 @@ sd_preview <- function(.data, n = NULL, ascii = NULL, width = NULL) {
invisible(.data)
}

#' Write DataFrame to (Geo)Parquet files
#'
#' Write this DataFrame to one or more (Geo)Parquet files. For input that contains
#' geometry columns, GeoParquet metadata is written such that suitable readers can
#' recreate Geometry/Geography types when reading the output and potentially read
#' fewer row groups when only a subset of the file is needed for a given query.
#'
#' @inheritParams sd_count
#' @param path A filename or directory to which parquet file(s) should be written
#' @param partition_by A character vector of column names to partition by. If non-empty,
#' applies hive-style partitioning to the output
#' @param sort_by A character vector of column names to sort by. Currently only
#' ascending sort is supported
#' @param single_file_output Use TRUE or FALSE to force writing a single Parquet
#' file vs. writing one file per partition to a directory. By default,
#' a single file is written if `partition_by` is unspecified and
#' `path` ends with `.parquet`
#' @param geoparquet_version GeoParquet metadata version to write if output contains
#' one or more geometry columns. The default ("1.0") is the most widely
#' supported and will result in geometry columns being recognized in many
#' readers; however, only includes statistics at the file level.
#' Use "1.1" to compute an additional bounding box column
#' for every geometry column in the output: some readers can use these columns
#' to prune row groups when files contain an effective spatial ordering.
#' The extra columns will appear just before their geometry column and
#' will be named "[geom_col_name]_bbox" for all geometry columns except
#' "geometry", whose bounding box column name is just "bbox"
#' @param overwrite_bbox_columns Use TRUE to overwrite any bounding box columns
#' that already exist in the input. This is useful in a read -> modify
#' -> write scenario to ensure these columns are up-to-date. If FALSE
#' (the default), an error will be raised if a bbox column already exists
#'
#' @returns The input, invisibly
#' @export
#'
#' @examples
#' tmp_parquet <- tempfile(fileext = ".parquet")
#'
#' sd_sql("SELECT ST_SetSRID(ST_Point(1, 2), 4326) as geom") |>
#' sd_write_parquet(tmp_parquet)
#'
#' sd_read_parquet(tmp_parquet)
#' unlink(tmp_parquet)
#'
sd_write_parquet <- function(.data,
path,
partition_by = character(0),
sort_by = character(0),
single_file_output = NULL,
geoparquet_version = "1.0",
overwrite_bbox_columns = FALSE) {

# Determine single_file_output default based on path and partition_by
if (is.null(single_file_output)) {
single_file_output <- length(partition_by) == 0 && grepl("\\.parquet$", path)
}

# Validate geoparquet_version
if (!is.null(geoparquet_version)) {
if (!geoparquet_version %in% c("1.0", "1.1")) {
stop("geoparquet_version must be '1.0' or '1.1'")
}
}

# Call the underlying Rust method
.data$df$to_parquet(
ctx = .data$ctx,
path = path,
partition_by = partition_by,
sort_by = sort_by,
single_file_output = single_file_output,
overwrite_bbox_columns = overwrite_bbox_columns,
geoparquet_version = geoparquet_version
)

invisible(.data)
}

new_sedonadb_dataframe <- function(ctx, internal_df) {
structure(list(ctx = ctx, df = internal_df), class = "sedonadb_dataframe")
Expand Down
67 changes: 67 additions & 0 deletions r/sedonadb/man/sd_write_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion r/sedonadb/src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
// under the License.

#include <Rinternals.h>
#include <stdint.h>

#include <R_ext/Parse.h>
#include <stdint.h>

#include "rust/api.h"

Expand Down Expand Up @@ -155,6 +155,17 @@ SEXP savvy_InternalDataFrame_to_arrow_stream__impl(SEXP self__,
return handle_result(res);
}

SEXP savvy_InternalDataFrame_to_parquet__impl(
SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__partition_by,
SEXP c_arg__sort_by, SEXP c_arg__single_file_output,
SEXP c_arg__overwrite_bbox_columns, SEXP c_arg__geoparquet_version) {
SEXP res = savvy_InternalDataFrame_to_parquet__ffi(
self__, c_arg__ctx, c_arg__path, c_arg__partition_by, c_arg__sort_by,
c_arg__single_file_output, c_arg__overwrite_bbox_columns,
c_arg__geoparquet_version);
return handle_result(res);
}

SEXP savvy_InternalDataFrame_to_view__impl(SEXP self__, SEXP c_arg__ctx,
SEXP c_arg__table_ref,
SEXP c_arg__overwrite) {
Expand Down Expand Up @@ -198,6 +209,8 @@ static const R_CallMethodDef CallEntries[] = {
(DL_FUNC)&savvy_InternalDataFrame_to_arrow_schema__impl, 2},
{"savvy_InternalDataFrame_to_arrow_stream__impl",
(DL_FUNC)&savvy_InternalDataFrame_to_arrow_stream__impl, 2},
{"savvy_InternalDataFrame_to_parquet__impl",
(DL_FUNC)&savvy_InternalDataFrame_to_parquet__impl, 8},
{"savvy_InternalDataFrame_to_view__impl",
(DL_FUNC)&savvy_InternalDataFrame_to_view__impl, 4},
{NULL, NULL, 0}};
Expand Down
1 change: 1 addition & 0 deletions r/sedonadb/src/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ arrow-schema = { workspace = true }
arrow-array = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
savvy = "*"
savvy-ffi = "*"
sedona = { path = "../../../../rust/sedona" }
Expand Down
4 changes: 4 additions & 0 deletions r/sedonadb/src/rust/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ SEXP savvy_InternalDataFrame_show__ffi(SEXP self__, SEXP c_arg__ctx,
SEXP c_arg__ascii, SEXP c_arg__limit);
SEXP savvy_InternalDataFrame_to_arrow_schema__ffi(SEXP self__, SEXP c_arg__out);
SEXP savvy_InternalDataFrame_to_arrow_stream__ffi(SEXP self__, SEXP c_arg__out);
SEXP savvy_InternalDataFrame_to_parquet__ffi(
SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__partition_by,
SEXP c_arg__sort_by, SEXP c_arg__single_file_output,
SEXP c_arg__overwrite_bbox_columns, SEXP c_arg__geoparquet_version);
SEXP savvy_InternalDataFrame_to_view__ffi(SEXP self__, SEXP c_arg__ctx,
SEXP c_arg__table_ref,
SEXP c_arg__overwrite);
66 changes: 64 additions & 2 deletions r/sedonadb/src/rust/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ use arrow_array::ffi::FFI_ArrowSchema;
use arrow_array::ffi_stream::FFI_ArrowArrayStream;
use arrow_array::{RecordBatchIterator, RecordBatchReader};
use datafusion::catalog::MemTable;
use datafusion::prelude::DataFrame;
use datafusion::{logical_expr::SortExpr, prelude::DataFrame};
use datafusion_common::Column;
use datafusion_expr::Expr;
use savvy::{savvy, savvy_err, Result};
use sedona::context::SedonaDataFrame;
use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
use sedona::reader::SedonaStreamReader;
use sedona::show::{DisplayMode, DisplayTableOptions};
use sedona_geoparquet::options::{GeoParquetVersion, TableGeoParquetOptions};
use sedona_schema::schema::SedonaSchema;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -191,4 +194,63 @@ impl InternalDataFrame {

savvy::Sexp::try_from(out_string)
}

#[allow(clippy::too_many_arguments)]
fn to_parquet(
&self,
ctx: &InternalContext,
path: &str,
partition_by: savvy::Sexp,
sort_by: savvy::Sexp,
single_file_output: bool,
overwrite_bbox_columns: bool,
geoparquet_version: Option<&str>,
) -> savvy::Result<()> {
let partition_by_strsxp = savvy::StringSexp::try_from(partition_by)?;
let partition_by_vec = partition_by_strsxp
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();

let sort_by_strsxp = savvy::StringSexp::try_from(sort_by)?;
let sort_by_vec = sort_by_strsxp
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>();

let sort_by_expr = sort_by_vec
.iter()
.map(|name| {
let column = Expr::Column(Column::new_unqualified(name));
SortExpr::new(column, true, false)
})
.collect::<Vec<_>>();

let options = SedonaWriteOptions::new()
.with_partition_by(partition_by_vec)
.with_sort_by(sort_by_expr)
.with_single_file_output(single_file_output);

let mut writer_options = TableGeoParquetOptions::new();
writer_options.overwrite_bbox_columns = overwrite_bbox_columns;
if let Some(geoparquet_version) = geoparquet_version {
writer_options.geoparquet_version = geoparquet_version
.parse()
.map_err(|e| savvy::Error::new(format!("Invalid geoparquet_version: {e}")))?;
} else {
writer_options.geoparquet_version = GeoParquetVersion::Omitted;
}

let inner = self.inner.clone();
let inner_context = ctx.inner.clone();
let path_owned = path.to_string();

wait_for_future_captured_r(&self.runtime, async move {
inner
.write_geoparquet(&inner_context, &path_owned, options, Some(writer_options))
.await
})??;

Ok(())
}
}
Loading
Loading