In [65]:
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark import SparkConf
from dotenv import load_dotenv
import os


In [66]:
conf = SparkConf()
conf.set("spark.jars.packages", "net.snowflake:snowflake-jdbc:3.24.2,net.snowflake:spark-snowflake_2.12:3.1.2") #1. Que antes de empezar tiene que descargar bibliotecas y paquetes externos y otros de snowflake(como supo este man que habia que hacer esa conexion?)
conf.set("spark.sql.adaptive.enabled", "true")
conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
conf.set("spark.sql.broadcastTimeout", "36000")  # 10 minutos
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

spark = SparkSession.builder \
    .config(conf=conf) \
    .getOrCreate() #Obtiene una SparkSession existente si ya hay una en ejecución y si no crea una nueva 

In [None]:
load_dotenv(dotenv_path="/home/jovyan/work/.env")

sfOptions = { #Es un diccionario de Python que está reuniendo todos los parámetros necesarios para que el conector Spark-Snowflake (que configuraste en tu SparkConf anterior) sepa dónde y cómo conectarse a Snowflake.
    "sfURL": os.getenv("URL"),
    "sfDatabase": os.getenv("DB"),
    "sfSchema": "RAW",
    "sfWarehouse": os.getenv("WAREHOUSE"),
    "sfRole": os.getenv("ROLE"),
    "sfUser": os.getenv("USER"),
    "sfPassword": os.getenv("PASSWORD")
}


In [67]:
sfOptions = { #Es un diccionario de Python que está reuniendo todos los parámetros necesarios para que el conector Spark-Snowflake (que configuraste en tu SparkConf anterior) sepa dónde y cómo conectarse a Snowflake.
    "sfURL" : "TLZAPUN-PKC06603.snowflakecomputing.com",
    "sfDatabase" : "NY_TAXI",
    "sfSchema" : "RAW",
    "sfWarehouse" :"COMPUTE_WH",
    "sfRole" : "ACCOUNTADMIN",
    "sfUser" : "MARE122510",
    "sfPassword" : "MyTurnEra2025100%"
}

In [68]:
def add_payment_type_label(df):
      return df.withColumn("PAYMENT_TYPE_LABEL",
          F.when(F.col("PAYMENT_TYPE") == 1, "Credit card")
           .when(F.col("PAYMENT_TYPE") == 2, "Cash")
           .when(F.col("PAYMENT_TYPE") == 3, "No charge")
           .when(F.col("PAYMENT_TYPE") == 4, "Dispute")
           .when(F.col("PAYMENT_TYPE") == 5, "Unknown")
           .when(F.col("PAYMENT_TYPE") == 6, "Voided trip")
           .otherwise("Other")
      )

  # ⚡ Reemplazar rate_code_udf con CASE WHEN nativo
def add_ratecode_label(df):
      return df.withColumn("RATECODE_LABEL",
          F.when(F.col("RATECODEID") == 1, "Standard")
           .when(F.col("RATECODEID") == 2, "JFK")
           .when(F.col("RATECODEID") == 3, "Newark")
           .when(F.col("RATECODEID") == 4, "Nassau or Westchester")
           .when(F.col("RATECODEID") == 5, "Negotiated fare")
           .when(F.col("RATECODEID") == 6, "Group ride")
           .otherwise("Other")
      )

  # ⚡ Reemplazar vendor_udf con CASE WHEN nativo
def add_vendor_label(df):
      return df.withColumn("VENDOR_LABEL",
          F.when(F.col("VENDORID") == 1, "CMT")
           .when(F.col("VENDORID") == 2, "VeriFone")
           .otherwise("Other")
      )

In [69]:
# 1. Load the main trip data (from the RAW schema)
df_yellow = spark.read \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "YELLOW_TRIPS") \
    .load()\
    .withColumn("SERVICE_TYPE", F.lit("Yellow"))\
    .cache()

In [70]:
df_green = spark.read \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "GREEN_TRIPS") \
    .load()\
    .withColumn("SERVICE_TYPE", F.lit("Green"))\
    .cache()

