<a href="https://colab.research.google.com/github/densmyslov/ais-data-pipeline/blob/main/notebooks/preprocessing_with_polars.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preprocessing Large-Scale Datasets at Scale

Working with multi-gigabyte datasets can be a challenge—not because of complexity, but because of size. A single CSV file several gigabytes in size can quickly expand to many times that in memory if loaded with Pandas (see Step 1 below).

This notebook demoes a preprocessing pipeline that transforms bulky CSVs into compact, query-ready Parquet files while keeping memory usage under control. The Dubai real estate dataset serves as an illustrative example of this approach.

# Setup

In [1]:
import os
import json
import pandas as pd
import numpy as np
from random import choice, choices
from time import time
import pyarrow as pa
import pyarrow.parquet as pq

import polars as pl
import polars.selectors as cs

# Dubai Rent_contracts.csv Dataset pre-processing

## prerequisites:
* at least rent_contracts.csv dataset has been downloaded using "data_ingestion_with_profiling.ipynb" notebook

In [2]:
# change the current folder to that where you downloaded a rent_contracts.csv dataset

project_name = '0_real-estate-agent'
os.chdir(f'/content/drive/MyDrive/0_Projects/{project_name}')

# change the date to the actual date of the downloaded dataset
date = '2025-08-20'

let's check the size of the downloaded rent_contracts.csv dataset

In [3]:
fn = f"datasets/rent_contracts_8192_{date}.csv"
file_size = os.path.getsize(fn)/1000000
print(f"File size: {file_size: .2f} MiB")

File size:  4225.93 MiB


## Step 1. Experiment: how to get OOM ("out-of-memory error")

the file size on disk is over 4 GiB. That's already quite a lot, given that we are going to work with it with serverless functions, and they have a memory max limit. E.g., AWS lambdas have 10 GiB max memory.  


If we try to read this file into memory using pandas, it will exceed 20 GiB.  

You can try to do this; the notebook will most probably crash unless you selected high-memory runtime

In [28]:
df = pd.read_csv(fn, engine='python', on_bad_lines='skip')
print(df.shape)

(8915752, 40)


# Step 2. Create LazyFrame with Polars

let's create a `LazyFrame` using Polars - nothing is loaded into memory yet

In [106]:
fn = f"datasets/rent_contracts_8192_{date}.csv"

lf = pl.scan_csv(
    fn,
    null_values=["", "null", "NULL", "None"],
    infer_schema_length=10000,
)

let's get the number of rows in the dataset

In [7]:
n_rows = lf.select(pl.len()).collect().item()
print(f"Number of rows: {n_rows}")

Number of rows: 8915752


### step 2.0: initial memory consumption by columns

* at this step we are going to estimate the in-memory size of the dataset if cast to pandas  

* we will first take a sample of  50000 first rows of the polars frame (lf)

* next we will cast that sample to pandas, get its memory size and extrapolate to the full dataset of n_rows



