# Evaluar nuestras necesidades de Big Data

###  Dataset Completo puede encontrarse en:
https://www.kaggle.com/datasets/usdot/flight-delays?select=flights.csv

En los archivos del curso tienes la versión reducida: *flights_small.csv*

In [1]:
import pandas as pd
import warnings
warnings.filterwarnings("ignore")
df = pd.read_csv("flights.csv",sep=",")

In [2]:
# Número de filas y columnas
print("Filas: " + str(df.shape[0]) + " - Columnas: " + str(df.shape[1]))

Filas: 5819079 - Columnas: 31


In [3]:
# Tamaño del DataFrame en memoria
print("Tamaño en memoria (MB): " + str(round(df.memory_usage(deep=True).sum() / 1024**2, 2)))

Tamaño en memoria (MB): 2484.59


In [4]:
# Número de valores únicos por columna
df.nunique()

YEAR                      1
MONTH                    12
DAY                      31
DAY_OF_WEEK               7
AIRLINE                  14
FLIGHT_NUMBER          6952
TAIL_NUMBER            4897
ORIGIN_AIRPORT          930
DESTINATION_AIRPORT     930
SCHEDULED_DEPARTURE    1321
DEPARTURE_TIME         1440
DEPARTURE_DELAY        1217
TAXI_OUT                184
WHEELS_OFF             1440
SCHEDULED_TIME          550
ELAPSED_TIME            712
AIR_TIME                675
DISTANCE               1363
WHEELS_ON              1440
TAXI_IN                 185
SCHEDULED_ARRIVAL      1435
ARRIVAL_TIME           1440
ARRIVAL_DELAY          1240
DIVERTED                  2
CANCELLED                 2
CANCELLATION_REASON       4
AIR_SYSTEM_DELAY        570
SECURITY_DELAY          154
AIRLINE_DELAY          1067
LATE_AIRCRAFT_DELAY     695
WEATHER_DELAY           632
dtype: int64

In [5]:
# Porcentaje de nulos por columna
(df.isnull().mean() * 100).round(2)

YEAR                    0.00
MONTH                   0.00
DAY                     0.00
DAY_OF_WEEK             0.00
AIRLINE                 0.00
FLIGHT_NUMBER           0.00
TAIL_NUMBER             0.25
ORIGIN_AIRPORT          0.00
DESTINATION_AIRPORT     0.00
SCHEDULED_DEPARTURE     0.00
DEPARTURE_TIME          1.48
DEPARTURE_DELAY         1.48
TAXI_OUT                1.53
WHEELS_OFF              1.53
SCHEDULED_TIME          0.00
ELAPSED_TIME            1.81
AIR_TIME                1.81
DISTANCE                0.00
WHEELS_ON               1.59
TAXI_IN                 1.59
SCHEDULED_ARRIVAL       0.00
ARRIVAL_TIME            1.59
ARRIVAL_DELAY           1.81
DIVERTED                0.00
CANCELLED               0.00
CANCELLATION_REASON    98.46
AIR_SYSTEM_DELAY       81.72
SECURITY_DELAY         81.72
AIRLINE_DELAY          81.72
LATE_AIRCRAFT_DELAY    81.72
WEATHER_DELAY          81.72
dtype: float64

In [6]:
# Columnas tipo texto
cols_obj = df.select_dtypes(include='object').columns
print("Columnas object: " + str(cols_obj.tolist()))

