## Arrow Client

In [45]:
import time
import requests
import pyarrow as pa

In [64]:
def read_arrow_stream_from_url_batches(url: str) -> pa.Table:
    """
    Reads an Arrow stream from a URL and returns it as a pyarrow Table.
    """
    with requests.get(url, stream=True) as response:
        response.raise_for_status()

        # Read raw binary stream in chunks
        chunks = bytearray()
        for chunk in response.iter_content(
            chunk_size=130058908
        ):  # chunk_size is based on the size of a single Arrow RecordBatch
            if chunk:
                chunks.extend(chunk)

        # Use pyarrow to load from bytes
        buffer = memoryview(chunks)
        t1 = time.time()
        reader = pa.ipc.open_stream(buffer)
        table = reader.read_all()
        t2 = time.time()
        print(f"Time taken to deserialize Arrow stream: {t2 - t1:.3f} seconds")
        return table, t2-t1

In [65]:
nrows = 1_000_000
t1 = time.time()
arrow_table = read_arrow_stream_from_url_batches(
    f"http://localhost:8000/rows/arrow/lineitem?nrows={nrows}"
)[0]
t2 = time.time()
print(f"Time taken to read {nrows} from Arrow stream: {t2 - t1:.3f} seconds")
arrow_table

Time taken to deserialize Arrow stream: 0.002 seconds
Time taken to read 1000000 from Arrow stream: 0.968 seconds


pyarrow.Table
l_orderkey: int64
l_partkey: int64
l_suppkey: int64
l_linenumber: int64
l_quantity: decimal128(15, 2)
l_extendedprice: decimal128(15, 2)
l_discount: decimal128(15, 2)
l_tax: decimal128(15, 2)
l_returnflag: string
l_linestatus: string
l_shipdate: date32[day]
l_commitdate: date32[day]
l_receiptdate: date32[day]
l_shipinstruct: string
l_shipmode: string
l_comment: string
----
l_orderkey: [[1,1,1,1,1,...,999939,999939,999939,999939,999939]]
l_partkey: [[1551894,673091,636998,21315,240267,...,34552,711982,272928,1099732,1185143]]
l_suppkey: [[76910,73092,36999,46316,15274,...,59553,61997,22933,49753,85144]]
l_linenumber: [[1,2,3,4,5,...,1,2,3,4,5]]
l_quantity: [[17.00,36.00,8.00,28.00,24.00,...,20.00,10.00,5.00,18.00,2.00]]
l_extendedprice: [[33078.94,38306.16,15479.68,34616.68,28974.00,...,29731.00,19939.50,9504.55,31170.24,2456.18]]
l_discount: [[0.04,0.09,0.10,0.09,0.10,...,0.09,0.00,0.02,0.07,0.01]]
l_tax: [[0.02,0.06,0.02,0.06,0.04,...,0.08,0.06,0.05,0.01,0.06]]
l_returnf

In [113]:
arrow_table.num_columns, arrow_table.num_rows, arrow_table.nbytes

(16, 15000000, 2591777478)

## JSON Client

In [67]:
def json_request_handler(url: str) -> dict:
    response = requests.get(url)
    response.raise_for_status()
    # response.json() already parses de JSON response into a Python dictionary
    t1 = time.time()
    result_dict = response.json()
    t2 = time.time()
    print(f"Time taken to deserialize JSON response: {t2 - t1:.3f} seconds")
    return result_dict, t2-t1

In [68]:
nrows = 1_000_000
t1 = time.time()
data = json_request_handler(f"http://localhost:8000/rows/json/orders?nrows={nrows}")[0]
t2 = time.time()
print(f"Time taken to process {nrows} with JSON: {t2 - t1:.3f} seconds")

Time taken to deserialize JSON response: 1.201 seconds
Time taken to process 1000000 with JSON: 5.458 seconds


In [9]:
len(data["o_orderkey"])

1000000

## JSON vs Arrow with Lineitem data

In [69]:
def benchmark():
    benchmark_data = []
    for nrows in [1_000, 10_000, 100_000, 1_000_000, 10_000_000]:
        t1 = time.time()
        t_arrow_deser = read_arrow_stream_from_url_batches(
            f"http://localhost:8000/rows/arrow/lineitem?nrows={nrows}"
        )[1]
        t2 = time.time()
        time_arrow = t2 - t1
        print(f"Time taken to read {nrows} from Arrow stream: {t2 - t1:.3f} seconds")

        t1 = time.time()
        t_json_deser = json_request_handler(f"http://localhost:8000/rows/json/lineitem?nrows={nrows}")[1]
        t2 = time.time()
        time_json = t2 - t1
        print(f"Time taken to read {nrows} with JSON: {t2 - t1:.3f} seconds")

        benchmark_data.append(
            {"nrows": nrows, "time_arrow": time_arrow, "time_json": time_json, "t_arrow_deser": t_arrow_deser, "t_json_deser": t_json_deser}
        )
    return benchmark_data