In [71]:
df_trips_unificado = df_yellow.unionByName(df_green, allowMissingColumns=True)\
                    .cache()

In [72]:
# 2. Load the Taxi Zones data (from the RAW schema)
df_zones = spark.read \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "TAXI_ZONES") \
    .load()\
    .cache()

In [73]:
# Normalize zone columns for joining (important to avoid case issues in Spark)
df_zones = df_zones.select(
    F.col("LocationID").alias("ZONE_ID"),
    F.col("Borough").alias("BOROUGH_NAME"),
    F.col("Zone").alias("ZONE_NAME")
)\
.cache()

In [74]:
df_enriched = df_trips_unificado

In [76]:
# Define the new target schema (STAGE)
sfOptions_stage = sfOptions.copy()
sfOptions_stage["sfSchema"] = "STAGE" 
# NOTE: Ensure the 'STAGE' schema exists in Snowflake before running the write operation.

# a. Integrate Taxi Zones (Join)
# Join for Pickup Location
df_zones_broadcast = F.broadcast(df_zones)

  # Join optimizado para pickup (con broadcast hint)
df_enriched = df_trips_unificado.alias("trips") \
      .join(df_zones_broadcast.alias("pu"),
            F.col("trips.PULOCATIONID") == F.col("pu.ZONE_ID"),
            "left") \
      .withColumnRenamed("BOROUGH_NAME", "PU_BOROUGH") \
      .withColumnRenamed("ZONE_NAME", "PU_ZONE") \
      .drop("pu.ZONE_ID")

  # Join optimizado para dropoff (con broadcast hint)
df_enriched = df_enriched.alias("trips") \
      .join(df_zones_broadcast.alias("do"),
            F.col("trips.DOLOCATIONID") == F.col("do.ZONE_ID"),
            "left") \
      .withColumnRenamed("BOROUGH_NAME", "DO_BOROUGH") \
      .withColumnRenamed("ZONE_NAME", "DO_ZONE") \
      .drop("do.ZONE_ID")

  # Aplicar todas las transformaciones de una vez (más eficiente)
df_enriched = add_payment_type_label(df_enriched)
df_enriched = add_ratecode_label(df_enriched)
df_enriched = add_vendor_label(df_enriched)

  # Cache el resultado final
df_enriched = df_enriched.cache()


In [78]:
final_cols = [
    "VENDORID", "VENDOR_LABEL", "SERVICE_TYPE", 
    F.coalesce(F.col("TPEP_PICKUP_DATETIME"), F.col("LPEP_PICKUP_DATETIME")).alias("PICKUP_DATETIME"),
    F.coalesce(F.col("TPEP_DROPOFF_DATETIME"), F.col("LPEP_DROPOFF_DATETIME")).alias("DROPOFF_DATETIME"),
    "PASSENGER_COUNT", "TRIP_DISTANCE", 
    "RATECODEID", "RATECODE_LABEL",
    "PULOCATIONID", "PU_BOROUGH", "PU_ZONE", 
    "DOLOCATIONID", "DO_BOROUGH", "DO_ZONE",
    "PAYMENT_TYPE", "PAYMENT_TYPE_LABEL", 
    # Financial fields
    "FARE_AMOUNT", "EXTRA", "MTA_TAX", "TIP_AMOUNT", "TOLLS_AMOUNT", 
    "IMPROVEMENT_SURCHARGE", "TOTAL_AMOUNT", "CONGESTION_SURCHARGE", "AIRPORT_FEE", 
    "CBD_CONGESTION_FEE", 
    # Metadata
    "RUN_ID", "SOURCE_YEAR", "SOURCE_MONTH", "INGESTED_AT_UTC", "SOURCE_PATH"
]

In [79]:
df_final_stage = df_enriched.select(*final_cols)

In [None]:
# Write to the STAGE schema in Snowflake
table_name_stage = "TRIPS_ENRICHED"

df_final_stage.write \
    .format("snowflake") \
    .options(**sfOptions_stage) \
    .option("dbtable", table_name_stage) \
    .mode("overwrite") \
    .save()
