In [None]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=fe4160f0c7f7c89ea98c11e7b61d9f59a411a9597aefd496d2e1ab4732cf597a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


###***Importacion de librerias***

In [None]:
import pandas as pd
import pyarrow
import requests
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
import seaborn as sns
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import when
from pyspark.sql.window import Window
from pyspark.sql.functions import min, max
from pyspark.sql.functions import hour
from pyspark.sql.functions import to_timestamp, round
from pyspark.sql import SparkSession
from functools import reduce


**Lee y carga el Archivo**

In [None]:
# Crea una sesión de Spark
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Ruta de la carpeta que contiene los archivos Parquet
ruta = "/content/drive/MyDrive/Proyecto FInal Henry/datos_tripdata/"

# Lista para almacenar los DataFrames de cada archivo
dfs = []

# Itera sobre los años 2022 y 2023 y sobre todos los meses
for year in range(2022, 2024):
    for month in range(1, 13):
        filename = f"yellow_tripdata_{year}-{month:02d}.parquet"
        try:
            df = spark.read.parquet(ruta + filename)
            dfs.append(df)
        except:
            print(f"No se encontró el archivo {filename}")


df_total = reduce(lambda df1, df2: df1.union(df2), dfs)
df_total.printSchema()
df_total.show()


root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+----

**>>> ETL MODELOS ML EMISIONES**

**Elimina columnas innecesarias**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Crear una sesión de Spark
spark = SparkSession.builder \
    .appName("Eliminar columnas") \
    .getOrCreate()

# Suponiendo que tu DataFrame se llama df_total

# Eliminar las columnas especificadas
columnas_a_eliminar = ["VendorID", "congestion_surcharge" , "improvement_surcharge" , "passenger_count", "payment_type", "tpep_pickup_datetime", "tpep_dropoff_datetime", "RatecodeID", "store_and_fwd_flag",
                       "PULocationID", "DOLocationID", "fare_amount", "extra", "mta_tax", "tip_amount",
                       "tolls_amount", "total_amount", "airport_fee"]

df_total_emis = df_total.drop(*columnas_a_eliminar)


**Eliminación de nulos**

In [None]:
# Eliminar registros con valores nulos en todas las columnas
df_total_sin_nulos = df_total_emis.dropna(how="all")



**Agrega columnas con cálculos necesarios de consumo y costo de combustible y emisiones**

In [None]:
from pyspark.sql.functions import col

#Costo de combustible por milla = Precio del combustible por galón / Millas por galón

#Donde:
#- *Precio del combustible por galón* es el precio del combustible por galón en dólares.
#- *Millas por galón* es la cantidad de millas que el vehículo puede recorrer con un galón de combustible.

#Por ejemplo, si el precio del combustible por galón es de $3.00 y el vehículo tiene un consumo de combustible promedio de 25 millas por galón, el costo de combustible por milla sería:




# Calcular el costo de combustible (suponiendo un costo de $0.12 por milla)
df_total_sin_nulos = df_total_sin_nulos.withColumn("fuel_cost", col("trip_distance") * 0.12)

# Calcular la eficiencia de combustible por milla
df_total_sin_nulos = df_total_sin_nulos.withColumn("fuel_efficiency_per_mile", col("trip_distance") / col("fuel_cost"))

# Calcular el consumo de combustible por viaje
df_total_sin_nulos = df_total_sin_nulos.withColumn("fuel_consumption_per_trip", col("fuel_efficiency_per_mile") * col("trip_distance"))

# Calcular el CO2 por milla (suponiendo una tasa de emisión de 0.5 toneladas de CO2 por milla)
df_total_sin_nulos = df_total_sin_nulos.withColumn("co2_per_mile", col("trip_distance") * 0.5)


**Redondea los cálculos a 2 decimales**

In [None]:
from pyspark.sql.functions import col, round

