diff --git a/lonboard/_geoarrow/ops/bbox.py b/lonboard/_geoarrow/ops/bbox.py index 220d2663..4cbbcd82 100644 --- a/lonboard/_geoarrow/ops/bbox.py +++ b/lonboard/_geoarrow/ops/bbox.py @@ -3,6 +3,7 @@ from __future__ import annotations import math +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass import numpy as np @@ -63,8 +64,12 @@ def _coords_bbox(arr: Array) -> Bbox: def _total_bounds_nest_0(column: ChunkedArray) -> Bbox: bbox = Bbox() - for coords in column.chunks: - bbox.update(_coords_bbox(coords)) + + with ThreadPoolExecutor() as executor: + bboxes = list(executor.map(_coords_bbox, column.chunks)) + + for other in bboxes: + bbox.update(other) return bbox @@ -72,8 +77,12 @@ def _total_bounds_nest_0(column: ChunkedArray) -> Bbox: def _total_bounds_nest_1(column: ChunkedArray) -> Bbox: bbox = Bbox() flat_array = list_flatten(column) - for coords in flat_array: - bbox.update(_coords_bbox(coords)) + + with ThreadPoolExecutor() as executor: + bboxes = list(executor.map(_coords_bbox, flat_array)) + + for other in bboxes: + bbox.update(other) return bbox @@ -81,8 +90,12 @@ def _total_bounds_nest_1(column: ChunkedArray) -> Bbox: def _total_bounds_nest_2(column: ChunkedArray) -> Bbox: bbox = Bbox() flat_array = list_flatten(list_flatten(column)) - for coords in flat_array: - bbox.update(_coords_bbox(coords)) + + with ThreadPoolExecutor() as executor: + bboxes = list(executor.map(_coords_bbox, flat_array)) + + for other in bboxes: + bbox.update(other) return bbox @@ -90,34 +103,48 @@ def _total_bounds_nest_2(column: ChunkedArray) -> Bbox: def _total_bounds_nest_3(column: ChunkedArray) -> Bbox: bbox = Bbox() flat_array = list_flatten(list_flatten(list_flatten(column))) - for coords in flat_array: - bbox.update(_coords_bbox(coords)) + + with ThreadPoolExecutor() as executor: + bboxes = list(executor.map(_coords_bbox, flat_array)) + + for other in bboxes: + bbox.update(other) return bbox +def _coords_bbox_struct(chunk: Array) -> Bbox: + """Compute the bounds of a single geoarrow.box chunk.""" + is_2d = len(chunk.field.type.fields) == 4 + is_3d = len(chunk.field.type.fields) == 6 + + if is_2d: + minx = np.min(struct_field(chunk, 0)) + miny = np.min(struct_field(chunk, 1)) + maxx = np.max(struct_field(chunk, 2)) + maxy = np.max(struct_field(chunk, 3)) + elif is_3d: + minx = np.min(struct_field(chunk, 0)) + miny = np.min(struct_field(chunk, 1)) + maxx = np.max(struct_field(chunk, 3)) + maxy = np.max(struct_field(chunk, 4)) + else: + raise ValueError( + f"Unexpected box type with {len(chunk.field.type.fields)} fields.\n" + "Only 2D and 3D boxes are supported.", + ) + + return Bbox(minx=minx, miny=miny, maxx=maxx, maxy=maxy) + + def _total_bounds_box(column: ChunkedArray) -> Bbox: """Compute the total bounds of a geoarrow.box column.""" bbox = Bbox() - for chunk in column.chunks: - is_2d = len(chunk.field.type.fields) == 4 - is_3d = len(chunk.field.type.fields) == 6 - - if is_2d: - minx = np.min(struct_field(chunk, 0)) - miny = np.min(struct_field(chunk, 1)) - maxx = np.max(struct_field(chunk, 2)) - maxy = np.max(struct_field(chunk, 3)) - elif is_3d: - minx = np.min(struct_field(chunk, 0)) - miny = np.min(struct_field(chunk, 1)) - maxx = np.max(struct_field(chunk, 3)) - maxy = np.max(struct_field(chunk, 4)) - else: - raise ValueError( - f"Unexpected box type with {len(chunk.field.type.fields)} fields.\n" - "Only 2D and 3D boxes are supported.", - ) - bbox.update(Bbox(minx=minx, miny=miny, maxx=maxx, maxy=maxy)) + + with ThreadPoolExecutor() as executor: + bboxes = list(executor.map(_coords_bbox_struct, column.chunks)) + + for other in bboxes: + bbox.update(other) return bbox diff --git a/lonboard/_serialization.py b/lonboard/_serialization.py index 3ed0d38f..04a3fd10 100644 --- a/lonboard/_serialization.py +++ b/lonboard/_serialization.py @@ -1,6 +1,7 @@ from __future__ import annotations import math +from concurrent.futures import ThreadPoolExecutor from io import BytesIO from typing import TYPE_CHECKING, Any, overload @@ -87,13 +88,15 @@ def write_parquet_batch(record_batch: RecordBatch) -> bytes: def serialize_table_to_parquet(table: Table, *, max_chunksize: int) -> list[bytes]: - buffers: list[bytes] = [] assert max_chunksize > 0 - for record_batch in table.rechunk(max_chunksize=max_chunksize).to_batches(): - buffers.append(write_parquet_batch(record_batch)) - - return buffers + with ThreadPoolExecutor() as executor: + return list( + executor.map( + write_parquet_batch, + table.rechunk(max_chunksize=max_chunksize).to_batches(), + ), + ) def serialize_pyarrow_column(