In [70]:
data = benchmark()

Time taken to deserialize Arrow stream: 0.000 seconds
Time taken to read 1000 from Arrow stream: 0.047 seconds
Time taken to deserialize JSON response: 0.002 seconds
Time taken to read 1000 with JSON: 0.041 seconds
Time taken to deserialize Arrow stream: 0.000 seconds
Time taken to read 10000 from Arrow stream: 0.037 seconds
Time taken to deserialize JSON response: 0.019 seconds
Time taken to read 10000 with JSON: 0.188 seconds
Time taken to deserialize Arrow stream: 0.000 seconds
Time taken to read 100000 from Arrow stream: 0.145 seconds
Time taken to deserialize JSON response: 0.174 seconds
Time taken to read 100000 with JSON: 1.036 seconds
Time taken to deserialize Arrow stream: 0.000 seconds
Time taken to read 1000000 from Arrow stream: 1.083 seconds
Time taken to deserialize JSON response: 1.720 seconds
Time taken to read 1000000 with JSON: 10.225 seconds
Time taken to deserialize Arrow stream: 0.000 seconds
Time taken to read 10000000 from Arrow stream: 5.569 seconds
Time taken t

In [71]:
import pandas as pd
import plotly.express as px

df = pd.DataFrame(data)
fig = px.line(
    df,
    x="nrows",
    y=["time_arrow", "time_json"],
    log_x=True,
    labels={"nrows": "log(nrows)", "value": "Time (seconds)"},
    title="Arrow vs JSON Performance Benchmark",
    width=800,
)
fig.show()

In [72]:
fig = px.line(
    df,
    x="nrows",
    y=["t_arrow_deser", "t_json_deser"],
    log_x=True,
    labels={"nrows": "log(nrows)", "value": "Time (seconds)"},
    title="Arrow vs JSON Deserialization Benchmark",
    width=800,
)
fig.show()

## Querying data with DuckDB as a client

In [41]:
import duckdb
import json
import os
import pandas as pd

In [43]:
nrows = 10_000_000
tpch_query = """SELECT
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
FROM
    {table}
WHERE
    l_shipdate <= date '1998-12-01' - interval '90' day
GROUP BY
    l_returnflag,
    l_linestatus
ORDER BY
    l_returnflag,
    l_linestatus;"""

In [13]:
## ARROW

# Retrieve
t1 = time.time()
arrow_table = read_arrow_stream_from_url_batches(
    f"http://localhost:8000/rows/arrow/lineitem?nrows={nrows}"
)
t2 = time.time()
print(f"Time taken to read {nrows} from Arrow stream: {t2 - t1:.2f} seconds")
t1 = time.time()

# Query
with duckdb.connect() as con:
    r = con.execute(tpch_query.format(table="arrow_table"))
    df = r.fetch_df()
t2 = time.time()
print(f"Time taken to execute query: {t2 - t1:.2f} seconds")
df

Time taken to read 10000000 from Arrow stream: 5.63 seconds
Time taken to execute query: 0.35 seconds


Unnamed: 0,l_returnflag,l_linestatus,sum_qty,sum_base_price,sum_disc_price,sum_charge,avg_qty,avg_price,avg_disc,count_order
0,A,F,62903247.0,94295180000.0,89580240000.0,93164540000.0,25.5091,38239.444388,0.050002,2465914
1,N,F,1642769.0,2463897000.0,2340822000.0,2434582000.0,25.535799,38299.715432,0.049946,64332
2,N,O,123887308.0,185766600000.0,176484800000.0,183548000000.0,25.494709,38228.822083,0.049973,4859334
3,R,F,62997464.0,94452440000.0,89732060000.0,93320400000.0,25.509539,38246.588969,0.049981,2469565


**TCPH queries are not going to work in the JSON data without some casting of types**. Somewhere in the serialization process at the server side dates where formatted in a way that cannot be understood by Arrow.

In [44]:
## JSON
t1 = time.time()
json_data = json_request_handler(f"http://localhost:8000/rows/json/lineitem?nrows={nrows}")
t2 = time.time()
print(f"Time taken to process {nrows} with JSON: {t2 - t1:.2f} seconds")

