# Streaming Large Datasets with pyhdb-rs

When working with datasets that don't fit in memory, pyhdb-rs provides
**streaming Arrow batches** for efficient processing.

## Memory-Efficient Data Pipeline

```
HANA (100M rows) → Stream batches (64K rows each) → Process → Aggregate
                   ↑ constant memory usage!
```

In [None]:
import os
import time

import polars as pl
import pyarrow as pa
from pyhdb_rs import connect

HANA_URL = os.environ.get("HANA_TEST_URI")

## RecordBatchReader for Streaming

The `execute_arrow_batches()` method returns a PyArrow RecordBatchReader
that yields batches on demand, keeping memory usage constant.

In [None]:
with connect(HANA_URL) as conn, conn.cursor() as cursor:
    # Get streaming reader (doesn't load all data!)
    reader = cursor.execute_arrow_batches(
        "SELECT * FROM TRANSACTION_HISTORY",  # Could be billions of rows
        batch_size=65536,  # 64K rows per batch
    )

    print(f"Schema: {reader.schema}")

    # Process batches one at a time
    total_rows = 0
    for batch in reader:
        total_rows += batch.num_rows
        # Process batch...

    print(f"Processed {total_rows:,} rows")

## Streaming Aggregation

Compute aggregates over huge datasets without loading everything into memory.

In [None]:
def streaming_aggregate(cursor, query: str, agg_column: str) -> dict:
    """
    Compute sum, count, min, max over a streaming result set.
    Memory usage: O(1) regardless of data size.
    """
    reader = cursor.execute_arrow_batches(query, batch_size=65536)

    total_sum = 0.0
    total_count = 0
    min_val = float("inf")
    max_val = float("-inf")

    for batch in reader:
        # Convert to Polars for fast aggregation
        df = pl.from_arrow(batch)
        col = df[agg_column]

        # Update running aggregates
        total_sum += col.sum()
        total_count += len(col)
        min_val = min(min_val, col.min())
        max_val = max(max_val, col.max())

    return {
        "sum": total_sum,
        "count": total_count,
        "mean": total_sum / total_count if total_count > 0 else 0,
        "min": min_val,
        "max": max_val,
    }


# Example usage
with connect(HANA_URL) as conn, conn.cursor() as cursor:
    stats = streaming_aggregate(
        cursor, "SELECT NET_AMOUNT FROM TRANSACTIONS WHERE FISCAL_YEAR = 2026", "NET_AMOUNT"
    )

print("Statistics:")
for key, value in stats.items():
    print(f"  {key}: {value:,.2f}")

## Grouped Streaming Aggregation

When you need group-by aggregates over large data, accumulate partial results.

In [None]:
from collections import defaultdict


def streaming_group_by(cursor, query: str, group_col: str, agg_col: str) -> pl.DataFrame:
    """
    Streaming group-by aggregation.
    Memory usage: O(groups) not O(rows).
    """
    reader = cursor.execute_arrow_batches(query, batch_size=65536)

    # Accumulators per group
    group_sums = defaultdict(float)
    group_counts = defaultdict(int)

    for batch in reader:
        df = pl.from_arrow(batch)

        # Aggregate this batch
        batch_agg = df.group_by(group_col).agg(
            [
                pl.col(agg_col).sum().alias("sum"),
                pl.len().alias("count"),
            ]
        )

        # Merge into global accumulators
        for row in batch_agg.iter_rows(named=True):
            key = row[group_col]
            group_sums[key] += row["sum"]
            group_counts[key] += row["count"]

    # Build final result
    return pl.DataFrame(
        {
            group_col: list(group_sums.keys()),
            "sum": list(group_sums.values()),
            "count": list(group_counts.values()),
            "mean": [s / c for s, c in zip(group_sums.values(), group_counts.values())],
        }
    ).sort("sum", descending=True)


# Example: Revenue by region over 100M transactions
with connect(HANA_URL) as conn, conn.cursor() as cursor:
    result = streaming_group_by(
        cursor, "SELECT SALES_REGION, NET_AMOUNT FROM TRANSACTIONS", "SALES_REGION", "NET_AMOUNT"
    )

print(result)

## Chunked Export to Parquet

Export large query results directly to Parquet files without loading into memory.

In [None]:
import pyarrow.parquet as pq


def export_to_parquet(cursor, query: str, output_path: str, batch_size: int = 65536):
    """
    Stream query results directly to Parquet file.
    Memory usage: O(batch_size) not O(total_rows).
    """
    reader = cursor.execute_arrow_batches(query, batch_size=batch_size)

    writer = None
    total_rows = 0

    try:
        for batch in reader:
            if writer is None:
                # Create writer with schema from first batch
                writer = pq.ParquetWriter(output_path, batch.schema, compression="snappy")

            # Write batch to parquet
            table = pa.Table.from_batches([batch])
            writer.write_table(table)
            total_rows += batch.num_rows

            # Progress indicator
            print(f"\rExported {total_rows:,} rows...", end="", flush=True)
    finally:
        if writer:
            writer.close()

    print(f"\nCompleted: {total_rows:,} rows → {output_path}")
    return total_rows


