Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ rstest = "0.24.0"
serde = { version = "1" }
serde_json = { version = "1" }
serde_with = { version = "1" }
tempfile = { version = "3"}
thiserror = { version = "2" }
tokio = { version = "1.44" }
url = "2.5.4"
Expand Down
1 change: 1 addition & 0 deletions python/sedonadb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ arrow-array = { workspace = true }
async-trait = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-ffi = { workspace = true }
futures = { workspace = true }
pyo3 = { version = "0.25.1" }
Expand Down
64 changes: 63 additions & 1 deletion python/sedonadb/python/sedonadb/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import TYPE_CHECKING, Union, Optional, Any

from pathlib import Path
from typing import TYPE_CHECKING, Union, Optional, Any, Iterable

from sedonadb._options import global_options

Expand Down Expand Up @@ -263,6 +265,66 @@ def to_pandas(
else:
return table.to_pandas()

def to_parquet(
self,
path: Union[str, Path],
*,
partition_by: Optional[Union[str, Iterable[str]]] = None,
sort_by: Optional[Union[str, Iterable[str]]] = None,
single_file_output: Optional[bool] = None,
):
"""Write this DataFrame to one or more (Geo)Parquet files

For input that contains geometry columns, GeoParquet metadata is written
such that suitable readers can recreate Geometry/Geography types when
reading the output.


Args:
path: A filename or directory to which parquet file(s) should be written.
partition_by: A vector of column names to partition by. If non-empty,
applies hive-style partitioning to the output.
sort_by: A vector of column names to sort by. Currently only ascending
sort is supported.
single_file_output: Use True or False to force writing a single Parquet
file vs. writing one file per partition to a directory. By default,
a single file is written if `partition_by` is unspecified and
`path` ends with `.parquet`.

Examples:

>>> import sedonadb
>>> import tempfile
>>> con = sedonadb.connect()
>>> td = tempfile.TemporaryDirectory()
>>> url = "https://github.com/apache/sedona-testing/raw/refs/heads/main/data/parquet/geoparquet-1.1.0.parquet"
>>> con.read_parquet(url).to_parquet(f"{td.name}/tmp.parquet")

"""

path = Path(path)

if single_file_output is None:
single_file_output = partition_by is None and str(path).endswith(".parquet")

if isinstance(partition_by, str):
partition_by = [partition_by]
elif partition_by is not None:
partition_by = list(partition_by)
else:
partition_by = []

if isinstance(sort_by, str):
sort_by = [sort_by]
elif sort_by is not None:
sort_by = list(sort_by)
else:
sort_by = []

self._impl.to_parquet(
self._ctx, str(path), partition_by, sort_by, single_file_output
)

def show(
self,
limit: Optional[int] = 10,
Expand Down
41 changes: 40 additions & 1 deletion python/sedonadb/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ use arrow_array::ffi_stream::FFI_ArrowArrayStream;
use arrow_array::RecordBatchReader;
use arrow_schema::Schema;
use datafusion::catalog::MemTable;
use datafusion::logical_expr::SortExpr;
use datafusion::prelude::DataFrame;
use datafusion_common::Column;
use datafusion_expr::Expr;
use datafusion_ffi::table_provider::FFI_TableProvider;
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
use sedona::context::SedonaDataFrame;
use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
use sedona::show::{DisplayMode, DisplayTableOptions};
use sedona_geoparquet::options::TableGeoParquetOptions;
use sedona_schema::schema::SedonaSchema;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -119,6 +123,41 @@ impl InternalDataFrame {
))
}

fn to_parquet<'py>(
&self,
py: Python<'py>,
ctx: &InternalContext,
path: String,
partition_by: Vec<String>,
sort_by: Vec<String>,
single_file_output: bool,
) -> Result<(), PySedonaError> {
// sort_by needs to be SortExpr. A Vec<String> can unambiguously be interpreted as
// field names (ascending), but other types of expressions aren't supported here yet.
let sort_by_expr = sort_by
.into_iter()
.map(|name| {
let column = Expr::Column(Column::new_unqualified(name));
SortExpr::new(column, true, false)
})
.collect::<Vec<_>>();

let options = SedonaWriteOptions::new()
.with_partition_by(partition_by)
.with_sort_by(sort_by_expr)
.with_single_file_output(single_file_output);
let writer_options = TableGeoParquetOptions::default();

wait_for_future(
py,
&self.runtime,
self.inner
.clone()
.write_geoparquet(&ctx.inner, &path, options, Some(writer_options)),
)??;
Ok(())
}

