<a href="https://colab.research.google.com/github/GenoKiller777/PythonBasico/blob/main/PacketDifferenceExtraction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [21]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

Saving flights.parquet to flights.parquet
User uploaded file "flights.parquet" with length 5832620 bytes


In [1]:
!pip install polars

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
import pandas as pd
import polars as pl
from pyspark.sql import SparkSession

flights_file = "flights.parquet"

In [4]:
!ls -GFlash flights.parquet

5.6M -rw-r--r-- 1 root 5.6M Feb 14 17:53 flights.parquet


In [5]:
df = pd.read_parquet(flights_file)

In [6]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 336776 entries, 0 to 336775
Data columns (total 19 columns):
 #   Column          Non-Null Count   Dtype         
---  ------          --------------   -----         
 0   year            336776 non-null  int16         
 1   month           336776 non-null  int8          
 2   day             336776 non-null  int8          
 3   dep_time        328521 non-null  float64       
 4   sched_dep_time  336776 non-null  int16         
 5   dep_delay       328521 non-null  float64       
 6   arr_time        328063 non-null  float64       
 7   sched_arr_time  336776 non-null  int16         
 8   arr_delay       327346 non-null  float64       
 9   carrier         336776 non-null  object        
 10  flight          336776 non-null  int16         
 11  tailnum         334264 non-null  object        
 12  origin          336776 non-null  object        
 13  dest            336776 non-null  object        
 14  air_time        327346 non-null  flo

In [None]:
df.head(5)

## Pandas

In [9]:
%%timeit
import pandas as od

df = pd.read_parquet(flights_file)
df_agg = df.groupby(["carrier", "year"])[["dep_delay","arr_time"]].agg(
    ["mean","sum","max"]
)

df_agg = df_agg.reset_index()
df_agg.to_parquet('temp_pandas.parquet')

# 0.2 Segundos

169 ms ± 31.9 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [None]:
pd.read_parquet('temp_pandas.parquet')

## Polars

In [15]:
%%timeit
import polars as pl
flights_file = "flights.parquet"

df_polars = (
    pl.scan_parquet(flights_file)
    .groupby(["carrier", "year"])
    .agg(
        [
            pl.col("dep_delay").mean().alias("avg_dep_delay"),
            pl.col("dep_delay").sum().alias("sum_dep_delay"),
            pl.col("dep_delay").max().alias("max_dep_delay"),
            pl.col("arr_time").mean().alias("avg_arr_delay"),
            pl.col("arr_time").sum().alias("sum_arr_delay"),
            pl.col("arr_time").max().alias("max_arr_delay")
        ]
    )
).collect()

#0.11 Segundos

119 ms ± 32.4 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [14]:
df_polars.write_parquet('temp_polars.parquet')

## PySpark

In [16]:
#Importacion SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, max, sum

#Create SparkSession
spark = SparkSession.builder.master("local[1]").appName("airline-example").getOrCreate()
flights_file = "flights.parquet"

In [18]:
%%timeit

df_spark = spark.read.parquet(flights_file)
df_spark_agg = df_spark.groupby("carrier", "year").agg(
    avg("dep_delay").alias("avg_dep_delay"),
    sum("dep_delay").alias("sum_dep_delay"),
    max("dep_delay").alias("max_dep_delay"),
    avg("arr_time").alias("avg_arr_delay"),
    sum("arr_time").alias("sum_arr_delay"),
    max("arr_time").alias("max_arr_delay")
)

df_spark_agg.write.mode('overwrite').parquet('temp_spark.parquet')

# 1.15 Segundos

1.11 s ± 242 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [19]:
!ls -GFlash temp_spark.parquet

total 20K
4.0K drwxr-xr-x 2 root 4.0K Feb 14 19:11 ./
4.0K drwxr-xr-x 1 root 4.0K Feb 14 19:11 ../
4.0K -rw-r--r-- 1 root 3.0K Feb 14 19:11 part-00000-b4f3e1fe-40de-4efe-b890-c3c4f1ec92f0-c000.snappy.parquet
4.0K -rw-r--r-- 1 root   32 Feb 14 19:11 .part-00000-b4f3e1fe-40de-4efe-b890-c3c4f1ec92f0-c000.snappy.parquet.crc
   0 -rw-r--r-- 1 root    0 Feb 14 19:11 _SUCCESS
4.0K -rw-r--r-- 1 root    8 Feb 14 19:11 ._SUCCESS.crc


## Spark SQL

In [20]:
# Importacion SparkSession
from pyspark.sql import SparkSession
# Creacion SparkSession
spark = SparkSession.builder.master("local[1]").appName("airline-example").getOrCreate()

flights_file = "flights.parquet"