In [105]:
def estimate_size(lf: pl.LazyFrame, n_rows: int, n_sample_rows: int = 50_000) -> tuple[pd.DataFrame, float]:
    """
    Estimate the memory footprint of a Polars LazyFrame when fully materialized.

    This function samples a subset of rows from a LazyFrame, collects them into
    memory, and uses pandas' `memory_usage` to approximate per-column memory size.
    The per-row cost is then extrapolated to the full dataset row count.

    Parameters
    ----------
    lf : pl.LazyFrame
        The Polars LazyFrame representing the dataset to analyze.
    n_rows : int
        Total number of rows in the dataset. Used to scale per-row estimates
        to the full dataset size.
    n_sample_rows : int, optional (default=50_000)
        Number of rows to sample from the LazyFrame to estimate per-column
        memory usage.

    Returns
    -------
    col_mem_size_df : pd.DataFrame
        DataFrame with columns:
          - "col": column name
          - "est_total_mib": estimated total memory footprint of the column
            in mebibytes (MiB).
        Sorted in descending order of estimated size.
    n_gig_size : float
        Estimated total size of the full dataset in GiB (sum across columns).

    Notes
    -----
    - This is an approximation: actual memory usage may differ depending on
      compression, Arrow backing, and materialization strategy.
    - Assumes that the first `n_sample_rows` are representative of the dataset.
    """
    # Sample rows from LazyFrame
    df_sample = lf.head(n_sample_rows).collect(engine="streaming")

    # Convert sample to pandas
    pdf_sample = df_sample.to_pandas()
    col_bytes_sample = pdf_sample.memory_usage(index=False, deep=True).astype("int64")

    # Estimate per-column and total size
    col_mem_size_df = (
        pd.DataFrame({"col": col_bytes_sample.index, "bytes_sample": col_bytes_sample.values})
        .assign(
            bytes_per_row=lambda d: d["bytes_sample"] / len(pdf_sample),
            est_total_bytes=lambda d: d["bytes_per_row"] * n_rows,
            est_total_mib=lambda d: d["est_total_bytes"] / (1024 ** 2),
        )
        .sort_values("est_total_bytes", ascending=False, ignore_index=True)
    )

    # Keep only relevant columns
    col_mem_size_df = col_mem_size_df[["col", "est_total_mib"]]

    # Compute total GiB
    n_gig_size = col_mem_size_df["est_total_mib"].sum() / 1024

    return col_mem_size_df, n_gig_size



let's apply the function we defined above to our dataset

In [107]:
col_mem_size_df, n_gig_size = estimate_size(lf, n_rows)
print(f"In-memory pandas df size: {n_gig_size: 2.2f} GiB")

In-memory pandas df size:  15.79 GiB


* the starting estimate of pandas df of our dataset is almost 16 GiB.

* the actual memory consumed will probably higher because of adidtional pandas overheads

let's analyze memory consumption by columns

In [108]:
col_mem_size_df

Unnamed: 0,col,est_total_mib
0,nearest_landmark_ar,810.639814
1,nearest_metro_ar,766.996353
2,area_name_ar,766.572238
3,ejari_property_sub_type_ar,737.587813
4,master_project_ar,664.320863
5,nearest_mall_ar,651.732751
6,contract_reg_type_ar,636.504373
7,property_usage_ar,632.510133
8,ejari_bus_property_type_ar,629.167542
9,ejari_property_type_ar,620.265531


there are similar columns with '_ar' and '_en' suffixes. These are string columns in Arabic and English respectively.  

We can drop Arabic duplicates of English columns.

That will be our first step towards optimizing the memory consumption by our dataset

## Step 3: Memory-Optimization Transformations

### step 3.1. remove Arabic duplicates of English columns

In [109]:
# remove columns with suffix '_ar'
lf = lf.select(pl.exclude(r".*_ar$"))
names = lf.collect_schema().names()
keep = [c for c in names if "_ar" not in c]
lf = lf.select(keep)

let's check that we actually droppped Arabic columns

In [110]:
schema = lf.collect_schema()

In [111]:
schema

