In [None]:
try:
    spark.stop()
except:
    pass
import gc, os
gc.collect()
os._exit(0)

In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("CleanSpark_NoSnowflake")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .getOrCreate()
)
print("Spark iniciado SIN conector Snowflake")


Spark iniciado SIN conector Snowflake


In [9]:
# === Reinicialización de entorno y sesión Spark ===
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession

load_dotenv(".env", override=True)

spark = (
    SparkSession.builder
    .appName("Deber03_Enriquecimiento")
    .config("spark.sql.shuffle.partitions", "4")
    .getOrCreate()
)

SF_OPTIONS = {
    "sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com",
    "sfAccount": os.getenv("SNOWFLAKE_ACCOUNT"),
    "sfUser": os.getenv("SNOWFLAKE_USER"),
    "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"),
    "sfDatabase": os.getenv("SNOWFLAKE_DATABASE"),
    "sfWarehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
    "sfSchema": os.getenv("SNOWFLAKE_SCHEMA_RAW", "RAW"),
}

print("Sesión Spark y conexión Snowflake listas.")


Sesión Spark y conexión Snowflake listas.


In [10]:
from pyspark.sql import DataFrame

df_check = (
    spark.read.format("snowflake")
    .options(**SF_OPTIONS)
    .option(
        "query",
        "SELECT TABLE_NAME, ROW_COUNT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'RAW'"
    )
    .load()
)

df_check.show(50, False)


+---------------+---------+
|TABLE_NAME     |ROW_COUNT|
+---------------+---------+
|YELLOW_2019_01 |7696617  |
|YELLOW_2019_02 |7049370  |
|CONN_TEST_SPARK|1        |
|GREEN_2019_02  |615594   |
|GREEN_2019_01  |672105   |
|INGEST_AUDIT   |4        |
+---------------+---------+



In [11]:
from pyspark.sql import functions as F

YEARS   = [2019]         # Parametrizable 2015–2025
MONTHS  = [1, 2]         # Parametrizable 1–12
SERVICES= ["yellow","green"]

def read_raw(service, year, month):
    tbl = f"RAW.{service.upper()}_{year}_{month:02d}"
    print("Leyendo", tbl)
    return (spark.read.format("snowflake")
            .options(**SF_OPTIONS)
            .option("dbtable", tbl)
            .load()
            .withColumn("service_type", F.lit(service))
            .withColumn("source_year",  F.lit(year))
            .withColumn("source_month", F.lit(month)))

dfs = []
for s in SERVICES:
    for y in YEARS:
        for m in MONTHS:
            try:
                dfs.append(read_raw(s, y, m))
            except Exception as e:
                print("Saltando", s, y, m, e)

if len(dfs) == 0:
    raise RuntimeError("No se cargó ninguna tabla RAW. Verifica esquema o nombres.")
else:
    raw_union = dfs[0]
    for d in dfs[1:]:
        raw_union = raw_union.unionByName(d, allowMissingColumns=True)

    print(f"Unión completa: {raw_union.count():,} filas totales")


Leyendo RAW.YELLOW_2019_01
Leyendo RAW.YELLOW_2019_02
Leyendo RAW.GREEN_2019_01
Leyendo RAW.GREEN_2019_02
Unión completa: 16,033,686 filas totales


In [13]:
import requests, tempfile

zones_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"
tmp_path = os.path.join(tempfile.gettempdir(), "taxi_zone_lookup.csv")

# Descargar el archivo a una ruta temporal
print("Descargando lookup de zonas...")
try:
    r = requests.get(zones_url, timeout=30)
    r.raise_for_status()
    open(tmp_path, "wb").write(r.content)
    print("Descarga completada:", tmp_path)
except Exception as e:
    raise RuntimeError(f"No se pudo descargar {zones_url}: {e}")

# Leerlo con Spark
zones = (
    spark.read.csv(tmp_path, header=True)
    .select(
        F.col("LocationID").cast("int").alias("location_id"),
        F.col("Borough").alias("borough"),
        F.col("Zone").alias("zone")
    )
)

print("Taxi Zones cargadas:", zones.count(), "registros")
zones.show(5)


Descargando lookup de zonas...
Descarga completada: /tmp/taxi_zone_lookup.csv
Taxi Zones cargadas: 265 registros
+-----------+-------------+--------------------+
|location_id|      borough|                zone|
+-----------+-------------+--------------------+
|          1|          EWR|      Newark Airport|
|          2|       Queens|         Jamaica Bay|
|          3|        Bronx|Allerton/Pelham G...|
|          4|    Manhattan|       Alphabet City|
|          5|Staten Island|       Arden Heights|
+-----------+-------------+--------------------+
only showing top 5 rows



