# Polars, Arrow and Spark

## Accelerating analytics with BlocPower's massive dataset

---

Dataset - 

This notebook explores methods to speed up and optimize working with large dataframes in Python. Because of compute and storage limitations, we desperately need ways to
- make operations faster

- consume less memory and/or handle data larger than memory

There are three methods tried here:

- Pandas: Regular workflow

- Dask: Flexible library for parallel computing in Python.

- Polars: Pandas alternative using Apache Arrow columnar memory written in Rust

The first step is common for all three methods - **define a query in SQL in the python notebook** to refer to our chosen data. This marks a distinct change in our usual approach, where we pre-add a dataset in the project or thru the visual GUI, transform/filter and then import it into the notebook


## Results:

For the limited use case we loop thru a list of state code (here NY and RI), read in the data, count the number of missing values in each variable, append the dataframes together and perform a simple groupby to calculate the mean Energy Use Intensity in a county.

The time and memory taken by each library is given.

- **Library |  Time   | Memory**

- Pandas  | 36.06s  | 3000 MB

- Dask    | 75.33s  | 64 BYTES

- Pandas  | 38.66s  | 48 BYTES


As datasets get larger, the difference grows and Pandas performs much worse.


In [3]:
%%capture

# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.2.0 redivis polars 


In [7]:
# Full documentation available at:
# https://apidocs.redivis.com/client-libraries/redivis-python

import redivis
import numpy as np
import pandas as pd
#import polars as pl
import sys as sys
# import dask.dataframe as dd
# import dask.bag as db
# from dask.delayed import delayed
import time as time
# import dask.array as da

import os
# !conda install -y polars
import polars as pl


### Insert your Redivis API Key

In [8]:

#give API key
os.environ['REDIVIS_API_TOKEN'] = 'token'


# #declare organization
# organization = redivis.organization("EIDC")


# # select dataset

# dataset = organization.dataset("spatial01")

## Pandas (+ PyArrow Backend)

In [17]:
st2=time.time()

# list of states
states = ['NY', 'RI', 'AZ']

dfs = []
for state in states:
    query = redivis.query(f"""
        SELECT * 
        FROM EIDC.blocpower_active.blocpower_core
        WHERE state = '{state}'
        """)
    # df = query.to_dataframe()   # 40 seconds
    df = query.to_pandas_dataframe(dtype_backend="numpy")
    size_in_mb = sys.getsizeof(df)/ (1024**2)
    print("######################################")

    print(f"Memory consumed by Pandas DF for {state}: {size_in_mb:.2f} MB")
    
    missing_values_count = df.address.isna().sum()
    
    print(f"Missing addresses for {state}:", missing_values_count )
    print("-------------------------------------")
    dfs.append(df)
    
pandas_df = pd.concat(dfs, ignore_index=True)

dfcounty = pandas_df.groupby('county').total_source_energy_GJ.mean()

et2=time.time()

dur2 = et2-st2

print("time pandas -", dur2)

  0%|          | 0/4875116 [00:00<?, ?it/s]

Memory consumed by Pandas DF for NY: 2966.39 MB
######################################
Missing addresses for NY
-------------------------------------
851884


  0%|          | 0/356365 [00:00<?, ?it/s]

Memory consumed by Pandas DF for RI: 230.18 MB
######################################
Missing addresses for RI
-------------------------------------
18105


  0%|          | 0/546816 [00:00<?, ?it/s]

Memory consumed by Pandas DF for AZ: 299.14 MB
######################################
Missing addresses for AZ
-------------------------------------
165129
time pandas - 44.24361228942871


# Polars

In [22]:
#https://towardsdatascience.com/understanding-groupby-in-polars-dataframe-by-examples-1e910e4095b3

st2=time.time()

states = ['NY', 'RI', 'AZ']
dfs = []
for state in states:
    query = redivis.query(f"""
        SELECT * 
        FROM EIDC.blocpower_active.blocpower_core
        WHERE state = '{state}'
        """)
    #CONVERT TO PANDAS
    df = query.to_polars_lazyframe(progress=True)
    
    #GET SIZE IN MEMORY
    print(f"Memory consumed by Polars DF for {state}: {sys.getsizeof(df):.2f} BYTES ")
    
    null_count_df=df.null_count()
    
    dfs.append(df)
    
