From d0953bff0c55fa3bb96b419daa83c07ba16ba66f Mon Sep 17 00:00:00 2001 From: Alex Reichenbach <8087473+Reichenbachian@users.noreply.github.com> Date: Mon, 2 Feb 2026 19:52:14 +0000 Subject: [PATCH 1/2] Batch polars parquet uploads --- src/structify/resources/polars.py | 37 ++++++++++++++++++------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/structify/resources/polars.py b/src/structify/resources/polars.py index b9d5e7192..942270d51 100644 --- a/src/structify/resources/polars.py +++ b/src/structify/resources/polars.py @@ -1120,8 +1120,8 @@ def match( if not request_cost_confirmation_if_needed(self._client, len(df), "match"): raise Exception("User cancelled matching operation") - self._upload_df(df, dataset_name, "table1") - self._upload_df(reference_df, dataset_name, "table2") + self._upload_df(df, dataset_name, "table1", batch_size=df.height) + self._upload_df(reference_df, dataset_name, "table2", batch_size=reference_df.height) # Wait for all embeddings to be added tqdm_marker = tqdm(total=df.height + reference_df.height, desc="Waiting for embeddings") @@ -1186,20 +1186,27 @@ def _upload_df( df: pl.DataFrame, dataset_name: str, table_name: str, + *, + batch_size: int = 10_000, ) -> None: - parquet_bytes = io.BytesIO() - df.write_parquet(parquet_bytes) - parquet_bytes.seek(0) - - response = self._client._client.post( - "/entity/upload_parquet", - params={"dataset": dataset_name, "table_name": table_name}, - files={"file": ("data.parquet", parquet_bytes.getvalue(), "application/octet-stream")}, - headers=self._client.auth_headers, - ) - response.raise_for_status() - - pass + if df.is_empty(): + return + if batch_size <= 0: + raise ValueError("batch_size must be positive") + + for offset in range(0, df.height, batch_size): + chunk = df.slice(offset, batch_size) + parquet_bytes = io.BytesIO() + chunk.write_parquet(parquet_bytes) + parquet_bytes.seek(0) + + response = self._client._client.post( + "/entity/upload_parquet", + params={"dataset": dataset_name, "table_name": table_name}, + files={"file": ("data.parquet", parquet_bytes.getvalue(), "application/octet-stream")}, + headers=self._client.auth_headers, + ) + response.raise_for_status() class PolarsResourceWithRawResponse: From 896b943d10700caaf3c307f8abc59dd8420eb8fa Mon Sep 17 00:00:00 2001 From: Alex Reichenbach <8087473+Reichenbachian@users.noreply.github.com> Date: Mon, 2 Feb 2026 20:00:44 +0000 Subject: [PATCH 2/2] Make polars upload batch size a constant --- src/structify/resources/polars.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/structify/resources/polars.py b/src/structify/resources/polars.py index 942270d51..ddcfab5a1 100644 --- a/src/structify/resources/polars.py +++ b/src/structify/resources/polars.py @@ -1120,8 +1120,8 @@ def match( if not request_cost_confirmation_if_needed(self._client, len(df), "match"): raise Exception("User cancelled matching operation") - self._upload_df(df, dataset_name, "table1", batch_size=df.height) - self._upload_df(reference_df, dataset_name, "table2", batch_size=reference_df.height) + self._upload_df(df, dataset_name, "table1") + self._upload_df(reference_df, dataset_name, "table2") # Wait for all embeddings to be added tqdm_marker = tqdm(total=df.height + reference_df.height, desc="Waiting for embeddings") @@ -1186,16 +1186,14 @@ def _upload_df( df: pl.DataFrame, dataset_name: str, table_name: str, - *, - batch_size: int = 10_000, ) -> None: + BATCH_UPLOAD_SIZE = 10_000 + if df.is_empty(): return - if batch_size <= 0: - raise ValueError("batch_size must be positive") - for offset in range(0, df.height, batch_size): - chunk = df.slice(offset, batch_size) + for offset in range(0, df.height, BATCH_UPLOAD_SIZE): + chunk = df.slice(offset, BATCH_UPLOAD_SIZE) parquet_bytes = io.BytesIO() chunk.write_parquet(parquet_bytes) parquet_bytes.seek(0)