In [1]:
import psycopg2, os

conn = psycopg2.connect(
    host=os.getenv("PG_HOST"),
    port=os.getenv("PG_PORT"),
    database=os.getenv("PG_DB"),
    user=os.getenv("PG_USER"),
    password=os.getenv("PG_PASSWORD")
)
cur = conn.cursor()
cur.execute("CREATE SCHEMA IF NOT EXISTS silver;")
conn.commit()
cur.close()
conn.close()

print("Esquema 'silver' verificado o creado correctamente.")


Esquema 'silver' verificado o creado correctamente.


In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Transform_SILVER_OBT")
    .config("spark.driver.memory", "2g")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0")
    .getOrCreate()
)

print("Spark activo para la fase SILVER.")


Spark activo para la fase SILVER.


In [4]:
import os

# Configuración de conexión JDBC a Postgres
pg_url = f"jdbc:postgresql://{os.getenv('PG_HOST')}:{os.getenv('PG_PORT')}/{os.getenv('PG_DB')}"

pg_props = {
    "user": os.getenv("PG_USER"),
    "password": os.getenv("PG_PASSWORD"),
    "driver": "org.postgresql.Driver"
}

schema_raw = os.getenv("PG_SCHEMA_RAW")

print(f"Conexión configurada correctamente → esquema RAW: {schema_raw}")


Conexión configurada correctamente → esquema RAW: raw


In [4]:
yellow_df = spark.read.jdbc(url=pg_url, table="raw.yellow_taxi_trip", properties=pg_props)
green_df  = spark.read.jdbc(url=pg_url, table="raw.green_taxi_trip", properties=pg_props)
zones_df  = spark.read.jdbc(url=pg_url, table="raw.taxi_zone_lookup", properties=pg_props)

print(f"Yellow: {yellow_df.count():,} filas | Green: {green_df.count():,} filas | Zones: {zones_df.count():,} filas")


Yellow: 14,745,987 filas | Green: 1,287,699 filas | Zones: 265 filas


In [5]:
from pyspark.sql import functions as F

# Añadir columna que identifique el servicio
yellow_df = yellow_df.withColumn("service_type", F.lit("yellow"))
green_df  = green_df.withColumn("service_type", F.lit("green"))

# Unificar columnas comunes
trips_df = yellow_df.unionByName(green_df, allowMissingColumns=True)

# Limpieza básica
trips_df = (
    trips_df
    .filter(F.col("trip_distance") > 0)
    .filter(F.col("fare_amount") > 0)
    .filter(F.col("passenger_count") > 0)
)

# Selección de campos de interés y creación de columnas derivadas
trips_df = (
    trips_df.select(
        F.col("VendorID").alias("vendor_id"),
        F.col("tpep_pickup_datetime").alias("pickup_datetime"),
        F.col("tpep_dropoff_datetime").alias("dropoff_datetime"),
        "passenger_count", "trip_distance",
        "PULocationID", "DOLocationID",
        "payment_type", "fare_amount", "total_amount",
        "service_type"
    )
    .withColumn("pickup_date", F.to_date("pickup_datetime"))
    .withColumn("trip_duration_min", (F.unix_timestamp("dropoff_datetime") - F.unix_timestamp("pickup_datetime"))/60)
    .withColumn("fare_per_mile", F.round(F.col("fare_amount")/F.col("trip_distance"), 2))
)

print("Estructura OBT unificada lista.")
trips_df.printSchema()