In [14]:
# Enriquecer con zonas pickup/dropoff
raw_enriched = (
    raw_union
    # Join con zonas de origen (pickup)
    .join(
        zones.withColumnRenamed("location_id", "PULocationID"),
        on="PULocationID", how="left"
    )
    .withColumnRenamed("borough", "pu_borough")
    .withColumnRenamed("zone", "pu_zone")
    # Join con zonas de destino (dropoff)
    .join(
        zones.withColumnRenamed("location_id", "DOLocationID"),
        on="DOLocationID", how="left"
    )
    .withColumnRenamed("borough", "do_borough")
    .withColumnRenamed("zone", "do_zone")
)

print("Dataset enriquecido con zonas NYC Taxi")
print("Filas totales:", raw_enriched.count())
raw_enriched.select("service_type", "pickup_datetime", "pu_borough", "pu_zone", "do_borough", "do_zone").show()


Dataset enriquecido con zonas NYC Taxi
Filas totales: 16033686
+------------+-------------------+----------+--------------------+----------+--------------------+
|service_type|    pickup_datetime|pu_borough|             pu_zone|do_borough|             do_zone|
+------------+-------------------+----------+--------------------+----------+--------------------+
|      yellow|2019-01-01 11:19:45| Manhattan|Times Sq/Theatre ...| Manhattan|            Gramercy|
|      yellow|2019-01-01 11:41:24| Manhattan|            Union Sq| Manhattan|        East Chelsea|
|      yellow|2019-01-01 11:39:40| Manhattan|Upper East Side S...| Manhattan|Upper East Side N...|
|      yellow|2019-01-01 11:25:47| Manhattan| Morningside Heights| Manhattan| Lincoln Square East|
|      yellow|2019-01-01 11:46:22| Manhattan|Upper East Side S...| Manhattan|        Midtown East|
|      yellow|2019-01-01 11:18:00| Manhattan|Financial Distric...|  Brooklyn|Williamsburg (Sou...|
|      yellow|2019-01-01 11:39:32|  Brooklyn|W

In [15]:
from pyspark.sql import functions as F

# Mapas de referencia
payment_map = F.create_map([F.lit(x) for x in
   [1,"Credit card", 2,"Cash", 3,"No charge", 4,"Dispute", 5,"Unknown", 6,"Voided trip"]])

rate_map = F.create_map([F.lit(x) for x in
   [1,"Standard rate", 2,"JFK", 3,"Newark", 4,"Nassau/Westchester", 5,"Negotiated fare", 6,"Group ride"]])

vendor_map = F.create_map([F.lit(x) for x in
   [1,"Creative Mobile Technologies", 2,"VeriFone Inc"]])

# Aplicar descripciones
raw_enriched = (
    raw_enriched
    .withColumn("payment_type_desc", payment_map[F.col("payment_type").cast("int")])
    .withColumn("rate_code_desc", rate_map[F.col("RatecodeID").cast("int")])
    .withColumn("vendor_name", vendor_map[F.col("VendorID").cast("int")])
)

print("Catálogos normalizados (payment_type, ratecode, vendor)")
raw_enriched.select(
    "VendorID", "vendor_name",
    "RatecodeID", "rate_code_desc",
    "payment_type", "payment_type_desc"
).show(5, False)


Catálogos normalizados (payment_type, ratecode, vendor)
+--------+----------------------------+----------+--------------+------------+-----------------+
|VendorID|vendor_name                 |RatecodeID|rate_code_desc|payment_type|payment_type_desc|
+--------+----------------------------+----------+--------------+------------+-----------------+
|2       |VeriFone Inc                |1.0       |Standard rate |2.0         |Cash             |
|2       |VeriFone Inc                |1.0       |Standard rate |2.0         |Cash             |
|1       |Creative Mobile Technologies|1.0       |Standard rate |1.0         |Credit card      |
|2       |VeriFone Inc                |1.0       |Standard rate |1.0         |Credit card      |
|2       |VeriFone Inc                |1.0       |Standard rate |1.0         |Credit card      |
+--------+----------------------------+----------+--------------+------------+-----------------+
only showing top 5 rows



In [13]:
import snowflake.connector, os
from dotenv import load_dotenv

load_dotenv(".env", override=True)

conn = snowflake.connector.connect(
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    user=os.getenv("SNOWFLAKE_USER"),
    password=os.getenv("SNOWFLAKE_PASSWORD"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
    database=os.getenv("SNOWFLAKE_DATABASE")
)

cur = conn.cursor()
cur.execute("CREATE SCHEMA IF NOT EXISTS ANALYTICS;")
cur.close()
conn.close()

print("Esquema ANALYTICS verificado/creado.")


Esquema ANALYTICS verificado/creado.


In [6]:
import snowflake.connector, os
from dotenv import load_dotenv

load_dotenv(".env", override=True)

conn = snowflake.connector.connect(
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    user=os.getenv("SNOWFLAKE_USER"),
    password=os.getenv("SNOWFLAKE_PASSWORD"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
    database=os.getenv("SNOWFLAKE_DATABASE"),
    schema="ANALYTICS"
)

cur = conn.cursor()

# Ahora incluimos las columnas descriptivas
cur.execute("""
CREATE OR REPLACE TABLE TRIPS_ENRICHED (
    vendorid INTEGER,
    vendor_name STRING,
    pickup_datetime TIMESTAMP_NTZ,
    dropoff_datetime TIMESTAMP_NTZ,
    passenger_count FLOAT,
    trip_distance FLOAT,
    ratecodeid FLOAT,
    rate_code_desc STRING,
    store_and_fwd_flag STRING,
    pulocationid INTEGER,
    dolocationid INTEGER,
    payment_type FLOAT,
    payment_type_desc STRING,
    fare_amount FLOAT,
    extra FLOAT,
    mta_tax FLOAT,
    tip_amount FLOAT,
    tolls_amount FLOAT,
    improvement_surcharge FLOAT,
    total_amount FLOAT,
    congestion_surcharge FLOAT,
    airport_fee FLOAT,
    service_type STRING,
    source_year INTEGER,
    source_month INTEGER,
    run_id STRING,
    ingested_at_utc TIMESTAMP_NTZ,
    source_path STRING,
    pu_borough STRING,
    pu_zone STRING,
    do_borough STRING,
    do_zone STRING
);
""")

print("Tabla ANALYTICS.TRIPS_ENRICHED recreada con columnas descriptivas.")
cur.close()
conn.close()



Tabla ANALYTICS.TRIPS_ENRICHED recreada con columnas descriptivas.


In [16]:
import os
import shutil

tmp_dir = "/tmp"
delete_targets = [
    "yellow_tripdata_2019-01.parquet",
    "yellow_tripdata_2019-02.parquet",
    "green_tripdata_2019-01.parquet",
    "green_tripdata_2019-02.parquet",
]

print("🧹 Eliminando Parquets temporales antiguos...")
for f in delete_targets:
    path = os.path.join(tmp_dir, f)
    if os.path.exists(path):
        os.remove(path)
        print(f" Borrado {f}")
    else:
        print(f" No encontrado {f}")

# (opcional) limpia también el directorio de trips_enriched si está vacío
enriched_dir = os.path.join(tmp_dir, "trips_enriched.parquet")
if os.path.exists(enriched_dir):
    shutil.rmtree(enriched_dir)
    print("  Carpeta trips_enriched.parquet eliminada")

print("Limpieza completa del directorio temporal /tmp")


🧹 Eliminando Parquets temporales antiguos...
 Borrado yellow_tripdata_2019-01.parquet
 Borrado yellow_tripdata_2019-02.parquet
 Borrado green_tripdata_2019-01.parquet
 Borrado green_tripdata_2019-02.parquet
  Carpeta trips_enriched.parquet eliminada
Limpieza completa del directorio temporal /tmp


In [8]:
import os
import tempfile

tmp_dir = tempfile.gettempdir()
print(" Carpeta temporal actual:", tmp_dir)
!ls -lh $tmp_dir | grep parquet || echo " No hay archivos Parquet residuales"


 Carpeta temporal actual: /tmp
 No hay archivos Parquet residuales


In [1]:
!du -sh /tmp

452M	/tmp


In [16]:
import os, tempfile, shutil

tmp_dir = os.path.join(tempfile.gettempdir(), "trips_enriched_final")
if os.path.exists(tmp_dir):
    shutil.rmtree(tmp_dir)

print(f"Exportando a {tmp_dir}")
raw_enriched.coalesce(1).write.mode("overwrite").parquet(tmp_dir)
print("Exportación local completada")


Exportando a /tmp/trips_enriched_final
Exportación local completada


In [17]:
import os

tmp_dir = "/tmp/trips_enriched_final"
print("Contenido de:", tmp_dir)

if os.path.exists(tmp_dir):
    print(os.listdir(tmp_dir))
else:
    print("La carpeta no existe")


Contenido de: /tmp/trips_enriched_final
['.part-00000-58bada99-d77f-4526-82d1-4a64d1a1a69b-c000.snappy.parquet.crc', 'part-00000-58bada99-d77f-4526-82d1-4a64d1a1a69b-c000.snappy.parquet', '._SUCCESS.crc', '_SUCCESS']


In [None]:
import os
import pandas as pd
import snowflake.connector
import glob
from dotenv import load_dotenv

# 1️Cargar variables de entorno
load_dotenv(".env", override=True)

#  Buscar parquet exportado
tmp_dir = "/tmp/trips_enriched_final"
parquet_file = glob.glob(os.path.join(tmp_dir, "*.parquet"))[0]

print(f"Leyendo {parquet_file} ...")
pdf = pd.read_parquet(parquet_file)
print(f"DataFrame leído con {len(pdf):,} filas y {len(pdf.columns)} columnas")

# Conexión Snowflake
conn = snowflake.connector.connect(
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    user=os.getenv("SNOWFLAKE_USER"),
    password=os.getenv("SNOWFLAKE_PASSWORD"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
    database=os.getenv("SNOWFLAKE_DATABASE"),
    schema="ANALYTICS"
)
cur = conn.cursor()

# Subir en chunks (bloques pequeños)
chunk_size = 50000  # ajusta si quieres menos/más filas por lote
total_rows = len(pdf)
for i in range(0, total_rows, chunk_size):
    chunk = pdf.iloc[i:i+chunk_size]
    success, nchunks, nrows, _ = cur.write_pandas(
        chunk,
        table_name="TRIPS_ENRICHED",
        overwrite=(i == 0)  # solo el primer chunk reemplaza; los demás hacen append
    )
    print(f"Chunk {i//chunk_size + 1} → {nrows} filas insertadas")

print(f"Carga completa → {total_rows:,} filas cargadas en ANALYTICS.TRIPS_ENRICHED")

cur.close()
conn.close()


Leyendo /tmp/trips_enriched_final/part-00000-58bada99-d77f-4526-82d1-4a64d1a1a69b-c000.snappy.parquet ...


In [1]:
import snowflake.connector, os

conn = snowflake.connector.connect(
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    user=os.getenv("SNOWFLAKE_USER"),
    password=os.getenv("SNOWFLAKE_PASSWORD"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
    database=os.getenv("SNOWFLAKE_DATABASE"),
    schema="ANALYTICS"
)

cur = conn.cursor()

# Stage temporal
cur.execute("CREATE OR REPLACE STAGE stage_trips_enriched;")

local_dir = "/tmp/trips_enriched_final"
for file in os.listdir(local_dir):
    if file.endswith(".parquet"):
        path = os.path.join(local_dir, file)
        print("Subiendo:", path)
        cur.execute(f"PUT file://{path} @stage_trips_enriched auto_compress=true")

# Cargar los datos con mapeo automático de columnas
cur.execute("""
    COPY INTO TRIPS_ENRICHED
    FROM @stage_trips_enriched
    FILE_FORMAT=(TYPE=PARQUET)
    MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
    ON_ERROR=CONTINUE;
""")

print("Carga completada en Snowflake.")
cur.close()
conn.close()


Subiendo: /tmp/trips_enriched_final/part-00000-58bada99-d77f-4526-82d1-4a64d1a1a69b-c000.snappy.parquet
Carga completada en Snowflake.


In [10]:
from pyspark.sql import SparkSession
from datetime import datetime
import os
from dotenv import load_dotenv

# Verificación final de carga ANALYTICS.TRIPS_ENRICHED 

# Cargar variables de entorno
load_dotenv(".env", override=True)

# Crear sesión Spark (si no está activa)
spark = (
    SparkSession.builder
    .appName("verificacion_analytics")
    .config("spark.sql.shuffle.partitions", "4")
    .getOrCreate()
)

# Configuración de conexión Snowflake
SF_OPTIONS_LOAD = {
    "sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com",  # corregido
    "sfDatabase": os.getenv("SNOWFLAKE_DATABASE"),
    "sfWarehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
    "sfUser": os.getenv("SNOWFLAKE_USER"),
    "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"),
    "sfSchema": "ANALYTICS"
}

# Verificar conteo de registros en la tabla final
count_snowflake = (
    spark.read.format("snowflake")
    .options(**SF_OPTIONS_LOAD)
    .option("dbtable", "TRIPS_ENRICHED")
    .load()
    .count()
)

print(f"Carga verificada en ANALYTICS.TRIPS_ENRICHED ({count_snowflake} filas) - {datetime.now()}")


Carga verificada en ANALYTICS.TRIPS_ENRICHED (16033686 filas) - 2025-10-18 23:30:22.835409