# Redondear los valores de las columnas especificadas
df_total_redondeado = df_total_sin_nulos \
    .withColumn("trip_distance", round(col("trip_distance"), 2)) \
    .withColumn("fuel_efficiency_per_mile", round(col("fuel_efficiency_per_mile"), 2)) \
    .withColumn("fuel_consumption_per_trip", round(col("fuel_consumption_per_trip"), 2)) \
    .withColumn("fuel_cost", round(col("fuel_cost"), 2))

# Verificar el DataFrame resultante
df_total_redondeado.show()


+-------------+---------+------------------------+-------------------------+------------+
|trip_distance|fuel_cost|fuel_efficiency_per_mile|fuel_consumption_per_trip|co2_per_mile|
+-------------+---------+------------------------+-------------------------+------------+
|          3.8|     0.46|                    8.33|                    31.67|         1.9|
|          2.1|     0.25|                    8.33|                     17.5|        1.05|
|         0.97|     0.12|                    8.33|                     8.08|       0.485|
|         1.09|     0.13|                    8.33|                     9.08|       0.545|
|          4.3|     0.52|                    8.33|                    35.83|        2.15|
|         10.3|     1.24|                    8.33|                    85.83|        5.15|
|         5.07|     0.61|                    8.33|                    42.25|       2.535|
|         2.02|     0.24|                    8.33|                    16.83|        1.01|
|         

**Exporta archivo en formato parquet**

In [None]:
'''# Ruta para guardar el archivo Parquet con el nombre "df_ml_emis"
output_path_parquet = "/content/drive/MyDrive/Proyecto FInal Henry/DATA/df_ml_emis.parquet"

# Guarda el DataFrame como un solo archivo Parquet
merged_df_sin_nulos.coalesce(1).write.parquet(output_path_parquet, mode="overwrite")'''

'# Ruta para guardar el archivo Parquet con el nombre "df_ml_emis"\noutput_path_parquet = "/content/drive/MyDrive/Proyecto FInal Henry/DATA/df_ml_emis.parquet"\n\n# Guarda el DataFrame como un solo archivo Parquet\nmerged_df_sin_nulos.coalesce(1).write.parquet(output_path_parquet, mode="overwrite")'

**Exporta archivo en formato csv**

In [None]:
'''
# Ruta para guardar el archivo CSV con el nombre "df_ml_emis"
output_path_csv = "/content/drive/MyDrive/Proyecto FInal Henry/DATA*/df_ml_emis.csv"

# Guarda el DataFrame como un solo archivo CSV
merged_df_sin_nulos.coalesce(1).write.csv(output_path_csv, mode="overwrite", header=True)'''

'\n# Ruta para guardar el archivo CSV con el nombre "df_ml_emis"\noutput_path_csv = "/content/drive/MyDrive/Proyecto FInal Henry/DATA*/df_ml_emis.csv"\n\n# Guarda el DataFrame como un solo archivo CSV\nmerged_df_sin_nulos.coalesce(1).write.csv(output_path_csv, mode="overwrite", header=True)'

**>>> ETL MODELOS ML VIAJES Y FRANJA HORARIA**

**Filtra solo las columnas necesarias para ML**

