# Notebook de Ingesta Parquet RAW

## Conexión a Snowflake

## Imports y SparkSession

In [2]:
# --- Celda 0: inicialización (ejecutar primero) ---
import os
import requests
import tempfile
from dotenv import load_dotenv

# Paquetes que Spark debe cargar (ajusta versiones si es necesario)
os.environ['PYSPARK_SUBMIT_ARGS'] = (
    '--packages net.snowflake:spark-snowflake_2.12:3.1.2,'
    'net.snowflake:snowflake-jdbc:3.24.2 pyspark-shell'
)

# Ahora sí importamos pyspark y creamos la sesión
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

# Evita sesiones múltiples: si existe, detenla y crea una nueva limpia
if 'spark' in globals():
    try:
        spark.stop()
    except Exception:
        pass

conf = SparkConf().setAppName("NYC_TLC_ingest").setMaster("local[*]")
# opcional: conf.set("spark.jars.packages", "net.snowflake:spark-snowflake_2.12:3.1.2,net.snowflake:snowflake-jdbc:3.24.2")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Comprueba
print("Spark inicializado:", spark.version)



Spark inicializado: 3.5.0


In [3]:
from dotenv import load_dotenv
import os

load_dotenv("/home/jovyan/work/.env", override=True)

account = os.getenv("SNOWFLAKE_ACCOUNT")
sf_url = os.getenv("SNOWFLAKE_URL") or account
if sf_url and not sf_url.endswith("snowflakecomputing.com"):
    sf_url = f"{sf_url}.snowflakecomputing.com"

if not all([account, os.getenv("SNOWFLAKE_USER"), os.getenv("SNOWFLAKE_PASSWORD")]):
    raise RuntimeError("Faltan variables de Snowflake en /home/jovyan/work/.env")

print("sfURL:", sf_url)




sfURL: LKVTWCT-PPC14557.snowflakecomputing.com


In [None]:
# --- Asume que ya ejecutaste load_dotenv() y creaste 'spark' en otra celda ---
import os
from dotenv import load_dotenv
import requests
import tempfile
import snowflake.connector
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.utils import AnalysisException
import logging

# Set up logging for better debugging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

load_dotenv()

# --- Cargar credenciales y construir sf_url robusto ---
account = os.getenv("SNOWFLAKE_ACCOUNT")
user = os.getenv("SNOWFLAKE_USER")
password = os.getenv("SNOWFLAKE_PASSWORD")
warehouse = os.getenv("SNOWFLAKE_WH")
database = os.getenv("SNOWFLAKE_DATABASE")
schema = os.getenv("SNOWFLAKE_SCHEMA", "BRONZE")  # Changed to BRONZE
role = os.getenv("SNOWFLAKE_ROLE")
sf_url = os.getenv("SNOWFLAKE_URL") or account

if not sf_url:
    raise RuntimeError("Falta SNOWFLAKE_URL o SNOWFLAKE_ACCOUNT en el entorno (.env).")

sf_url = sf_url.strip().strip('"').strip("'")
if sf_url.startswith("https://"):
    sf_url = sf_url[len("https://"):]
if not sf_url.endswith("snowflakecomputing.com"):
    sf_url = f"{sf_url}.snowflakecomputing.com"

logger.info("Usando Snowflake host: %s", sf_url)

sfOptions = {
    "sfURL": sf_url,
    "sfUser": user,
    "sfPassword": password,
    "sfDatabase": database,
    "sfSchema": schema,
    "sfWarehouse": warehouse,
    "sfRole": role
}

# Verificaciones mínimas
if not all([user, password, warehouse, database]):
    raise RuntimeError("Faltan credenciales Snowflake: revisa SNOWFLAKE_USER/PASSWORD/WH/DATABASE en .env")

if 'spark' not in globals():
    raise RuntimeError("La variable 'spark' no existe. Crea la SparkSession antes de ejecutar este script.")

# --- Parámetros de ingestión ---
years = ['2015','2016','2017','2018','2019','2020','2021','2022','2023','2024','2025']
months = ['01','02','03','04','05','06','07','08','09','10','11','12']
services = ['green', 'yellow']