Estructura OBT unificada lista.
root
 |-- vendor_id: long (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- service_type: string (nullable = false)
 |-- pickup_date: date (nullable = true)
 |-- trip_duration_min: double (nullable = true)
 |-- fare_per_mile: double (nullable = true)



In [6]:
# Join con catálogo de zonas (izquierdo para mantener todos los viajes)
obt_df = (
    trips_df.join(
        zones_df.select(
            F.col("LocationID").alias("PULocationID_zone"),
            F.col("Borough").alias("pickup_borough"),
            F.col("Zone").alias("pickup_zone")
        ),
        trips_df["PULocationID"] == F.col("PULocationID_zone"),
        "left"
    ).drop("PULocationID_zone")
)
print("Zonas incorporadas al OBT.")


Zonas incorporadas al OBT.


In [7]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("OBT_export")
    .config("spark.driver.memory", "6g")      # 6 GB de memoria para el driver
    .config("spark.executor.memory", "4g")    # 4 GB para los ejecutores
    .config("spark.sql.shuffle.partitions", "12")  # menos particiones => menos overhead
    .getOrCreate()
)

print("Spark reiniciado con memoria extendida.")



Spark reiniciado con memoria extendida.


In [8]:
spark.conf.get("spark.driver.memory"), spark.conf.get("spark.executor.memory")

('6g', '4g')

In [11]:
import psycopg2, os

conn = psycopg2.connect(
    host=os.getenv("PG_HOST"),
    port=os.getenv("PG_PORT"),
    database=os.getenv("PG_DB"),
    user=os.getenv("PG_USER"),
    password=os.getenv("PG_PASSWORD")
)
cur = conn.cursor()
cur.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'raw' AND table_name = 'yellow_taxi_trip'
ORDER BY ordinal_position;
""")
cols = [r[0] for r in cur.fetchall()]
print("Columnas en raw.yellow_taxi_trip:")
print(cols)

cur.close()
conn.close()


Columnas en raw.yellow_taxi_trip:
['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee', 'ingested_at_utc']


In [12]:
import psycopg2, os

conn = psycopg2.connect(
    host=os.getenv("PG_HOST"),
    port=os.getenv("PG_PORT"),
    database=os.getenv("PG_DB"),
    user=os.getenv("PG_USER"),
    password=os.getenv("PG_PASSWORD")
)
cur = conn.cursor()

cur.execute("CREATE SCHEMA IF NOT EXISTS silver;")
cur.execute("DROP TABLE IF EXISTS silver.obt_trips;")

cur.execute("""
CREATE TABLE silver.obt_trips AS
SELECT
    'yellow' AS service_type,
    "VendorID",
    "tpep_pickup_datetime" AS pickup_datetime,
    "tpep_dropoff_datetime" AS dropoff_datetime,
    "passenger_count",
    "trip_distance",
    "PULocationID",
    "DOLocationID",
    "payment_type",
    "fare_amount",
    "total_amount"
FROM raw.yellow_taxi_trip
UNION ALL
SELECT
    'green' AS service_type,
    "VendorID",
    "lpep_pickup_datetime" AS pickup_datetime,
    "lpep_dropoff_datetime" AS dropoff_datetime,
    "passenger_count",
    "trip_distance",
    "PULocationID",
    "DOLocationID",
    "payment_type",
    "fare_amount",
    "total_amount"
FROM raw.green_taxi_trip;
""")

conn.commit()
cur.close()
conn.close()

print("Tabla 'silver.obt_trips' creada correctamente en Postgres (usando comillas y nombres reales).")



Tabla 'silver.obt_trips' creada correctamente en Postgres (usando comillas y nombres reales).


In [13]:
conn = psycopg2.connect(
    host=os.getenv("PG_HOST"),
    port=os.getenv("PG_PORT"),
    database=os.getenv("PG_DB"),
    user=os.getenv("PG_USER"),
    password=os.getenv("PG_PASSWORD")
)
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM silver.obt_trips;")
print(f"Total filas en silver.obt_trips: {cur.fetchone()[0]:,}")
cur.close()
conn.close()


Total filas en silver.obt_trips: 16,033,686


In [11]:
obt_df = spark.read.jdbc(
    url=pg_url,
    table="silver.obt_trips",
    properties=pg_props
)
print("Cargado nuevamente desde Postgres.")



Cargado nuevamente desde Postgres.


In [12]:
import psycopg2, os

conn = psycopg2.connect(
    host=os.getenv("PG_HOST"),
    port=os.getenv("PG_PORT"),
    database=os.getenv("PG_DB"),
    user=os.getenv("PG_USER"),
    password=os.getenv("PG_PASSWORD")
)
cur = conn.cursor()

cur.execute("CREATE SCHEMA IF NOT EXISTS silver;")
cur.execute("DROP TABLE IF EXISTS silver.obt_trips;")

cur.execute("""
CREATE TABLE silver.obt_trips AS
SELECT DISTINCT
    'yellow' AS service_type,
    "VendorID",
    "tpep_pickup_datetime" AS pickup_datetime,
    "tpep_dropoff_datetime" AS dropoff_datetime,
    "passenger_count",
    "trip_distance",
    "PULocationID",
    "DOLocationID",
    "payment_type",
    "fare_amount",
    "total_amount"