Schema([('contract_id', String),
        ('contract_reg_type_id', Int64),
        ('contract_reg_type_en', String),
        ('contract_start_date', String),
        ('contract_end_date', String),
        ('contract_amount', Int64),
        ('annual_amount', Int64),
        ('no_of_prop', Int64),
        ('line_number', Int64),
        ('is_free_hold', Int64),
        ('ejari_bus_property_type_id', Int64),
        ('ejari_bus_property_type_en', String),
        ('ejari_property_type_id', Int64),
        ('ejari_property_type_en', String),
        ('ejari_property_sub_type_id', Int64),
        ('ejari_property_sub_type_en', String),
        ('property_usage_en', String),
        ('project_number', Int64),
        ('project_name_en', String),
        ('master_project_en', String),
        ('area_id', Int64),
        ('area_name_en', String),
        ('nearest_landmark_en', String),
        ('nearest_metro_en', String),
        ('nearest_mall_en', String),
        ('tenant_type_id', Int64)

In [112]:
col_mem_size_df, n_gig_size = estimate_size(lf, n_rows)
print(f"In-memory pandas df size: {n_gig_size: 2.2f} GiB")

In-memory pandas df size:  7.94 GiB


we compressed the size from 16 GiB to less that 8 GiB, which is quite straightforward: we dropped 13 heaviest columns from the dataset

In [113]:
col_mem_size_df.head(10)

Unnamed: 0,col,est_total_mib
0,nearest_landmark_en,553.977277
1,nearest_metro_en,544.788893
2,area_name_en,537.726531
3,contract_id,527.164107
4,ejari_property_sub_type_en,519.723204
5,property_usage_en,508.117496
6,contract_end_date,501.660698
7,contract_start_date,501.660698
8,nearest_mall_en,478.28569
9,master_project_en,461.341803


### step 3.2: date columns

columns "est_total_mib" in col_mem_size_df contains 2 rows # 6 and # 7, that obviously should be dtype 'Date', but they are String and occupy a lot of memory: over 500 MiB each.   

We can convert these columns to the efficient Polars/arrow dtype



In [114]:
date_cols = ["contract_start_date", "contract_end_date"]
lf = lf.with_columns(
    pl.col("contract_start_date").str.strptime(pl.Date, strict=False),
    pl.col("contract_end_date").str.strptime(pl.Date, strict=False),
)

let's check how much memory we saved by such conversion

In [115]:
col_mem_size_df, n_gig_size = estimate_size(lf, n_rows)
print(f"In-memory pandas df size: {n_gig_size: 2.2f} GiB")

In-memory pandas df size:  7.09 GiB


In [116]:
col_mem_size_df.query("col.str.contains('date')")

Unnamed: 0,col,est_total_mib
17,contract_end_date,68.02179
18,contract_start_date,68.02179


we compressed the size of the 2 date columns from 501 MiB to 68 MiB by converting their dtype from String to Date

### step 3.3 String to categorical

if we can convert string columns into categorical values, we can save a lot of bytes.  


first let's separate string columns from date columns

let's make an assumption that if the number of unique values divided by the number of rows in a column is equal or less than 0.1, we can convert dtype of such a column into  categorical

In [117]:
# Columns eligible for categorical (strings only, excluding date cols)
schema = lf.collect_schema()
string_cols = [c for c in schema.names() if schema[c] == pl.Utf8 and c not in date_cols]


# Compute unique-ratio on the sample
ratios = (
    df_sample.select([(pl.col(c).n_unique() / pl.len()).alias(c) for c in string_cols])
              .row(0, named=True)
)
category_cols = [c for c, r in ratios.items() if r <= 0.1]

# Lazily cast on the FULL dataset (no full collect here)
if category_cols:
    lf = lf.with_columns(pl.col(category_cols).cast(pl.Categorical))


In [118]:
col_mem_size_df, n_gig_size = estimate_size(lf, n_rows)
print(f"In-memory pandas df size: {n_gig_size: 2.2f} GiB")

In-memory pandas df size:  1.59 GiB


that's quite a result ! By converting string columns to categorical, we compressed the size of the dataset from 7 GiB to 1.6 GiB

In [119]:
col_mem_size_df.query("col==@category_cols")

Unnamed: 0,col,est_total_mib
15,project_name_en,27.450873
16,area_name_en,19.581433
17,master_project_en,19.178744
18,nearest_metro_en,9.532404
19,ejari_property_sub_type_en,9.186853
20,ejari_property_type_en,9.091622
21,nearest_landmark_en,8.692334
22,property_usage_en,8.594383
23,nearest_mall_en,8.586731
24,ejari_bus_property_type_en,8.548979


by converting to Category dtype we compressed memory used by columns from over 500 MiB to below 30 MiB, and most columns to below 10 MiB

let's analyze the top remaining columns

In [120]:
col_mem_size_df.head(10)

Unnamed: 0,col,est_total_mib
0,contract_id,527.164107
1,contract_reg_type_id,68.02179
2,contract_start_date,68.02179
3,no_of_prop,68.02179
4,contract_end_date,68.02179
5,contract_amount,68.02179
6,annual_amount,68.02179
7,is_free_hold,68.02179
8,line_number,68.02179
9,ejari_bus_property_type_id,68.02179


### Step 3.4 numerical columns

if we look at the lf schema after we converted String columns to Category and Date dtypes, we will see that the remaining dtype is Int64 that corresponds to the full signed 64-bit integer range (-9.22e18 … +9.22e18)

In [121]:
schema = lf.collect_schema()
schema

Schema([('contract_id', String),
        ('contract_reg_type_id', Int64),
        ('contract_reg_type_en', Categorical(ordering='physical')),
        ('contract_start_date', Date),
        ('contract_end_date', Date),
        ('contract_amount', Int64),
        ('annual_amount', Int64),
        ('no_of_prop', Int64),
        ('line_number', Int64),
        ('is_free_hold', Int64),
        ('ejari_bus_property_type_id', Int64),
        ('ejari_bus_property_type_en', Categorical(ordering='physical')),
        ('ejari_property_type_id', Int64),
        ('ejari_property_type_en', Categorical(ordering='physical')),
        ('ejari_property_sub_type_id', Int64),
        ('ejari_property_sub_type_en', Categorical(ordering='physical')),
        ('property_usage_en', Categorical(ordering='physical')),
        ('project_number', Int64),
        ('project_name_en', Categorical(ordering='physical')),
        ('master_project_en', Categorical(ordering='physical')),
        ('area_id', Int64),
     

In [122]:
numeric_cols = [c for c in schema.names() if schema[c] == pl.Int64]
numeric_cols

['contract_reg_type_id',
 'contract_amount',
 'annual_amount',
 'no_of_prop',
 'line_number',
 'is_free_hold',
 'ejari_bus_property_type_id',
 'ejari_property_type_id',
 'ejari_property_sub_type_id',
 'project_number',
 'area_id',
 'tenant_type_id']

some numeric columns, such as 'is_free_hold', 'area_id', 'ejari_property_type_id', etc. are good candidates for a categorical dtype.

let's apply to the the same filtering rule as we did with string columns, but this time we will be stricter and use ration threshold of 0.01

In [123]:
# Compute unique-ratio on the sample; we'll use ratio of 0.01 for numeric columns
ratios = (
    df_sample.select([(pl.col(c).n_unique() / pl.len()).alias(c) for c in numeric_cols])
              .row(0, named=True)
)
category_cols = [c for c, r in ratios.items() if r <= 0.01]

In [124]:
category_cols

['contract_reg_type_id',
 'no_of_prop',
 'line_number',
 'is_free_hold',
 'ejari_bus_property_type_id',
 'ejari_property_type_id',
 'ejari_property_sub_type_id',
 'area_id',
 'tenant_type_id']

let's do sanity check and sample from the category_cols list

In [130]:
df_sample[category_cols].sample(15)

contract_reg_type_id,no_of_prop,line_number,is_free_hold,ejari_bus_property_type_id,ejari_property_type_id,ejari_property_sub_type_id,area_id,tenant_type_id
i64,i64,i64,i64,i64,i64,i64,i64,i64
1,1,1,1,4,841,3,506,1
2,1,1,0,2,842,2,362,0
1,1,1,0,4,841,2,232,0
1,36,9,1,2,842,1,441,1
1,1,1,1,2,842,1,441,0
…,…,…,…,…,…,…,…,…
1,1,1,1,2,842,3,507,0
2,1,1,0,4,841,3,313,1
2,1,1,1,2,842,1,350,0
2,1,1,1,2,903,11,485,1


let's transform numeric columns from categor_cols list to String and then to Category

In [127]:
# Step 1: cast to Utf8 (string)
lf = lf.with_columns([pl.col(c).cast(pl.Utf8) for c in category_cols])

# Step 2: cast to Categorical
lf = lf.with_columns([pl.col(c).cast(pl.Categorical) for c in category_cols])

In [128]:
col_mem_size_df, n_gig_size = estimate_size(lf, n_rows)
print(f"In-memory pandas df size: {n_gig_size: 2.2f} GiB")

In-memory pandas df size:  1.09 GiB


by converting some numeric columns to String to Category, we further compressed the estimated memory consumption by pandas to 1.1 GiB

In [129]:
col_mem_size_df.query("col==@category_cols")

Unnamed: 0,col,est_total_mib
7,line_number,21.139132
9,area_id,19.19983
13,ejari_property_sub_type_id,9.101145
15,no_of_prop,9.014758
16,ejari_property_type_id,9.011357
21,ejari_bus_property_type_id,8.546598
24,contract_reg_type_id,8.538095
25,is_free_hold,8.538095
26,tenant_type_id,8.538095


In [131]:
col_mem_size_df.head(10)

Unnamed: 0,col,est_total_mib
0,contract_id,527.164107
1,contract_end_date,68.02179
2,contract_start_date,68.02179
3,contract_amount,68.02179
4,annual_amount,68.02179
5,project_number,68.02179
6,project_name_en,27.450873
7,line_number,21.139132
8,area_name_en,19.581433
9,area_id,19.19983


we optimized most of the columns. Column `contract_id` is a glaring exception: with its estimates 500+ MiB memory usage it accounts for 50% of the total memory consumption by the dataset.  

There are various ways to optimize this column, but we will just cast this column dtype to `string[pyarrow]` when casting the dataset to Pandas

In [133]:
schema

Schema([('contract_id', String),
        ('contract_reg_type_id', Int64),
        ('contract_reg_type_en', Categorical(ordering='physical')),
        ('contract_start_date', Date),
        ('contract_end_date', Date),
        ('contract_amount', Int64),
        ('annual_amount', Int64),
        ('no_of_prop', Int64),
        ('line_number', Int64),
        ('is_free_hold', Int64),
        ('ejari_bus_property_type_id', Int64),
        ('ejari_bus_property_type_en', Categorical(ordering='physical')),
        ('ejari_property_type_id', Int64),
        ('ejari_property_type_en', Categorical(ordering='physical')),
        ('ejari_property_sub_type_id', Int64),
        ('ejari_property_sub_type_en', Categorical(ordering='physical')),
        ('property_usage_en', Categorical(ordering='physical')),
        ('project_number', Int64),
        ('project_name_en', Categorical(ordering='physical')),
        ('master_project_en', Categorical(ordering='physical')),
        ('area_id', Int64),
     

## Step 4. Cast Polars to Pandas

In [135]:
df = lf.collect()

In [138]:
df = df.to_pandas()

In [None]:
df['contract_id'] = df['contract_id'].astype("string[pyarrow]")

In [150]:
df.memory_usage(deep=True)

Unnamed: 0,0
Index,132
contract_id,183359741
contract_reg_type_id,8915960
contract_reg_type_en,8915966
contract_start_date,71326016
contract_end_date,71326016
contract_amount,71326016
annual_amount,71326016
no_of_prop,17857786
line_number,17888924


In [149]:
df.dtypes

Unnamed: 0,0
contract_id,string[pyarrow]
contract_reg_type_id,category
contract_reg_type_en,category
contract_start_date,datetime64[ms]
contract_end_date,datetime64[ms]
contract_amount,int64
annual_amount,int64
no_of_prop,category
line_number,category
is_free_hold,category


In [151]:
fn=f"datasets/rent_contracts_8192_{date}.parquet"

df.to_parquet(fn,
               compression='brotli')

file_size = os.path.getsize(fn)
print(f"File size: {file_size/1000000: .2f} MiB")

File size:  127.37 MiB


* we saved our pre-process dataset into `parquet` with `Brotli` compression

* the size of the dataset on disk is less than 130 MiB, which compares well with the original dataset that occupied ~ 4.5 GiB memory space