# Imports

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime

# -*- coding: utf-8 -*-
from __future__ import print_function

# A) Viajes Nocturnos (00:00-01:00) por Día

In [None]:
def A():
    # Celda A1: Spark SQL
    print("A1) Spark SQL - Viajes nocturnos por día:")
    spark.sql("""
        SELECT 
            DATE(tpep_pickup_datetime) AS dia,
            COUNT(*) AS num_viajes
        FROM taxi_data
        WHERE HOUR(tpep_pickup_datetime) = 0
        GROUP BY dia
        ORDER BY dia
    """).show(30, truncate=False)

    # Celda A2: API DataFrame
    print("\nA2) API DataFrame - Viajes nocturnos por día:")
    (df_clean
     .filter(hour("tpep_pickup_datetime") == 0)
     .groupBy(to_date("tpep_pickup_datetime").alias("dia"))
     .count()
     .orderBy("dia")
     .show(30, truncate=False))

    # Celda A3: Notación estilo RDD
    print("A3) Spark RDD - Viajes nocturnos por día:")
    nocturnos_por_dia = (df_rdd.rdd
                         .filter(lambda x: x.tpep_pickup_datetime.hour == 0)
                         .map(lambda x: (x.tpep_pickup_datetime.date(), 1))
                         .reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[0])
                        )

    for dia, num in nocturnos_por_dia.collect()[:30]:
        print(str(dia) + "\t" + str(num))
        
    print()

# B) Viajes Nocturnos (00:00-01:00) por Mes

In [None]:
def B():
    # Celda B1: Spark SQL
    print("B1) Spark SQL - Viajes nocturnos por mes:")
    spark.sql("""
        SELECT 
            MONTH(tpep_pickup_datetime) AS mes,
            COUNT(*) AS num_viajes
        FROM taxi_data
        WHERE HOUR(tpep_pickup_datetime) = 0
        GROUP BY mes
    """).show()

    # Celda B2: API DataFrame

    print("\nB2) API DataFrame - Viajes nocturnos por mes:")
    (df_clean
     .filter(hour("tpep_pickup_datetime") == 0)
     .groupBy(month("tpep_pickup_datetime").alias("mes"))
     .count()
     .show())

    # Celda B3: Viajes nocturnos por mes (RDD)
    print("B3) Spark RDD - Viajes nocturnos por mes:")

    resultados = (df_rdd.rdd
        .filter(lambda r: r.tpep_pickup_datetime.hour == 0)
        .map(lambda r: (r.tpep_pickup_datetime.month, 1))
        .reduceByKey(lambda a, b: a + b)
        .sortBy(lambda x: x[0])
        .collect()
    )

    print("Mes\tTotal Viajes")
    for mes, total in resultados:
        print("%d\t%d" % (mes, total))
    
    print()

# C) Media de Viajes por Mes por Conductor

In [None]:
def C():
    # Celda C1: Spark SQL
    print("C1) Spark SQL - Media viajes/mes por conductor:")
    spark.sql("""
        SELECT 
            VendorID,
            AVG(viajes_por_mes) AS media_viajes_mes
        FROM (
            SELECT 
                VendorID, 
                MONTH(tpep_pickup_datetime) AS mes,
                COUNT(*) AS viajes_por_mes
            FROM taxi_data
            GROUP BY VendorID, mes
        )
        GROUP BY VendorID
    """).show()

    # Celda C2: API DataFrame
    print("\nC2) API DataFrame - Media viajes/mes por conductor:")
    (df_clean
     .groupBy("VendorID", month("tpep_pickup_datetime").alias("mes"))
     .count()
     .groupBy("VendorID")
     .agg(avg("count").alias("media_viajes_mes"))
     .show())

    # Celda C3: Media viajes/mes por conductor (RDD)
    print("C3) Spark RDD - Media viajes/mes por conductor:")

    viajes_mes_conductor = (df_rdd.rdd
        .map(lambda r: ((r.VendorID, r.tpep_pickup_datetime.month), 1))
        .reduceByKey(lambda a, b: a + b)
    )

    media_viajes = (viajes_mes_conductor
        .map(lambda x: (x[0][0], (x[1], 1)))
        .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
        .mapValues(lambda x: x[0] / float(x[1]))
        .sortBy(lambda x: x[0])
    )

    print("VendorID | Media Viajes/Mes")
    for vendor_id, media in media_viajes.collect():
        print(str(vendor_id) + " |  " + str(media))
    
    print()

# D) Media de Viajes por Día por Conductor

In [None]:
def D():
    # Celda D1: Spark SQL
    print("D1) Spark SQL - Media viajes/día por conductor:")
    spark.sql("""
        SELECT 
            VendorID,
            AVG(viajes_por_dia) AS media_viajes_dia
        FROM (
            SELECT 
                VendorID, 
                DATE(tpep_pickup_datetime) AS dia,
                COUNT(*) AS viajes_por_dia
            FROM taxi_data
            GROUP BY VendorID, dia
        )
        GROUP BY VendorID
    """).show()

    # Celda D2: API DataFrame
    print("\nD2) API DataFrame - Media viajes/día por conductor:")
    (df_clean
     .groupBy("VendorID", to_date("tpep_pickup_datetime").alias("dia"))
     .count()
     .groupBy("VendorID")
     .agg(avg("count").alias("media_viajes_dia"))
     .show())

    # Celda D3: Spark RDD - Media viajes/día por conductor
    print("D3) Spark RDD - Media viajes/día por conductor:")

    resultados = (df_rdd.rdd
        .map(lambda r: ((r.VendorID, r.tpep_pickup_datetime.date()), 1))
        .reduceByKey(lambda a, b: a + b)
        .map(lambda x: (x[0][0], (x[1], 1)))
        .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
        .mapValues(lambda x: x[0]/float(x[1]))
        .sortBy(lambda x: x[0])
        .collect()
    )

    print("VendorID | Media Viajes/Día")
    for vendor_id, media in resultados:
        print(str(vendor_id) + " | " + str(media))
    
    print()