Columnas object: ['AIRLINE', 'TAIL_NUMBER', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT', 'CANCELLATION_REASON']


In [7]:
# Pasamos a categoría si tienen pocos valores distintos
for col in cols_obj:
    if df[col].nunique() < 50:
        df[col] = df[col].astype('category')

In [8]:
# Nuevo tamaño en memoria
print("Nuevo tamaño (MB): " + str(round(df.memory_usage(deep=True).sum() / 1024**2, 2)))

Nuevo tamaño (MB): 2033.54


In [9]:
# Agrupación como ejemplo de carga computacional
import time
start = time.time()

df.groupby(['AIRLINE'])['ARRIVAL_DELAY'].mean()

print("Tiempo: " + str(round(time.time() - start, 5)) + " seg")

Tiempo: 0.06845 seg


In [10]:
# Función simple con resumen
def resumen(df):
    return {
        'Retraso medio': float(df['DEPARTURE_DELAY'].mean().round(2)),
        'Máximo retraso': df['DEPARTURE_DELAY'].max(),
        'Cancelados': int(df['CANCELLED'].sum()),
        'Porcentaje cancelados': round(float(df['CANCELLED'].mean()) * 100, 2)
    }

resumen(df)

{'Retraso medio': 9.37,
 'Máximo retraso': 1988.0,
 'Cancelados': 89884,
 'Porcentaje cancelados': 1.54}

# Trabajar con Jupyter Notebook: Posibilidades

In [11]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5819079 entries, 0 to 5819078
Data columns (total 31 columns):
 #   Column               Dtype   
---  ------               -----   
 0   YEAR                 int64   
 1   MONTH                int64   
 2   DAY                  int64   
 3   DAY_OF_WEEK          int64   
 4   AIRLINE              category
 5   FLIGHT_NUMBER        int64   
 6   TAIL_NUMBER          object  
 7   ORIGIN_AIRPORT       object  
 8   DESTINATION_AIRPORT  object  
 9   SCHEDULED_DEPARTURE  int64   
 10  DEPARTURE_TIME       float64 
 11  DEPARTURE_DELAY      float64 
 12  TAXI_OUT             float64 
 13  WHEELS_OFF           float64 
 14  SCHEDULED_TIME       float64 
 15  ELAPSED_TIME         float64 
 16  AIR_TIME             float64 
 17  DISTANCE             int64   
 18  WHEELS_ON            float64 
 19  TAXI_IN              float64 
 20  SCHEDULED_ARRIVAL    int64   
 21  ARRIVAL_TIME         float64 
 22  ARRIVAL_DELAY        float64 
 23  DIVERTE

In [14]:
df.describe()

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,FLIGHT_NUMBER,SCHEDULED_DEPARTURE,DEPARTURE_TIME,DEPARTURE_DELAY,TAXI_OUT,WHEELS_OFF,...,SCHEDULED_ARRIVAL,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
count,5819079.0,5819079.0,5819079.0,5819079.0,5819079.0,5819079.0,5732926.0,5732926.0,5730032.0,5730032.0,...,5819079.0,5726566.0,5714008.0,5819079.0,5819079.0,1063439.0,1063439.0,1063439.0,1063439.0,1063439.0
mean,2015.0,6.524085,15.70459,3.926941,2173.093,1329.602,1335.204,9.370158,16.07166,1357.171,...,1493.808,1476.491,4.407057,0.002609863,0.01544643,13.48057,0.07615387,18.96955,23.47284,2.91529
std,0.0,3.405137,8.783425,1.988845,1757.064,483.7518,496.4233,37.08094,8.895574,498.0094,...,507.1647,526.3197,39.2713,0.05102012,0.1233201,28.00368,2.14346,48.16164,43.19702,20.43334
min,2015.0,1.0,1.0,1.0,1.0,1.0,1.0,-82.0,1.0,1.0,...,1.0,1.0,-87.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,2015.0,4.0,8.0,2.0,730.0,917.0,921.0,-5.0,11.0,935.0,...,1110.0,1059.0,-13.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
50%,2015.0,7.0,16.0,4.0,1690.0,1325.0,1330.0,-2.0,14.0,1343.0,...,1520.0,1512.0,-5.0,0.0,0.0,2.0,0.0,2.0,3.0,0.0
75%,2015.0,9.0,23.0,6.0,3230.0,1730.0,1740.0,7.0,19.0,1754.0,...,1918.0,1917.0,8.0,0.0,0.0,18.0,0.0,19.0,29.0,0.0
max,2015.0,12.0,31.0,7.0,9855.0,2359.0,2400.0,1988.0,225.0,2400.0,...,2400.0,2400.0,1971.0,1.0,1.0,1134.0,573.0,1971.0,1331.0,1211.0


Veamos un recuento:

In [15]:
df['AIRLINE'].value_counts() # veamos un recuento

AIRLINE
WN    1261855
DL     875881
AA     725984
OO     588353
EV     571977
UA     515723
MQ     294632
B6     267048
US     198715
AS     172521
NK     117379
F9      90836
HA      76272
VX      61903
Name: count, dtype: int64

In [16]:
df[['AIRLINE', 'ORIGIN_AIRPORT', 'DEPARTURE_DELAY']].head(10)

Unnamed: 0,AIRLINE,ORIGIN_AIRPORT,DEPARTURE_DELAY
0,AS,ANC,-11.0
1,AA,LAX,-8.0
2,US,SFO,-2.0
3,AA,LAX,-5.0
4,AS,SEA,-1.0
5,DL,SFO,-5.0
6,NK,LAS,-6.0
7,US,LAX,14.0
8,AA,SFO,-11.0
9,DL,LAS,3.0


In [17]:
df['SALIDA_TARDE'] = df['DEPARTURE_DELAY'] > 15
df[df['SALIDA_TARDE'] == True].head()

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,...,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY,SALIDA_TARDE
20,2015,1,1,4,NK,520,N525NK,LAS,MCI,55,...,6.0,0,0,,,,,,,True
29,2015,1,1,4,AA,2392,N3HRAA,DEN,MIA,120,...,2.0,0,0,,,,,,,True
30,2015,1,1,4,NK,168,N629NK,PHX,ORD,125,...,43.0,0,0,,43.0,0.0,0.0,0.0,0.0,True
52,2015,1,1,4,B6,2134,N307JB,SJU,MCO,400,...,85.0,0,0,,0.0,0.0,85.0,0.0,0.0,True
55,2015,1,1,4,B6,2276,N646JB,SJU,BDL,438,...,89.0,0,0,,17.0,0.0,72.0,0.0,0.0,True


In [18]:
df.sort_values('DEPARTURE_DELAY', ascending=False).head()

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,...,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY,SALIDA_TARDE
337720,2015,1,23,5,AA,1322,N598AA,BHM,DFW,700,...,1971.0,0,0,,0.0,0.0,1971.0,0.0,0.0,True
3412085,2015,8,1,6,AA,96,N479AA,RIC,DFW,709,...,1898.0,0,0,,20.0,0.0,1878.0,0.0,0.0,True
4103531,2015,9,13,7,AA,1063,N3CAAA,SAN,DFW,700,...,1665.0,0,0,,0.0,0.0,1665.0,0.0,0.0,True
5810811,2015,12,31,4,AA,2214,N4XKAA,ABQ,DFW,1041,...,1636.0,0,0,,0.0,0.0,1636.0,0.0,0.0,True
5279939,2015,11,27,5,AA,2559,N489AA,DTW,ORD,1027,...,1638.0,0,0,,7.0,0.0,1631.0,0.0,0.0,True


In [None]:
df['DEPARTURE_DELAY'].hist(bins=50)

In [None]:
df['AIRLINE'].value_counts().plot(kind='bar')

In [None]:
def resumen_retraso(origen):
    temp = df[df['ORIGIN_AIRPORT'] == origen]
    return float(temp['DEPARTURE_DELAY'].mean().round(2))

resumen_retraso('ATL')

In [None]:
df.groupby('MONTH')['ARRIVAL_DELAY'].mean().plot()

# Trabajar con PySpark. Similitudes y diferencias

No apto para ejecutar sin una sesión activa de Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, count, when

spark = SparkSession.builder.appName("flights").getOrCreate()
df_spark = spark.createDataFrame(df)

In [None]:
df.head()

df_spark.show(5)

In [None]:
df[['AIRLINE', 'ORIGIN_AIRPORT', 'DEPARTURE_DELAY']].head()

df_spark.select("AIRLINE", "ORIGIN_AIRPORT", "DEPARTURE_DELAY").show(5)

In [None]:
df[df['DEPARTURE_DELAY'] > 15].head()

df_spark.filter(col("DEPARTURE_DELAY") > 15).show(5)

In [None]:
df.sort_values('DEPARTURE_DELAY', ascending=False)[['AIRLINE', 'DEPARTURE_DELAY']].head()

df_spark.orderBy(col("DEPARTURE_DELAY").desc()).select("AIRLINE", "DEPARTURE_DELAY").show(5)

In [None]:
df.groupby('AIRLINE')['ARRIVAL_DELAY'].mean()

df_spark.groupBy("AIRLINE").agg(mean("ARRIVAL_DELAY").alias("media_retraso")).show()

In [None]:
df['SALIDA_TARDE'] = df['DEPARTURE_DELAY'] > 15
df[['DEPARTURE_DELAY', 'SALIDA_TARDE']].head()

df_spark.withColumn("SALIDA_TARDE", col("DEPARTURE_DELAY") > 15).select("DEPARTURE_DELAY", "SALIDA_TARDE").show(5)

In [None]:
df[df['CANCELLED'] == 1].shape[0]

df_spark.filter(col("CANCELLED") == 1).count()

In [None]:
df[['DEPARTURE_DELAY', 'ARRIVAL_DELAY']].mean()

df_spark.select(mean("DEPARTURE_DELAY"), mean("ARRIVAL_DELAY")).show()

In [None]:
df['CATEGORIA_RETRASO'] = df['ARRIVAL_DELAY'].apply(lambda x: "Alto" if x > 60 else "Normal")
df[['ARRIVAL_DELAY', 'CATEGORIA_RETRASO']].head()

df_spark.withColumn("CATEGORIA_RETRASO", when(col("ARRIVAL_DELAY") > 60, "Alto").otherwise("Normal")).select("ARRIVAL_DELAY", "CATEGORIA_RETRASO").show(5)

# Evaluar la eficiencia de nuestro código

In [26]:
import time

start = time.time()

df.groupby(['AIRLINE'])['ARRIVAL_DELAY'].mean()

print("Tiempo con time: " + str(round(time.time() - start, 4)) + " seg")

Tiempo con time: 0.0731 seg


In [27]:
time.time()

1754298227.2060614

In [29]:
import timeit

tiempo = timeit.timeit(
    stmt="df.groupby(['AIRLINE'])['ARRIVAL_DELAY'].mean()",
    globals=globals(),
    number=50
)

print("Tiempo con timeit: " + str(round(tiempo, 4)) + " seg")

Tiempo con timeit: 3.374 seg


In [30]:
def resumen_basico():
    return {
        'retraso medio': df['DEPARTURE_DELAY'].mean(),
        'retraso máximo': df['DEPARTURE_DELAY'].max(),
        'cancelados': int(df['CANCELLED'].sum())
    }

print(resumen_basico())

print("Tiempo función resumen: " + str(round(timeit.timeit("resumen_basico()", globals=globals(), number=100), 4)) + " seg (x100)")

{'retraso medio': np.float64(9.370158275198389), 'retraso máximo': 1988.0, 'cancelados': 89884}
Tiempo función resumen: 5.0464 seg (x100)


In [31]:
%%timeit
df.groupby(['AIRLINE'])['DEPARTURE_DELAY'].mean()

66.9 ms ± 252 μs per loop (mean ± std. dev. of 7 runs, 10 loops each)