fn show<'py>(
&self,
py: Python<'py>,
Expand Down
37 changes: 37 additions & 0 deletions python/sedonadb/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tempfile
import shapely
import geopandas
import geopandas.testing
from pyarrow import parquet
from pathlib import Path
from sedonadb.testing import geom_or_null, SedonaDB, DuckDB, skip_if_not_exists
Expand Down Expand Up @@ -238,3 +239,39 @@ def test_read_geoparquet_prune_polygons(sedona_testing, predicate):
"""
)
eng.assert_result(result, gdf)


@pytest.mark.parametrize("name", ["water-junc", "water-point"])
def test_write_geoparquet_geometry(con, geoarrow_data, name):
# Checks a read and write of some non-trivial files and ensures we match GeoPandas
path = geoarrow_data / "ns-water" / "files" / f"ns-water_{name}_geo.parquet"
skip_if_not_exists(path)

gdf = geopandas.read_parquet(path).sort_values(by="OBJECTID").reset_index(drop=True)

with tempfile.TemporaryDirectory() as td:
tmp_parquet = Path(td) / "tmp.parquet"
con.create_data_frame(gdf).to_parquet(tmp_parquet)

gdf_roundtrip = geopandas.read_parquet(tmp_parquet)
geopandas.testing.assert_geodataframe_equal(gdf_roundtrip, gdf)


def test_write_geoparquet_geography(con, geoarrow_data):
# Checks a read and write of geography (rounctrip, since nobody else can read/write)
path = (
geoarrow_data
/ "natural-earth"
/ "files"
/ "natural-earth_countries-geography_geo.parquet"
)
skip_if_not_exists(path)

table = con.read_parquet(path).to_arrow_table()

with tempfile.TemporaryDirectory() as td:
tmp_parquet = Path(td) / "tmp.parquet"
con.create_data_frame(table).to_parquet(tmp_parquet)

table_roundtrip = con.read_parquet(tmp_parquet).to_arrow_table()
assert table_roundtrip == table
51 changes: 51 additions & 0 deletions python/sedonadb/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import geoarrow.types as gat
import geopandas.testing
import pandas as pd
from pathlib import Path
import pyarrow as pa
import pytest
import sedonadb
import tempfile


def test_dataframe_from_dataframe(con):
Expand Down Expand Up @@ -281,6 +283,55 @@ def test_dataframe_to_pandas(con):
)


def test_dataframe_to_parquet(con):
df = con.sql(
"SELECT * FROM (VALUES ('one', 1), ('two', 2), ('three', 3)) AS t(a, b)"
)

with tempfile.TemporaryDirectory() as td:
# Defaults with a path that ends with .parquet (single file)
tmp_parquet_file = Path(td) / "tmp.parquet"
df.to_parquet(tmp_parquet_file)

assert tmp_parquet_file.exists()
assert tmp_parquet_file.is_file()
pd.testing.assert_frame_equal(
pd.read_parquet(tmp_parquet_file),
pd.DataFrame({"a": ["one", "two", "three"], "b": [1, 2, 3]}),
)

# Defaults with a path that doesn't end in .parquet (directory)
tmp_parquet_dir = Path(td) / "tmp"
df.to_parquet(tmp_parquet_dir)

assert tmp_parquet_dir.exists()
assert tmp_parquet_dir.is_dir()
pd.testing.assert_frame_equal(
pd.read_parquet(tmp_parquet_dir),
pd.DataFrame({"a": ["one", "two", "three"], "b": [1, 2, 3]}),
)

# With partition_by
tmp_parquet_dir = Path(td) / "tmp_partitioned"
df.to_parquet(tmp_parquet_dir, partition_by=["a"])
assert tmp_parquet_dir.exists()
assert tmp_parquet_dir.is_dir()
pd.testing.assert_frame_equal(
pd.read_parquet(tmp_parquet_dir).sort_values("b").reset_index(drop=True),
pd.DataFrame(
{"b": [1, 2, 3], "a": pd.Categorical(["one", "two", "three"])}
),
)

# With order_by
tmp_parquet = Path(td) / "tmp_ordered.parquet"
df.to_parquet(tmp_parquet, sort_by=["a"])
pd.testing.assert_frame_equal(
pd.read_parquet(tmp_parquet),
pd.DataFrame({"a": ["one", "three", "two"], "b": [1, 3, 2]}),
)


def test_show(con, capsys):
con.sql("SELECT 1 as one").show()
expected = """
Expand Down
3 changes: 2 additions & 1 deletion rust/sedona-geoparquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ default = []
sedona-testing = { path = "../sedona-testing" }
url = { workspace = true }
rstest = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }

[dependencies]
async-trait = { workspace = true }
Expand All @@ -59,4 +61,3 @@ sedona-schema = { path = "../sedona-schema" }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
tokio = { workspace = true }
Loading
Loading