spark.sql(f"CREATE TEMPORARY VIEW flights USING parquet OPTIONS (path \"{flights_file}\")")

DataFrame[]

In [24]:
%%timeit

query = """
SELECT carrier,
       year,
       avg(dep_delay) AS avg_dep_delay,
       sum(dep_delay) AS sum_dep_delay,
       max(dep_delay) AS max_dep_delay,
       avg(arr_time) AS avg_arr_delay,
       sum(arr_time) AS sum_arr_delay,
       max(arr_time) AS max_arr_delay
FROM flights
GROUP BY carrier, year
"""
spark.sql(query).write.mode('overwrite').parquet('temp_spark_sql.parquet')

# 0.742 Segundos

In [25]:
!ls -GFlash temp_spark_sql.parquet

total 20K
4.0K drwxr-xr-x 2 root 4.0K Feb 14 19:12 ./
4.0K drwxr-xr-x 1 root 4.0K Feb 14 19:12 ../
4.0K -rw-r--r-- 1 root 3.0K Feb 14 19:12 part-00000-b2b059fb-0fa0-4c34-b892-2be853d89b40-c000.snappy.parquet
4.0K -rw-r--r-- 1 root   32 Feb 14 19:12 .part-00000-b2b059fb-0fa0-4c34-b892-2be853d89b40-c000.snappy.parquet.crc
   0 -rw-r--r-- 1 root    0 Feb 14 19:12 _SUCCESS
4.0K -rw-r--r-- 1 root    8 Feb 14 19:12 ._SUCCESS.crc


## Lectura de Resultados


In [26]:
import pandas as pd
agg_pandas = pd.read_parquet('temp_pandas.parquet')
agg_polars = pd.read_parquet('temp_polars.parquet')
agg_spark = pd.read_parquet('temp_spark.parquet')
agg_sparksql = pd.read_parquet('temp_spark_sql.parquet')

In [27]:
agg_pandas.shape, agg_polars.shape, agg_spark.shape, agg_sparksql.shape

((16, 8), (16, 8), (16, 8), (16, 8))

In [28]:
agg_pandas.sort_values(['carrier','year']).head()

Unnamed: 0_level_0,carrier,year,dep_delay,dep_delay,dep_delay,arr_time,arr_time,arr_time
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,mean,sum,max,mean,sum,max
0,9E,2013,16.725769,291296.0,747.0,1639.029692,28428970.0,2400.0
1,AA,2013,8.586016,275551.0,1014.0,1521.436757,48775741.0,2400.0
2,AS,2013,5.804775,4133.0,225.0,1564.894663,1114205.0,2355.0
3,B6,2013,13.022522,705417.0,502.0,1405.725012,76101735.0,2400.0
4,DL,2013,9.264505,442482.0,960.0,1572.877812,75098624.0,2400.0


In [29]:
agg_polars.sort_values('carrier').head()

Unnamed: 0,carrier,year,avg_dep_delay,sum_dep_delay,max_dep_delay,avg_arr_delay,sum_arr_delay,max_arr_delay
12,9E,2013,16.725769,291296,747,1639.029692,28428970,2400
9,AA,2013,8.586016,275551,1014,1521.436757,48775741,2400
10,AS,2013,5.804775,4133,225,1564.894663,1114205,2355
6,B6,2013,13.022522,705417,502,1405.725012,76101735,2400
5,DL,2013,9.264505,442482,960,1572.877812,75098624,2400


In [30]:
agg_spark.sort_values(['carrier','year']).head()

Unnamed: 0,carrier,year,avg_dep_delay,sum_dep_delay,max_dep_delay,avg_arr_delay,sum_arr_delay,max_arr_delay
6,9E,2013,16.725769,291296,747,1639.029692,28428970,2400
1,AA,2013,8.586016,275551,1014,1521.436757,48775741,2400
13,AS,2013,5.804775,4133,225,1564.894663,1114205,2355
15,B6,2013,13.022522,705417,502,1405.725012,76101735,2400
8,DL,2013,9.264505,442482,960,1572.877812,75098624,2400


In [31]:
agg_sparksql.sort_values(['carrier','year']).head()

Unnamed: 0,carrier,year,avg_dep_delay,sum_dep_delay,max_dep_delay,avg_arr_delay,sum_arr_delay,max_arr_delay
6,9E,2013,16.725769,291296,747,1639.029692,28428970,2400
1,AA,2013,8.586016,275551,1014,1521.436757,48775741,2400
13,AS,2013,5.804775,4133,225,1564.894663,1114205,2355
15,B6,2013,13.022522,705417,502,1405.725012,76101735,2400
8,DL,2013,9.264505,442482,960,1572.877812,75098624,2400
