In [None]:
# Imports
from pyspark.sql import SparkSession, functions as F, Window

### Preparación del cluster

In [None]:
# Levantamos el Cluster

spark = SparkSession.builder \
    .appName("ClusterLocal") \
    .master("local[*]") \
    .getOrCreate()
    
print("SparkContext creado con el master:", spark.sparkContext.master)

### Preparación del DataFrame

In [None]:
# Cargamos el dato en un DataFrame
ruta_fichero = "../../data/input/dataset.csv"
df_spark = spark.read.csv(ruta_fichero,inferSchema=True, header=True)

# Mostramos las primeras 10 columnas para ver si tiene dato y tiene sentido
# df_spark.show(10)

# Mostramos el schema inferido para ver si es coherente con el dato
# df_spark.printSchema()

# Formateo de la columna Date
df_spark = df_spark.withColumn('Date', F.date_format(F.to_date(F.col('Date'), 'dd-MMM-yy'), 'yyyy-MM-dd'))

# Generamos un índice auxiliar para poder hacer el order by y que no desmonte los datos

# Para ello creamos primero la "ventana de ordenación"
window_spec = Window.orderBy(F.monotonically_increasing_id())

# Creamos ahora el índice en base al row_number con la ventana explicada
df_spark = df_spark.withColumn("aux_index", F.row_number().over(window_spec))

# Una vez generado ese índice, vamos a ir rellenando la columna de Date con el último (last) date que no sea Null, recorriendo uno a uno
df_spark = df_spark.withColumn("Date", F.last("Date", ignorenulls=True).over(window_spec))

df_spark = df_spark.drop("aux_index")

# # Validamos
df_spark.show(30)

# Limpiamos los nombres de las columnas que tienen espacios
df_spark = df_spark.select([F.col(col).alias(col.strip()) for col in df_spark.columns])

### 1. Which date had the most completed trips during the two-week period?

In [None]:
df_trips_by_day = df_spark.groupBy("Date").agg(F.sum("Completed Trips").alias("total_trips"))
df_trips_by_day.orderBy("total_trips",ascending=False).limit(1).show()

### 2. What was the highest number of completed trips within a 24-hour period?

In [None]:
# Creamos una columna auxiliar de Date + Time en formato Unix (para poder hacer luego la resta de segundos para generar una ventana de 24 horas)
df_spark = df_spark.withColumn(
    "DateTime_unix",
    F.to_timestamp(F.concat(F.col("Date"), F.lit(" "), F.col("Time (Local)")), "dd/MM/yyyy H").cast("long"))

# Definir la ventana de tiempo de las últimas 24 horas
window_spec = Window.orderBy("DateTime_unix").rangeBetween(-86400, 0)  # 86400 segundos = 24 horas

# Calcular la suma de "Completed Trips" en una ventana de 24 horas
df_trips_by_24h_period = df_spark.withColumn("sum_last_24_hours", F.sum("Completed Trips").over(window_spec))

# Extraer el máximo y mostrarlo por pantalla
df_trips_by_24h_period.select(F.max(df_trips_by_24h_period.sum_last_24_hours)).show()

# Asignar el valor a una variable para poder mostrar luego el registro donde el sum de horas sea = al valor
max_value = df_trips_by_24h_period.agg(F.max("sum_last_24_hours").alias("max_trips_in_24h")).collect()[0]["max_trips_in_24h"]

df_trips_by_24h_period.filter(F.col("sum_last_24_hours") == max_value).show()
df_trips_by_24h_period.filter(df_trips_by_24h_period.sum_last_24_hours == max_value).show()

# Eliminamos la columna auxiliar que hemos creado
df_spark = df_spark.drop("DateTime_unix")

### 3. Which hour of the day had the most requests during the two-week period?

In [None]:
# Dataframe con el sum de requests por hour
df_requests_per_hour = df_spark.groupBy("Time (Local)").agg(F.sum("Requests").alias("requests_per_hour"))

# Valor máximo
max_requests_hour = df_requests_per_hour.agg(F.max("requests_per_hour").alias("max_requests_hour")).collect()[0]["max_requests_hour"]

df_requests_per_hour.filter(F.col("requests_per_hour") == max_requests_hour).show()

### 4. What percentages of all zeroes during the two-week period occurred on weekend (Friday at 5 pm to Sunday at 3 am)? 

In [None]:
# Tip: The local time value is the start of the hour (e.g. 15 is the hour from 3:00 pm - 4:00 pm)

# Columna is_weekend
df_spark = df_spark.withColumn(
    'is_weekend',
    F.when(
        (F.dayofweek('Date').isin(1, 7)) |  # Caso 1: Domingo o sábado
        ((F.dayofweek('Date') == 6) & (F.col('Time (Local)') >= 17)) |
        ((F.dayofweek('Date') == 2) & (F.col('Time (Local)') <= 3)),  # Caso 2: Viernes y hora >= 17
        True
    ).otherwise(False)
)

df_spark = df_spark.repartition(8)

total_zeroes = df_spark.agg(F.sum("Zeroes").alias("total_zeroes")).collect()[0]["total_zeroes"]
weekend_zeroes = df_spark \
    .filter(F.col("is_weekend")==True) \
    .agg(F.sum("Zeroes").alias("weekend_zeroes")) \
    .collect()[0]["weekend_zeroes"]

