<h1 align=center> Proceso de ETL </h1>

<h2>Instalacion de librerias </h2>


In [None]:
%pip install -r ../requirements.txt

<h2> Importacion de libreria </h2>

In [18]:
import pandas as pd

import pyarrow
import gzip
import warnings
warnings.filterwarnings("ignore")
import numpy as np

<h2>Rutas </h2>

In [19]:
path = '../path.txt'

<h3 align=left> Funciones</h3>


In [20]:
def positivos(valor:float|int) -> float|int:
    if type(valor) == float:
        return float(abs(valor))
    else:
        return int(abs(valor))

In [21]:
def verificacion_nulos(data:pd.DataFrame) -> dict:
    porcentaje_nulos_dict = {}
    for columna in data.columns:
        nulos = data[columna].isnull().sum()
        total = len(data[columna])
        porcentaje = round(nulos/total*100,2)
        porcentaje_nulos_dict[columna] = porcentaje
        print(f'\nColumna: {columna:<20}      Cantidad de nulos: {nulos:<7}      Porcentaje de nulos: {str(porcentaje) + " %":<8}\n')
    return porcentaje_nulos_dict


In [22]:
def corregir_NaN_float(valor:float) -> float:
    if pd.isnull(valor):
        return float(-1)
    else:
        return float(valor)

In [23]:
def corregir_NaN_int(valor:float) -> int:
    if pd.isnull(valor):
        return int(-1)
    else:
        return int(valor)

In [24]:
def calcular_mediana_float(data:pd.Series) -> float:
    return float(round(data.median(), 2))


In [25]:
def calcular_mediana_int(data:pd.Series) -> int:
    return int(data.median())

In [26]:
def corregir_NaN_mediana_int(columna:str, data:pd.DataFrame) -> None:
    mediana = mediana_columna_int(data[columna][~data[columna].isnull()])
    def mediana_f(valor:float) -> int:
        if pd.isnull(valor):
            return int(mediana)
        else:
            return int(valor)
    data[columna] = data[columna].apply(mediana_f)


In [27]:
def corregir_NaN_mediana_float(columna:str, data:pd.DataFrame) -> None:
    mediana = mediana_columna_float(data[columna][~data[columna].isnull()])
    def mediana_f(valor:float) -> float:
        if pd.isnull(valor):
            return float(mediana)
        else:
            return float(valor)
    data[columna] = data[columna].apply(mediana_f)


In [28]:
def chequear_duplicados(data:pd.DataFrame) -> None:
    duplicados = data.duplicated().sum()
    total = len(data)
    porcentaje = round(duplicados/total*100,2)
    print(f'\nCantidad de duplicados: {duplicados:<7}      Porcentaje de duplicados: {str(porcentaje) + " %":<8}\n')
    if duplicados > 0:
         data = data.drop_duplicates()
         print(f'Se eliminaron {duplicados} registros duplicados.')


In [29]:
def testear_outliers(data: pd.DataFrame) -> None:
    columnas_outliers = []
    for columna in data.columns[2:]:
        omitir_columnas = ['zona_inicio', 'zona_fin', 'modalidad_pago', 'hora_inicio', 'hora_fin']
        if columna in omitir_columnas:
            continue
        q1 = data[columna][data[columna] > -1].quantile(0.25)
        q3 = data[columna][data[columna] > -1].quantile(0.75)
        iqr = q3 - q1
        modal = ''
        for valor in data[columna][data[columna] > -1].mode().to_list():
            modal += str(valor) + ', '
        modal = modal[:-2]
        promedio = round(data[columna][data[columna] > -1].mean(), 2)
        mediana = round(data[columna][data[columna] > -1].median(), 2)
        limite_superior = round(q3 + 1.5 * iqr, 2)
        limite_inferior = round(q1 - 1.5 * iqr, 2)
        if limite_inferior < 0:
            limite_inferior = 0
        outliers = data[columna][(data[columna] > limite_superior) | (data[columna] < limite_inferior)].count()
        if outliers > 0:
            columnas_outliers.append({'Columna': columna, 'Cantidad de outliers': outliers,
                                    'Límite inferior': limite_inferior, 'Límite superior': limite_superior,
                                    'Índices': data[(data[columna] < limite_inferior) | (data[columna] > limite_superior)].index.to_list()})
        print(f'\nColumna: {columna:<20}\n\nModal: {modal:<24} Promedio: {promedio:<24} Mediana: {mediana:<24}\n\nLímite inferior: {limite_inferior:<14} Límite superior: {limite_superior:<17} Outliers: {outliers:<16}\n\n')
        bins = (int(data[columna].max() +1) if data[columna].max() <= 40 else 40)
        plt.figure(figsize=(3,3))
        plt.hist(data[columna][data[columna] > -1], bins=bins, edgecolor='black')
        plt.title(f'Histograma de {columna}')
        plt.xlabel('Valor')
        plt.ylabel('Frecuencia')
        plt.show()
        print('\n\n')
        plt.figure(figsize=(3,3))
        plt.boxplot(data[columna][data[columna] > -1])
        plt.title(f'Boxplot de {columna}')
        plt.xlabel('Valor')
        plt.ylabel('Frecuencia')
        plt.show()
        print('\n\n\n\n')