In [None]:
df_total_fh = df_total [['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'total_amount', 'congestion_surcharge', 'airport_fee']]

**Crea la columna 'date_only' solo con el valor de la fecha para hacer un merge**

In [None]:
# Convierte la columna 'tpep_pickup_datetime' a tipo timestamp
df_total_fh = df_total_fh.withColumn('tpep_pickup_datetime', F.to_timestamp(df_total_fh['tpep_pickup_datetime']))

# Extrae la parte de la fecha y asigna a una nueva columna 'date_only'
df_total_fh = df_total_fh.withColumn('date_only', F.to_date(df_total_fh['tpep_pickup_datetime']))

# Muestra el DataFrame con la nueva columna 'date_only'
#df_total.show()

**DATOS DEL TIEMPO**

**Lee y carga el archivo**

In [None]:
# Especifica la ruta del archivo CSV en Google Drive
ruta_archivo_csv = '/content/drive/MyDrive/Proyecto FInal Henry/DATA*/daily_weather_data.csv'

# Leer el archivo CSV en un DataFrame de PySpark
  daily_weather_data = spark.read.csv(ruta_archivo_csv, header=True, inferSchema=True)

# Mostrar el DataFrame
daily_weather_data.show()

+-------------------+------------------+------------------+-------------------+-----------------+
|               date|temperature_2m_max|temperature_2m_min|temperature_2m_mean|precipitation_sum|
+-------------------+------------------+------------------+-------------------+-----------------+
|2010-01-01 04:00:00|            5.3305|           -4.0695|         0.29091665|        1.8000001|
|2010-01-02 04:00:00|       -0.86950004|           -9.3695|         -3.5465834|              0.7|
|2010-01-03 04:00:00|        -4.8694997|          -10.1195|         -7.6820006|              0.0|
|2010-01-04 04:00:00|           -0.7195|           -7.3195|          -4.873667|              0.0|
|2010-01-05 04:00:00|      -0.119500004|        -7.3694997|         -4.6486664|              0.0|
|2010-01-06 04:00:00|            1.1805|        -5.3694997|         -2.8653333|              0.0|
|2010-01-07 04:00:00|            3.6805|           -3.8195|         -1.2924166|              0.0|
|2010-01-08 04:00:00

**Filtra el dataframe con la columnasnecesarias para el ML**

In [None]:
daily_weather_data = daily_weather_data [['date', 'temperature_2m_mean', 'precipitation_sum']]
daily_weather_data.show()

+-------------------+-------------------+-----------------+
|               date|temperature_2m_mean|precipitation_sum|
+-------------------+-------------------+-----------------+
|2010-01-01 04:00:00|         0.29091665|        1.8000001|
|2010-01-02 04:00:00|         -3.5465834|              0.7|
|2010-01-03 04:00:00|         -7.6820006|              0.0|
|2010-01-04 04:00:00|          -4.873667|              0.0|
|2010-01-05 04:00:00|         -4.6486664|              0.0|
|2010-01-06 04:00:00|         -2.8653333|              0.0|
|2010-01-07 04:00:00|         -1.2924166|              0.0|
|2010-01-08 04:00:00|         -1.5611666|       0.70000005|
|2010-01-09 04:00:00|          -5.654917|              0.0|
|2010-01-10 04:00:00|         -6.7007504|              0.0|
|2010-01-11 04:00:00|         -4.1486664|              0.0|
|2010-01-12 04:00:00|          -2.254917|              0.0|
|2010-01-13 04:00:00|         -2.9236662|              0.0|
|2010-01-14 04:00:00|         -0.8174166

**Crea la columna 'date_only' solo con el valor de la fecha para hacer un merge**

In [None]:
# Convierte la columna 'date' a tipo timestamp
daily_weather_data = daily_weather_data.withColumn('date', F.to_timestamp(daily_weather_data['date']))

# Extrae la parte de la fecha y asigna a una nueva columna 'date_only'
daily_weather_data = daily_weather_data.withColumn('date_only', F.to_date(daily_weather_data['date']))

# Muestra el DataFrame con la nueva columna 'date_only'
#daily_weather_data.show()

**Realiza la union de ambos dataframes**

In [None]:
# Realizar la unión de DataFrames en PySpark
merged_df = df_total_fh.join(daily_weather_data, on='date_only', how='left')

# Mostrar el DataFrame resultante
#merged_df.show()


**Imputacion de nulos**

In [None]:
# Reemplazar los valores nulos en la columna 'passenger_count' con 1
merged_df = merged_df.withColumn('passenger_count', when(merged_df['passenger_count'].isNull(), 1).otherwise(merged_df['passenger_count']))

# Reemplazar los valores nulos en la columna 'congestion_surcharge' con 0
merged_df = merged_df.withColumn('congestion_surcharge', when(merged_df['congestion_surcharge'].isNull(), 0).otherwise(merged_df['congestion_surcharge']))

# Reemplazar los valores nulos en la columna 'airport_fee' con 0
merged_df = merged_df.withColumn('airport_fee', when(merged_df['airport_fee'].isNull(), 0).otherwise(merged_df['airport_fee']))

# Mostrar el DataFrame resultante
#merged_df.show()


**Elimina filas con valores nulos**

In [None]:
# Eliminar filas con valores nulos
merged_df_sin_nulos = merged_df.na.drop()

**Convierte las columnas de precipitaciones, congestion y aeropuerto en 0 si no ocurrió el evento y 1 si ocurrió**

In [None]:
# Reemplazar los valores distintos de cero por 1 en la columna 'temperature_2m_mean'
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('precipitation_sum', when(merged_df_sin_nulos['precipitation_sum'] != 0, 1).otherwise(0))

# Reemplazar los valores distintos de cero por 1 en la columna 'congestion_surcharge'
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('congestion_surcharge', when(merged_df_sin_nulos['congestion_surcharge'] != 0, 1).otherwise(0))

# Reemplazar los valores distintos de cero por 1 en la columna 'airport_fee'
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('airport_fee', when(merged_df_sin_nulos['airport_fee'] != 0, 1).otherwise(0))

# Mostrar el DataFrame resultante
#merged_df_sin_nulos.show()

**MODIFICANDO GRANULARIDAD DE LOS DATOS**

**Crea la columna franja horaria a partir de la hora de iniciado el viaje 'tpep_pickup_datetime'**

In [None]:
# Convertir la columna 'tpep_pickup_datetime' a tipo timestamp
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('tpep_pickup_datetime', F.to_timestamp(merged_df_sin_nulos['tpep_pickup_datetime']))

# Extraer la hora de la columna 'tpep_pickup_datetime' y asignarla a la nueva columna 'franja_horaria'
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('franja_horaria', hour(merged_df_sin_nulos['tpep_pickup_datetime']))

# Mostrar el DataFrame resultante
#merged_df_sin_nulos.show()


**Calcula la duración del viaje en minutos restando las columnas de inicio y fin del viaje**

In [None]:
# Convertir las columnas 'tpep_pickup_datetime' y 'tpep_dropoff_datetime' a tipo datetime si no lo están
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('tpep_pickup_datetime', to_timestamp(merged_df_sin_nulos['tpep_pickup_datetime']))
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('tpep_dropoff_datetime', to_timestamp(merged_df_sin_nulos['tpep_dropoff_datetime']))

# Calcular la duración del viaje en minutos y redondear el resultado a 2 decimales
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('duracion_viaje_minutos',
                                                     round(((F.col('tpep_dropoff_datetime').cast('long') - F.col('tpep_pickup_datetime').cast('long')) / 60), 2))

# Mostrar el DataFrame con la nueva columna
#merged_df_sin_nulos.show()


**Agrupa las columnas por fechas y franjas horarias**

In [None]:
# Define la ventana de partición
window_spec = Window.partitionBy('date_only', 'franja_horaria')

# Agrupar por fecha y franja horaria y sumar los viajes en cada grupo
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('suma_viajes', F.count('passenger_count').over(window_spec))

# Agrupar por fecha y franja horaria y sumar los pasajeros en cada grupo
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('suma_pasajeros', F.sum('passenger_count').over(window_spec))

# Aquí agrupamos por 'date_only' y 'franja_horaria' y sumamos 'trip_distance'
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('suma_distancias', F.sum('trip_distance').over(window_spec))

# Aquí agrupamos por 'date_only' y 'franja_horaria' y sumamos 'total_amount'
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('suma_tarifas', F.sum('total_amount').over(window_spec))

# Aquí agrupamos por 'date_only' y 'franja_horaria' y promediamos 'congestion_surcharge'
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('promedio_congestion', F.avg('congestion_surcharge').over(window_spec))

# Aquí agrupamos por 'date_only' y 'franja_horaria' y promediamos 'airport_fee'
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('promedio_aeropuerto', F.avg('airport_fee').over(window_spec))

# Aquí agrupamos por 'date_only' y 'franja_horaria' y promediamos 'temperature_2m_mean'
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('promedio_temperatura', F.avg('temperature_2m_mean').over(window_spec))

# Aquí agrupamos por 'date_only' y 'franja_horaria' y promediamos 'precipitation_sum'
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('promedio_presipitaciones', F.avg('precipitation_sum').over(window_spec))

# Aquí agrupamos por 'date_only' y 'franja_horaria' y sumamos 'duracion_viaje_minutos'
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('suma_duracion_viajes_minutos', F.sum('duracion_viaje_minutos').over(window_spec))


# Redondear todas las salidas a 2 decimales
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('suma_viajes', F.round('suma_viajes', 2))
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('suma_pasajeros', F.round('suma_pasajeros', 2))
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('suma_distancias', F.round('suma_distancias', 2))
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('suma_tarifas', F.round('suma_tarifas', 2))
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('promedio_congestion', F.round('promedio_congestion', 2))
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('promedio_aeropuerto', F.round('promedio_aeropuerto', 2))
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('promedio_temperatura', F.round('promedio_temperatura', 2))
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('promedio_presipitaciones', F.round('promedio_presipitaciones', 2))
merged_df_sin_nulos = merged_df_sin_nulos.withColumn('suma_duracion_viajes_minutos', F.round('suma_duracion_viajes_minutos', 2))

# Mostrar el DataFrame resultante
#merged_df_sin_nulos.show()



**Elimina columnas innecesarias**

In [None]:
# Lista de columnas a eliminar
columnas_a_eliminar = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'duracion_viaje_minutos','passenger_count', 'trip_distance', 'total_amount', 'congestion_surcharge', 'airport_fee', 'pickup_date', 'date', 'temperature_2m_mean', 'precipitation_sum']

# Eliminar las columnas
merged_df_sin_nulos = merged_df_sin_nulos.drop(*columnas_a_eliminar)

# Mostrar el DataFrame resultante
#merged_df_sin_nulos.show()

**Elimina filas duplicadas**

In [None]:
# Eliminar filas completamente duplicadas
merged_df_sin_nulos = merged_df_sin_nulos.dropDuplicates()


In [None]:
# Mostrar el DataFrame resultante
merged_df_sin_nulos.show()

+----------+--------------+-----------+--------------+---------------+------------+-------------------+-------------------+--------------------+------------------------+----------------------------+
| date_only|franja_horaria|suma_viajes|suma_pasajeros|suma_distancias|suma_tarifas|promedio_congestion|promedio_aeropuerto|promedio_temperatura|promedio_presipitaciones|suma_duracion_viajes_minutos|
+----------+--------------+-----------+--------------+---------------+------------+-------------------+-------------------+--------------------+------------------------+----------------------------+
|2022-01-01|            14|       3324|        5220.0|       37575.27|    73869.51|               0.87|                0.1|                9.66|                     1.0|                     56106.3|
|2022-01-01|            21|       2703|        4176.0|       10739.83|    57309.62|               0.87|               0.13|                9.66|                     1.0|                    44048.79|
|2022

**Exporta archivo en formato parquet**

In [None]:
'''# Ruta para guardar el archivo Parquet con el nombre "df_ml_fh"
output_path_parquet = "/content/drive/MyDrive/Proyecto FInal Henry/DATA/df_ml_fh.parquet"

# Guarda el DataFrame como un solo archivo Parquet
merged_df_sin_nulos.coalesce(1).write.parquet(output_path_parquet, mode="overwrite")'''




**Exporta archivo en formato csv**

In [None]:
'''
# Ruta para guardar el archivo CSV con el nombre "df_ml_fh"
output_path_csv = "/content/drive/MyDrive/Proyecto FInal Henry/DATA*/df_ml_fh.csv"

# Guarda el DataFrame como un solo archivo CSV
merged_df_sin_nulos.coalesce(1).write.csv(output_path_csv, mode="overwrite", header=True)

'''
