# Proyecto Final Primera Parte: Ingesta

## Imports

In [2]:
import os
import pandas as pd
import yfinance as yf
from sqlalchemy import create_engine, text
from datetime import datetime

## Configuración y Variables de Entorno

In [None]:
# Leer variables 
PG_USER = os.getenv('PG_USER')
PG_PASSWORD = os.getenv('PG_PASSWORD')
PG_HOST = os.getenv('PG_HOST')
PG_PORT = os.getenv('PG_PORT')
PG_DB = os.getenv('PG_DB')
PG_SCHEMA = os.getenv('PG_SCHEMA_RAW')

# Obtener Tickers y fechas
TICKERS = os.getenv('TICKERS').split(',')
START_DATE = os.getenv('START_DATE')
END_DATE = os.getenv('END_DATE')

print(f"Procesando Tickers: {TICKERS}")
print(f"Rango: {START_DATE} a {END_DATE}")

# Crear string de conexión
db_url = f"postgresql://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DB}"
engine = create_engine(db_url)

Procesando Tickers: ['SPY', 'QQQ', 'GLD']
Rango: 2020-01-01 a 2025-12-01


## Función de Descarga y Transformación

In [None]:
def process_ticker(ticker):
    print(f"Descargando {ticker}...")
    # Descarga desde Yahoo Finance
    df = yf.download(ticker, start=START_DATE, end=END_DATE, progress=False, auto_adjust=False)
    
    if df.empty:
        print(f"No se encontraron datos para {ticker}")
        return None

    # Si yfinance devuelve columnas MultiIndex, 
    # extraemos solo el primer nivel ('Close').
    if isinstance(df.columns, pd.MultiIndex):
        df.columns = df.columns.get_level_values(0)

    # Limpieza básica
    df = df.reset_index()
    
    # Estandarizar nombres de columnas a minúsculas y sin espacios
    df.columns = [str(c).lower().replace(' ', '_') for c in df.columns]
    
    # Renombrar 'date' si viene como 'datetime'
    if 'date' not in df.columns and 'datetime' in df.columns:
        df.rename(columns={'datetime': 'date'}, inplace=True)
        
    # Asegurar columnas de metadatos
    df['ticker'] = ticker
    df['run_id'] = 'batch_manual_01'
    df['ingested_at_utc'] = datetime.utcnow()
    df['source_name'] = 'yfinance'
    
    # Mapeo de columnas para asegurar match con DB
    cols_map = {
        'date': 'date',
        'open': 'open',
        'high': 'high',
        'low': 'low',
        'close': 'close',
        'adj_close': 'adj_close',
        'volume': 'volume',
        'ticker': 'ticker',
        'run_id': 'run_id',
        'ingested_at_utc': 'ingested_at_utc',
        'source_name': 'source_name'
    }
    
    # Filtrar solo columnas existentes en el DF que coincidan con el mapa
    available_cols = [c for c in cols_map.keys() if c in df.columns]
    df = df[available_cols].rename(columns=cols_map)
    
    return df

## Carga a Base de Datos (Esquema RAW)

In [None]:
# Limpiar tabla antes de insertar para evitar errores de llave primaria en pruebas
try:
    with engine.begin() as conn:
        for ticker in TICKERS:
            conn.execute(text(f"DELETE FROM {PG_SCHEMA}.prices_daily WHERE ticker = :ticker"), {'ticker': ticker})
except Exception as e:
    print(f"Advertencia al limpiar tabla (puede estar vacía): {e}")

dfs = []

for ticker in TICKERS:
    try:
        df_result = process_ticker(ticker)
        
        if df_result is not None:
            dfs.append(df_result)

            df_result.to_sql(
                'prices_daily',
                engine,
                schema=PG_SCHEMA,
                if_exists='append',
                index=False
            )
            print(f"Guardadas {len(df_result)} filas para {ticker}")

    except Exception as e:
        print(f"Error procesando {ticker}: {e}")

# Guardar CSV consolidado 
if dfs:
    df_all = pd.concat(dfs, ignore_index=True)
    df_all.to_csv("raw_prices_daily.csv", index=False)
    print("CSV consolidado guardado: raw_prices_daily.csv")



Descargando SPY...


  df['ingested_at_utc'] = datetime.utcnow()


Guardadas 1486 filas para SPY
Descargando QQQ...


  df['ingested_at_utc'] = datetime.utcnow()


Guardadas 1486 filas para QQQ
Descargando GLD...


  df['ingested_at_utc'] = datetime.utcnow()


Guardadas 1486 filas para GLD
CSV consolidado guardado: raw_prices_daily.csv


## Verificación

In [8]:

try:
    with engine.connect() as conn:
        query = text(f"SELECT ticker, COUNT(*) as cnt, MIN(date) as min_d, MAX(date) as max_d FROM {PG_SCHEMA}.prices_daily GROUP BY ticker")
        result = conn.execute(query)
        print("\nResumen de Ingesta:")
        for row in result:
            print(row)
except Exception as e:
    print(f"Error en verificación: {e}")


Resumen de Ingesta:
('GLD', 1486, datetime.date(2020, 1, 2), datetime.date(2025, 11, 28))
('SPY', 1486, datetime.date(2020, 1, 2), datetime.date(2025, 11, 28))
('QQQ', 1486, datetime.date(2020, 1, 2), datetime.date(2025, 11, 28))
