## Importar librerias

In [9]:
import os
# Importar las bibliotecas necesarias
import findspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import col, lit, concat_ws, collect_set, from_unixtime, date_format, udf, array, avg, sum, count
from pyspark.sql.types import TimestampType, StructField, StringType, IntegerType, StructType, DoubleType
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import holidays
from pyspark.ml.linalg import Vectors, VectorUDT

## Inicializar pyspark

In [4]:
findspark.init()
findspark.find()
# Importar las bibliotecas necesarias
sc = SparkContext.getOrCreate()
# Primera sesión de Spark
spark = SparkSession.builder \
    .appName("Trufi") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.executor.memory", "3g") \
    .config("spark.driver.host", "localhost") \
    .config("spark.master", "local") \
    .getOrCreate()

# Configurar el número de particiones
spark.conf.set("spark.sql.shuffle.partitions", "4")
spark.sparkContext

## Obtener rutas de los Datasets

In [5]:
# Obtener la ruta de la carpeta del notebook
notebook_folder = os.getcwd()
root_project = os.path.abspath(os.path.join(notebook_folder, '..'))
dataset_logs = os.path.abspath(os.path.join(root_project, 'Datos', 'Logs'))
trufi_datos = os.path.abspath(os.path.join(root_project, 'Datos', 'Registros de Trufi App'))
municipios_datos = os.path.abspath(os.path.join(root_project, 'Datos', 'Poligonos de Cochabamba','region_cochabamba_2018.geojson'))
lagos_datos = os.path.abspath(os.path.join(root_project, 'Datos', 'Poligonos de Cochabamba','region_cochabamba_2018.geojson'))
clima_datos = os.path.abspath(os.path.join(root_project, 'Datos', 'Datos del clima','weather.csv'))


## Obtener rutas de los Datasets

In [None]:
# Leer el archivo CSV en un DataFrame
df = pd.read_csv(csv_file_path, parse_dates=['date'])



In [None]:
# Lee el archivo CSV en un DataFrame
df = spark.read.csv("file:///" + out_agg_archivo_csv, header=True, inferSchema=True)

# Muestra el esquema del DataFrame
df.printSchema()

# Muestra las primeras filas del DataFrame
df.show()

# Lee el archivo CSV en un DataFrame de Spark
df_spark_result = spark.read.csv(weather_csv, header=True, inferSchema=True)

# Selecciona solo las columnas necesarias del DataFrame de Spark resultante
df_spark_selected = df_spark_result.select(
    'datetime',
    'temp',
    'humidity',
    'precip',
    'cloudcover',
    'windspeed'
)

# Renombra la columna 'hourly_timestamp' a 'datetime'
df_spark_selected = df_spark_selected.withColumnRenamed('hourly_timestamp', 'datetime')

# Muestra el esquema y las primeras filas del DataFrame de Spark resultante con las columnas seleccionadas
df_spark_selected.printSchema()
df_spark_selected.count()

In [None]:
# Define la fecha límite
fecha_limite = '2023-12-27'

# Filtra el DataFrame de Spark

df_spark_result_filtered = df_spark_selected.filter(col('datetime') <= fecha_limite)
df_spark_result_filtered.show()

In [None]:
# Crea la tabla temporal para df_spark_result_filtered
df_spark_result_filtered.createOrReplaceTempView("df_spark_result_filtered")

# Clona el DataFrame df
df_clone = df.select(*df.columns)

# Realiza la fusión (merge) utilizando PySpark
df_final = df_clone.join(
    df_spark_result_filtered,
    df_clone["hourly_timestamp"] == df_spark_result_filtered["datetime"],
    how="left"
)

# Muestra el DataFrame resultante
df_final.show(100)
df_final.count()

In [None]:
# Crear un objeto para Bolivia
bo_holidays = holidays.country_holidays('BO', subdiv='C', years=[2022, 2023])

# Obtener los feriados para 2022 y 2023
holidays_list = [(date, name) for date, name in sorted(bo_holidays.items())]