Time taken to process 10000000 with JSON: 101.11 seconds


In [38]:
# Write to disk and query (not very efficient and results need to be exploded)
t1 = time.time()
if os.path.exists('../tmp/data.json'):
    os.remove('../tmp/data.json')
with open('../tmp/data.json', 'w') as f:
    json.dump(json_data, f)
t2 = time.time()
print(f"Time taken to dump JSON to disk: {t2 - t1:.2f} seconds")
t1 = time.time()
with duckdb.connect() as con:
    r = con.execute("SELECT * FROM read_json('../tmp/data.json', maximum_object_size=197108896)")
    df = r.fetch_df() 
t2 = time.time()
print(f"Time taken to execute query: {t2 - t1:.2f} seconds")
df

Time taken to dump JSON to disk: 7.33 seconds
Time taken to execute query: 10.99 seconds


Unnamed: 0,l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment
0,"[1, 1, 1, 1, 1, 1, 2, 3, 3, 3, 3, 3, 3, 4, 5, ...","[1551894, 673091, 636998, 21315, 240267, 15634...","[76910, 73092, 36999, 46316, 15274, 6348, 1171...","[1, 2, 3, 4, 5, 6, 1, 1, 2, 3, 4, 5, 6, 1, 1, ...","[17.0, 36.0, 8.0, 28.0, 24.0, 32.0, 38.0, 45.0...","[33078.94, 38306.16, 15479.68, 34616.68, 28974...","[0.04, 0.09, 0.1, 0.09, 0.1, 0.07, 0.0, 0.06, ...","[0.02, 0.06, 0.02, 0.06, 0.04, 0.02, 0.05, 0.0...","[N, N, N, N, N, N, N, R, R, A, A, R, A, N, R, ...","[O, O, O, O, O, O, O, F, F, F, F, F, F, O, F, ...","[1996-03-13T00:00:00, 1996-04-12T00:00:00, 199...","[1996-02-12T00:00:00, 1996-02-28T00:00:00, 199...","[1996-03-22T00:00:00, 1996-04-20T00:00:00, 199...","[DELIVER IN PERSON, TAKE BACK RETURN, TAKE BAC...","[TRUCK, MAIL, REG AIR, AIR, FOB, MAIL, RAIL, A...","[to beans x-ray carefull, according to the fi..."


In [None]:
# Convert dictionary to Arrow and query
# Better than the previous option but somewhere in the serialization process in the client side, dates where formatted in a way that Arrow cannot parse
t1 = time.time()
json_arrow = pa.Table.from_pydict(json_data)
t2 = time.time()
print(f"Time taken to convert JSON to Arrow: {t2 - t1:.2f} seconds")
t1 = time.time()
with duckdb.connect() as con:
    r = con.execute("SELECT * FROM json_arrow LIMIT 10")
    df = r.fetch_df()
t2 = time.time()
print(f"Time taken to execute query: {t2 - t1:.2f} seconds")
df

Time taken to convert JSON to Arrow: 0.57 seconds
Time taken to execute query: 0.01 seconds


Unnamed: 0,l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment
0,1,1551894,76910,1,17.0,33078.94,0.04,0.02,N,O,1996-03-13T00:00:00,1996-02-12T00:00:00,1996-03-22T00:00:00,DELIVER IN PERSON,TRUCK,to beans x-ray carefull
1,1,673091,73092,2,36.0,38306.16,0.09,0.06,N,O,1996-04-12T00:00:00,1996-02-28T00:00:00,1996-04-20T00:00:00,TAKE BACK RETURN,MAIL,according to the final foxes. qui
2,1,636998,36999,3,8.0,15479.68,0.1,0.02,N,O,1996-01-29T00:00:00,1996-03-05T00:00:00,1996-01-31T00:00:00,TAKE BACK RETURN,REG AIR,ourts cajole above the furiou
3,1,21315,46316,4,28.0,34616.68,0.09,0.06,N,O,1996-04-21T00:00:00,1996-03-30T00:00:00,1996-05-16T00:00:00,NONE,AIR,s cajole busily above t
4,1,240267,15274,5,24.0,28974.0,0.1,0.04,N,O,1996-03-30T00:00:00,1996-03-14T00:00:00,1996-04-01T00:00:00,NONE,FOB,"the regular, regular pa"
5,1,156345,6348,6,32.0,44842.88,0.07,0.02,N,O,1996-01-30T00:00:00,1996-02-07T00:00:00,1996-02-03T00:00:00,DELIVER IN PERSON,MAIL,rouches. special
6,2,1061698,11719,1,38.0,63066.32,0.0,0.05,N,O,1997-01-28T00:00:00,1997-01-14T00:00:00,1997-02-02T00:00:00,TAKE BACK RETURN,RAIL,re. enticingly regular instruct
7,3,42970,17971,1,45.0,86083.65,0.06,0.0,R,F,1994-02-02T00:00:00,1994-01-04T00:00:00,1994-02-23T00:00:00,NONE,AIR,s cajole above the pinto beans. iro
8,3,190355,65359,2,49.0,70822.15,0.1,0.0,R,F,1993-11-09T00:00:00,1993-12-20T00:00:00,1993-11-24T00:00:00,TAKE BACK RETURN,RAIL,ecial pinto beans. sly
9,3,1284483,34508,3,27.0,39620.34,0.06,0.07,A,F,1994-01-16T00:00:00,1993-11-22T00:00:00,1994-01-23T00:00:00,DELIVER IN PERSON,SHIP,e carefully fina