percentage_of_weekend_zeroes = (weekend_zeroes / total_zeroes) * 100

percentage_of_weekend_zeroes_str = f"{percentage_of_weekend_zeroes:.2f}%"

print(percentage_of_weekend_zeroes_str)

df_spark = df_spark.drop('is_weekend')

### 5. What is the weighted average ratio of completed trips per driver during the two-week period? 

In [None]:
# Tip: “Weighted average” means your answer should account for the total trip volume in each hour to determine the most accurate number in the whole period.
weighted_avg = df_spark.withColumn("completed_trips_per_driver", df_spark["Completed Trips"] / df_spark["Unique Drivers"]) \
                 .groupBy("Date", "Time (Local)") \
                 .agg(
                    F.avg("completed_trips_per_driver").alias("avg_completed_per_driver"),
                    F.sum("Completed Trips").alias("total_completed_trips")
                 ) \
                 .withColumn("weighted_ratio", F.col("avg_completed_per_driver") * F.col("total_completed_trips")) \
                 .agg(F.sum("weighted_ratio") / F.sum("total_completed_trips")).show()

##### 6. In drafting a driver schedule in terms of 8 hours shifts, when are the busiest 8 consecutive hours over the two-week period in terms of unique requests? A new shift starts every 8 hours. Assume that a driver will work the same shift each day.

In [None]:
# Una sola línea para calcular el bloque de 8 horas con más Requests
busiest_8_hours = (df_spark
    .withColumn("timestamp", F.concat(F.col("Date"), F.lit(" "), F.col("Time (Local)")))
    .withColumn("consecutive_requests", F.sum("Requests").over(Window.orderBy("timestamp").rowsBetween(0, 7)))
    .orderBy(F.desc("consecutive_requests"))
    .limit(1)
).show()

### True or False: Driver supply always increases when demand increases during the two-week period. Tip: Visualize the data to confirm your answer if needed.

In [None]:
# Definir una ventana ordenada por "Date" y "Time (Local)"
window = Window.orderBy("Date", "Time (Local)")

driver_supply_increase = (df_spark
    .withColumn("difference_requests", F.col("Requests")-F.lag("Requests",1).over(window))
    .withColumn("difference_drivers", F.col("Unique Drivers")-F.lag("Unique Drivers",1).over(window))
    .filter(F.col("difference_requests")>0)
    .withColumn("condition_met", 
                F.when((F.col("difference_requests") >= 0) & (F.col("difference_drivers") > 0), True)
                .otherwise(False))
    .select(F.min(("condition_met")))    
    .show()
)

### 8. In which 72-hour period is the ratio of Zeroes to Eyeballs the highest?

In [None]:
window = Window.orderBy("timestamp").rangeBetween(-72 * 60 * 60 * 1000, 0)

zeroes_to_eyeballs = (df_spark
    .withColumn("timestamp", F.to_timestamp(F.concat(F.col("Date"), F.lit(" "), F.col("Time (Local)")),"yyyy-MM-dd H").cast("long") * 1000)
    .withColumn("zeroes_last72h", F.sum("Zeroes").over(window))
    .withColumn("eyeballs_last72h", F.sum("Eyeballs").over(window))
    .withColumn("highest_ratio", F.col("zeroes_last72h")/F.col("eyeballs_last72h"))
    .select(F.max('highest_ratio'))
    .show()
)

### 9. If you could add 5 drivers to any single hour of every day during the two-week period, which hour should you add them to? Hint: Consider both rider eyeballs and driver supply when choosing

In [None]:
# Tiene sentido sumar Riders (que van a hacer varios viajes por periodo) a aquellas horas que menos % de servicio tengan y quizá, que más número de Eyeballs tengan.
# No vale quedarse con 4 eyeballs y 1 rider y veas un 1/4 que un 400 eyeballs y 100 riders. Hay mucho margen de ganancia
# Así que miraría de los que mayor porcentaje tengan, los que tengan mayor número de eyeballs

In [None]:
alpha = 0.5

service_provided = df_spark \
    .withColumn("availability_of_service", F.col("Unique Drivers")/F.col("Eyeballs")) \
    .groupBy("Time (Local)") \
    .agg(
        F.sum("Eyeballs").alias("usuarios_unicos_loggeados"),
        F.sum("Zeroes").alias("usuarios_sin_coche"),
        F.avg("availability_of_service").alias("media_de_servicio")
    )\
    .withColumn("priority_score", 
                (1 / F.col("media_de_servicio")) * F.col("usuarios_unicos_loggeados") * (1 + alpha * F.col("usuarios_sin_coche")) # Fórmula para calcular el impacto de mayor oferta
               ) \
    .orderBy("priority_score", ascending=False) \
    .show()

### 10. Looking at the data from all two weeks, which time might make the most sense to consider a true “end day” instead of midnight? (i.e when are supply and demand at both their natural minimums)

In [None]:
# Voy a buscar aquella hora con menor número de Eyeballs y Drivers
minimum_demand_offer = df_spark \
    .groupBy("Time (Local)") \
    .agg(
        F.avg("Unique Drivers").alias("drivers"),
        F.avg("Eyeballs").alias("eyeballs")
    ) \
    .orderBy(F.col("drivers").asc(), F.col("eyeballs").asc()) \
    .show()
    
# Me quedo con el que tenga el menor número de drivers puesto que directamente no habrá opción de negocio