diff --git a/Cargo.lock b/Cargo.lock index ce037d7b..78618f24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5244,6 +5244,7 @@ dependencies = [ "arrow-schema", "datafusion", "datafusion-common", + "datafusion-expr", "savvy", "savvy-ffi", "sedona", diff --git a/r/sedonadb/NAMESPACE b/r/sedonadb/NAMESPACE index 262b6dd4..5e781497 100644 --- a/r/sedonadb/NAMESPACE +++ b/r/sedonadb/NAMESPACE @@ -27,6 +27,7 @@ export(sd_read_parquet) export(sd_sql) export(sd_to_view) export(sd_view) +export(sd_write_parquet) export(sedonadb_adbc) importFrom(nanoarrow,as_nanoarrow_array_stream) importFrom(nanoarrow,infer_nanoarrow_schema) diff --git a/r/sedonadb/R/000-wrappers.R b/r/sedonadb/R/000-wrappers.R index 7f717992..15ca6597 100644 --- a/r/sedonadb/R/000-wrappers.R +++ b/r/sedonadb/R/000-wrappers.R @@ -184,6 +184,13 @@ class(`InternalContext`) <- c("InternalContext__bundle", "savvy_sedonadb__sealed } } +`InternalDataFrame_to_parquet` <- function(self) { + function(`ctx`, `path`, `partition_by`, `sort_by`, `single_file_output`, `overwrite_bbox_columns`, `geoparquet_version` = NULL) { + `ctx` <- .savvy_extract_ptr(`ctx`, "InternalContext") + invisible(.Call(savvy_InternalDataFrame_to_parquet__impl, `self`, `ctx`, `path`, `partition_by`, `sort_by`, `single_file_output`, `overwrite_bbox_columns`, `geoparquet_version`)) + } +} + `InternalDataFrame_to_view` <- function(self) { function(`ctx`, `table_ref`, `overwrite`) { `ctx` <- .savvy_extract_ptr(`ctx`, "InternalContext") @@ -202,6 +209,7 @@ class(`InternalContext`) <- c("InternalContext__bundle", "savvy_sedonadb__sealed e$`show` <- `InternalDataFrame_show`(ptr) e$`to_arrow_schema` <- `InternalDataFrame_to_arrow_schema`(ptr) e$`to_arrow_stream` <- `InternalDataFrame_to_arrow_stream`(ptr) + e$`to_parquet` <- `InternalDataFrame_to_parquet`(ptr) e$`to_view` <- `InternalDataFrame_to_view`(ptr) class(e) <- c("InternalDataFrame", "savvy_sedonadb__sealed") diff --git a/r/sedonadb/R/dataframe.R b/r/sedonadb/R/dataframe.R index 321961c7..0e5b511f 100644 --- a/r/sedonadb/R/dataframe.R +++ b/r/sedonadb/R/dataframe.R @@ -182,6 +182,83 @@ sd_preview <- function(.data, n = NULL, ascii = NULL, width = NULL) { invisible(.data) } +#' Write DataFrame to (Geo)Parquet files +#' +#' Write this DataFrame to one or more (Geo)Parquet files. For input that contains +#' geometry columns, GeoParquet metadata is written such that suitable readers can +#' recreate Geometry/Geography types when reading the output and potentially read +#' fewer row groups when only a subset of the file is needed for a given query. +#' +#' @inheritParams sd_count +#' @param path A filename or directory to which parquet file(s) should be written +#' @param partition_by A character vector of column names to partition by. If non-empty, +#' applies hive-style partitioning to the output +#' @param sort_by A character vector of column names to sort by. Currently only +#' ascending sort is supported +#' @param single_file_output Use TRUE or FALSE to force writing a single Parquet +#' file vs. writing one file per partition to a directory. By default, +#' a single file is written if `partition_by` is unspecified and +#' `path` ends with `.parquet` +#' @param geoparquet_version GeoParquet metadata version to write if output contains +#' one or more geometry columns. The default ("1.0") is the most widely +#' supported and will result in geometry columns being recognized in many +#' readers; however, only includes statistics at the file level. +#' Use "1.1" to compute an additional bounding box column +#' for every geometry column in the output: some readers can use these columns +#' to prune row groups when files contain an effective spatial ordering. +#' The extra columns will appear just before their geometry column and +#' will be named "[geom_col_name]_bbox" for all geometry columns except +#' "geometry", whose bounding box column name is just "bbox" +#' @param overwrite_bbox_columns Use TRUE to overwrite any bounding box columns +#' that already exist in the input. This is useful in a read -> modify +#' -> write scenario to ensure these columns are up-to-date. If FALSE +#' (the default), an error will be raised if a bbox column already exists +#' +#' @returns The input, invisibly +#' @export +#' +#' @examples +#' tmp_parquet <- tempfile(fileext = ".parquet") +#' +#' sd_sql("SELECT ST_SetSRID(ST_Point(1, 2), 4326) as geom") |> +#' sd_write_parquet(tmp_parquet) +#' +#' sd_read_parquet(tmp_parquet) +#' unlink(tmp_parquet) +#' +sd_write_parquet <- function(.data, + path, + partition_by = character(0), + sort_by = character(0), + single_file_output = NULL, + geoparquet_version = "1.0", + overwrite_bbox_columns = FALSE) { + + # Determine single_file_output default based on path and partition_by + if (is.null(single_file_output)) { + single_file_output <- length(partition_by) == 0 && grepl("\\.parquet$", path) + } + + # Validate geoparquet_version + if (!is.null(geoparquet_version)) { + if (!geoparquet_version %in% c("1.0", "1.1")) { + stop("geoparquet_version must be '1.0' or '1.1'") + } + } + + # Call the underlying Rust method + .data$df$to_parquet( + ctx = .data$ctx, + path = path, + partition_by = partition_by, + sort_by = sort_by, + single_file_output = single_file_output, + overwrite_bbox_columns = overwrite_bbox_columns, + geoparquet_version = geoparquet_version + ) + + invisible(.data) +} new_sedonadb_dataframe <- function(ctx, internal_df) { structure(list(ctx = ctx, df = internal_df), class = "sedonadb_dataframe") diff --git a/r/sedonadb/man/sd_write_parquet.Rd b/r/sedonadb/man/sd_write_parquet.Rd new file mode 100644 index 00000000..11f59a84 --- /dev/null +++ b/r/sedonadb/man/sd_write_parquet.Rd @@ -0,0 +1,67 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dataframe.R +\name{sd_write_parquet} +\alias{sd_write_parquet} +\title{Write DataFrame to (Geo)Parquet files} +\usage{ +sd_write_parquet( + .data, + path, + partition_by = character(0), + sort_by = character(0), + single_file_output = NULL, + geoparquet_version = "1.0", + overwrite_bbox_columns = FALSE +) +} +\arguments{ +\item{.data}{A sedonadb_dataframe} + +\item{path}{A filename or directory to which parquet file(s) should be written} + +\item{partition_by}{A character vector of column names to partition by. If non-empty, +applies hive-style partitioning to the output} + +\item{sort_by}{A character vector of column names to sort by. Currently only +ascending sort is supported} + +\item{single_file_output}{Use TRUE or FALSE to force writing a single Parquet +file vs. writing one file per partition to a directory. By default, +a single file is written if \code{partition_by} is unspecified and +\code{path} ends with \code{.parquet}} + +\item{geoparquet_version}{GeoParquet metadata version to write if output contains +one or more geometry columns. The default ("1.0") is the most widely +supported and will result in geometry columns being recognized in many +readers; however, only includes statistics at the file level. +Use "1.1" to compute an additional bounding box column +for every geometry column in the output: some readers can use these columns +to prune row groups when files contain an effective spatial ordering. +The extra columns will appear just before their geometry column and +will be named "\link{geom_col_name}_bbox" for all geometry columns except +"geometry", whose bounding box column name is just "bbox"} + +\item{overwrite_bbox_columns}{Use TRUE to overwrite any bounding box columns +that already exist in the input. This is useful in a read -> modify +-> write scenario to ensure these columns are up-to-date. If FALSE +(the default), an error will be raised if a bbox column already exists} +} +\value{ +The input, invisibly +} +\description{ +Write this DataFrame to one or more (Geo)Parquet files. For input that contains +geometry columns, GeoParquet metadata is written such that suitable readers can +recreate Geometry/Geography types when reading the output and potentially read +fewer row groups when only a subset of the file is needed for a given query. +} +\examples{ +tmp_parquet <- tempfile(fileext = ".parquet") + +sd_sql("SELECT ST_SetSRID(ST_Point(1, 2), 4326) as geom") |> + sd_write_parquet(tmp_parquet) + +sd_read_parquet(tmp_parquet) +unlink(tmp_parquet) + +} diff --git a/r/sedonadb/src/init.c b/r/sedonadb/src/init.c index e4f02bdb..69886654 100644 --- a/r/sedonadb/src/init.c +++ b/r/sedonadb/src/init.c @@ -16,9 +16,9 @@ // under the License. #include +#include #include -#include #include "rust/api.h" @@ -155,6 +155,17 @@ SEXP savvy_InternalDataFrame_to_arrow_stream__impl(SEXP self__, return handle_result(res); } +SEXP savvy_InternalDataFrame_to_parquet__impl( + SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__partition_by, + SEXP c_arg__sort_by, SEXP c_arg__single_file_output, + SEXP c_arg__overwrite_bbox_columns, SEXP c_arg__geoparquet_version) { + SEXP res = savvy_InternalDataFrame_to_parquet__ffi( + self__, c_arg__ctx, c_arg__path, c_arg__partition_by, c_arg__sort_by, + c_arg__single_file_output, c_arg__overwrite_bbox_columns, + c_arg__geoparquet_version); + return handle_result(res); +} + SEXP savvy_InternalDataFrame_to_view__impl(SEXP self__, SEXP c_arg__ctx, SEXP c_arg__table_ref, SEXP c_arg__overwrite) { @@ -198,6 +209,8 @@ static const R_CallMethodDef CallEntries[] = { (DL_FUNC)&savvy_InternalDataFrame_to_arrow_schema__impl, 2}, {"savvy_InternalDataFrame_to_arrow_stream__impl", (DL_FUNC)&savvy_InternalDataFrame_to_arrow_stream__impl, 2}, + {"savvy_InternalDataFrame_to_parquet__impl", + (DL_FUNC)&savvy_InternalDataFrame_to_parquet__impl, 8}, {"savvy_InternalDataFrame_to_view__impl", (DL_FUNC)&savvy_InternalDataFrame_to_view__impl, 4}, {NULL, NULL, 0}}; diff --git a/r/sedonadb/src/rust/Cargo.toml b/r/sedonadb/src/rust/Cargo.toml index 280c3caf..98dc5487 100644 --- a/r/sedonadb/src/rust/Cargo.toml +++ b/r/sedonadb/src/rust/Cargo.toml @@ -28,6 +28,7 @@ arrow-schema = { workspace = true } arrow-array = { workspace = true } datafusion = { workspace = true } datafusion-common = { workspace = true } +datafusion-expr = { workspace = true } savvy = "*" savvy-ffi = "*" sedona = { path = "../../../../rust/sedona" } diff --git a/r/sedonadb/src/rust/api.h b/r/sedonadb/src/rust/api.h index 303d30a5..e3ba8a15 100644 --- a/r/sedonadb/src/rust/api.h +++ b/r/sedonadb/src/rust/api.h @@ -44,6 +44,10 @@ SEXP savvy_InternalDataFrame_show__ffi(SEXP self__, SEXP c_arg__ctx, SEXP c_arg__ascii, SEXP c_arg__limit); SEXP savvy_InternalDataFrame_to_arrow_schema__ffi(SEXP self__, SEXP c_arg__out); SEXP savvy_InternalDataFrame_to_arrow_stream__ffi(SEXP self__, SEXP c_arg__out); +SEXP savvy_InternalDataFrame_to_parquet__ffi( + SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__partition_by, + SEXP c_arg__sort_by, SEXP c_arg__single_file_output, + SEXP c_arg__overwrite_bbox_columns, SEXP c_arg__geoparquet_version); SEXP savvy_InternalDataFrame_to_view__ffi(SEXP self__, SEXP c_arg__ctx, SEXP c_arg__table_ref, SEXP c_arg__overwrite); diff --git a/r/sedonadb/src/rust/src/dataframe.rs b/r/sedonadb/src/rust/src/dataframe.rs index 274aa493..909b2d72 100644 --- a/r/sedonadb/src/rust/src/dataframe.rs +++ b/r/sedonadb/src/rust/src/dataframe.rs @@ -21,11 +21,14 @@ use arrow_array::ffi::FFI_ArrowSchema; use arrow_array::ffi_stream::FFI_ArrowArrayStream; use arrow_array::{RecordBatchIterator, RecordBatchReader}; use datafusion::catalog::MemTable; -use datafusion::prelude::DataFrame; +use datafusion::{logical_expr::SortExpr, prelude::DataFrame}; +use datafusion_common::Column; +use datafusion_expr::Expr; use savvy::{savvy, savvy_err, Result}; -use sedona::context::SedonaDataFrame; +use sedona::context::{SedonaDataFrame, SedonaWriteOptions}; use sedona::reader::SedonaStreamReader; use sedona::show::{DisplayMode, DisplayTableOptions}; +use sedona_geoparquet::options::{GeoParquetVersion, TableGeoParquetOptions}; use sedona_schema::schema::SedonaSchema; use tokio::runtime::Runtime; @@ -191,4 +194,63 @@ impl InternalDataFrame { savvy::Sexp::try_from(out_string) } + + #[allow(clippy::too_many_arguments)] + fn to_parquet( + &self, + ctx: &InternalContext, + path: &str, + partition_by: savvy::Sexp, + sort_by: savvy::Sexp, + single_file_output: bool, + overwrite_bbox_columns: bool, + geoparquet_version: Option<&str>, + ) -> savvy::Result<()> { + let partition_by_strsxp = savvy::StringSexp::try_from(partition_by)?; + let partition_by_vec = partition_by_strsxp + .iter() + .map(|s| s.to_string()) + .collect::>(); + + let sort_by_strsxp = savvy::StringSexp::try_from(sort_by)?; + let sort_by_vec = sort_by_strsxp + .iter() + .map(|s| s.to_string()) + .collect::>(); + + let sort_by_expr = sort_by_vec + .iter() + .map(|name| { + let column = Expr::Column(Column::new_unqualified(name)); + SortExpr::new(column, true, false) + }) + .collect::>(); + + let options = SedonaWriteOptions::new() + .with_partition_by(partition_by_vec) + .with_sort_by(sort_by_expr) + .with_single_file_output(single_file_output); + + let mut writer_options = TableGeoParquetOptions::new(); + writer_options.overwrite_bbox_columns = overwrite_bbox_columns; + if let Some(geoparquet_version) = geoparquet_version { + writer_options.geoparquet_version = geoparquet_version + .parse() + .map_err(|e| savvy::Error::new(format!("Invalid geoparquet_version: {e}")))?; + } else { + writer_options.geoparquet_version = GeoParquetVersion::Omitted; + } + + let inner = self.inner.clone(); + let inner_context = ctx.inner.clone(); + let path_owned = path.to_string(); + + wait_for_future_captured_r(&self.runtime, async move { + inner + .write_geoparquet(&inner_context, &path_owned, options, Some(writer_options)) + .await + })??; + + Ok(()) + } } diff --git a/r/sedonadb/tests/testthat/test-dataframe.R b/r/sedonadb/tests/testthat/test-dataframe.R index f530f654..a74eab34 100644 --- a/r/sedonadb/tests/testthat/test-dataframe.R +++ b/r/sedonadb/tests/testthat/test-dataframe.R @@ -135,3 +135,139 @@ test_that("dataframe print limits max output based on options", { expect_output(print(df), "| a r... |") }) }) + +test_that("sd_write_parquet can write simple data", { + df <- sd_sql( + "SELECT * FROM (VALUES ('one', 1), ('two', 2), ('three', 3)) AS t(a, b)" + ) + + # Test single file output (path ending with .parquet) + tmp_parquet_file <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp_parquet_file)) + + result <- sd_write_parquet(df, tmp_parquet_file) + expect_identical(result, df) # Should return the input invisibly + expect_true(file.exists(tmp_parquet_file)) + + # Read back and verify contents + df_roundtrip <- sd_read_parquet(tmp_parquet_file) + expect_identical( + sd_collect(df_roundtrip), + data.frame( + a = c("one", "two", "three"), + b = c(1, 2, 3) + ) + ) +}) + +test_that("sd_write_parquet can write to directory", { + df <- sd_sql( + "SELECT * FROM (VALUES ('one', 1), ('two', 2), ('three', 3)) AS t(a, b)" + ) + + # Test directory output (path not ending with .parquet) + tmp_parquet_dir <- tempfile() + on.exit(unlink(tmp_parquet_dir, recursive = TRUE)) + + sd_write_parquet(df, tmp_parquet_dir) + expect_true(dir.exists(tmp_parquet_dir)) + + # Read back and verify contents + df_roundtrip <- sd_read_parquet(tmp_parquet_dir) + expect_identical( + sd_collect(df_roundtrip), + data.frame( + a = c("one", "two", "three"), + b = c(1, 2, 3) + ) + ) +}) + +test_that("sd_write_parquet can partition data", { + df <- sd_sql( + "SELECT * FROM (VALUES ('one', 1), ('two', 2), ('three', 3)) AS t(a, b)" + ) + + tmp_parquet_dir <- tempfile() + on.exit(unlink(tmp_parquet_dir, recursive = TRUE)) + + sd_write_parquet(df, tmp_parquet_dir, partition_by = "a") + + # Read back and verify partitioning worked + roundtrip_data <- sd_read_parquet(tmp_parquet_dir) |> + sd_collect() + + # Should have the same data (order might be different due to partitioning) + expect_setequal(roundtrip_data$b, c(1L, 2L, 3L)) +}) + +test_that("sd_write_parquet can sort data", { + df <- sd_sql( + "SELECT * FROM (VALUES ('two', 2), ('one', 1), ('three', 3)) AS t(a, b)" + ) + + tmp_parquet_file <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp_parquet_file)) + + sd_write_parquet(df, tmp_parquet_file, sort_by = "a") + + # Read back and verify sorting + roundtrip_data <- sd_read_parquet(tmp_parquet_file) |> + sd_collect() + + expect_identical( + roundtrip_data, + data.frame( + a = c("one", "three", "two"), + b = c(1, 3, 2) + ) + ) +}) + +test_that("sd_write_parquet can write geometry data", { + df <- sd_sql( + "SELECT ST_SetSRID(ST_Point(1, 2), 4326) as geom, 'test' as name" + ) + + tmp_parquet_file <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp_parquet_file)) + + sd_write_parquet(df, tmp_parquet_file) + + # Read back and verify geometry is preserved + roundtrip_data <- sd_read_parquet(tmp_parquet_file) |> + sd_collect() + + expect_identical( + wk::as_wkt(roundtrip_data$geom), + wk::wkt("POINT (1 2)", crs = "OGC:CRS84") + ) +}) + +test_that("sd_write_parquet validates geoparquet_version parameter", { + df <- sd_sql( + "SELECT ST_SetSRID(ST_Point(1, 2), 4326) as geom, 'test' as name" + ) + tmp_parquet_file <- tempfile(fileext = ".parquet") + on.exit(unlink(tmp_parquet_file)) + + # GeoParquet 1.0 shouldn't add any columns + sd_write_parquet(df, tmp_parquet_file, geoparquet_version = "1.0") + expect_identical( + sd_read_parquet(tmp_parquet_file) |> colnames(), + c("geom", "name") + ) + + # GeoParquet 1.1 should add a geom_bbox column + sd_write_parquet(df, tmp_parquet_file, geoparquet_version = "1.1") + expect_identical( + sd_read_parquet(tmp_parquet_file) |> colnames(), + c("geom_bbox", "geom", "name") + ) + + # Invalid version should error + expect_error( + sd_write_parquet(df, tmp_parquet_file, geoparquet_version = "2.0"), + "geoparquet_version must be" + ) +})