In [0]:
from pyspark.sql.functions import (
    col, to_timestamp, when, row_number,
    from_utc_timestamp, expr
)
from pyspark.sql.window import Window

#A continuacion leeremos los datos desde la capa Bronze
df_bronze = spark.read.table("workspace.bronze.hvfhs")
print(f"Registros leídos desde Bronze: {df_bronze.count()}")

#Identificamos columnas de fechas y valores monetarios que se encuentran como string y que deben ser normalizados
df_bronze.printSchema()


In [0]:
#Seguidamente vamos a castear los flags a booleanos
df_typed = (
    df_bronze
    .withColumn("shared_request_flag", col("shared_request_flag") == "Y")
    .withColumn("shared_match_flag", col("shared_match_flag") == "Y")
    .withColumn("access_a_ride_flag", col("access_a_ride_flag") == "Y")
    .withColumn("wav_request_flag", col("wav_request_flag") == "Y")
    .withColumn("wav_match_flag", col("wav_match_flag") == "Y")
)

#Validamos que los cambios se hayan realizado correctamente
df_typed.printSchema()


In [0]:
#Ahora vamos a normalizar los timestamps que en crudo podemos ver que no tienen timezone
from pyspark.sql.functions import to_utc_timestamp

df_time = (
    df_typed
    .withColumn("pickup_datetime", to_utc_timestamp(col("pickup_datetime"), "UTC"))
    .withColumn("dropoff_datetime", to_utc_timestamp(col("dropoff_datetime"), "UTC"))
    .withColumn("request_datetime", to_utc_timestamp(col("request_datetime"), "UTC"))
    .withColumn("on_scene_datetime", to_utc_timestamp(col("on_scene_datetime"), "UTC"))
)

#validamos que la zona horaria ahora se interprete en UTC
df_time.printSchema()
spark.sql("SET spark.sql.session.timeZone").show()

# Los timestamps originales eran timestamp_ntz.
# Se normalizan a UTC para garantizar consistencia de los datos y evitar problemas de zona horaria


In [0]:
#Pondremos el tiempo de viajes en minutos
df_edited = (
    df_time
    .withColumn("trip_duration_minutes", col("trip_time") / 60)
)

#verificamos los valores originales vs los nuevos
df_edited.select(
    "trip_time",
    "trip_duration_minutes"
).limit(5).show(truncate=False)


In [0]:
#Por ultimo haremos algunas validaciones que pueden ser relevantes como tiempo de viaje valido, distacia de viaje mayor a 0 etc

df_valid = (
    df_edited
    .filter(col("pickup_datetime").isNotNull())
    .filter(col("dropoff_datetime") >= col("pickup_datetime"))
    .filter(col("trip_miles") >= 0)
    .filter(col("base_passenger_fare") >= 0)
)

#verificamos la cantidad de registros antes y despues para corroborar que si funcionaron los filtros
total_before = df_edited.count()
total_after = df_valid.count()

print(f"Registros antes de calidad: {total_before}")
print(f"Registros después de calidad: {total_after}")
print(f"Registros filtrados: {total_before - total_after}")



In [0]:
#Ahora le indicamos las columnas que queremos persistir en la capa de silver
df_silver_final = df_valid.select(
    "hvfhs_license_num",
    "dispatching_base_num",
    "originating_base_num",

    "request_datetime",
    "on_scene_datetime",
    "pickup_datetime",
    "dropoff_datetime",

    "PULocationID",
    "DOLocationID",

    "trip_miles",
    "trip_time",
    "trip_duration_minutes",

    "base_passenger_fare",
    "tolls",
    "bcf",
    "sales_tax",
    "congestion_surcharge",
    "airport_fee",
    "cbd_congestion_fee",
    "tips",
    "driver_pay",

    "shared_request_flag",
    "shared_match_flag",
    "access_a_ride_flag",
    "wav_request_flag",
    "wav_match_flag",

    "ingestion_timestamp",
    "source_file"
)


In [0]:
#podemos agregar una columna que facilitara consultas por fecha de viaje
from pyspark.sql.functions import to_date

df_silver_final = df_silver_final.withColumn(
    "pickup_date",
    to_date("pickup_datetime")
)


In [0]:
#Finalmente vamos a persisistir los datos en la capa silver
(
    df_silver_final.write
    .format("delta")
    .mode("append")
    #Aqui se puede poner una columna de particion que sera util para optimizar consultas
    .partitionBy("pickup_date")
    .saveAsTable("workspace.silver.hvfhs")
)

print("✔️ Datos guardados correctamente en la capa Silver")


In [0]:
#verificamos con una consulta que los datos hayan persistido en la tabla asignada
spark.sql("SELECT COUNT(*) FROM workspace.silver.hvfhs").show()
