Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 55 additions & 28 deletions lonboard/_geoarrow/ops/bbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import math
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass

import numpy as np
Expand Down Expand Up @@ -63,61 +64,87 @@ def _coords_bbox(arr: Array) -> Bbox:

def _total_bounds_nest_0(column: ChunkedArray) -> Bbox:
bbox = Bbox()
for coords in column.chunks:
bbox.update(_coords_bbox(coords))

with ThreadPoolExecutor() as executor:
bboxes = list(executor.map(_coords_bbox, column.chunks))

for other in bboxes:
bbox.update(other)

return bbox


def _total_bounds_nest_1(column: ChunkedArray) -> Bbox:
bbox = Bbox()
flat_array = list_flatten(column)
for coords in flat_array:
bbox.update(_coords_bbox(coords))

with ThreadPoolExecutor() as executor:
bboxes = list(executor.map(_coords_bbox, flat_array))

for other in bboxes:
bbox.update(other)

return bbox


def _total_bounds_nest_2(column: ChunkedArray) -> Bbox:
bbox = Bbox()
flat_array = list_flatten(list_flatten(column))
for coords in flat_array:
bbox.update(_coords_bbox(coords))

with ThreadPoolExecutor() as executor:
bboxes = list(executor.map(_coords_bbox, flat_array))

for other in bboxes:
bbox.update(other)

return bbox


def _total_bounds_nest_3(column: ChunkedArray) -> Bbox:
bbox = Bbox()
flat_array = list_flatten(list_flatten(list_flatten(column)))
for coords in flat_array:
bbox.update(_coords_bbox(coords))

with ThreadPoolExecutor() as executor:
bboxes = list(executor.map(_coords_bbox, flat_array))

for other in bboxes:
bbox.update(other)

return bbox


def _coords_bbox_struct(chunk: Array) -> Bbox:
"""Compute the bounds of a single geoarrow.box chunk."""
is_2d = len(chunk.field.type.fields) == 4
is_3d = len(chunk.field.type.fields) == 6

if is_2d:
minx = np.min(struct_field(chunk, 0))
miny = np.min(struct_field(chunk, 1))
maxx = np.max(struct_field(chunk, 2))
maxy = np.max(struct_field(chunk, 3))
elif is_3d:
minx = np.min(struct_field(chunk, 0))
miny = np.min(struct_field(chunk, 1))
maxx = np.max(struct_field(chunk, 3))
maxy = np.max(struct_field(chunk, 4))
else:
raise ValueError(
f"Unexpected box type with {len(chunk.field.type.fields)} fields.\n"
"Only 2D and 3D boxes are supported.",
)

return Bbox(minx=minx, miny=miny, maxx=maxx, maxy=maxy)


def _total_bounds_box(column: ChunkedArray) -> Bbox:
"""Compute the total bounds of a geoarrow.box column."""
bbox = Bbox()
for chunk in column.chunks:
is_2d = len(chunk.field.type.fields) == 4
is_3d = len(chunk.field.type.fields) == 6

if is_2d:
minx = np.min(struct_field(chunk, 0))
miny = np.min(struct_field(chunk, 1))
maxx = np.max(struct_field(chunk, 2))
maxy = np.max(struct_field(chunk, 3))
elif is_3d:
minx = np.min(struct_field(chunk, 0))
miny = np.min(struct_field(chunk, 1))
maxx = np.max(struct_field(chunk, 3))
maxy = np.max(struct_field(chunk, 4))
else:
raise ValueError(
f"Unexpected box type with {len(chunk.field.type.fields)} fields.\n"
"Only 2D and 3D boxes are supported.",
)
bbox.update(Bbox(minx=minx, miny=miny, maxx=maxx, maxy=maxy))

with ThreadPoolExecutor() as executor:
bboxes = list(executor.map(_coords_bbox_struct, column.chunks))

for other in bboxes:
bbox.update(other)

return bbox
13 changes: 8 additions & 5 deletions lonboard/_serialization.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import math
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from typing import TYPE_CHECKING, Any, overload

Expand Down Expand Up @@ -87,13 +88,15 @@ def write_parquet_batch(record_batch: RecordBatch) -> bytes:


def serialize_table_to_parquet(table: Table, *, max_chunksize: int) -> list[bytes]:
buffers: list[bytes] = []
assert max_chunksize > 0

for record_batch in table.rechunk(max_chunksize=max_chunksize).to_batches():
buffers.append(write_parquet_batch(record_batch))

return buffers
with ThreadPoolExecutor() as executor:
return list(
executor.map(
write_parquet_batch,
table.rechunk(max_chunksize=max_chunksize).to_batches(),
),
)


def serialize_pyarrow_column(
Expand Down