In [1]:
import numpy as np 
import pandas as pd
import polars as pl
import time

# Pandas

In [2]:
import pandas as pd
from multiprocessing import Pool, cpu_count
from tqdm import tqdm  # import tqdm to progress bar

In [3]:
CONCURRENCY = cpu_count()

total_lines = 1_000_000_000  # Total rows
chunksize = 100_000_000  # Define the chunk size
filename = "measurements.txt"  

In [4]:
def process_chunk(chunk):
    # Aggregates data within the chunk using Pandas
    aggregated = chunk.groupby('station')['measure'].agg(['min', 'max', 'mean']).reset_index()
    return aggregated

def create_pandas_df(filename, total_lines, chunksize=chunksize):
    total_chunks = total_lines // chunksize + (1 if total_lines % chunksize else 0)
    results = []

    with pd.read_csv(filename, header=None, names=['station', 'measure'], chunksize=chunksize) as reader:
        # tqdm to view progress
        with Pool(CONCURRENCY) as pool:
            for chunk in tqdm(reader, total=total_chunks, desc="Processing"):
                # Process each chunk in parallel
                result = pool.apply_async(process_chunk, (chunk,))
                results.append(result)

            results = [result.get() for result in results]

    final_df = pd.concat(results, ignore_index=True)

    final_aggregated_df = final_df.groupby('station').agg({
        'min': 'min',
        'max': 'max',
        'mean': 'mean'
    }).reset_index().sort_values('station')

    return final_df

In [5]:
# start_time = time.time()
# data = create_pandas_df()
# took = time.time() - start_time
# print(data)
# print(f"Pandas Took: {took:.2f} sec")

# Polars

## Scan and query one line

In [6]:
def create_polars_df():
    pl.Config.set_streaming_chunk_size(4000000)
    return (
        pl.scan_csv("measurements.txt", separator=";", has_header=False, new_columns=["station", "measure"], schema={"station": pl.String, "measure": pl.Float64}).group_by(by="station")
        .agg(
            max = pl.col("measure").max(),
            min = pl.col("measure").min(),
            mean = pl.col("measure").mean()
        )
        .sort("station")
        .collect(streaming=True)
    )

In [7]:
start_time = time.time()
data = create_polars_df()
took = time.time() - start_time
print(data)
print(f"Polars Took: {took:.2f} sec")

  pl.scan_csv("measurements.txt", separator=";", has_header=False, new_columns=["station", "measure"], schema={"station": pl.String, "measure": pl.Float64}).group_by(by="station")


shape: (413, 4)
┌───────────────┬──────┬───────┬───────────┐
│ station       ┆ max  ┆ min   ┆ mean      │
│ ---           ┆ ---  ┆ ---   ┆ ---       │
│ str           ┆ f64  ┆ f64   ┆ f64       │
╞═══════════════╪══════╪═══════╪═══════════╡
│ Abha          ┆ 70.1 ┆ -31.0 ┆ 17.997024 │
│ Abidjan       ┆ 75.6 ┆ -22.7 ┆ 25.999419 │
│ Abéché        ┆ 77.9 ┆ -18.5 ┆ 29.389605 │
│ Accra         ┆ 79.5 ┆ -23.4 ┆ 26.4022   │
│ Addis Ababa   ┆ 66.2 ┆ -34.6 ┆ 16.009494 │
│ …             ┆ …    ┆ …     ┆ …         │
│ Zagreb        ┆ 62.7 ┆ -41.6 ┆ 10.701838 │
│ Zanzibar City ┆ 80.4 ┆ -26.9 ┆ 26.001838 │
│ Zürich        ┆ 61.2 ┆ -38.1 ┆ 9.303919  │
│ Ürümqi        ┆ 62.2 ┆ -44.4 ┆ 7.396752  │
│ İzmir         ┆ 67.2 ┆ -28.5 ┆ 17.898003 │
└───────────────┴──────┴───────┴───────────┘
Polars Took: 27.53 sec
