# Low Memory Data Parsing

In this task I will try to parse data by making sure that it will keep the memory low

**Definition**
Low memory usage as the ability of a data processing framework to keep its peak resident memory footprint minimal, predictable, and well below a reasonable fraction of available system RAM, while maintaining correct execution and performance for large datasets.

Source:
1. https://www.ece.ucdavis.edu/~hhomayou/files/hosein-ccgrid.pdf
2. https://dl.acm.org/doi/10.1109/CCGRID.2018.00097
3. https://gac.udc.es/~juan/papers/encyclopedia2022.pdf

In order to make sure that our process can have low memory parsing, we will try to do comparison between four methods on running or parsing data in python.
1. Standard Pandas Data Processing: Pandas is the most widely-used data processing library in Python, making it the natural baseline for memory comparisons. It loads data fully into memory, so it clearly shows traditional bottlenecks in low-memory environments.
2. Pyspark data processing: PySpark supports distributed computing, making it suitable for scaling large datasets beyond local RAM. Comparing PySpark helps us understand how much overhead a JVM-based execution engine adds compared to pure-Python solutions.
3. Polar data processing: Polars uses Apache Arrow and Rust for fully optimized columnar processing, enabling significantly lower memory usage than Pandas. Including Polars highlights the benefits of modern, memory-efficient DataFrame technologies.
4. DuckDB (Lazy Mode): DuckDB processes data lazily using vectorized execution, reducing peak memory by avoiding full in-memory loading. This mode shows how far memory usage can be minimized when computation is pushed to the storage layer.

There are five parameters that we can use in order to make sure that we can see the low memory usage.
1. **Peak Memory Usage** : Maximum resident memory used during executions
2. **Memory Efficiency per Data Volume**: Memory used per million rows/ GB processed
3. **Memory Stability**: Fluctuations in memory overtime
4. **Garbage or Cache Reclaim**: Ability to free memory after expensive operations
5. **Scalability**: Behavior as data increases (50K - 100K - 200K) row of data



## Help Functions

