<a href="https://colab.research.google.com/github/TaniaHurtado2024/Seminario_BigData/blob/main/1_PPvsSpark_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Pandas vs Polars vs Spark vs Dask
[Author: Elias Buitrago Bolivar](https://github.com/ebuitrago?tab=repositories)

Inspired in: https://www.youtube.com/watch?v=mi9f9zOaqM8

Original data: Kaggle

This jupyter notebook is designed to study and compare different tools to read and manipulate data; to be used in the data undertanding phase. The corresponding explanations will be given directly in class, therefore the material isn't autoexplained. Don´t forget ask me for the access to the data. And, please, give credits to the original author's idea and, if consider, also to me.

_Updated: June 20th, 2023_

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Playing with pandas

In [None]:
import pandas as pd
flights_file1 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2018.parquet"
flights_file2 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2019.parquet"
flights_file3 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2020.parquet"
flights_file4 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2021.parquet"
flights_file5 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2022.parquet"
df1 = pd.read_parquet(flights_file1)
df2 = pd.read_parquet(flights_file2)
df3 = pd.read_parquet(flights_file3)
df4 = pd.read_parquet(flights_file4)
df5 = pd.read_parquet(flights_file5)

KeyboardInterrupt: 

In [None]:
# df = pd.concat([df3, df5])
# df = df3

In [None]:
%%timeit

df_agg = df.groupby(['Airline','Year'])[["DepDelayMinutes", "ArrDelayMinutes"]].agg(
    ["mean", "sum", "max"]
)
df_agg = df_agg.reset_index()
df_agg.to_parquet("temp_pandas.parquet")

In [None]:
!ls -GFlash temp_pandas.parquet

In [None]:
pd.read_parquet('temp_pandas.parquet')

In [None]:
pd.read_parquet('temp_pandas.parquet').info()

## Playing with Polars

In [None]:
import polars as pl

In [None]:
flights_file1 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2018.parquet"
flights_file2 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2019.parquet"
flights_file3 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2020.parquet"
flights_file4 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2021.parquet"
flights_file5 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2022.parquet"
df1 = pl.scan_parquet(flights_file1)
df2 = pl.scan_parquet(flights_file2)
df3 = pl.scan_parquet(flights_file3)
df4 = pl.scan_parquet(flights_file4)
df5 = pl.scan_parquet(flights_file5)

In [None]:
 %%timeit

df_polars = (
    pl.concat([df1, df2, df3, df4, df5])
    .groupby(['Airline', 'Year'])
    .agg([
        pl.col("DepDelayMinutes").mean().alias("avg_dep_delay"),
        pl.col("DepDelayMinutes").sum().alias("sum_dep_delay"),
        pl.col("DepDelayMinutes").max().alias("max_dep_delay"),
        pl.col("ArrDelayMinutes").mean().alias("avg_arr_delay"),
        pl.col("ArrDelayMinutes").sum().alias("sum_arr_delay"),
        pl.col("ArrDelayMinutes").max().alias("max_arr_delay"),
      ])
).collect()

df_polars.write_parquet('temp_polars.parquet')



11.4 s ± 560 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
!ls -GFlash temp_polars.parquet

12K -rw-r--r-- 1 root 8.1K Jul 11 01:14 temp_polars.parquet


## Playing with PySpark

In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, max, sum, concat

In [None]:
spark = SparkSession.builder.master("local[1]").appName("airline-example").getOrCreate()

In [None]:
flights_file1 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2018.parquet"
flights_file2 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2019.parquet"
flights_file3 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2020.parquet"
flights_file4 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2021.parquet"
flights_file5 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2022.parquet"

In [None]:
df_spark1 = spark.read.parquet(flights_file1)
df_spark2 = spark.read.parquet(flights_file2)
df_spark3 = spark.read.parquet(flights_file3)
df_spark4 = spark.read.parquet(flights_file4)
df_spark5 = spark.read.parquet(flights_file5)

In [None]:
df_spark = df_spark1.union(df_spark2)
df_spark = df_spark.union(df_spark3)
df_spark = df_spark.union(df_spark4)
df_spark = df_spark.union(df_spark5)

In [None]:
 %%timeit

df_spark_agg = df_spark.groupby("Airline", "Year").agg(
    avg("ArrDelayMinutes").alias('avg_arr_delay'),
    sum("ArrDelayMinutes").alias('sum_arr_delay'),
    max("ArrDelayMinutes").alias('max_arr_delay'),
    avg("DepDelayMinutes").alias('avg_dep_delay'),
    sum("DepDelayMinutes").alias('sum_dep_delay'),
    max("DepDelayMinutes").alias('max_dep_delay'),
)
df_spark_agg.write.mode('overwrite').parquet('temp_spark.parquet')

8.06 s ± 1.07 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
!ls -GFlash temp_pyspark.parquet

ls: cannot access 'temp_pyspark.parquet': No such file or directory


## Playing with dask

In [2]:
import pandas as pd
import dask.dataframe as dd
flights_file1 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2018.parquet"
flights_file2 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2019.parquet"
flights_file3 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2020.parquet"
flights_file4 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2021.parquet"
flights_file5 = "/content/drive/MyDrive/Data_Flights/Combined_Flights_2022.parquet"
df1 = dd.read_parquet(flights_file1)
df2 = dd.read_parquet(flights_file2)
df3 = dd.read_parquet(flights_file3)
df4 = dd.read_parquet(flights_file4)
df5 = dd.read_parquet(flights_file5)

In [4]:
df = dd.concat([df3, df5])

In [5]:
print(df.compute())

       FlightDate            Airline Origin Dest  Cancelled  Diverted  \
0      2020-09-01        Comair Inc.    PHL  DAY      False     False   
1      2020-09-02        Comair Inc.    PHL  DAY      False     False   
2      2020-09-03        Comair Inc.    PHL  DAY      False     False   
3      2020-09-04        Comair Inc.    PHL  DAY      False     False   
4      2020-09-05        Comair Inc.    PHL  DAY      False     False   
...           ...                ...    ...  ...        ...       ...   
590537 2022-03-31  Republic Airlines    MSY  EWR      False      True   
590538 2022-03-17  Republic Airlines    CLT  EWR       True     False   
590539 2022-03-08  Republic Airlines    ALB  ORD      False     False   
590540 2022-03-25  Republic Airlines    EWR  PIT      False      True   
590541 2022-03-07  Republic Airlines    EWR  RDU      False      True   

        CRSDepTime  DepTime  DepDelayMinutes  DepDelay  ...  WheelsOff  \
0             1905   1858.0              0.0     

In [6]:
df = df.compute()

In [7]:
%%timeit

df_agg = df.groupby(['Airline','Year'])[["DepDelayMinutes", "ArrDelayMinutes"]].agg(
    ["mean", "sum", "max"]
)
df_agg = df_agg.reset_index()
df_agg.to_parquet("temp_dask.parquet")

1.38 s ± 244 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
!ls -GFlash temp_dask.parquet

12K -rw-r--r-- 1 root 9.9K Jul 11 02:26 temp_dask.parquet


In [None]:
pd.read_parquet('temp_dask.parquet').info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 46 entries, 0 to 45
Data columns (total 8 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   (Airline, )              46 non-null     string 
 1   (Year, )                 46 non-null     int64  
 2   (DepDelayMinutes, mean)  46 non-null     float64
 3   (DepDelayMinutes, sum)   46 non-null     float64
 4   (DepDelayMinutes, max)   46 non-null     float64
 5   (ArrDelayMinutes, mean)  46 non-null     float64
 6   (ArrDelayMinutes, sum)   46 non-null     float64
 7   (ArrDelayMinutes, max)   46 non-null     float64
dtypes: float64(6), int64(1), string(1)
memory usage: 3.0 KB


In [None]:
pd.read_parquet('temp_dask.parquet')

Unnamed: 0_level_0,Airline,Year,DepDelayMinutes,DepDelayMinutes,DepDelayMinutes,ArrDelayMinutes,ArrDelayMinutes,ArrDelayMinutes
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,mean,sum,max,mean,sum,max
0,Air Wisconsin Airlines Corp,2020,8.583725,433315.0,1460.0,8.982529,452450.0,1439.0
1,Air Wisconsin Airlines Corp,2022,13.124801,510581.0,1355.0,13.340409,517261.0,1353.0
2,Alaska Airlines Inc.,2020,5.818328,772930.0,823.0,6.365082,843157.0,788.0
3,Alaska Airlines Inc.,2022,10.153994,1278134.0,915.0,11.02628,1382905.0,908.0
4,Allegiant Air,2020,12.825575,1080016.0,1648.0,13.331111,1115734.0,1645.0
5,Allegiant Air,2022,22.688601,1602632.0,1917.0,25.350068,1785963.0,1919.0
6,American Airlines Inc.,2020,7.624477,4084097.0,3890.0,7.861155,4202644.0,3864.0
7,American Airlines Inc.,2022,17.718716,8464195.0,2994.0,17.860139,8499122.0,2977.0
8,Capital Cargo International,2020,7.665063,512969.0,1482.0,8.427212,561522.0,1470.0
9,Capital Cargo International,2022,12.052814,619599.0,1512.0,13.050802,667418.0,1490.0


## Read Results

In [None]:
import pandas as pd

In [None]:
#agg_pandas = pd.read_parquet('temp_pandas.parquet')
agg_polars = pd.read_parquet('temp_polars.parquet')
agg_spark = pd.read_parquet('temp_spark.parquet')
agg_dask = pd.read_parquet('temp_dask.parquet')

In [None]:
#agg_pandas.shape, agg_polars.shape, agg_spark.shape, agg_dask.shape

In [None]:
#agg_pandas.sort_values(['Airline','Year']).head()

In [None]:
agg_polars.sort_values(['Airline','Year']).head()

Unnamed: 0,Airline,Year,avg_dep_delay,sum_dep_delay,max_dep_delay,avg_arr_delay,sum_arr_delay,max_arr_delay
5,Air Wisconsin Airlines Corp,2018,16.753459,1606774.0,1296.0,17.881934,1708887.0,1292.0
97,Air Wisconsin Airlines Corp,2019,16.868511,1742281.0,1690.0,17.610384,1811545.0,1707.0
51,Air Wisconsin Airlines Corp,2020,8.583725,433315.0,1460.0,8.982529,452450.0,1439.0
49,Air Wisconsin Airlines Corp,2021,16.553045,1290194.0,1421.0,17.32744,1346602.0,1416.0
106,Air Wisconsin Airlines Corp,2022,13.124801,510581.0,1355.0,13.340409,517261.0,1353.0


In [None]:
agg_spark.sort_values(['Airline','Year']).head()

Unnamed: 0,Airline,Year,avg_arr_delay,sum_arr_delay,max_arr_delay,avg_dep_delay,sum_dep_delay,max_dep_delay
0,Air Wisconsin Airlines Corp,2018,17.881934,1708887.0,1292.0,16.753459,1606774.0,1296.0
48,Air Wisconsin Airlines Corp,2019,17.610384,1811545.0,1707.0,16.868511,1742281.0,1690.0
56,Air Wisconsin Airlines Corp,2020,8.982529,452450.0,1439.0,8.583725,433315.0,1460.0
93,Air Wisconsin Airlines Corp,2021,17.32744,1346602.0,1416.0,16.553045,1290194.0,1421.0
119,Air Wisconsin Airlines Corp,2022,13.340409,517261.0,1353.0,13.124801,510581.0,1355.0


In [None]:
agg_dask.sort_values(['Airline','Year']).head()

Unnamed: 0_level_0,Airline,Year,DepDelayMinutes,DepDelayMinutes,DepDelayMinutes,ArrDelayMinutes,ArrDelayMinutes,ArrDelayMinutes
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,mean,sum,max,mean,sum,max
0,Air Wisconsin Airlines Corp,2020,8.583725,433315.0,1460.0,8.982529,452450.0,1439.0
1,Air Wisconsin Airlines Corp,2022,13.124801,510581.0,1355.0,13.340409,517261.0,1353.0
2,Alaska Airlines Inc.,2020,5.818328,772930.0,823.0,6.365082,843157.0,788.0
3,Alaska Airlines Inc.,2022,10.153994,1278134.0,915.0,11.02628,1382905.0,908.0
4,Allegiant Air,2020,12.825575,1080016.0,1648.0,13.331111,1115734.0,1645.0