Vemos que hay varias cosas para revisar, por lo que debemos hacer un EDA más profundo para cada columna. Después hay que hacer una función para solucionar esos problemas y volver a revisar la función de outliers.

<h2 align=left> Extraccion</h2>

In [30]:
  # Con esta funcion se lee cada linea que se toma como una variables con un string asignado

  with open(path, 'r') as archivo:
      for linea in archivo.readlines():
          exec(linea.strip(), globals())

In [31]:
df_taxis_verdes = pd.read_parquet(verdes_01_21, engine='pyarrow')

<h1>Taxis</h2>

<h2 align=Center> Transformacion de datos</h2>



In [32]:
df_taxis_verdes.columns


Index(['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime',
       'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID',
       'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax',
       'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge',
       'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge'],
      dtype='object')

In [33]:
renombre_columnas = {'tpep_pickup_datetime': 'inicio', 'tpep_dropoff_datetime': 'fin', 'passenger_count': 'pasajeros',
                  'trip_distance': 'distancia', 'PULocationID': 'zona_inicio', 'DOLocationID': 'zona_fin', 'fare_amount': 'tarifa_medida',
                  'total_amount': 'tarifa_total', 'congestion_surcharge': 'recargo_trafico', 'payment_type': 'modalidad_pago',
                  'tip_amount': 'propina', 'lpep_pickup_datetime': 'inicio', 'lpep_dropoff_datetime': 'fin'}

In [34]:
df_taxis_verdes.columns

Index(['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime',
       'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID',
       'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax',
       'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge',
       'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge'],
      dtype='object')

In [35]:
df_taxis_verdes.rename(columns = renombre_columnas, inplace=True)

In [36]:
df_taxis_verdes.columns

Index(['VendorID', 'inicio', 'fin', 'store_and_fwd_flag', 'RatecodeID',
       'zona_inicio', 'zona_fin', 'pasajeros', 'distancia', 'tarifa_medida',
       'extra', 'mta_tax', 'propina', 'tolls_amount', 'ehail_fee',
       'improvement_surcharge', 'tarifa_total', 'modalidad_pago', 'trip_type',
       'recargo_trafico'],
      dtype='object')

In [37]:
columnas_a_borrar = []
for columna in df_taxis_verdes.columns:
    if columna not in renombre_columnas.values():
        columnas_a_borrar.append(columna)

In [38]:
df_taxis_verdes.drop(columnas_a_borrar, axis=1, inplace=True)

