# Preparación de datos
## Importar bibliotecas

In [2]:
import os
import json
import math
import pandas as pd
from datetime import date, time
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

import dask
from dask_sql import Context
import dask.dataframe as dd
import dask.array as da

from dask import delayed
from dask.distributed import Client, LocalCluster
import dask.multiprocessing

In [5]:
# Configurar el clúster de Dask
cluster = LocalCluster(n_workers=2, threads_per_worker=1, memory_limit='6GB')
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 2,Total memory: 11.18 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:50575,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 2
Started: Just now,Total memory: 11.18 GiB

0,1
Comm: tcp://127.0.0.1:50586,Total threads: 1
Dashboard: http://127.0.0.1:50587/status,Memory: 5.59 GiB
Nanny: tcp://127.0.0.1:50578,
Local directory: C:\Users\ozi\AppData\Local\Temp\dask-scratch-space\worker-6jpv_pxa,Local directory: C:\Users\ozi\AppData\Local\Temp\dask-scratch-space\worker-6jpv_pxa

0,1
Comm: tcp://127.0.0.1:50589,Total threads: 1
Dashboard: http://127.0.0.1:50590/status,Memory: 5.59 GiB
Nanny: tcp://127.0.0.1:50579,
Local directory: C:\Users\ozi\AppData\Local\Temp\dask-scratch-space\worker-pb4qedo3,Local directory: C:\Users\ozi\AppData\Local\Temp\dask-scratch-space\worker-pb4qedo3


In [4]:
cluster.close()
client.close()

### Importación de Datasets `Yellow Taxi Trip Records`

In [6]:
# Ruta al directorio que contiene los archivos Parquet
directorio = "../datasets/raw/"

# Obtener la lista de archivos en el directorio
archivos_parquet = [os.path.join(directorio, f) for f in os.listdir(directorio) if os.path.isfile(os.path.join(directorio, f)) and f.endswith('.parquet')]

# Inicializar el DataFrame de Dask para la concatenación
df_concatenado = None

# Iterar sobre los archivos Parquet
for archivo in archivos_parquet:
    # Verificar si el archivo contiene "yellow", ".parquet" y "2022" en su nombre
    if 'yellow' in archivo and '.parquet' in archivo and '2022' in archivo:
        # Leer el archivo Parquet en un DataFrame de Dask
        df = dd.read_parquet(archivo, engine='pyarrow')
        
        # Concatenar el DataFrame al DataFrame de Dask principal
        if df_concatenado is None:
            df_concatenado = df
        else:
            df_concatenado = dd.concat([df_concatenado, df])

# Ahora el DataFrame 'df_concatenado' contiene todos los datos de los archivos Parquet
# que cumplen con los criterios especificados.

### Exploración de datos

In [4]:
print("Cantidad de registros:", len(df_concatenado), "Cantidad de las Columnas:", len(df_concatenado.columns))
print("Schema:")
print(df_concatenado.dtypes)  # Para obtener los tipos de datos de las columnas
print("\n")

Cantidad de registros: 39656098 Cantidad de las Columnas: 19
Schema:
VendorID                           int64
tpep_pickup_datetime      datetime64[us]
tpep_dropoff_datetime     datetime64[us]
passenger_count                  float64
trip_distance                    float64
RatecodeID                       float64
store_and_fwd_flag       string[pyarrow]
PULocationID                       int64
DOLocationID                       int64
payment_type                       int64
fare_amount                      float64
extra                            float64
mta_tax                          float64
tip_amount                       float64
tolls_amount                     float64
improvement_surcharge            float64
total_amount                     float64
congestion_surcharge             float64
airport_fee                      float64
dtype: object




In [7]:
df_concatenado.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2022-01-01 00:35:40,2022-01-01 00:53:29,2.0,3.8,1.0,N,142,236,1,14.5,3.0,0.5,3.65,0.0,0.3,21.95,2.5,0.0
1,1,2022-01-01 00:33:43,2022-01-01 00:42:07,1.0,2.1,1.0,N,236,42,1,8.0,0.5,0.5,4.0,0.0,0.3,13.3,0.0,0.0
2,2,2022-01-01 00:53:21,2022-01-01 01:02:19,1.0,0.97,1.0,N,166,166,1,7.5,0.5,0.5,1.76,0.0,0.3,10.56,0.0,0.0
3,2,2022-01-01 00:25:21,2022-01-01 00:35:23,1.0,1.09,1.0,N,114,68,2,8.0,0.5,0.5,0.0,0.0,0.3,11.8,2.5,0.0
4,2,2022-01-01 00:36:48,2022-01-01 01:14:20,1.0,4.3,1.0,N,68,163,1,23.5,0.5,0.5,3.0,0.0,0.3,30.3,2.5,0.0