# Crear un DataFrame con la lista de feriados
holidays_df = pd.DataFrame(holidays_list, columns=["date", "holiday"])
# Creating new column contain 1 for the holiday
holidays_df['Holiday_n']=1
# Muestra los primeros registros del DataFrame
print(holidays_df.head())


In [None]:
# Crear DataFrames de Spark desde los DataFrames de Pandas
holidays_df_spark = spark.createDataFrame(holidays_df)

# Realizar la unión utilizando Spark SQL
df_final.createOrReplaceTempView("df_final")
holidays_df_spark.createOrReplaceTempView("holidays_df_spark")

query = """
    SELECT *
    FROM df_final t
    LEFT JOIN holidays_df_spark h
    ON t.datetime = h.date
"""
df_final_merged_spark = spark.sql(query)
df_final_merged_spark = df_final_merged_spark.drop("date").drop("holiday")

# Reemplazar los valores nulos en la columna 'Holiday_n' con 0
df_final_merged_spark = df_final_merged_spark.fillna(0, subset=["Holiday_n"])

# Muestra el DataFrame resultante
df_final_merged_spark.count()
df_final_merged_spark.describe()

In [None]:
# Seleccionar las columnas relevantes para el análisis de correlación
selected_columns = ["hourly_timestamp","origin_request_count", "OriginLocationID", "destination_request_count", "Holiday_n", "windspeed", "cloudcover", "precip", "humidity", "temp", "isWeekend","DayOfWeek","Hour","DayOfMonth","Month","Year"]  # Agrega tus columnas aquí

# Crear un DataFrame de Spark solo con las columnas seleccionadas
selected_data = df_final_merged_spark.select(selected_columns)

# Definir una función UDF para convertir las columnas en un vector
vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

# Aplicar la conversión manualmente a las columnas seleccionadas
for col_name in selected_columns:
    selected_data = selected_data.withColumn(col_name, col(col_name).cast(DoubleType()))

selected_data.show()

In [None]:

selected_data_p = selected_data.toPandas()
# Filtra las filas que contienen al menos un valor NaN en cualquier columna
condition = selected_data_p["temp"].isna() | selected_data_p["precip"].isna() | selected_data_p["humidity"].isna() | selected_data_p["cloudcover"].isna()
df_final_with_nan = selected_data_p[condition]

# Muestra el DataFrame resultante
df_final_with_nan
len(df_final_with_nan)

In [None]:
selected_data_p.corr()['origin_request_count'].sort_values().drop('origin_request_count').plot(kind='bar',figsize=(10,6))

In [None]:

plt.figure(figsize=(6, 6))

sns.heatmap(selected_data_p.corr(), linewidth=0.3, linecolor='black', cmap="YlGnBu")

In [None]:
# Lee el archivo de H3 en un DataFrame de PySpark
h3_df = spark.read.csv("D:\Trufiapp\GANS\id_index_h3.csv", header=True, inferSchema=True)
# Convierte el DataFrame de Pandas a un DataFrame de PySpark
selected_data_spark = spark.createDataFrame(selected_data_p)
# Convierte la columna 'hourly_timestamp' a tipo fecha
selected_data_spark = selected_data_spark.withColumn('hourly_timestamp', from_unixtime('hourly_timestamp').cast(TimestampType()))

# Realiza la unión utilizando la condición de igualdad en las columnas
df_final = selected_data_spark.join(h3_df, col("OriginLocationID") == col("id"), "left")
df_final.show(20)

In [None]:
df_final.count()
# Muestra los últimos elementos (por ejemplo, los últimos 10)
df_final.orderBy(col("hourly_timestamp").desc()).show(10)

In [None]:

# Especifica la ruta del archivo CSV en el sistema de archivos local
csv_local_path = "file:///" + datasetmerged
# selected_columns = ["UniqueColumn_G", "hourly_timestamp", "OriginLocationID", "destination_request_count", "origin_request_count", "Year", "Month", "DayOfMonth", "Hour", "DayOfWeek", "isWeekend", "id"]
# df_selected = df_final.select(selected_columns)
# Guardar el DataFrame en formato CSV en el sistema de archivos local
df_final.write.mode("overwrite").option("header", "true").csv(csv_local_path)