# Export example
with connect(HANA_URL) as conn, conn.cursor() as cursor:
    export_to_parquet(cursor, "SELECT * FROM SALES_ITEMS WHERE FISCAL_YEAR = 2026", "sales_2026.parquet")

## Parallel Processing with Multiple Connections

For maximum throughput, use multiple connections to process partitioned data.

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed


def process_partition(partition_id: int, total_partitions: int) -> dict:
    """
    Process one partition of data.
    Each thread gets its own connection.
    """
    with connect(HANA_URL) as conn, conn.cursor() as cursor:
        # Use modulo partitioning
        df = pl.from_arrow(cursor.execute_arrow(f"""
            SELECT SALES_REGION, SUM(NET_AMOUNT) as TOTAL
            FROM TRANSACTIONS
            WHERE MOD(TRANSACTION_ID, {total_partitions}) = {partition_id}
            GROUP BY SALES_REGION
        """))

        return {
            "partition": partition_id,
            "data": df,
        }


# Process 8 partitions in parallel
NUM_PARTITIONS = 8
results = []
start = time.time()

with ThreadPoolExecutor(max_workers=NUM_PARTITIONS) as executor:
    futures = {
        executor.submit(process_partition, i, NUM_PARTITIONS): i for i in range(NUM_PARTITIONS)
    }

    for future in as_completed(futures):
        result = future.result()
        results.append(result["data"])
        print(f"Partition {result['partition']} complete")

# Combine results
combined = pl.concat(results).group_by("SALES_REGION").agg(pl.col("TOTAL").sum())
elapsed = time.time() - start

print(f"\nCompleted in {elapsed:.2f}s")
print(combined)

## Incremental Processing Pattern

Process data incrementally using watermarks for exactly-once semantics.

In [None]:
import jsonfrom pathlib import Pathclass IncrementalProcessor:    """    Process data incrementally with checkpointing.    """    def __init__(self, checkpoint_file: str):        self.checkpoint_file = Path(checkpoint_file)        self.watermark = self._load_watermark()    def _load_watermark(self) -> str:        if self.checkpoint_file.exists():            data = json.loads(self.checkpoint_file.read_text())            return data.get("watermark", "1970-01-01 00:00:00")        return "1970-01-01 00:00:00"    def _save_watermark(self, watermark: str):        self.checkpoint_file.write_text(json.dumps({"watermark": watermark}))    def process_new_data(self, cursor) -> pl.DataFrame:        """        Fetch and process only new data since last run.        """        print(f"Processing data since: {self.watermark}")        df = pl.from_arrow(cursor.execute_arrow(f"""            SELECT *            FROM EVENTS            WHERE EVENT_TIME > '{self.watermark}'            ORDER BY EVENT_TIME        """))        if len(df) > 0:            # Update watermark to latest event            new_watermark = df["EVENT_TIME"].max()            self._save_watermark(str(new_watermark))            self.watermark = str(new_watermark)            print(f"Processed {len(df)} new events, watermark: {self.watermark}")        else:            print("No new data")        return df# Example usageprocessor = IncrementalProcessor("checkpoint.json")with connect(HANA_URL) as conn, conn.cursor() as cursor:    new_data = processor.process_new_data(cursor)    if len(new_data) > 0:        # Process new data...        print(new_data.head())

## Memory Monitoring

Monitor memory usage during streaming operations.

In [None]:
import tracemalloc


def measure_streaming_memory(cursor, query: str, batch_size: int = 65536):
    """
    Measure memory usage during streaming.
    """
    tracemalloc.start()

    reader = cursor.execute_arrow_batches(query, batch_size=batch_size)

    peak_memory = 0
    total_rows = 0

    for batch in reader:
        total_rows += batch.num_rows

        # Check current memory
        current, peak = tracemalloc.get_traced_memory()
        peak_memory = max(peak_memory, peak)

    tracemalloc.stop()

    return {
        "total_rows": total_rows,
        "peak_memory_mb": peak_memory / 1024 / 1024,
        "bytes_per_row": peak_memory / total_rows if total_rows > 0 else 0,
    }


# Compare memory usage
with connect(HANA_URL) as conn, conn.cursor() as cursor:
    stats = measure_streaming_memory(cursor, "SELECT * FROM TRANSACTION_HISTORY")

print(f"Processed {stats['total_rows']:,} rows")
print(f"Peak memory: {stats['peak_memory_mb']:.2f} MB")
print(f"Memory per row: {stats['bytes_per_row']:.2f} bytes")

## Best Practices for Large Data

1. **Use streaming** (`execute_arrow_batches`) for datasets > 1M rows
2. **Push filters to HANA** - let the database do the heavy lifting
3. **Tune batch size** - 64K rows is a good default, adjust based on row width
4. **Use Polars lazy mode** for complex transformations
5. **Partition queries** for parallel processing
6. **Export directly to Parquet** for long-term storage