diff --git a/Cargo.lock b/Cargo.lock index 2337df69..f5e3ca57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4810,6 +4810,7 @@ dependencies = [ "sedona-tg", "serde", "serde_json", + "tempfile", "tokio", "url", ] @@ -4987,6 +4988,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "tempfile", "tokio", "url", ] @@ -5167,6 +5169,7 @@ dependencies = [ "async-trait", "datafusion", "datafusion-common", + "datafusion-expr", "datafusion-ffi", "futures", "libmimalloc-sys", diff --git a/Cargo.toml b/Cargo.toml index 0a8e803a..a5cf2866 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ rstest = "0.24.0" serde = { version = "1" } serde_json = { version = "1" } serde_with = { version = "1" } +tempfile = { version = "3"} thiserror = { version = "2" } tokio = { version = "1.44" } url = "2.5.4" diff --git a/python/sedonadb/Cargo.toml b/python/sedonadb/Cargo.toml index 1ddb4c06..98379bde 100644 --- a/python/sedonadb/Cargo.toml +++ b/python/sedonadb/Cargo.toml @@ -36,6 +36,7 @@ arrow-array = { workspace = true } async-trait = { workspace = true } datafusion = { workspace = true } datafusion-common = { workspace = true } +datafusion-expr = { workspace = true } datafusion-ffi = { workspace = true } futures = { workspace = true } pyo3 = { version = "0.25.1" } diff --git a/python/sedonadb/python/sedonadb/dataframe.py b/python/sedonadb/python/sedonadb/dataframe.py index 182a008c..b390e77c 100644 --- a/python/sedonadb/python/sedonadb/dataframe.py +++ b/python/sedonadb/python/sedonadb/dataframe.py @@ -14,7 +14,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from typing import TYPE_CHECKING, Union, Optional, Any + +from pathlib import Path +from typing import TYPE_CHECKING, Union, Optional, Any, Iterable from sedonadb._options import global_options @@ -263,6 +265,66 @@ def to_pandas( else: return table.to_pandas() + def to_parquet( + self, + path: Union[str, Path], + *, + partition_by: Optional[Union[str, Iterable[str]]] = None, + sort_by: Optional[Union[str, Iterable[str]]] = None, + single_file_output: Optional[bool] = None, + ): + """Write this DataFrame to one or more (Geo)Parquet files + + For input that contains geometry columns, GeoParquet metadata is written + such that suitable readers can recreate Geometry/Geography types when + reading the output. + + + Args: + path: A filename or directory to which parquet file(s) should be written. + partition_by: A vector of column names to partition by. If non-empty, + applies hive-style partitioning to the output. + sort_by: A vector of column names to sort by. Currently only ascending + sort is supported. + single_file_output: Use True or False to force writing a single Parquet + file vs. writing one file per partition to a directory. By default, + a single file is written if `partition_by` is unspecified and + `path` ends with `.parquet`. + + Examples: + + >>> import sedonadb + >>> import tempfile + >>> con = sedonadb.connect() + >>> td = tempfile.TemporaryDirectory() + >>> url = "https://github.com/apache/sedona-testing/raw/refs/heads/main/data/parquet/geoparquet-1.1.0.parquet" + >>> con.read_parquet(url).to_parquet(f"{td.name}/tmp.parquet") + + """ + + path = Path(path) + + if single_file_output is None: + single_file_output = partition_by is None and str(path).endswith(".parquet") + + if isinstance(partition_by, str): + partition_by = [partition_by] + elif partition_by is not None: + partition_by = list(partition_by) + else: + partition_by = [] + + if isinstance(sort_by, str): + sort_by = [sort_by] + elif sort_by is not None: + sort_by = list(sort_by) + else: + sort_by = [] + + self._impl.to_parquet( + self._ctx, str(path), partition_by, sort_by, single_file_output + ) + def show( self, limit: Optional[int] = 10, diff --git a/python/sedonadb/src/dataframe.rs b/python/sedonadb/src/dataframe.rs index 51f7a4b8..aae953b3 100644 --- a/python/sedonadb/src/dataframe.rs +++ b/python/sedonadb/src/dataframe.rs @@ -22,12 +22,16 @@ use arrow_array::ffi_stream::FFI_ArrowArrayStream; use arrow_array::RecordBatchReader; use arrow_schema::Schema; use datafusion::catalog::MemTable; +use datafusion::logical_expr::SortExpr; use datafusion::prelude::DataFrame; +use datafusion_common::Column; +use datafusion_expr::Expr; use datafusion_ffi::table_provider::FFI_TableProvider; use pyo3::prelude::*; use pyo3::types::PyCapsule; -use sedona::context::SedonaDataFrame; +use sedona::context::{SedonaDataFrame, SedonaWriteOptions}; use sedona::show::{DisplayMode, DisplayTableOptions}; +use sedona_geoparquet::options::TableGeoParquetOptions; use sedona_schema::schema::SedonaSchema; use tokio::runtime::Runtime; @@ -119,6 +123,41 @@ impl InternalDataFrame { )) } + fn to_parquet<'py>( + &self, + py: Python<'py>, + ctx: &InternalContext, + path: String, + partition_by: Vec, + sort_by: Vec, + single_file_output: bool, + ) -> Result<(), PySedonaError> { + // sort_by needs to be SortExpr. A Vec can unambiguously be interpreted as + // field names (ascending), but other types of expressions aren't supported here yet. + let sort_by_expr = sort_by + .into_iter() + .map(|name| { + let column = Expr::Column(Column::new_unqualified(name)); + SortExpr::new(column, true, false) + }) + .collect::>(); + + let options = SedonaWriteOptions::new() + .with_partition_by(partition_by) + .with_sort_by(sort_by_expr) + .with_single_file_output(single_file_output); + let writer_options = TableGeoParquetOptions::default(); + + wait_for_future( + py, + &self.runtime, + self.inner + .clone() + .write_geoparquet(&ctx.inner, &path, options, Some(writer_options)), + )??; + Ok(()) + } + fn show<'py>( &self, py: Python<'py>, diff --git a/python/sedonadb/tests/io/test_parquet.py b/python/sedonadb/tests/io/test_parquet.py index 42fcaf55..a90e2542 100644 --- a/python/sedonadb/tests/io/test_parquet.py +++ b/python/sedonadb/tests/io/test_parquet.py @@ -19,6 +19,7 @@ import tempfile import shapely import geopandas +import geopandas.testing from pyarrow import parquet from pathlib import Path from sedonadb.testing import geom_or_null, SedonaDB, DuckDB, skip_if_not_exists @@ -238,3 +239,39 @@ def test_read_geoparquet_prune_polygons(sedona_testing, predicate): """ ) eng.assert_result(result, gdf) + + +@pytest.mark.parametrize("name", ["water-junc", "water-point"]) +def test_write_geoparquet_geometry(con, geoarrow_data, name): + # Checks a read and write of some non-trivial files and ensures we match GeoPandas + path = geoarrow_data / "ns-water" / "files" / f"ns-water_{name}_geo.parquet" + skip_if_not_exists(path) + + gdf = geopandas.read_parquet(path).sort_values(by="OBJECTID").reset_index(drop=True) + + with tempfile.TemporaryDirectory() as td: + tmp_parquet = Path(td) / "tmp.parquet" + con.create_data_frame(gdf).to_parquet(tmp_parquet) + + gdf_roundtrip = geopandas.read_parquet(tmp_parquet) + geopandas.testing.assert_geodataframe_equal(gdf_roundtrip, gdf) + + +def test_write_geoparquet_geography(con, geoarrow_data): + # Checks a read and write of geography (rounctrip, since nobody else can read/write) + path = ( + geoarrow_data + / "natural-earth" + / "files" + / "natural-earth_countries-geography_geo.parquet" + ) + skip_if_not_exists(path) + + table = con.read_parquet(path).to_arrow_table() + + with tempfile.TemporaryDirectory() as td: + tmp_parquet = Path(td) / "tmp.parquet" + con.create_data_frame(table).to_parquet(tmp_parquet) + + table_roundtrip = con.read_parquet(tmp_parquet).to_arrow_table() + assert table_roundtrip == table diff --git a/python/sedonadb/tests/test_dataframe.py b/python/sedonadb/tests/test_dataframe.py index 5386d249..bd79b2ad 100644 --- a/python/sedonadb/tests/test_dataframe.py +++ b/python/sedonadb/tests/test_dataframe.py @@ -18,9 +18,11 @@ import geoarrow.types as gat import geopandas.testing import pandas as pd +from pathlib import Path import pyarrow as pa import pytest import sedonadb +import tempfile def test_dataframe_from_dataframe(con): @@ -281,6 +283,55 @@ def test_dataframe_to_pandas(con): ) +def test_dataframe_to_parquet(con): + df = con.sql( + "SELECT * FROM (VALUES ('one', 1), ('two', 2), ('three', 3)) AS t(a, b)" + ) + + with tempfile.TemporaryDirectory() as td: + # Defaults with a path that ends with .parquet (single file) + tmp_parquet_file = Path(td) / "tmp.parquet" + df.to_parquet(tmp_parquet_file) + + assert tmp_parquet_file.exists() + assert tmp_parquet_file.is_file() + pd.testing.assert_frame_equal( + pd.read_parquet(tmp_parquet_file), + pd.DataFrame({"a": ["one", "two", "three"], "b": [1, 2, 3]}), + ) + + # Defaults with a path that doesn't end in .parquet (directory) + tmp_parquet_dir = Path(td) / "tmp" + df.to_parquet(tmp_parquet_dir) + + assert tmp_parquet_dir.exists() + assert tmp_parquet_dir.is_dir() + pd.testing.assert_frame_equal( + pd.read_parquet(tmp_parquet_dir), + pd.DataFrame({"a": ["one", "two", "three"], "b": [1, 2, 3]}), + ) + + # With partition_by + tmp_parquet_dir = Path(td) / "tmp_partitioned" + df.to_parquet(tmp_parquet_dir, partition_by=["a"]) + assert tmp_parquet_dir.exists() + assert tmp_parquet_dir.is_dir() + pd.testing.assert_frame_equal( + pd.read_parquet(tmp_parquet_dir).sort_values("b").reset_index(drop=True), + pd.DataFrame( + {"b": [1, 2, 3], "a": pd.Categorical(["one", "two", "three"])} + ), + ) + + # With order_by + tmp_parquet = Path(td) / "tmp_ordered.parquet" + df.to_parquet(tmp_parquet, sort_by=["a"]) + pd.testing.assert_frame_equal( + pd.read_parquet(tmp_parquet), + pd.DataFrame({"a": ["one", "three", "two"], "b": [1, 3, 2]}), + ) + + def test_show(con, capsys): con.sql("SELECT 1 as one").show() expected = """ diff --git a/rust/sedona-geoparquet/Cargo.toml b/rust/sedona-geoparquet/Cargo.toml index 6c1ffa1e..d11acd01 100644 --- a/rust/sedona-geoparquet/Cargo.toml +++ b/rust/sedona-geoparquet/Cargo.toml @@ -34,6 +34,8 @@ default = [] sedona-testing = { path = "../sedona-testing" } url = { workspace = true } rstest = { workspace = true } +tempfile = { workspace = true } +tokio = { workspace = true } [dependencies] async-trait = { workspace = true } @@ -59,4 +61,3 @@ sedona-schema = { path = "../sedona-schema" } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } -tokio = { workspace = true } diff --git a/rust/sedona-geoparquet/src/format.rs b/rust/sedona-geoparquet/src/format.rs index 02904e5b..94576aba 100644 --- a/rust/sedona-geoparquet/src/format.rs +++ b/rust/sedona-geoparquet/src/format.rs @@ -14,12 +14,13 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + use std::{any::Any, collections::HashMap, sync::Arc}; use arrow_schema::{Schema, SchemaRef}; use async_trait::async_trait; use datafusion::{ - config::{ConfigOptions, TableParquetOptions}, + config::ConfigOptions, datasource::{ file_format::{ file_compression_type::FileCompressionType, @@ -32,7 +33,7 @@ use datafusion::{ }, }; use datafusion_catalog::{memory::DataSourceExec, Session}; -use datafusion_common::{not_impl_err, plan_err, GetExt, Result, Statistics}; +use datafusion_common::{plan_err, GetExt, Result, Statistics}; use datafusion_physical_expr::{LexRequirement, PhysicalExpr}; use datafusion_physical_plan::{ filter_pushdown::FilterPushdownPropagation, metrics::ExecutionPlanMetricsSet, ExecutionPlan, @@ -47,6 +48,8 @@ use sedona_schema::extension_type::ExtensionType; use crate::{ file_opener::{storage_schema_contains_geo, GeoParquetFileOpener}, metadata::{GeoParquetColumnEncoding, GeoParquetMetadata}, + options::{GeoParquetVersion, TableGeoParquetOptions}, + writer::create_geoparquet_writer_physical_plan, }; use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::schema_adapter::SchemaAdapterFactory; @@ -55,9 +58,10 @@ use datafusion::datasource::schema_adapter::SchemaAdapterFactory; /// /// A DataFusion FormatFactory provides a means to allow creating a table /// or referencing one from a SQL context like COPY TO. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct GeoParquetFormatFactory { inner: ParquetFormatFactory, + options: Option, } impl GeoParquetFormatFactory { @@ -66,13 +70,15 @@ impl GeoParquetFormatFactory { pub fn new() -> Self { Self { inner: ParquetFormatFactory::new(), + options: None, } } /// Creates an instance of [GeoParquetFormatFactory] with customized default options - pub fn new_with_options(options: TableParquetOptions) -> Self { + pub fn new_with_options(options: TableGeoParquetOptions) -> Self { Self { - inner: ParquetFormatFactory::new_with_options(options), + inner: ParquetFormatFactory::new_with_options(options.inner.clone()), + options: Some(options), } } } @@ -83,9 +89,24 @@ impl FileFormatFactory for GeoParquetFormatFactory { state: &dyn Session, format_options: &HashMap, ) -> Result> { - let inner_format = self.inner.create(state, format_options)?; + let mut options_mut = self.options.clone().unwrap_or_default(); + let mut format_options_mut = format_options.clone(); + options_mut.geoparquet_version = + if let Some(version_string) = format_options_mut.remove("geoparquet_version") { + match version_string.as_str() { + "1.0" => GeoParquetVersion::V1_0, + "1.1" => GeoParquetVersion::V1_1, + "2.0" => GeoParquetVersion::V2_0, + _ => GeoParquetVersion::default(), + } + } else { + GeoParquetVersion::default() + }; + + let inner_format = self.inner.create(state, &format_options_mut)?; if let Some(parquet_format) = inner_format.as_any().downcast_ref::() { - Ok(Arc::new(GeoParquetFormat::new(parquet_format))) + options_mut.inner = parquet_format.options().clone(); + Ok(Arc::new(GeoParquetFormat::new(options_mut))) } else { sedona_internal_err!( "Unexpected format from ParquetFormatFactory: {:?}", @@ -115,27 +136,19 @@ impl GetExt for GeoParquetFormatFactory { /// FileFormat is to be able to be used in a ListingTable (i.e., multi file table). /// Here we also use it to implement a basic `TableProvider` that give us most if /// not all of the features of the underlying Parquet reader. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct GeoParquetFormat { - inner: ParquetFormat, + options: TableGeoParquetOptions, } impl GeoParquetFormat { /// Create a new instance of the file format - pub fn new(inner: &ParquetFormat) -> Self { - // For GeoParquet we currently inspect metadata at the Arrow level, - // so we need this to be exposed by the underlying reader. Depending on - // what exactly we're doing, we might need the underlying metadata or might - // need it to be omitted. - Self { - inner: ParquetFormat::new().with_options(inner.options().clone()), - } + pub fn new(options: TableGeoParquetOptions) -> Self { + Self { options } } -} -impl Default for GeoParquetFormat { - fn default() -> Self { - Self::new(&ParquetFormat::default()) + fn inner(&self) -> ParquetFormat { + ParquetFormat::new().with_options(self.options.inner.clone()) } } @@ -146,18 +159,18 @@ impl FileFormat for GeoParquetFormat { } fn get_ext(&self) -> String { - self.inner.get_ext() + ParquetFormatFactory::new().get_ext() } fn get_ext_with_compression( &self, file_compression_type: &FileCompressionType, ) -> Result { - self.inner.get_ext_with_compression(file_compression_type) + self.inner().get_ext_with_compression(file_compression_type) } fn compression_type(&self) -> Option { - self.inner.compression_type() + self.inner().compression_type() } async fn infer_schema( @@ -169,7 +182,8 @@ impl FileFormat for GeoParquetFormat { // First, try the underlying format without schema metadata. This should work // for regular Parquet reads and will at least ensure that the underlying schemas // are compatible. - let inner_schema_without_metadata = self.inner.infer_schema(state, store, objects).await?; + let inner_schema_without_metadata = + self.inner().infer_schema(state, store, objects).await?; // Collect metadata separately. We can in theory do our own schema // inference too to save an extra server request, but then we have to @@ -180,7 +194,7 @@ impl FileFormat for GeoParquetFormat { fetch_parquet_metadata( store.as_ref(), object, - self.inner.metadata_size_hint(), + self.inner().metadata_size_hint(), None, ) }) @@ -252,7 +266,7 @@ impl FileFormat for GeoParquetFormat { // We don't do anything special here to insert GeoStatistics because pruning // happens elsewhere. These might be useful for a future optimizer or analyzer // pass that can insert optimizations based on geometry type. - self.inner + self.inner() .infer_stats(state, store, table_schema, object) .await } @@ -266,11 +280,11 @@ impl FileFormat for GeoParquetFormat { // DataSourceExec is backed by a GeoParquetFileSource instead of a ParquetFileSource let mut metadata_size_hint = None; - if let Some(metadata) = self.inner.metadata_size_hint() { + if let Some(metadata) = self.inner().metadata_size_hint() { metadata_size_hint = Some(metadata); } - let mut source = GeoParquetFileSource::new(self.inner.options().clone()); + let mut source = GeoParquetFileSource::new(self.options.clone()); if let Some(metadata_size_hint) = metadata_size_hint { source = source.with_metadata_size_hint(metadata_size_hint) @@ -287,17 +301,17 @@ impl FileFormat for GeoParquetFormat { async fn create_writer_physical_plan( &self, - _input: Arc, + input: Arc, _state: &dyn Session, - _conf: FileSinkConfig, - _order_requirements: Option, + conf: FileSinkConfig, + order_requirements: Option, ) -> Result> { - not_impl_err!("GeoParquet writer not implemented") + create_geoparquet_writer_physical_plan(input, conf, order_requirements, &self.options) } fn file_source(&self) -> Arc { Arc::new( - GeoParquetFileSource::try_from_file_source(self.inner.file_source(), None, None) + GeoParquetFileSource::try_from_file_source(self.inner().file_source(), None, None) .unwrap(), ) } @@ -320,9 +334,9 @@ pub struct GeoParquetFileSource { impl GeoParquetFileSource { /// Create a new file source based on [TableParquetOptions] - pub fn new(options: TableParquetOptions) -> Self { + pub fn new(options: TableGeoParquetOptions) -> Self { Self { - inner: ParquetSource::new(options), + inner: ParquetSource::new(options.inner.clone()), metadata_size_hint: None, predicate: None, } @@ -351,7 +365,6 @@ impl GeoParquetFileSource { ) -> Result { if let Some(parquet_source) = inner.as_any().downcast_ref::() { let mut parquet_source = parquet_source.clone(); - // Extract the predicate from the existing source if it exists so we can keep a copy of it let new_predicate = match (parquet_source.predicate().cloned(), predicate) { (None, None) => None, @@ -520,6 +533,7 @@ mod test { use arrow_array::RecordBatch; use arrow_schema::DataType; + use datafusion::config::TableParquetOptions; use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory}; use datafusion::{ @@ -534,6 +548,7 @@ mod test { use rstest::rstest; use sedona_schema::crs::lnglat; use sedona_schema::datatypes::{Edges, SedonaType, WKB_GEOMETRY}; + use sedona_schema::schema::SedonaSchema; use sedona_testing::create::create_scalar; use sedona_testing::data::{geoarrow_data_dir, test_geoparquet}; @@ -559,14 +574,11 @@ mod test { ["wkt", "geometry"] ); - let sedona_types: Result> = df + let sedona_types = df .schema() - .as_arrow() - .fields() - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect(); - let sedona_types = sedona_types.unwrap(); + .sedona_types() + .collect::>>() + .unwrap(); assert_eq!(sedona_types.len(), 2); assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View)); assert_eq!( @@ -578,13 +590,11 @@ mod test { // the correct schema let batches = df.collect().await.unwrap(); assert_eq!(batches.len(), 1); - let sedona_types: Result> = batches[0] + let sedona_types = batches[0] .schema() - .fields() - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect(); - let sedona_types = sedona_types.unwrap(); + .sedona_types() + .collect::>>() + .unwrap(); assert_eq!(sedona_types.len(), 2); assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View)); assert_eq!( @@ -632,14 +642,11 @@ mod test { .select(vec![col("wkt")]) .unwrap(); - let sedona_types: Result> = df + let sedona_types = df .schema() - .as_arrow() - .fields() - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect(); - let sedona_types = sedona_types.unwrap(); + .sedona_types() + .collect::>>() + .unwrap(); assert_eq!(sedona_types.len(), 1); assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View)); } @@ -653,14 +660,11 @@ mod test { .await .unwrap(); - let sedona_types: Result> = df + let sedona_types = df .schema() - .as_arrow() - .fields() - .iter() - .map(|f| SedonaType::from_storage_field(f)) - .collect(); - let sedona_types = sedona_types.unwrap(); + .sedona_types() + .collect::>>() + .unwrap(); assert_eq!(sedona_types.len(), 2); assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View)); assert_eq!( diff --git a/rust/sedona-geoparquet/src/lib.rs b/rust/sedona-geoparquet/src/lib.rs index c8b16a88..732d8682 100644 --- a/rust/sedona-geoparquet/src/lib.rs +++ b/rust/sedona-geoparquet/src/lib.rs @@ -17,4 +17,6 @@ mod file_opener; pub mod format; mod metadata; +pub mod options; pub mod provider; +mod writer; diff --git a/rust/sedona-geoparquet/src/metadata.rs b/rust/sedona-geoparquet/src/metadata.rs index b09b62bb..406d1958 100644 --- a/rust/sedona-geoparquet/src/metadata.rs +++ b/rust/sedona-geoparquet/src/metadata.rs @@ -63,6 +63,12 @@ pub enum GeoParquetColumnEncoding { MultiPolygon, } +impl Default for GeoParquetColumnEncoding { + fn default() -> Self { + Self::WKB + } +} + impl Display for GeoParquetColumnEncoding { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { use GeoParquetColumnEncoding::*; @@ -276,8 +282,18 @@ pub struct GeoParquetMetadata { pub columns: HashMap, } +impl Default for GeoParquetMetadata { + fn default() -> Self { + Self { + version: "1.0.0".to_string(), + primary_column: Default::default(), + columns: Default::default(), + } + } +} + /// GeoParquet column metadata -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct GeoParquetColumnMetadata { /// Name of the geometry encoding format. As of GeoParquet 1.1, `"WKB"`, `"point"`, /// `"linestring"`, `"polygon"`, `"multipoint"`, `"multilinestring"`, and `"multipolygon"` are diff --git a/rust/sedona-geoparquet/src/options.rs b/rust/sedona-geoparquet/src/options.rs new file mode 100644 index 00000000..93965b1e --- /dev/null +++ b/rust/sedona-geoparquet/src/options.rs @@ -0,0 +1,58 @@ +use datafusion::config::TableParquetOptions; + +/// [TableParquetOptions] wrapper with GeoParquet-specific options +#[derive(Debug, Default, Clone)] +pub struct TableGeoParquetOptions { + /// Inner [TableParquetOptions] + pub inner: TableParquetOptions, + /// [GeoParquetVersion] to use when writing GeoParquet files + pub geoparquet_version: GeoParquetVersion, +} + +impl From for TableGeoParquetOptions { + fn from(value: TableParquetOptions) -> Self { + Self { + inner: value, + geoparquet_version: GeoParquetVersion::default(), + } + } +} + +/// The GeoParquet Version to write for output with spatial columns +#[derive(Debug, Clone, Copy)] +pub enum GeoParquetVersion { + /// Write GeoParquet 1.0 metadata + /// + /// GeoParquet 1.0 has the widest support among readers and writers; however + /// it does not include row-group level statistics. + V1_0, + + /// Write GeoParquet 1.1 metadata and optional bounding box column + /// + /// A bbox column will be included for any column where the Parquet options would + /// have otherwise written statistics (which it will by default). + /// This option may be more computationally expensive; however, will result in + /// row-group level statistics that some readers (e.g., SedonaDB) can use to prune + /// row groups on read. + V1_1, + + /// Write GeoParquet 2.0 + /// + /// The GeoParquet 2.0 options is identical to GeoParquet 1.0 except the underlying storage + /// of spatial columns is Parquet native geometry, where the Parquet writer will include + /// native statistics according to the underlying Parquet options. Some readers + /// (e.g., SedonaDB) can use these statistics to prune row groups on read. + V2_0, + + /// Do not write GeoParquet metadata + /// + /// This option suppresses GeoParquet metadata; however, spatial types will be written as + /// Parquet native Geometry/Geography when this is supported by the underlying writer. + Omitted, +} + +impl Default for GeoParquetVersion { + fn default() -> Self { + Self::V1_0 + } +} diff --git a/rust/sedona-geoparquet/src/provider.rs b/rust/sedona-geoparquet/src/provider.rs index 5b5e059d..c76d7f56 100644 --- a/rust/sedona-geoparquet/src/provider.rs +++ b/rust/sedona-geoparquet/src/provider.rs @@ -173,7 +173,8 @@ impl ReadOptions<'_> for GeoParquetReadOptions<'_> { let mut options = self.inner.to_listing_options(config, table_options); if let Some(parquet_format) = options.format.as_any().downcast_ref::() { - options.format = Arc::new(GeoParquetFormat::new(parquet_format)); + let geoparquet_options = parquet_format.options().clone().into(); + options.format = Arc::new(GeoParquetFormat::new(geoparquet_options)); return options; } diff --git a/rust/sedona-geoparquet/src/writer.rs b/rust/sedona-geoparquet/src/writer.rs new file mode 100644 index 00000000..eb5e1406 --- /dev/null +++ b/rust/sedona-geoparquet/src/writer.rs @@ -0,0 +1,248 @@ +use std::sync::Arc; + +use datafusion::{ + config::TableParquetOptions, + datasource::{ + file_format::parquet::ParquetSink, physical_plan::FileSinkConfig, sink::DataSinkExec, + }, +}; +use datafusion_common::{exec_datafusion_err, exec_err, not_impl_err, Result}; +use datafusion_expr::dml::InsertOp; +use datafusion_physical_expr::LexRequirement; +use datafusion_physical_plan::ExecutionPlan; +use sedona_common::sedona_internal_err; +use sedona_schema::{ + crs::lnglat, + datatypes::{Edges, SedonaType}, + schema::SedonaSchema, +}; + +use crate::{ + metadata::{GeoParquetColumnMetadata, GeoParquetMetadata}, + options::{GeoParquetVersion, TableGeoParquetOptions}, +}; + +pub fn create_geoparquet_writer_physical_plan( + input: Arc, + conf: FileSinkConfig, + order_requirements: Option, + options: &TableGeoParquetOptions, +) -> Result> { + if conf.insert_op != InsertOp::Append { + return not_impl_err!("Overwrites are not implemented yet for Parquet"); + } + + // If there is no geometry, just use the inner implementation + let output_geometry_column_indices = conf.output_schema().geometry_column_indices()?; + if output_geometry_column_indices.is_empty() { + return create_inner_writer(input, conf, order_requirements, options.inner.clone()); + } + + // We have geometry and/or geography! Collect the GeoParquetMetadata we'll need to write + let mut metadata = GeoParquetMetadata::default(); + + // Check the version + match options.geoparquet_version { + GeoParquetVersion::V1_0 => { + metadata.version = "1.0.0".to_string(); + } + _ => { + return not_impl_err!( + "GeoParquetVersion {:?} is not yet supported", + options.geoparquet_version + ); + } + }; + + let field_names = conf + .output_schema() + .fields() + .iter() + .map(|f| f.name()) + .collect::>(); + + // Apply primary column + if let Some(output_geometry_primary) = conf.output_schema().primary_geometry_column_index()? { + metadata.primary_column = field_names[output_geometry_primary].clone(); + } + + // Apply all columns + for i in output_geometry_column_indices { + let f = conf.output_schema().field(i); + let sedona_type = SedonaType::from_storage_field(f)?; + let mut column_metadata = GeoParquetColumnMetadata::default(); + + let (edge_type, crs) = match sedona_type { + SedonaType::Wkb(edge_type, crs) | SedonaType::WkbView(edge_type, crs) => { + (edge_type, crs) + } + _ => return sedona_internal_err!("Unexpected type: {sedona_type}"), + }; + + // Assign edge type if needed + match edge_type { + Edges::Planar => {} + Edges::Spherical => { + column_metadata.edges = Some("spherical".to_string()); + } + } + + // Assign crs + if crs == lnglat() { + // Do nothing, lnglat is the meaning of an omitted CRS + } else if let Some(crs) = crs { + column_metadata.crs = Some(crs.to_json().parse().map_err(|e| { + exec_datafusion_err!("Failed to parse CRS for column '{}' {e}", f.name()) + })?); + } else { + return exec_err!( + "Can't write GeoParquet from null CRS\nUse ST_SetSRID({}, ...) to assign it one", + f.name() + ); + } + + // Add to metadata + metadata + .columns + .insert(f.name().to_string(), column_metadata); + } + + // Apply to the Parquet options + let mut parquet_options = options.inner.clone(); + parquet_options.key_value_metadata.insert( + "geo".to_string(), + Some( + serde_json::to_string(&metadata).map_err(|e| { + exec_datafusion_err!("Failed to serialize GeoParquet metadata: {e}") + })?, + ), + ); + + // Create the sink + let sink = Arc::new(ParquetSink::new(conf, parquet_options)); + Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _) +} + +fn create_inner_writer( + input: Arc, + conf: FileSinkConfig, + order_requirements: Option, + options: TableParquetOptions, +) -> Result> { + // Create the sink + let sink = Arc::new(ParquetSink::new(conf, options)); + Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _) +} + +#[cfg(test)] +mod test { + use std::iter::zip; + + use datafusion::datasource::file_format::format_as_file_type; + use datafusion::prelude::DataFrame; + use datafusion::{ + execution::SessionStateBuilder, + prelude::{col, SessionContext}, + }; + use datafusion_expr::LogicalPlanBuilder; + use sedona_testing::data::test_geoparquet; + use tempfile::tempdir; + + use crate::format::GeoParquetFormatFactory; + + use super::*; + + fn setup_context() -> SessionContext { + let mut state = SessionStateBuilder::new().build(); + state + .register_file_format(Arc::new(GeoParquetFormatFactory::new()), true) + .unwrap(); + SessionContext::new_with_state(state).enable_url_table() + } + + async fn test_dataframe_roundtrip(ctx: SessionContext, df: DataFrame) { + // It's a bit verbose to trigger this without helpers + let format = GeoParquetFormatFactory::new(); + let file_type = format_as_file_type(Arc::new(format)); + let tmpdir = tempdir().unwrap(); + + let df_batches = df.clone().collect().await.unwrap(); + + let tmp_parquet = tmpdir.path().join("foofy_spatial.parquet"); + + let plan = LogicalPlanBuilder::copy_to( + df.into_unoptimized_plan(), + tmp_parquet.to_string_lossy().into(), + file_type, + Default::default(), + vec![], + ) + .unwrap() + .build() + .unwrap(); + + DataFrame::new(ctx.state(), plan).collect().await.unwrap(); + + let df_parquet_batches = ctx + .table(tmp_parquet.to_string_lossy().to_string()) + .await + .unwrap() + .collect() + .await + .unwrap(); + + assert_eq!(df_parquet_batches.len(), df_batches.len()); + + // Check types, since the schema may not compare byte-for-byte equal (CRSes) + let df_parquet_sedona_types = df_parquet_batches[0] + .schema() + .sedona_types() + .collect::>>() + .unwrap(); + let df_sedona_types = df_batches[0] + .schema() + .sedona_types() + .collect::>>() + .unwrap(); + assert_eq!(df_parquet_sedona_types, df_sedona_types); + + // Check batches without metadata + for (df_parquet_batch, df_batch) in zip(df_parquet_batches, df_batches) { + assert_eq!(df_parquet_batch.columns(), df_batch.columns()) + } + } + + #[tokio::test] + async fn writer_without_spatial() { + let example = test_geoparquet("example", "geometry").unwrap(); + let ctx = setup_context(); + + // Deselect all geometry columns + let df = ctx + .table(&example) + .await + .unwrap() + .select(vec![col("wkt")]) + .unwrap(); + + test_dataframe_roundtrip(ctx, df).await; + } + + #[tokio::test] + async fn writer_with_geometry() { + let example = test_geoparquet("example", "geometry").unwrap(); + let ctx = setup_context(); + let df = ctx.table(&example).await.unwrap(); + + test_dataframe_roundtrip(ctx, df).await; + } + + #[tokio::test] + async fn writer_with_geography() { + let example = test_geoparquet("natural-earth", "countries-geography").unwrap(); + let ctx = setup_context(); + let df = ctx.table(&example).await.unwrap(); + + test_dataframe_roundtrip(ctx, df).await; + } +} diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml index 2be1be84..c7cb95a2 100644 --- a/rust/sedona/Cargo.toml +++ b/rust/sedona/Cargo.toml @@ -40,6 +40,7 @@ spatial-join = ["dep:sedona-spatial-join"] s2geography = ["dep:sedona-s2geography"] [dev-dependencies] +tempfile = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } rstest = { workspace = true } diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs index 3ca8fc61..07fabf87 100644 --- a/rust/sedona/src/context.rs +++ b/rust/sedona/src/context.rs @@ -22,7 +22,10 @@ use crate::{ random_geometry_provider::RandomGeometryFunction, show::{show_batches, DisplayTableOptions}, }; +use arrow_array::RecordBatch; use async_trait::async_trait; +use datafusion::dataframe::DataFrameWriteOptions; +use datafusion::datasource::file_format::format_as_file_type; use datafusion::{ common::{plan_datafusion_err, plan_err}, error::{DataFusionError, Result}, @@ -30,11 +33,15 @@ use datafusion::{ prelude::{DataFrame, SessionConfig, SessionContext}, sql::parser::{DFParser, Statement}, }; +use datafusion_common::not_impl_err; +use datafusion_expr::dml::InsertOp; use datafusion_expr::sqlparser::dialect::{dialect_from_str, Dialect}; +use datafusion_expr::{LogicalPlanBuilder, SortExpr}; use parking_lot::Mutex; use sedona_common::option::add_sedona_option_extension; use sedona_expr::aggregate_udf::SedonaAccumulatorRef; use sedona_expr::{function_set::FunctionSet, scalar_udf::ScalarKernelRef}; +use sedona_geoparquet::options::TableGeoParquetOptions; use sedona_geoparquet::{ format::GeoParquetFormatFactory, provider::{geoparquet_listing_table, GeoParquetReadOptions}, @@ -269,6 +276,14 @@ pub trait SedonaDataFrame { limit: Option, options: DisplayTableOptions<'a>, ) -> Result; + + async fn write_geoparquet( + self, + ctx: &SedonaContext, + path: &str, + options: SedonaWriteOptions, + writer_options: Option, + ) -> Result>; } #[async_trait] @@ -287,6 +302,120 @@ impl SedonaDataFrame for DataFrame { show_batches(ctx, &mut out, schema, batches, options)?; String::from_utf8(out).map_err(|e| DataFusionError::External(Box::new(e))) } + + async fn write_geoparquet( + self, + ctx: &SedonaContext, + path: &str, + options: SedonaWriteOptions, + writer_options: Option, + ) -> Result, DataFusionError> { + if options.insert_op != InsertOp::Append { + return not_impl_err!( + "{} is not implemented for DataFrame::write_geoparquet.", + options.insert_op + ); + } + + let format = if let Some(parquet_opts) = writer_options { + Arc::new(GeoParquetFormatFactory::new_with_options(parquet_opts)) + } else { + Arc::new(GeoParquetFormatFactory::new()) + }; + + let file_type = format_as_file_type(format); + + let plan = if options.sort_by.is_empty() { + self.into_unoptimized_plan() + } else { + LogicalPlanBuilder::from(self.into_unoptimized_plan()) + .sort(options.sort_by)? + .build()? + }; + + let plan = LogicalPlanBuilder::copy_to( + plan, + path.into(), + file_type, + Default::default(), + options.partition_by, + )? + .build()?; + + DataFrame::new(ctx.ctx.state(), plan).collect().await + } +} + +/// A Sedona-specific copy of [DataFrameWriteOptions] +/// +/// This is needed because [DataFrameWriteOptions] has private fields, so we +/// can't use it in our interfaces. This object can be converted to a +/// [DataFrameWriteOptions] using `.into()`. +pub struct SedonaWriteOptions { + /// Controls how new data should be written to the table, determining whether + /// to append, overwrite, or replace existing data. + pub insert_op: InsertOp, + /// Controls if all partitions should be coalesced into a single output file + /// Generally will have slower performance when set to true. + pub single_file_output: bool, + /// Sets which columns should be used for hive-style partitioned writes by name. + /// Can be set to empty vec![] for non-partitioned writes. + pub partition_by: Vec, + /// Sets which columns should be used for sorting the output by name. + /// Can be set to empty vec![] for non-sorted writes. + pub sort_by: Vec, +} + +impl From for DataFrameWriteOptions { + fn from(value: SedonaWriteOptions) -> Self { + DataFrameWriteOptions::new() + .with_insert_operation(value.insert_op) + .with_single_file_output(value.single_file_output) + .with_partition_by(value.partition_by) + .with_sort_by(value.sort_by) + } +} + +impl SedonaWriteOptions { + /// Create a new SedonaWriteOptions with default values + pub fn new() -> Self { + SedonaWriteOptions { + insert_op: InsertOp::Append, + single_file_output: false, + partition_by: vec![], + sort_by: vec![], + } + } + + /// Set the insert operation + pub fn with_insert_operation(mut self, insert_op: InsertOp) -> Self { + self.insert_op = insert_op; + self + } + + /// Set the single_file_output value to true or false + pub fn with_single_file_output(mut self, single_file_output: bool) -> Self { + self.single_file_output = single_file_output; + self + } + + /// Sets the partition_by columns for output partitioning + pub fn with_partition_by(mut self, partition_by: Vec) -> Self { + self.partition_by = partition_by; + self + } + + /// Sets the sort_by columns for output sorting + pub fn with_sort_by(mut self, sort_by: Vec) -> Self { + self.sort_by = sort_by; + self + } +} + +impl Default for SedonaWriteOptions { + fn default() -> Self { + Self::new() + } } // Because Dialect/dialect_from_str is not marked as Send, using the async @@ -325,6 +454,7 @@ mod tests { datatypes::{Edges, SedonaType}, }; use sedona_testing::data::test_geoparquet; + use tempfile::tempdir; use super::*; @@ -375,6 +505,23 @@ mod tests { ); } + #[tokio::test] + async fn write_geoparquet() { + let tmpdir = tempdir().unwrap(); + let tmp_parquet = tmpdir.path().join("tmp.parquet"); + let ctx = SedonaContext::new(); + ctx.sql("SELECT 1 as one") + .await + .unwrap() + .write_parquet( + &tmp_parquet.to_string_lossy(), + DataFrameWriteOptions::default(), + None, + ) + .await + .unwrap(); + } + #[tokio::test] async fn geoparquet_format() { // Make sure that our context can be set up to identify and read