FROM raw.yellow_taxi_trip
WHERE "tpep_pickup_datetime" IS NOT NULL
  AND "tpep_dropoff_datetime" IS NOT NULL
  AND "total_amount" IS NOT NULL

UNION ALL

SELECT DISTINCT
    'green' AS service_type,
    "VendorID",
    "lpep_pickup_datetime" AS pickup_datetime,
    "lpep_dropoff_datetime" AS dropoff_datetime,
    "passenger_count",
    "trip_distance",
    "PULocationID",
    "DOLocationID",
    "payment_type",
    "fare_amount",
    "total_amount"
FROM raw.green_taxi_trip
WHERE "lpep_pickup_datetime" IS NOT NULL
  AND "lpep_dropoff_datetime" IS NOT NULL
  AND "total_amount" IS NOT NULL;
""")

conn.commit()
cur.close()
conn.close()

print("Tabla 'silver.obt_trips' recreada y limpiada directamente desde Postgres (sin Spark).")


Tabla 'silver.obt_trips' recreada y limpiada directamente desde Postgres (sin Spark).


In [13]:
conn = psycopg2.connect(
    host=os.getenv("PG_HOST"),
    port=os.getenv("PG_PORT"),
    database=os.getenv("PG_DB"),
    user=os.getenv("PG_USER"),
    password=os.getenv("PG_PASSWORD")
)
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM silver.obt_trips;")
print(f"Filas finales en silver.obt_trips: {cur.fetchone()[0]:,}")
cur.close()
conn.close()


Filas finales en silver.obt_trips: 16,033,615


In [19]:
import psycopg2, os, warnings

# Silenciar warnings de psycopg2/pandas (opcional pero recomendado)
warnings.filterwarnings("ignore")

# --- Conexión a Postgres ---
conn = psycopg2.connect(
    host=os.getenv("PG_HOST"),
    port=os.getenv("PG_PORT"),
    database=os.getenv("PG_DB"),
    user=os.getenv("PG_USER"),
    password=os.getenv("PG_PASSWORD")
)
cur = conn.cursor()

# --- Crear tabla enriquecida ---
print("Integrando catálogo de zonas con OBT...")

cur.execute("CREATE SCHEMA IF NOT EXISTS silver;")
cur.execute("DROP TABLE IF EXISTS silver.obt_trips_enriched;")

cur.execute("""
CREATE TABLE silver.obt_trips_enriched AS
SELECT
    obt.*,
    pickup_z."Borough"  AS pickup_borough,
    pickup_z."Zone"     AS pickup_zone,
    drop_z."Borough"    AS dropoff_borough,
    drop_z."Zone"       AS dropoff_zone
FROM silver.obt_trips obt
LEFT JOIN raw.taxi_zone_lookup pickup_z
    ON obt."PULocationID" = pickup_z."LocationID"
LEFT JOIN raw.taxi_zone_lookup drop_z
    ON obt."DOLocationID" = drop_z."LocationID";