In [39]:
df_taxis_verdes.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 76518 entries, 0 to 76517
Data columns (total 11 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   inicio           76518 non-null  datetime64[us]
 1   fin              76518 non-null  datetime64[us]
 2   zona_inicio      76518 non-null  int64         
 3   zona_fin         76518 non-null  int64         
 4   pasajeros        40471 non-null  float64       
 5   distancia        76518 non-null  float64       
 6   tarifa_medida    76518 non-null  float64       
 7   propina          76518 non-null  float64       
 8   tarifa_total     76518 non-null  float64       
 9   modalidad_pago   40471 non-null  float64       
 10  recargo_trafico  40471 non-null  float64       
dtypes: datetime64[us](2), float64(7), int64(2)
memory usage: 6.4 MB


## Manejo de datos nulos

Avanzando con el EDA nos econtramos con valores negativos en varias columnas. Antes de convertir los NaN a -1, normalizamos esos valores.

In [40]:
df_taxis_verdes.isnull().sum().all()


False

In [41]:
for columna in df_taxis_verdes.columns[2:]:
    df_taxis_verdes[columna] = df_taxis_verdes[columna].apply(positivos)


## Verificamos nulos o faltantes en cada columna:

In [42]:
nulos_dict = verificacion_nulos(df_taxis_verdes)


Columna: inicio                    Cantidad de nulos: 0            Porcentaje de nulos: 0.0 %   


Columna: fin                       Cantidad de nulos: 0            Porcentaje de nulos: 0.0 %   


Columna: zona_inicio               Cantidad de nulos: 0            Porcentaje de nulos: 0.0 %   


Columna: zona_fin                  Cantidad de nulos: 0            Porcentaje de nulos: 0.0 %   


Columna: pasajeros                 Cantidad de nulos: 36047        Porcentaje de nulos: 47.11 % 


Columna: distancia                 Cantidad de nulos: 0            Porcentaje de nulos: 0.0 %   


Columna: tarifa_medida             Cantidad de nulos: 0            Porcentaje de nulos: 0.0 %   


Columna: propina                   Cantidad de nulos: 0            Porcentaje de nulos: 0.0 %   


Columna: tarifa_total              Cantidad de nulos: 0            Porcentaje de nulos: 0.0 %   


Columna: modalidad_pago            Cantidad de nulos: 36047        Porcentaje de nulos: 47.11 % 


Columna: 

Dado que todas las columnas son numéricas a excepción de la columna fecha, los nulos de esas columnas los reemplazamos por -1. Las columnas 'pasajeros', 'zona_inicio' y 'zona_fin' tienen que ser enteros, el resto float.

Se tomó la decision de establecer un limite de hasta el 10% en cuanto AL porcentaje de nulos permitido para no realizar cambio. En caso de ser menor los nulos seran reemplazados por la mediana de la columna. En el caso contrario tomarán el valor de -1. Esta medida no se aplicará en las columnas 'zona_inicio' y 'zona_fin', ya que estas representan una numeración que se relaciona con los datos del mapa. Si las columnas mencionadas contienen NaN, se reemplazarán estos valores por -1.

In [43]:
for columna in df_taxis_verdes.columns[2:]:
    if df_taxis_verdes[columna].isnull().sum() == 0:
        continue
    else:
        if columna == 'zona_inicio' or columna == 'zona_fin':
            df_taxis_verdes[columna] = df_taxis_verdes[columna].apply(corregir_NaN_int)
        elif columna == 'pasajeros':
            if nulos_dict[columna] >= 10:
                df_taxis_verdes[columna] = df_taxis_verdes[columna].apply(corregir_NaN_int)
            else:
                corregir_NaN_mediana_int(columna, df_taxis_verdes)
        else:
            if nulos_dict[columna] >= 10:
                df_taxis_verdes[columna] = df_taxis_verdes[columna].apply(corregir_NaN_float)
            else:
                corregir_NaN_mediana_float(columna, df_taxis_verdes)





In [44]:
df_taxis_verdes.shape

(76518, 11)

In [45]:
df_taxis_verdes.to_csv(ETL,index=False)

In [46]:
df_taxis_verdes.head()

Unnamed: 0,inicio,fin,zona_inicio,zona_fin,pasajeros,distancia,tarifa_medida,propina,tarifa_total,modalidad_pago,recargo_trafico
0,2021-01-01 00:15:56,2021-01-01 00:19:52,43,151,1,1.01,5.5,0.0,6.8,2.0,0.0
1,2021-01-01 00:25:59,2021-01-01 00:34:44,166,239,1,2.53,10.0,2.81,16.86,1.0,2.75
2,2021-01-01 00:45:57,2021-01-01 00:51:55,41,42,1,1.12,6.0,1.0,8.3,1.0,0.0
3,2020-12-31 23:57:51,2021-01-01 00:04:56,168,75,1,1.99,8.0,0.0,9.3,2.0,0.0
4,2021-01-01 00:16:36,2021-01-01 00:16:40,265,265,3,0.0,52.0,0.0,52.8,3.0,0.0


In [47]:
df_taxis_verdes.columns

Index(['inicio', 'fin', 'zona_inicio', 'zona_fin', 'pasajeros', 'distancia',
       'tarifa_medida', 'propina', 'tarifa_total', 'modalidad_pago',
       'recargo_trafico'],
      dtype='object')

In [48]:
df_taxis_verdes[(df_taxis_verdes['pasajeros'] == 0) | (df_taxis_verdes['distancia'] == 0)].count()

inicio             2670
fin                2670
zona_inicio        2670
zona_fin           2670
pasajeros          2670
distancia          2670
tarifa_medida      2670
propina            2670
tarifa_total       2670
modalidad_pago     2670
recargo_trafico    2670
dtype: int64

Vemos que hay varias cosas para revisar, por lo que debemos hacer un EDA más profundo para cada columna. Después hay que hacer una función para solucionar esos problemas y volver a revisar la función de outliers.

<h2 align=Center> Carga de datos</h2>