# E) Máximo Pasajeros en Primera Semana

In [None]:
def E():
    # Celda E1: Spark SQL
    print("E1) Spark SQL - Máx pasajeros en primera semana:")
    spark.sql("""
        SELECT MAX(passenger_count) AS max_pasajeros
        FROM taxi_data
        WHERE DAY(tpep_pickup_datetime) <= 7
    """).show()

    # Celda E2: API DataFrame
    print("\nE2) API DataFrame - Máx pasajeros en primera semana:")
    (df_clean
     .filter(dayofmonth("tpep_pickup_datetime") <= 7)
     .agg(max("passenger_count").alias("max_pasajeros"))
     .show())

    # Celda E3: Spark RDD - Máx pasajeros en primera semana
    print("E3) Spark RDD - Máx pasajeros en primera semana:")

    max_pasajeros = (df_rdd.rdd
        .filter(lambda r: r.tpep_pickup_datetime.day <= 7)
        .map(lambda r: r.passenger_count)
        .max()
    )

    print("Máximo de pasajeros: " + str(max_pasajeros))
    print()

# F) Máximo Pasajeros en Todo el Mes

In [None]:
def F():
    # Celda F1: Spark SQL
    print("F1) Spark SQL - Máx pasajeros en todo el mes:")
    spark.sql("""
        SELECT MAX(passenger_count) AS max_pasajeros_mes
        FROM taxi_data
    """).show()

    # Celda F2: API DataFrame
    print("\nF2) API DataFrame - Máx pasajeros en todo el mes:")
    df_clean.agg(max("passenger_count").alias("max_pasajeros_mes")).show()

    # Celda F3: Spark RDD - Máx pasajeros en todo el mes
    print("F3) Spark RDD - Max pasajeros en todo el mes:")

    max_pasajeros = (df_rdd.rdd
        .map(lambda r: r.passenger_count)
        .max()
    )

    print("Maximo de pasajeros: " + str(max_pasajeros))
    print()

# G) Coste del Recorrido más Caro

In [None]:
def G():
    # Celda G1: Spark SQL
    print("G1) Spark SQL - Recorrido más caro:")
    spark.sql("""
        SELECT MAX(total_amount) AS max_coste
        FROM taxi_data
    """).show()

    # Celda G2: API DataFrame
    print("\nG2) API DataFrame - Recorrido más caro:")
    df_clean.agg(max("total_amount").alias("max_coste")).show()

    # Celda G3: Spark RDD - Recorrido más caro
    print("G3) Spark RDD - Recorrido mas caro:")

    max_coste = (df_rdd.rdd
        .map(lambda r: float(r.total_amount))
        .max()
    )

    print("Costo máximo: " + str(max_coste))
    print()

# H) Coste del Recorrido más Barato

In [None]:
def H():
    # Celda H1: Spark SQL
    print("H1) Spark SQL - Recorrido más barato:")
    spark.sql("""
        SELECT MIN(total_amount) AS min_coste
        FROM taxi_data
        WHERE total_amount > 0
    """).show()

    # Celda H2: API DataFrame
    print("\nH2) API DataFrame - Recorrido más barato:")
    df_clean.filter("total_amount > 0").agg(min("total_amount").alias("min_coste")).show()

    # Celda H3: Spark RDD - Recorrido más barato
    print("H3) Spark RDD - Recorrido mas barato:")

    min_coste = (df_rdd.rdd 
        .filter(lambda r: float(r.total_amount) > 0)
        .map(lambda r: float(r.total_amount))
        .min()
    )

    print("Costo minimo: " + str(min_coste))
    print()def H():
    # Celda H1: Spark SQL
    print("H1) Spark SQL - Recorrido más barato:")
    spark.sql("""
        SELECT MIN(total_amount) AS min_coste
        FROM taxi_data
        WHERE total_amount > 0
    """).show()

    # Celda H2: API DataFrame
    print("\nH2) API DataFrame - Recorrido más barato:")
    df_clean.filter("total_amount > 0").agg(min("total_amount").alias("min_coste")).show()

    # Celda H3: Spark RDD - Recorrido más barato
    print("H3) Spark RDD - Recorrido mas barato:")

    min_coste = (df_rdd.rdd 
        .filter(lambda r: float(r.total_amount) > 0)
        .map(lambda r: float(r.total_amount))
        .min()
    )

    print("Costo minimo: " + str(min_coste))
    print()

# Ejecución

In [None]:
sample_fraction = 0.2
format_string = "yyyy-MM-dd HH:mm:ss"

# Inicializar Spark
spark = SparkSession.builder \
    .appName("NYC Taxi Analysis") \
    .getOrCreate()

# Cargar y limpiar datos
file_path = "yellow_tripdata_2018-11.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Limpieza básica
df_clean = df.dropna(subset=["tpep_pickup_datetime", "VendorID", "passenger_count", "total_amount"])
df_clean = df_clean.sample(withReplacement=False, fraction=sample_fraction, seed=42)
df_clean = df_clean.withColumn("tpep_pickup_datetime", 
                             to_timestamp("tpep_pickup_datetime", format_string))

df_clean.createOrReplaceTempView("taxi_data")

# Convertir a Pandas para el enfoque 3
df_rdd = df_clean

# Ejecutamos los métodos
A()
B()
C()
D()
E()
F()
G()
H()

spark.stop()