""")

conn.commit()
print("Tabla 'silver.obt_trips_enriched' creada exitosamente con zonas integradas.")

# --- Cierre de conexión ---
cur.close()
conn.close()


Integrando catálogo de zonas con OBT...
Tabla 'silver.obt_trips_enriched' creada exitosamente con zonas integradas.


In [20]:
from sqlalchemy import create_engine
import pandas as pd, os, warnings

# Silenciar warnings molestos
warnings.filterwarnings("ignore", category=UserWarning, module="pandas")

# Crear conexión SQLAlchemy (sin warnings)
pg_url = f"postgresql+psycopg2://{os.getenv('PG_USER')}:{os.getenv('PG_PASSWORD')}@{os.getenv('PG_HOST')}:{os.getenv('PG_PORT')}/{os.getenv('PG_DB')}"
engine = create_engine(pg_url)

# --- Mostrar estructura completa ---
print("Vista de columnas disponibles en silver.obt_trips_enriched:")
query_cols = """
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'silver' AND table_name = 'obt_trips_enriched';
"""
cols_df = pd.read_sql(query_cols, engine)
display(cols_df)

# --- Mostrar una muestra de registros (todas las columnas) ---
print("\nMuestra de registros (primeras 10 filas):")
query_sample = """
SELECT *
FROM silver.obt_trips_enriched
LIMIT 10;
"""
sample_df = pd.read_sql(query_sample, engine)
display(sample_df)


Vista de columnas disponibles en silver.obt_trips_enriched:


Unnamed: 0,column_name,data_type
0,service_type,text
1,VendorID,bigint
2,pickup_datetime,timestamp without time zone
3,dropoff_datetime,timestamp without time zone
4,passenger_count,double precision
5,trip_distance,double precision
6,PULocationID,bigint
7,DOLocationID,bigint
8,payment_type,double precision
9,fare_amount,double precision



Muestra de registros (primeras 10 filas):


Unnamed: 0,service_type,VendorID,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,PULocationID,DOLocationID,payment_type,fare_amount,total_amount,pickup_borough,pickup_zone,dropoff_borough,dropoff_zone
0,yellow,1,2019-01-16 09:11:31,2019-01-16 09:20:47,1.0,1.5,43,238,1.0,8.5,11.15,Manhattan,Central Park,Manhattan,Upper West Side North
1,yellow,2,2019-01-16 09:01:05,2019-01-16 09:12:51,1.0,0.69,170,100,1.0,8.5,11.16,Manhattan,Murray Hill,Manhattan,Garment District
2,yellow,1,2019-01-16 09:22:27,2019-01-16 09:33:44,2.0,1.5,48,43,1.0,9.0,11.8,Manhattan,Clinton East,Manhattan,Central Park
3,yellow,1,2019-01-16 09:29:53,2019-01-16 10:17:58,1.0,12.4,138,162,2.0,42.0,48.56,Queens,LaGuardia Airport,Manhattan,Midtown East
4,yellow,2,2019-01-16 09:17:18,2019-01-16 09:23:53,1.0,0.86,236,262,2.0,6.0,6.8,Manhattan,Upper East Side North,Manhattan,Yorkville East
5,yellow,1,2019-01-16 09:04:50,2019-01-16 09:11:22,1.0,0.5,161,161,1.0,5.5,7.25,Manhattan,Midtown Center,Manhattan,Midtown Center
6,yellow,2,2019-01-16 08:11:37,2019-01-16 08:24:02,1.0,1.88,238,237,1.0,10.0,12.96,Manhattan,Upper West Side North,Manhattan,Upper East Side South
7,yellow,2,2019-01-16 08:25:41,2019-01-16 08:34:05,3.0,3.91,121,93,2.0,52.0,52.8,Queens,Hillcrest/Pomonok,Queens,Flushing Meadows-Corona Park
8,yellow,2,2019-01-16 08:04:56,2019-01-16 08:41:05,1.0,3.73,113,140,1.0,23.5,26.73,Manhattan,Greenwich Village North,Manhattan,Lenox Hill East
9,yellow,1,2019-01-16 07:06:43,2019-01-16 07:15:56,1.0,1.2,100,230,1.0,8.0,9.8,Manhattan,Garment District,Manhattan,Times Sq/Theatre District
