# Benchmark: Polymo vs Spark UDF (Paginated API)

This notebook spins up a local FastAPI service that simulates a paginated REST endpoint. Polymo batches requests while a baseline UDF calls the API once per row.

## Prerequisites

```bash
pip install "polymo[builder]"
```
PySpark 4.x is required.

In [7]:
import threading
import time
from pathlib import Path
from time import perf_counter
import tempfile
import textwrap

import requests
from fastapi import FastAPI, HTTPException
import uvicorn

from pyspark.sql import SparkSession

from polymo import ApiReader

DATA = [{"id": i, "value": f"record-{i}"} for i in range(1, 1001)]

app = FastAPI()

PAGE_DELAY = 0.05  # seconds

def _lookup_item(item_id: int) -> dict:
    for row in DATA:
        if row["id"] == item_id:
            return row
    raise HTTPException(status_code=404, detail="Not found")

@app.get("/items")
def list_items(_offset: int = 0, _limit: int = 200):
    time.sleep(PAGE_DELAY)
    start = max(_offset, 0)
    end = min(start + _limit, len(DATA))
    return {
        "data": DATA[start:end],
        "meta": {
            "total_records": len(DATA),
            "offset": start,
            "limit": _limit,
        },
    }

@app.get("/item")
def get_item_query(item_id: int):
    time.sleep(PAGE_DELAY)
    return _lookup_item(item_id)

@app.get("/item/{item_id}")
def get_item(item_id: int):
    time.sleep(PAGE_DELAY)
    return _lookup_item(item_id)


config = uvicorn.Config(app, host="127.0.0.1", port=9876, log_level="warning")
server = uvicorn.Server(config)

thread = threading.Thread(target=server.run, daemon=True)
thread.start()
while not server.started:
    time.sleep(0.1)

spark = SparkSession.builder.appName("polymo-benchmark").master("local[8]").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.dataSource.register(ApiReader)

25/10/04 23:10:21 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [8]:
config_yaml = textwrap.dedent('''
version: 0.1
source:
  type: rest
  base_url: http://127.0.0.1:9876
stream:
  name: items
  path: /items
  params:
    _limit: 200
  pagination:
    type: offset
    limit_param: _limit
    offset_param: _offset
    page_size: 200
    total_records_path:
      - meta
      - total_records
  record_selector:
    field_path:
      - data
  partition:
    strategy: pagination
''').strip()


config_dir = Path(tempfile.mkdtemp(prefix="polymo-bench-"))
CONFIG_PATH = config_dir / "mock_api.yml"
CONFIG_PATH.write_text(config_yaml)


375

In [9]:
from pyspark.sql.functions import col, expr, from_json
from pyspark.sql.types import StructField, StructType, IntegerType, StringType
import statistics
from typing import List

def benchmark_polymo(config_path: Path, *, label: str, repeats: int = 3) -> dict:
    timings: List[float] = []
    rows = 0
    for _ in range(repeats):
        start = perf_counter()
        df = (
            spark.read.format('polymo')
            .option('config_path', str(config_path))
            .load()
        )
        rows = df.count()
        df.write.mode('overwrite').format('noop').save()
        timings.append(perf_counter() - start)
    return {'approach': label, 'rows': rows, 'seconds': statistics.median(timings)}


def fetch_item(item_id: int) -> str:
    url = 'http://127.0.0.1:8765/item/' + str(item_id)
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    return response.text

def benchmark_udf(repeats: int = 3) -> dict:
    ids = spark.range(1, len(DATA) + 1).toDF('id')
    spark.udf.register('fetch_item', fetch_item, StringType())
    schema = StructType([
        StructField('id', IntegerType()),
        StructField('value', StringType()),
    ])
    timings: List[float] = []
    rows = 0
    for _ in range(repeats):
        start = perf_counter()
        udf_df = ids.select(from_json(expr('fetch_item(id)'), schema).alias('row'))
        rows = udf_df.count()
        udf_df.write.mode('overwrite').format('noop').save()
        timings.append(perf_counter() - start)
    return {'approach': 'Spark UDF (per-row requests)', 'rows': rows, 'seconds': statistics.median(timings)}


results = [
    benchmark_polymo(CONFIG_PATH, label='Polymo DataSource (paginated items)'),
    benchmark_udf()
]
results


                                                                                

[{'approach': 'Polymo DataSource (paginated items)',
  'rows': 1000,
  'seconds': 0.715621250012191},
 {'approach': 'Spark UDF (per-row requests)',
  'rows': 1000,
  'seconds': 7.729226415976882}]

In [10]:
results = [
    benchmark_udf(),
    benchmark_polymo(CONFIG_PATH, label='Polymo DataSource (paginated items)'),
]
results


25/10/04 23:10:47 WARN SimpleFunctionRegistry: The function fetch_item replaced a previously registered function.
                                                                                

[{'approach': 'Spark UDF (per-row requests)',
  'rows': 1000,
  'seconds': 7.62242816700018},
 {'approach': 'Polymo DataSource (paginated items)',
  'rows': 1000,
  'seconds': 0.7210472909791861}]

In [11]:
summary_df = spark.createDataFrame(results)
summary_df = summary_df.withColumn(
    "throughput_rows_per_sec",
    col("rows") / col("seconds")
)
summary_df.orderBy("approach").show(truncate=False)


+-----------------------------------+----+------------------+-----------------------+
|approach                           |rows|seconds           |throughput_rows_per_sec|
+-----------------------------------+----+------------------+-----------------------+
|Polymo DataSource (paginated items)|1000|0.7210472909791861|1386.8715859704496     |
|Spark UDF (per-row requests)       |1000|7.62242816700018  |131.1917906067394      |
+-----------------------------------+----+------------------+-----------------------+



## Interpretation

With a 50 ms delay per request and 1,000 total records, partitioning makes a big difference. Reading the collection endpoint
with pagination batches 200 rows per call and remains the fastest approach. The plain Spark UDF performs every HTTP call serially inside Python, so it lags well behind
both DataSource strategies. Adjust `PAGE_DELAY`, the range size, or executor parallelism to mirror your API.


In [12]:

server.should_exit = True
thread.join(timeout=5)
spark.stop()
