![JohnSnowLabs](https://nlp.johnsnowlabs.com/assets/images/logo.png)


[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/PySpark/9.PySpark_Estimate_Size.ipynb)


# Overview

Sometimes it is an important question, how much memory does our DataFrame use? And there is no easy answer if you are working with PySpark. You can try to collect the data sample and run local memory profiler. You can estimate the size of the data in the source (for example, in parquet file). But from PySpark API only string representation is available and we will work with it. Please review [this page](https://semyonsinchenko.github.io/ssinchenko/post/estimation-spark-df-size/) for more information.


### Install PySpark


In [None]:
# install PySpark
%pip install pyspark==3.5.6

Note: you may need to restart the kernel to use updated packages.


### Initializing Spark


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

spark

In [None]:
#  DO NOT FORGET WHEN YOU'RE DONE => spark.stop()

# Pyspark DataFrame


In [2]:
import pandas as pd
import io
import re
import contextlib
from pyspark.sql import DataFrame

The functions we've defined to calculate estimated sizes.


In [3]:
def _bytes2unit(bb: float, unit: str) -> float:
    units = {
        "B": 1,
        "KiB": 1024,
        "MiB": 1024 * 1024,
        "GiB": 1024 * 1024 * 1024,
        "TiB": 1024 * 1024 * 1024 * 1024,
    }
    return bb * units[unit]


def convert_unit_to_bytes(size: float, unit: str) -> str:
    units = {"B": "Byte", "KiB": "KiB", "MiB": "MiB", "GiB": "GiB", "TiB": "TiB"}
    return f"{size:.0f} {units[unit]}"


def estimate_size_of_df(df: DataFrame) -> tuple:
    """Estimate the size of the given DataFrame in different units.
    If the size cannot be estimated return (-1.0, -1.0, '').
    Sizes are returned in original format, size in bytes, and original unit.

    This function works only in PySpark 3.0.0 or higher!

    :param df: DataFrame
    :returns: Tuple containing original size, size in bytes, and original unit
    """
    with contextlib.redirect_stdout(io.StringIO()) as stdout:
        # mode argument was added in 3.0.0
        df.explain(mode="cost")

    top_line = stdout.getvalue().split("\n")[1]

    # We need a pattern to parse the real size and units
    pattern = r"^.*sizeInBytes=([0-9]+\.[0-9]+)\s(B|KiB|MiB|GiB|TiB).*$"

    _match = re.search(pattern, top_line)

    if _match:
        size = float(_match.groups()[0])
        unit = _match.groups()[1]
    else:
        return -1.0, -1.0, ""

    return (
        size,
        _bytes2unit(size, unit),
        unit,
    )  # original size, size in bytes, original unit

Let's now estimate the sizes of different types of data frames here.


## CSV


In [4]:
# Run these lines to fetch the sample dataset if you are on Colab
!mkdir -p ./data
!wget -q -P ./data https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/PySpark/data/amazonFood.csv

In [5]:
amazon_csv = spark.read.csv("./data/amazonFood.csv", header=True)

In [6]:
# Calculate the DataFrame size using the function

original_size, size_in_bytes, original_unit = estimate_size_of_df(amazon_csv)

if original_size == -1 or size_in_bytes == -1:
    print("Unable to calculate DataFrame size.")

else:
    formatted_original_size = (
        "{:.2f}".format(original_size)
        if original_unit != "B"
        else "{:.0f}".format(original_size)
    )
    formatted_size_in_bytes = convert_unit_to_bytes(size_in_bytes, original_unit)

    if original_unit != "B":
        print(
            f"CSV DataFrame size: {formatted_original_size} {original_unit} or {size_in_bytes:.0f} Byte"
        )
    else:
        print(f"CSV DataFrame size: {formatted_original_size} {original_unit} Byte")

CSV DataFrame size: 22.60 MiB or 23697818 Byte


## Parquet


In [7]:
%%time

amazon_csv.write.parquet('./savedData/amazonFood.parquet')

amazon_parquet = spark.read.parquet('./savedData/amazonFood.parquet')

CPU times: user 1.66 ms, sys: 928 µs, total: 2.59 ms
Wall time: 3.56 s


In [8]:
# Calculate the DataFrame size using the function

original_size, size_in_bytes, original_unit = estimate_size_of_df(amazon_parquet)

if original_size == -1 or size_in_bytes == -1:
    print("Unable to calculate DataFrame size.")

else:
    formatted_original_size = (
        "{:.2f}".format(original_size)
        if original_unit != "B"
        else "{:.0f}".format(original_size)
    )
    formatted_size_in_bytes = convert_unit_to_bytes(size_in_bytes, original_unit)

    if original_unit != "B":
        print(
            f"Parquet DataFrame size: {formatted_original_size} {original_unit} or {size_in_bytes:.0f} Byte"
        )
    else:
        print(f"Parquet DataFrame size: {formatted_original_size} {original_unit} Byte")

Parquet DataFrame size: 10.60 MiB or 11114906 Byte


As seen in the examples above, CSV files occupy approximately twice the space of Parquet files.