polars_df = pl.concat(dfs)

q = (
    polars_df    
    .lazy()
    .groupby(by='county')
    .agg(
        [
            pl.col('total_source_energy_GJ').mean().alias('mean_energy'),
            
        ]
    )    
)

polars_df = q.collect()

# q = (
#     polars_df.lazy()
#     .groupby("county")
#     .agg(mean_energy=('total_source_energy_GJ', pl.mean()))
# )

et2=time.time()

dur2 = et2-st2
print("time polars -", dur2)


  0%|          | 0/4875116 [00:00<?, ?it/s]

Memory consumed by Polars DF for NY: 40.00 BYTES 


  0%|          | 0/356365 [00:00<?, ?it/s]

Memory consumed by Polars DF for RI: 40.00 BYTES 


  0%|          | 0/546816 [00:00<?, ?it/s]

Memory consumed by Polars DF for AZ: 40.00 BYTES 


  polars_df
PARTITIONED DS


time polars - 25.754555702209473


## Polars = 25.7 seconds

## Pandas = 44.2 seconds


---

In [None]:

states = ["DC", "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", 
    "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", 
    "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", 
    "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", 
    "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"]

## Download entire data to disk


### PyArrow and Redivis

Returns a representation of the query results as a PyArrow Table. Since arrow is the underlying transport format for Redivis data, loading data directly into an arrow table will always be the most performant in-memory option. 

In [24]:
#https://towardsdatascience.com/understanding-groupby-in-polars-dataframe-by-examples-1e910e4095b3
import pyarrow.dataset as ds

st2=time.time()

# Query
query = redivis.query(f"""
    SELECT * 
    FROM EIDC.blocpower_active.blocpower_core
    WHERE ADDRESS IS NOT NULL
    """)
#CONVERT TO arrow df
df = query.to_arrow_dataset(max_results=None, *, progress=True, batch_preprocessor=None) 

# #GET SIZE IN MEMORY
print(f"Memory consumed by Arrow DF for {state}: {sys.getsizeof(df):.2f} BYTES ")
 

# Write the dataset to Parquet
ds.write_dataset(df, 'bp_core.parquet', format="parquet")



  0%|          | 0/68496879 [00:00<?, ?it/s]

PyArrow doesn't have a built-in method to directly write FileSystemDataset objects in chunks to Parquet. Instead, you typically work with PyArrow Tables or RecordBatches that are part of the dataset. The idea is to iterate over partitions or batches of the dataset, writing each to Parquet files incrementally.

---

#### Add `.parquet` to .gitignore

In [39]:
!touch .gitignore

!echo ".parquet" >> .gitignore

!cat .gitignore

## Spark

### Initialize PySpark Session + Enable PyArrow

In [28]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PyArrowIntegration") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/28 18:20:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Read .parquet into Spark Dataframe


Two fundamental concepts in Apache Spark:

- **Resilient Distributed Dataset (RDD)**:

The fundamental data structure of Apache Spark. Low-level API.
It represents an immutable, distributed collection of objects that can be processed in parallel. RDDs can store any type of data and are compiled down to a set of JVM bytecode instructions that can execute on a cluster.


- **DataFrame**:

Distributed collection of data organized into named columns, similar to a table in a relational database. It is built on top of the RDD API and provides a higher-level abstraction.

In [41]:
%%time

sdf = spark.read.parquet("bp_core.parquet",
    header=True
)
sdf.show()

[Stage 2:>                                                          (0 + 1) / 1]

+-------------+-----+-----------------+---------------+-----+--------------------+----------+----------+---------------+-------------------+-------------------+-----------------+--------------------+---------------------------+--------------------+----------------------+
|  building_id|state|           county|           city|  zip|             address|area_sq_ft|year_built|  building_type|cooling_system_type|heating_system_type|heating_fuel_type|energy_use_intensity|energy_efficiency_potential|total_site_energy_GJ|total_source_energy_GJ|
+-------------+-----+-----------------+---------------+-----+--------------------+----------+----------+---------------+-------------------+-------------------+-----------------+--------------------+---------------------------+--------------------+----------------------+
|1017004124081|   RI|       Washington|SOUTH KINGSTOWN|02879|11 ELWOOD CT SOUT...|      1596|      null|   SingleFamily|               NONE|          HOT WATER|              Gas|     3

                                                                                