In [12]:
# Obtener el recuento de valores nulos como una operación lazy en Dask
lazy_result = df_concatenado.isnull().sum()

# Ejecutar la operación lazy y obtener el resultado
result = lazy_result.compute()

# Mostrar el resultado
result

VendorID                       0
tpep_pickup_datetime           0
tpep_dropoff_datetime          0
passenger_count          1368303
trip_distance                  0
RatecodeID               1368303
store_and_fwd_flag       1368303
PULocationID                   0
DOLocationID                   0
payment_type                   0
fare_amount                    0
extra                          0
mta_tax                        0
tip_amount                     0
tolls_amount                   0
improvement_surcharge          0
total_amount                   0
congestion_surcharge     1368303
airport_fee              1368303
dtype: int64

In [2]:
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

# Configurar el clúster de Dask
cluster = LocalCluster(n_workers=2, threads_per_worker=1, memory_limit='6GB')
client = Client(cluster)

def optimize_and_process_dataframe(df):
    # Definir las columnas a eliminar
    columnas_a_eliminar = ['store_and_fwd_flag', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'congestion_surcharge', 'airport_fee']
    
    # Eliminar las columnas del DataFrame de Dask
    df = df.drop(columns=columnas_a_eliminar)
    
    # Eliminar filas con valores nulos del DataFrame de Dask
    df = df.dropna()
    
    # Convertir las columnas de fechas a tipo DateTime en Dask
    for col in ['tpep_pickup_datetime', 'tpep_dropoff_datetime']:
        df[col] = dd.to_datetime(df[col])
    
    # Agregar columnas para fechas y hora_minutos 
    for col in ['tpep_pickup_datetime', 'tpep_dropoff_datetime']:
        df[col + '_fecha'] = df[col].dt.strftime('%Y-%m-%d')
        df[col + '_hora_minuto'] = df[col].dt.strftime('%H:%M:%S')  # Formatear como HH:MM:SS

    # Calcular la duración del viaje en segundos
    df['DuracionViaje'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds()

    # Filtrar registros que no cumplen con las condiciones
    df = df[(df['DuracionViaje'] > 310) & (df['DuracionViaje'] <= 2700)]
    df = df[(df['passenger_count'] > 0) & (df['passenger_count'] <= 4)]
    df = df[(df['trip_distance'] > 0.5) & (df['trip_distance'] <= 13)]
    df = df[(df['total_amount'] > 11.3) & (df['total_amount'] <= 50.25)]
    
    # Eliminar las columnas originales de fecha y hora
    df = df.drop(['tpep_pickup_datetime', 'tpep_dropoff_datetime'], axis=1)
    
    # Convertir passenger_count y DuracionViaje a enteros
    df['passenger_count'] = df['passenger_count'].astype(int)
    df['DuracionViaje'] = df['DuracionViaje'].astype(int)
    
    # Definir mapeo de nombres de columnas a nuevos nombres
    new_column_names = {
        "VendorID": "IdProveedor",
        "passenger_count": "TotalPasajeros",
        "trip_distance": "DistanciaViaje",
        "RatecodeID": "IdTipoTarifa",
        "PULocationID": "IdZonaOrigen",
        "DOLocationID": "IdZonaDestino",
        "payment_type": "IdTipoPago",
        "total_amount": "CostoTotal",
        "tpep_pickup_datetime_fecha": "FechaRecogida",
        "tpep_pickup_datetime_hora_minuto": "HoraRecogida",
        "tpep_dropoff_datetime_fecha": "FechaLlegada",
        "tpep_dropoff_datetime_hora_minuto": "HoraLlegada"
    }

    # Renombrar las columnas del DataFrame de Dask
    df = df.rename(columns=new_column_names)

    return df

# Cargar los datos en un DataFrame distribuido de Dask y particionarlo
directorio = "../datasets/raw/yellow_tripdata_2022-02.parquet"
df_dask = dd.read_parquet(directorio, engine='pyarrow')
df_dask_particionado = df_dask.repartition(npartitions=4)

# Aplicar el procesamiento avanzado a cada partición y escribir en CSV
for i, particion in enumerate(df_dask_particionado.to_delayed()):
    df_particion_procesado = optimize_and_process_dataframe(particion.compute())
    
    # Crear un archivo CSV a partir de cada partición
    ruta_salida = f'../datasets/processed/yellow_analytics/yellow_analytics_part_{i+5}.csv'
    df_particion_procesado.to_csv(ruta_salida, index=False)


Perhaps you already have a cluster running?
Hosting the HTTP server on port 50958 instead
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


## General Preprocessing `Yellow Taxi Trip Records`
### Preprocesamiento Variables de Tipo Fecha `tpep_pickup_datetime` y  `tpep_dropoff_datetime`

In [None]:
#Si las fechas se guardan como cadenas de texto en formato ISO 8601 en el archivo JSON

In [3]:
cluster.close()
client.close()

## PreProcesamiento `High Volume FHV Trip Records`

In [None]:
# Define the Imports 
try:
    import os
    import json
    import math
    from datetime import date, time
    import dask
    import pyarrow as pa
    import pyarrow.parquet as pq
    from dask.distributed import Client, LocalCluster
    import dask.dataframe as dd
    import numpy as np
    import dask.multiprocessing
except Exception as e:
    print("Some Modules are Missing : {} ".format(e))

In [None]:
size = os.path.getsize("..\\datasets\\raw\\fhvhv_tripdata_2022-01.parquet") / math.pow(1024,3)
print("Size in GB : {} ".format(size))

Size in GB : 0.34889130666852 


In [None]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://192.168.100.11:8787/status,

0,1
Dashboard: http://192.168.100.11:8787/status,Workers: 2
Total threads: 2,Total memory: 9.31 GiB
Status: running,Using processes: False

0,1
Comm: inproc://192.168.100.11/8444/1,Workers: 2
Dashboard: http://192.168.100.11:8787/status,Total threads: 2
Started: Just now,Total memory: 9.31 GiB

0,1
Comm: inproc://192.168.100.11/8444/4,Total threads: 1
Dashboard: http://192.168.100.11:57368/status,Memory: 4.66 GiB
Nanny: None,
Local directory: C:\Users\ozi\AppData\Local\Temp\dask-scratch-space\worker-57d4pv5q,Local directory: C:\Users\ozi\AppData\Local\Temp\dask-scratch-space\worker-57d4pv5q

0,1
Comm: inproc://192.168.100.11/8444/5,Total threads: 1
Dashboard: http://192.168.100.11:57369/status,Memory: 4.66 GiB
Nanny: None,
Local directory: C:\Users\ozi\AppData\Local\Temp\dask-scratch-space\worker-u_is3xww,Local directory: C:\Users\ozi\AppData\Local\Temp\dask-scratch-space\worker-u_is3xww


In [None]:
#client.cluster.workers
cluster.dashboard_link

'http://127.0.0.1:8787/status'

In [None]:
df_dask

Unnamed: 0_level_0,hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,trip_time,base_passenger_fare,tolls,bcf,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1
,string,string,string,datetime64[us],datetime64[us],datetime64[us],datetime64[us],int64,int64,float64,int64,float64,float64,float64,float64,float64,float64,float64,float64,string,string,string,string,string
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [None]:
# Obtener el número de columnas
columnas = len(df_dask.columns)

# Obtener el número de filas
num_filas = df_dask.shape[0].compute()

# Imprimir el número de filas y columnas
print("Número de filas:", num_filas)
print("Número de columnas:", columnas)

Número de filas: 14751591.0
Número de columnas: 24


## General Preprocessing `High Volume FHV Trip Records`


In [None]:
# Obtener el esquema de tipos de datos (dtype) de cada columna
schema_dask = df_dask.dtypes
print(schema_dask)

hvfhs_license_num       string[pyarrow]
dispatching_base_num    string[pyarrow]
originating_base_num    string[pyarrow]
request_datetime         datetime64[us]
on_scene_datetime        datetime64[us]
pickup_datetime          datetime64[us]
dropoff_datetime         datetime64[us]
PULocationID                      int64
DOLocationID                      int64
trip_miles                      float64
trip_time                         int64
base_passenger_fare             float64
tolls                           float64
bcf                             float64
sales_tax                       float64
congestion_surcharge            float64
airport_fee                     float64
tips                            float64
driver_pay                      float64
shared_request_flag     string[pyarrow]
shared_match_flag       string[pyarrow]
access_a_ride_flag      string[pyarrow]
wav_request_flag        string[pyarrow]
wav_match_flag          string[pyarrow]
dtype: object


In [None]:
# Obtener los primeros 15 registros del DataFrame de Dask
df_dask.head()

Unnamed: 0,hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,...,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
0,HV0003,B03404,B03404,2022-01-01 00:05:31,2022-01-01 00:05:40,2022-01-01 00:07:24,2022-01-01 00:18:28,170,161,1.18,...,2.21,2.75,0.0,0.0,23.03,N,N,,N,N
1,HV0003,B03404,B03404,2022-01-01 00:19:27,2022-01-01 00:22:08,2022-01-01 00:22:32,2022-01-01 00:30:12,237,161,0.82,...,1.06,2.75,0.0,0.0,12.32,N,N,,N,N
2,HV0003,B03404,B03404,2022-01-01 00:43:53,2022-01-01 00:57:37,2022-01-01 00:57:37,2022-01-01 01:07:32,237,161,1.18,...,2.65,2.75,0.0,0.0,23.3,N,N,,N,N
3,HV0003,B03404,B03404,2022-01-01 00:15:36,2022-01-01 00:17:08,2022-01-01 00:18:02,2022-01-01 00:23:05,262,229,1.65,...,0.7,2.75,0.0,0.0,6.3,N,N,,N,N
4,HV0003,B03404,B03404,2022-01-01 00:25:45,2022-01-01 00:26:01,2022-01-01 00:28:01,2022-01-01 00:35:42,229,141,1.65,...,0.84,2.75,0.0,0.0,7.44,N,N,,N,N


In [1]:
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

# Configurar el clúster de Dask
cluster = LocalCluster(n_workers=2, threads_per_worker=1, memory_limit='12GB')
client = Client(cluster)
def procesamiento_avanzado_particion(df):
    # Filtrar y eliminar columnas no deseadas
    columnas_a_eliminar = ['originating_base_num', 'access_a_ride_flag', 'wav_request_flag', 
                           'wav_match_flag', 'shared_request_flag', 'shared_match_flag',
                           'access_a_ride_flag']
    df = df.drop(columnas_a_eliminar, axis=1)
    
    # Reemplazar valores nulos por 0.00 en columnas relevantes
    relevant_columns = ['base_passenger_fare', 'tolls', 'bcf', 'sales_tax', 'congestion_surcharge',
                        'airport_fee', 'tips', 'driver_pay']
    df = df.fillna({col: 0.00 for col in relevant_columns})

    # Calcular la columna 'total_amount' sumando las columnas relevantes
    df['total_amount'] = df[relevant_columns].sum(axis=1)

    # Eliminar columnas no deseadas
    columnas_a_eliminar = ['tolls', 'bcf', 'sales_tax', 'congestion_surcharge', 
                           'airport_fee', 'tips', 'driver_pay']
    df = df.drop(columnas_a_eliminar, axis=1)
    
    # Convertir columnas de fechas a tipo DateTime en Dask
    for col in ['pickup_datetime', 'dropoff_datetime', 'request_datetime', 'on_scene_datetime']:
        df[col] = dd.to_datetime(df[col])

    # Agregar columnas para fechas, horas, minutos y segundos
    for col in ['pickup_datetime', 'dropoff_datetime', 'request_datetime', 'on_scene_datetime']:
        df[col + '_fecha'] = df[col].dt.strftime('%Y-%m-%d')
        df[col + '_hora_minuto'] = df[col].dt.strftime('%H:%M')  # Formatear como HH:MM
        
    # Calcular la duración del viaje y de espera en segundos
    df['DuracionAtencion'] = (df['on_scene_datetime'] - df['request_datetime']).dt.total_seconds()

    # Eliminar filas con duraciones negativas o nulas
    df = df[df['DuracionAtencion'] >= 0]
    
    # Eliminar columnas originales
    df = df.drop(columns=['pickup_datetime', 'dropoff_datetime', 'request_datetime', 'on_scene_datetime'])

    # Renombrar las columnas del DataFrame de Dask
    new_column_names = {
        "hvfhs_license_num": "IdProveedor",
        "dispatching_base_num": "IdProveedor",
        "PULocationID": "IdZonaOrigen",
        "DOLocationID": "IdZonaDestino",
        "trip_miles": "DistanciaViaje",
        "base_passenger_fare": "TarifaPasajero",
        "pickup_datetime_fecha": "FechaRecogida",
        "pickup_datetime_hora_minuto": "HoraRecogida",
        "dropoff_datetime_fecha": "FechaLlegada",
        "dropoff_datetime_hora_minuto": "HoraLlegada",
        "request_datetime_fecha": "FechaSolicitada",
        "request_datetime_hora_minuto": "HoraSolicitada",
        "on_scene_datetime_fecha": "FechaAtendida",
        "on_scene_datetime_hora_minuto": "HoraAtendida",
        "total_amount": "CostoTotal"
    }
    df = df.rename(columns=new_column_names)

    return df

# Cargar los datos en un DataFrame distribuido de Dask y particionarlo
directorio = "../datasets/raw/fhvhv_tripdata_2023-01.parquet"
df_dask = dd.read_parquet(directorio, engine='pyarrow')
df_dask_particionado = df_dask.repartition(npartitions=4)

# Aplicar el procesamiento avanzado a cada partición y escribir en CSV
for i, particion in enumerate(df_dask_particionado.to_delayed()):
    df_particion_procesado = procesamiento_avanzado_particion(particion.compute())
    
    # Crear un archivo CSV a partir de cada partición
    ruta_salida = f'../datasets/processed/ffvh_analytics/ffvh_analytics_part_{i+1}.csv'
    df_particion_procesado.to_csv(ruta_salida, index=False)

This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Co

In [2]:
# 5. Cerrar el clúster de Dask
client.close()
cluster.close()