Make sure you can install the pyspark

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.range(5).show()
spark.stop()


Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x10a3b7310>>
Traceback (most recent call last):
  File "/Users/muhammadfadlyhidayat/miniconda3/envs/deepsearch_v2/lib/python3.11/site-packages/ipykernel/ipkernel.py", line 781, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(

KeyboardInterrupt: 
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/23 11:00:31 WARN Utils: Your hostname, Muhammads-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.18.24 instead (on interface en0)
25/11/23 11:00:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/23 11:00:32 WARN NativeCodeLoader: Unable to load native-h

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [2]:
import time
import psutil
import pandas as pd
import polars as pl
from pyspark.sql import SparkSession
import findspark
import gc
import os

# Initialize Spark
findspark.init()
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11/libexec/openjdk.jdk/Contents/Home"
spark = SparkSession.builder \
    .appName("MemoryBenchmark") \
    .master("local[*]") \
    .config("spark.driver.memory", "6g") \
    .config("spark.sql.execution.arrow.enabled", "true") \
    .getOrCreate()

### ðŸ“Œ Memory helpers ###
def memory_used_mb():
    return psutil.Process().memory_info().rss / (1024 * 1024)

# JVM memory for Spark
def spark_memory_mb():
    jvm = spark._sc._jvm
    runtime = jvm.java.lang.Runtime.getRuntime()
    return (runtime.totalMemory() - runtime.freeMemory()) / (1024 * 1024)


In [3]:
import pandas as pd
import polars as pl
import duckdb

def load_pandas(f):
    return pd.read_csv(f)

def load_polars(f):
    return pl.read_csv(f)

def load_spark(f):
    df = spark.read.csv(f, header=True, inferSchema=True).cache()
    df.count()  # force caching
    return df

# DuckDB: Lazy (does not load fully into memory)
def load_duckdb_lazy(f):
    con = duckdb.connect(database=':memory:')
    df = con.execute(f"SELECT * FROM read_csv_auto('{f}')").pl()  # Lazily evaluated
    con.close()
    return df


## Metrics Evaluations Functions

### Peak Memory Usage

In [14]:
def measure_peak_memory(name, load_fn, file):
    if name == "PySpark":
        before = spark_memory_mb()
        df = load_fn(file)
        after = spark_memory_mb()
    else:
        before = memory_used_mb()
        df = load_fn(file)
        after = memory_used_mb()

    return df, round(after - before, 2)

### Memory Efficiency (MB/Million rows)

In [15]:
def memory_efficiency(peak, rows):
    return round(peak / (rows / 1_000_000), 3)


### Memory Stability

In [16]:
def memory_stability(df, name):
    mem = []
    for _ in range(3):
        if name == "PySpark":
            df.limit(1000).count()
            mem.append(spark_memory_mb())
        else:
            df.head(1000)
            mem.append(memory_used_mb())
    return round(pd.Series(mem).std(), 3)

### Garbage Reclaim Ability

In [17]:
def garbage_reclaim(df, name):
    if name == "PySpark":
        df.unpersist()
        before = spark_memory_mb()
        spark.catalog.clearCache()
        gc.collect()
        after = spark_memory_mb()
    else:
        before = memory_used_mb()
        del df
        gc.collect()
        after = memory_used_mb()

    return round(before - after, 2)

### Scalability Test

In [18]:
def scalability(loader, name, file):
    sizes = [50_000, 100_000, 200_000]
    mem = []

    for size in sizes:
        df = loader(file)
        if name == "PySpark":
            df.limit(size).count()
            mem.append(spark_memory_mb())
            df.unpersist()
        else:
            df = df.head(size)
            mem.append(memory_used_mb())
        gc.collect()

    return round((mem[-1] - mem[0]) / (sizes[-1] - sizes[0]) * 1000, 3)

## Full Benchmark Execution

In [None]:
FILE = "xx/customers-2000000.csv"

frameworks = [
    ("Pandas", load_pandas),
    ("Polars", load_polars),
    ("PySpark", load_spark),
    ("DuckDB Lazy", load_duckdb_lazy)
]


results = []
for name, loader in frameworks:
    record = {"Framework": name}

    df, peak = measure_peak_memory(name, loader, FILE)
    record["Peak Memory (MB)"] = peak

    if name == "PySpark":
        rows = df.count()
    elif name == "DuckDB-Lazy":
        rows = df.count()[0][0]
    else:
        rows = df.shape[0]

    record["Memory / 1M rows (MB)"] = memory_efficiency(peak, rows)
    record["Memory Stability (std)"] = memory_stability(df, name)
    record["Memory Reclaimed (MB)"] = garbage_reclaim(df, name)
    record["Scalability (MB / 1K rows growth)"] = scalability(loader, name, FILE)

    results.append(record)

spark.stop()

df_results = pd.DataFrame(results)
df_results

25/11/23 11:01:13 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
25/11/23 11:01:13 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
25/11/23 11:01:13 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
25/11/23 11:01:13 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.enabled' instead of it.
25/11/23 11:01:13 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.enabled' has been deprecated in Spark v3.0 and may be removed in the f

Unnamed: 0,Framework,Peak Memory (MB),Memory / 1M rows (MB),Memory Stability (std),Memory Reclaimed (MB),Scalability (MB / 1K rows growth)
0,Pandas,1412.22,706.11,0.0,0.0,3.469
1,Polars,642.67,321.335,0.0,4.0,0.891
2,PySpark,567.55,283.775,112.017,-0.54,0.758
3,DuckDB Lazy,161.06,80.53,0.0,0.0,-1.628


### Summary
1. Peak Memory (MB)
Highest: Pandas â€” consumes the most memory when loading data
Lowest: DuckDB Lazy â€” very efficient, minimal loading into RAM
Polars & PySpark fall in the middle, far better than Pandas

2. Memory / 1M Rows (MB)
Pandas: high memory-per-row â†’ not scalable
Polars & PySpark: much lower footprint â†’ strong scalability
DuckDB Lazy: best efficiency â†’ ideal for large datasets

3. Memory Stability (std)
Polars, Pandas, DuckDB Lazy: extremely stable memory usage
PySpark: variable memory usage due to dynamic JVM/executor behavior

4. Memory Reclaimed (MB)
PySpark & Polars reclaim memory after execution
Pandas & DuckDB Lazy do not show active memory release during this test

5. Scalability (Memory Growth Rate)
DuckDB Lazy: negative growth â†’ exceptional memory reuse
PySpark & Polars: small incremental growth â†’ scalable
Pandas: sharp increase â†’ not suitable for growing data sizes

### Conclusion
For large-scale and low-memory workloads, DuckDB Lazy is the most efficient, Polars provides the best balance of speed and stability, PySpark is useful only when distributed computing is needed, while Pandas is the least scalable option