In [None]:
# Convert dictionary to pandas and query
# For sure less efficient than arrow but also works. Definitely better than writing to disk and then querying the JSON file
t1 = time.time()
json_df = pd.DataFrame(json_data)
t2 = time.time()
print(f"Time taken to convert JSON to pandas: {t2 - t1:.2f} seconds")
t1 = time.time()
with duckdb.connect() as con:
    r = con.execute("SELECT * FROM json_df LIMIT 10")
    df = r.fetch_df()
t2 = time.time()
print(f"Time taken to execute query: {t2 - t1:.2f} seconds")
df

Time taken to convert JSON to pandas: 2.78 seconds
Time taken to execute query: 0.04 seconds


Unnamed: 0,l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment
0,1,1551894,76910,1,17.0,33078.94,0.04,0.02,N,O,1996-03-13T00:00:00,1996-02-12T00:00:00,1996-03-22T00:00:00,DELIVER IN PERSON,TRUCK,to beans x-ray carefull
1,1,673091,73092,2,36.0,38306.16,0.09,0.06,N,O,1996-04-12T00:00:00,1996-02-28T00:00:00,1996-04-20T00:00:00,TAKE BACK RETURN,MAIL,according to the final foxes. qui
2,1,636998,36999,3,8.0,15479.68,0.1,0.02,N,O,1996-01-29T00:00:00,1996-03-05T00:00:00,1996-01-31T00:00:00,TAKE BACK RETURN,REG AIR,ourts cajole above the furiou
3,1,21315,46316,4,28.0,34616.68,0.09,0.06,N,O,1996-04-21T00:00:00,1996-03-30T00:00:00,1996-05-16T00:00:00,NONE,AIR,s cajole busily above t
4,1,240267,15274,5,24.0,28974.0,0.1,0.04,N,O,1996-03-30T00:00:00,1996-03-14T00:00:00,1996-04-01T00:00:00,NONE,FOB,"the regular, regular pa"
5,1,156345,6348,6,32.0,44842.88,0.07,0.02,N,O,1996-01-30T00:00:00,1996-02-07T00:00:00,1996-02-03T00:00:00,DELIVER IN PERSON,MAIL,rouches. special
6,2,1061698,11719,1,38.0,63066.32,0.0,0.05,N,O,1997-01-28T00:00:00,1997-01-14T00:00:00,1997-02-02T00:00:00,TAKE BACK RETURN,RAIL,re. enticingly regular instruct
7,3,42970,17971,1,45.0,86083.65,0.06,0.0,R,F,1994-02-02T00:00:00,1994-01-04T00:00:00,1994-02-23T00:00:00,NONE,AIR,s cajole above the pinto beans. iro
8,3,190355,65359,2,49.0,70822.15,0.1,0.0,R,F,1993-11-09T00:00:00,1993-12-20T00:00:00,1993-11-24T00:00:00,TAKE BACK RETURN,RAIL,ecial pinto beans. sly
9,3,1284483,34508,3,27.0,39620.34,0.06,0.07,A,F,1994-01-16T00:00:00,1993-11-22T00:00:00,1994-01-23T00:00:00,DELIVER IN PERSON,SHIP,e carefully fina


In [33]:
from sys import getsizeof
size_json = getsizeof(json_data)
for column in json_data.values():
    size_json += getsizeof(column)
print(f"Size of JSON dict data: {size_json / 1_000_000:.2f} MB")
size_arrow = arrow_table.nbytes
print(f"Size of Arrow data: {size_arrow / 1_000_000:.2f} MB")

Size of JSON dict data: 135.18 MB
Size of Arrow data: 1727.84 MB