In [None]:
# Agrupar por 'h3_index' y calcular el promedio de 'origin_request_count' para cada índice
average_trips_per_zone = df_final.groupBy('h3_index').agg(avg('origin_request_count').alias('average_trips'))

# Ordenar por el promedio de viajes y tomar los 10 principales
top_10_zones = average_trips_per_zone.orderBy('average_trips', ascending=False).limit(60)

# Recopilar los datos de PySpark DataFrame y convertirlos a listas de Python
h3_index_data = top_10_zones.select('h3_index').collect()
average_trips_data = top_10_zones.select('average_trips').collect()

# Extraer los valores de las listas
h3_index_values = [row['h3_index'] for row in h3_index_data]
average_trips_values = [row['average_trips'] for row in average_trips_data]

# Crear el gráfico de barras horizontal
fig, ax = plt.subplots(figsize=(20, 9))
ax.barh(h3_index_values, average_trips_values)

# Personalizar la visualización
ax.set_xlabel('Número promedio de viajes')
ax.set_ylabel('Índice H3')
ax.set_title('Top 10 zonas por número promedio de viajes')

# Mostrar el gráfico
plt.show()


In [None]:
# Agrupar por 'h3_index' y calcular el promedio de 'origin_request_count' para cada índice
average_trips_per_zone = df_final.groupBy('h3_index').agg(avg('destination_request_count').alias('average_trips'))

# Ordenar por el promedio de viajes y tomar los 10 principales
top_10_zones = average_trips_per_zone.orderBy('average_trips', ascending=False).limit(60)

# Recopilar los datos de PySpark DataFrame y convertirlos a listas de Python
h3_index_data = top_10_zones.select('h3_index').collect()
average_trips_data = top_10_zones.select('average_trips').collect()

# Extraer los valores de las listas
h3_index_values = [row['h3_index'] for row in h3_index_data]
average_trips_values = [row['average_trips'] for row in average_trips_data]

# Crear el gráfico de barras horizontal
fig, ax = plt.subplots(figsize=(20, 9))
ax.barh(h3_index_values, average_trips_values)

# Personalizar la visualización
ax.set_xlabel('Número promedio de viajes')
ax.set_ylabel('Índice H3')
ax.set_title('Top 10 zonas por número promedio de viajes')

# Mostrar el gráfico
plt.show()

In [None]:
# Agrupar por 'h3_index' y calcular la suma total y el recuento de solicitudes de origen y destino
total_trips_per_zone = df_final.groupBy('h3_index', 'id').agg(sum('origin_request_count').alias('total_origin_trips'),
                                                                    sum('destination_request_count').alias('total_destination_trips'),
                                                                    count('*').alias('total_trips_count'))

# Calcular la suma total de viajes sumando las solicitudes de origen y destino
total_trips_per_zone = total_trips_per_zone.withColumn('total_trips', col('total_origin_trips') + col('total_destination_trips'))

# Calcular el promedio dividiendo la suma total entre el recuento total
total_trips_per_zone = total_trips_per_zone.withColumn('average_trips', col('total_trips') / col('total_trips_count'))

# Ordenar por el promedio de viajes y tomar los 10 principales
top_10_zones = total_trips_per_zone.orderBy('average_trips', ascending=False).limit(15)

# Recopilar los datos de PySpark DataFrame y convertirlos a listas de Python
h3_index_data = top_10_zones.select('h3_index', 'id').collect()
average_trips_data = top_10_zones.select('average_trips').collect()

# Extraer los valores de las listas
h3_index_values = [f"{row['h3_index']} (ID {row['id']})" for row in h3_index_data]
average_trips_values = [row['average_trips'] for row in average_trips_data]

# Crear el gráfico de barras horizontal
fig, ax = plt.subplots(figsize=(20, 9))
ax.barh(h3_index_values, average_trips_values)

# Personalizar la visualización
ax.set_xlabel('Número promedio de viajes')
ax.set_ylabel('Índice H3 (ID de Zona)')
ax.set_title('Top 10 zonas por número promedio de viajes')

# Mostrar el gráfico
plt.show()


In [None]:
spark.stop()