def download_file(url, temp_path):
    """Download file with better error handling"""
    try:
        response = requests.get(url, stream=True, timeout=30)
        response.raise_for_status()
        with open(temp_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
        return True
    except requests.exceptions.RequestException as e:
        logger.error("Error downloading %s: %s", url, e)
        return False

def delete_existing_records(conn_params, table_name, ingest_run_id):
    """Delete existing records with proper error handling"""
    try:
        conn = snowflake.connector.connect(**conn_params)
        cur = conn.cursor()
        
        # Use fully qualified name with proper quoting
        qualified_table = f'"{database}"."{schema}"."{table_name}"'
        delete_sql = f"DELETE FROM {qualified_table} WHERE INGEST_RUN_ID = %s"
        
        cur.execute(delete_sql, (ingest_run_id,))
        conn.commit()
        logger.info("Registros con RUN_ID=%s eliminados de %s", ingest_run_id, qualified_table)
        
        cur.close()
        conn.close()
        return True
    except Exception as e:
        logger.error("Error deleting records from Snowflake: %s", e)
        return False

# Connection parameters for snowflake-connector
conn_params = {
    "user": user,
    "password": password,
    "account": account.replace(".snowflakecomputing.com", ""),  # Remove domain for connector
    "warehouse": warehouse,
    "database": database,
    "schema": schema,
    "role": role
}

for service in services:
    # Define columns based on YOUR ACTUAL TABLE SCHEMA
    if service == 'yellow':
        # Match your YELLOW_TRIPS table structure
        table_columns = [
            "INGEST_RUN_ID", "INGEST_TIMESTAMP", "YEAR", "MONTH", "VENDORID", 
            "PASSENGER_COUNT", "TRIP_DISTANCE", "TPEP_PICKUP_DATETIME", "TPEP_DROPOFF_DATETIME",
            "PULOCATIONID", "DOLOCATIONID", "RATECODEID", "STORE_AND_FWD_FLAG", 
            "PAYMENT_TYPE", "FARE_AMOUNT", "EXTRA", "MTA_TAX", "TIP_AMOUNT", 
            "TOLLS_AMOUNT", "IMPROVEMENT_SURCHARGE", "TOTAL_AMOUNT", 
            "CONGESTION_SURCHARGE", "CBD_CONGESTION_FEE", "AIRPORT_FEE", "SERVICE"
        ]
    else:  # green
        # Match your GREEN_TRIPS table structure
        table_columns = [
            "INGEST_RUN_ID", "INGEST_TIMESTAMP", "YEAR", "MONTH", "VENDORID", 
            "PASSENGER_COUNT", "TRIP_DISTANCE", "LPEP_PICKUP_DATETIME", "LPEP_DROPOFF_DATETIME",
            "PULOCATIONID", "DOLOCATIONID", "RATECODEID", "STORE_AND_FWD_FLAG", 
            "PAYMENT_TYPE", "FARE_AMOUNT", "EXTRA", "MTA_TAX", "TIP_AMOUNT", 
            "TOLLS_AMOUNT", "IMPROVEMENT_SURCHARGE", "TOTAL_AMOUNT", 
            "CONGESTION_SURCHARGE", "CBD_CONGESTION_FEE", "EHAIL_FEE", "TRIP_TYPE", "SERVICE"
        ]

    for year in years:
        months_iter = months if year != '2025' else ['01','02','03','04','05','06','07']
        for month in months_iter:
            timestamp_columns = (['TPEP_PICKUP_DATETIME','TPEP_DROPOFF_DATETIME'] 
                                 if service == 'yellow' else ['LPEP_PICKUP_DATETIME','LPEP_DROPOFF_DATETIME'])

            url = f'https://d37ci6vzurychx.cloudfront.net/trip-data/{service}_tripdata_{year}-{month}.parquet'
            
            # Check if file exists
            try:
                head = requests.head(url, timeout=10)
                if head.status_code != 200:
                    logger.info("No existe: %s", url)
                    continue
            except requests.exceptions.RequestException:
                logger.info("No se pudo verificar: %s", url)
                continue
                
            logger.info("Procesando %s: %s-%s", service, year, month)

            # Download to temp file
            tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".parquet")
            tmp_path = tmp_file.name
            tmp_file.close()
            
            try:
                # Download file
                if not download_file(url, tmp_path):
                    continue

                # Read with Spark
                try:
                    df = spark.read.parquet(tmp_path)
                    logger.info("Archivo %s cargado en DataFrame", url)
                except Exception as e:
                    logger.error("Error reading parquet file %s: %s", url, e)
                    continue

                INGEST_RUN_ID = f"run_{year}_{month}"

                # Add metadata - MATCHING YOUR TABLE SCHEMA
                df = (
                    df.withColumn("INGEST_RUN_ID", F.lit(INGEST_RUN_ID))
                      .withColumn("INGEST_TIMESTAMP", F.current_timestamp())  
                      .withColumn("YEAR", F.lit(int(year)))                   
                      .withColumn("MONTH", F.lit(int(month)))                 
                      .withColumn("SERVICE", F.lit(service))                  
                )

                # Convert to uppercase
                df = df.toDF(*[c.upper() for c in df.columns])
                logger.info("Metadatos agregados")

                # Convert timestamps
                for col_name in timestamp_columns:
                    col_name_upper = col_name.upper()
                    if col_name_upper in df.columns:
                        df = df.withColumn(col_name_upper, 
                                         F.col(col_name_upper).cast(T.TimestampType()))

                # Prepare table name
                table_name = f"{service.upper()}_TRIPS"

                # Handle missing columns based on your table schema
                if "CBD_CONGESTION_FEE" not in df.columns:
                    df = df.withColumn("CBD_CONGESTION_FEE", F.lit(None).cast(T.DoubleType()))
                
                if service == 'yellow':
                    if "AIRPORT_FEE" not in df.columns:
                        df = df.withColumn("AIRPORT_FEE", F.lit(None).cast(T.DoubleType()))
                else:  # green
                    if "EHAIL_FEE" not in df.columns:
                        df = df.withColumn("EHAIL_FEE", F.lit(None).cast(T.DoubleType()))
                    if "TRIP_TYPE" not in df.columns:
                        df = df.withColumn("TRIP_TYPE", F.lit(None).cast(T.IntegerType()))

                # Delete existing records
                if not delete_existing_records(conn_params, table_name, INGEST_RUN_ID):
                    logger.warning("Continuing despite delete failure")

                # Select only columns that exist in both DataFrame and target table
                existing_cols = [c for c in table_columns if c in df.columns]
                missing_cols = set(table_columns) - set(existing_cols)
                extra_cols = set(df.columns) - set(table_columns)
                
                if missing_cols:
                    logger.warning("Missing columns in DataFrame (will be NULL in DB): %s", missing_cols)
                if extra_cols:
                    logger.warning("Extra columns in DataFrame (will be dropped): %s", extra_cols)
                
                # Select only the columns that match your table schema
                df = df.select(existing_cols)
                
                # Write to Snowflake
                try:
                    df.write \
                      .format("snowflake") \
                      .options(**sfOptions) \
                      .option("dbtable", table_name) \
                      .mode("append") \
                      .save()
                    logger.info("Datos de %s para %s-%s cargados en Snowflake", service, year, month)
                except Exception as e:
                    logger.error("Error writing to Snowflake: %s", e)
                    # Print schema for debugging
                    logger.info("DataFrame schema:")
                    df.printSchema()
                    logger.info("Expected table columns: %s", table_columns)

            except Exception as e:
                logger.error("Unexpected error processing %s: %s", url, e)
            finally:
                # Always delete temp file
                try:
                    os.remove(tmp_path)
                except Exception as e:
                    logger.warning("Could not delete temp file %s: %s", tmp_path, e)

logger.info("Proceso completado")

INFO:__main__:Usando Snowflake host: LKVTWCT-PPC14557.snowflakecomputing.com
INFO:__main__:Procesando yellow: 2022-01
INFO:__main__:Archivo https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet cargado en DataFrame
INFO:__main__:Metadatos agregados
INFO:snowflake.connector.connection:Snowflake Connector for Python Version: 3.18.0, Python Version: 3.11.6, Platform: Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.35
INFO:snowflake.connector.connection:Connecting to GLOBAL Snowflake domain
INFO:__main__:Registros con RUN_ID=run_2022_01 eliminados de "NYC_TLC_SPARK"."BRONZE"."YELLOW_TRIPS"
INFO:__main__:Datos de yellow para 2022-01 cargados en Snowflake
INFO:__main__:Procesando yellow: 2022-02
INFO:__main__:Archivo https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-02.parquet cargado en DataFrame
INFO:__main__:Metadatos agregados
INFO:snowflake.connector.connection:Snowflake Connector for Python Version: 3.18.0, Python Version: 3.11.6, 