In [1]:
import duckdb
import polars as pl
from pathlib import Path

In [2]:
file_path = Path('data/messages.csv')
size_in_bytes = file_path.stat().st_size
size_in_gb = size_in_bytes / (1024 ** 3)

print(f"File size: {size_in_gb:.2f} GB")

File size: 147.03 GB


In [3]:
# This doesn't load any data
lazy_df = pl.scan_csv('data/messages.csv')

# Get column names and types
schema = lazy_df.schema

print("Column names:")
for col_name, col_type in schema.items():
    print(f"  {col_name}: {col_type}")

Column names:
  id: Int64
  message_id: String
  campaign_id: Int64
  message_type: String
  client_id: Int64
  channel: String
  category: String
  platform: String
  email_provider: String
  stream: String
  date: String
  sent_at: String
  is_opened: String
  opened_first_time_at: String
  opened_last_time_at: String
  is_clicked: String
  clicked_first_time_at: String
  clicked_last_time_at: String
  is_unsubscribed: String
  unsubscribed_at: String
  is_hard_bounced: String
  hard_bounced_at: String
  is_soft_bounced: String
  soft_bounced_at: String
  is_complained: String
  complained_at: String
  is_blocked: String
  blocked_at: String
  is_purchased: String
  purchased_at: String
  created_at: String
  updated_at: String


  schema = lazy_df.schema


Since our dataset is very large and involves complex operations such as counting unique values and performing joins, DuckDB is a safer choice for processing on a single machine. DuckDB supports disk spilling, which allows intermediate results to be offloaded to disk when memory limits are exceeded, helping to prevent out-of-memory (OOM) errors.

Because we need more than just random sampling — specifically, obtaining unique client IDs, randomly sampling them, and then merging them back to the full dataset via an inner join — other approaches such as Polars, even with lazy evaluation, are prone to OOM errors on our dataset.

Our primary goal is to reduce the dataset size while retaining the most relevant information for our research, and DuckDB provides the necessary scalability and reliability to achieve this efficiently.

In [4]:
# SQL interface, also optimized
result = duckdb.sql("""
    SELECT COUNT(DISTINCT client_id) 
    FROM 'data/messages.csv'
""").fetchone()[0]

# ran in 50.9 s

In [5]:
# number of unique clients in messages raw data
result

16860044

Since we have data on 16.8 million clients, analyzing such a large population can be computationally challenging and is not strictly necessary for our research objectives. Therefore, we will keep a subset of 0.5 million clients for our analysis.

In [4]:
# SQL interface, also optimized
result2 = duckdb.sql("""
    SELECT COUNT(DISTINCT client_id) 
    FROM 'data/client_first_purchase_date.csv'
""").fetchone()[0]

In [None]:
# number of unique clients with first_purchase_date
result2

1854736

# Reducing File Size

In [3]:
RANDOM_SEED = 255
SAMPLE_PERCENTAGE = (500_000 / 16_860_044) * 100
INPUT_CSV = 'data/messages.csv'
OUTPUT_PARQUET = 'data/messages_subset.parquet'


In [4]:
SAMPLE_PERCENTAGE

2.965591311624098

## DuckDB

In [None]:
con = duckdb.connect()

# Step 1: Randomly select exactly 1 million distinct client_ids (reproducible)
con.execute(f"""
    CREATE OR REPLACE TEMP VIEW selected_clients AS
SELECT client_id
FROM (
    SELECT DISTINCT client_id
    FROM read_csv_auto('data/messages.csv')
)
USING SAMPLE {SAMPLE_PERCENTAGE} PERCENT (bernoulli, {RANDOM_SEED});
    
""")
# REPEATABLE ({RANDOM_SEED});
# Step 2: Keep only those clients and save to Parquet
con.execute("""
    COPY (
        SELECT *
        FROM read_csv_auto('data/messages.csv')
        WHERE client_id IN (SELECT client_id FROM selected_clients)
    ) TO 'data/messages_subset.parquet' (FORMAT 'parquet');
""")

print("Saved reproducible random subset to data/messages_subset.parquet")


# 2m 17.0s

Saved reproducible random subset to data/messages_subset.parquet


### Polars

In [None]:
# Parameters
SAMPLE_PERCENTAGE = 0.05  # 5%
RANDOM_SEED = 42

lazy_df = pl.scan_csv("data/messages.csv")

sampled_clients = (
    lazy_df.select("client_id")
    .unique()
    .with_columns(
        (pl.lit(RANDOM_SEED).hash() % 10000).alias("rand_col")  # optional deterministic hash
    )
    .filter(pl.col("rand_col") < SAMPLE_PERCENTAGE * 10000)
    .select("client_id")
)

# Filter original CSV using sampled clients (lazy)
filtered_df = lazy_df.join(sampled_clients, on="client_id", how="inner")

# Write to parquet using sink_parquet for writing by chunks
filtered_df.sink_parquet("data/messages_subset_p.parquet")


: 

This can lead to out-of-memory (OOM) errors because Polars does not support disk spilling. When performing operations such as finding unique values or joining DataFrames, Polars must load the entire relevant data into RAM, as it does not create temporary tables like DuckDB.

While the lazy evaluation approach in Polars works well for simple or narrow operations — which can be processed in chunks and written to Parquet incrementally, avoiding OOM — more complex or wide operations require loading whole columns into memory at once. This can become problematic when the dataset is much larger than the available RAM.