# Leer parámetros desde env (TICKERS, START_DATE, END_DATE). 


In [33]:
# Instalar librerías necesarias (si no están en la imagen base)
!pip install pandas yfinance psycopg2-binary lxml 



# Imports y Configuración de DB

In [34]:
import os
import pandas as pd
import yfinance as yf
from sqlalchemy import create_engine, text
from datetime import datetime
import time

# DB Params
PG_USER = os.getenv('PG_USER')
PG_PASSWORD = os.getenv('PG_PASSWORD')
PG_HOST = os.getenv('PG_HOST') 
PG_PORT = os.getenv('PG_PORT')
PG_DB = os.getenv('PG_DB')

# Business Params
TICKERS = os.getenv('TICKERS')  
START_DATE = os.getenv('START_DATE') 
END_DATE = os.getenv('END_DATE')

# String de conexión SQLAlchemy
db_url = f"postgresql://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DB}"
engine = create_engine(db_url)

print(f"Configuración cargada Rango={START_DATE} a {END_DATE}")

Configuración cargada Rango=2020-01-01 a 2025-11-30


# Extracción (Yahoo Finance)

In [35]:
df_all = []

max_retries = 3

for TICKER in TICKERS.split(','):
    # Reintentos para descarga de datos
    for attempt in range(max_retries):
        try:
            print(f"Descargando datos para {TICKER}...")
            df_raw = yf.download(
                tickers=TICKER, 
                start=START_DATE, 
                end=END_DATE, 
                interval='1d',
                progress=False,
                auto_adjust=False
            )

            # Asegurar que las columnas no sean MultiIndex
            if isinstance(df_raw.columns, pd.MultiIndex):
                df_raw.columns = df_raw.columns.get_level_values(0)

            df_raw.reset_index(inplace=True)
            # Añadir columna ticker
            df_raw['ticker'] = TICKER

            df_all.append(df_raw)
            print(f"Filas descargadas para {TICKER}: {len(df_raw)}")
            break  # si descarga ok, rompe el loop de reintentos
        except Exception as e:
            print(f"Error al descargar {TICKER} (intento {attempt + 1}): {e}")
            time.sleep(5 * (attempt + 1))
            if attempt == max_retries - 1:
                print(f"No se pudo descargar {TICKER} después de {max_retries} intentos.")

# Unir todos los DataFrames
df_all = pd.concat(df_all, ignore_index=True)
print(f"Total filas descargadas: {len(df_all)}")

Descargando datos para AAPL...
Filas descargadas para AAPL: 1486
Descargando datos para NVDA...
Filas descargadas para NVDA: 1486
Descargando datos para AMZN...
Filas descargadas para AMZN: 1486
Descargando datos para GOOGL...
Filas descargadas para GOOGL: 1486
Total filas descargadas: 5944


# Aumento de metadatos

In [36]:
df_all['ingested_at_utc'] = datetime.utcnow()
df_all['run_id'] = datetime.now().strftime('run_%Y%m%d_%H%M')
df_all['source_name'] = 'yahoo_finance'

# Renombrar columnas a minusculas

In [37]:
column_mapping = {
    'Date': 'date',
    'Open': 'open',
    'High': 'high',
    'Low': 'low',
    'Close': 'close',
    'Adj Close': 'adj_close',
    'Volume': 'volume'
}
df_all.rename(columns=column_mapping, inplace=True)
df_all.head()

Price,date,adj_close,close,high,low,open,volume,ticker,ingested_at_utc,run_id,source_name
0,2020-01-02,72.468262,75.087502,75.150002,73.797501,74.059998,135480400,AAPL,2025-12-06 22:44:30.350447,run_20251206_2244,yahoo_finance
1,2020-01-03,71.763725,74.357498,75.144997,74.125,74.287498,146322800,AAPL,2025-12-06 22:44:30.350447,run_20251206_2244,yahoo_finance
2,2020-01-06,72.335556,74.949997,74.989998,73.1875,73.447502,118387200,AAPL,2025-12-06 22:44:30.350447,run_20251206_2244,yahoo_finance
3,2020-01-07,71.995361,74.597504,75.224998,74.370003,74.959999,108872000,AAPL,2025-12-06 22:44:30.350447,run_20251206_2244,yahoo_finance
4,2020-01-08,73.153503,75.797501,76.110001,74.290001,74.290001,132079200,AAPL,2025-12-06 22:44:30.350447,run_20251206_2244,yahoo_finance


# Carga a Postgres

In [38]:
min_date = df_all['date'].min()
max_date = df_all['date'].max()


table_name = 'prices_daily'
schema = os.getenv('RAW_SCHEMA', 'raw')

# Reintentos para asegurar existencia de la tabla
print(f"Iniciando carga para ({min_date} a {max_date})")
max_retries = 3
for attempt in range(max_retries):
    try:
        print(f"Asegurando existencia de tabla {schema}.{table_name}...")
        with engine.begin() as conn:
            conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema};"))
            conn.execute(text(f"""
                CREATE TABLE IF NOT EXISTS {schema}.{table_name} (
                    date DATE,
                    ticker VARCHAR(20),
                    open NUMERIC,
                    high NUMERIC,
                    low NUMERIC,
                    close NUMERIC,
                    adj_close NUMERIC,
                    volume BIGINT,
                    source_name VARCHAR(50),
                    ingested_at_utc TIMESTAMP,
                    run_id VARCHAR(50)
                );
            """))
        break
    except Exception as e:
        print(f"Error al asegurar tabla (intento {attempt + 1}): {e}")
        time.sleep(5 * (attempt + 1))
        if attempt == max_retries - 1:
            raise  
    

print(f"Cargando datos en {schema}.{table_name}")


# Reintentos para inserción de datos
for attempt in range(max_retries):
    try:
        with engine.begin() as conn:
            # Limpiar datos previos para el mismo ticker y rango
            delete_query = text("""
                DELETE FROM raw.prices_daily
            WHERE ticker = :ticker
            AND date >= :min_date 
            AND date <= :max_date
            """
            )   
            for current_ticker in TICKERS.split(','):
                result = conn.execute(delete_query, {
                    'ticker': current_ticker,
                    'min_date': min_date,
                    'max_date': max_date
                })
                print(f"Filas eliminadas previamente de {current_ticker}: {result.rowcount}")

        # Insertar nuevos datos
        df_all.to_sql(
            name=table_name,
            con=engine,
            schema=schema,
            if_exists='append', # 'replace' si quieres borrar todo cada vez, 'append' para historial
            index=False,
            chunksize=1000 # Insertar por lotes para no saturar memoria
        )
        print(f"Filas insertadas: {len(df_all)}")
        break
    except Exception as e:
        print(f"Error durante la insercion (intento {attempt + 1}): {e}")
        # esperar antes de reintentar
        time.sleep(5 * (attempt + 1))
        if attempt == max_retries - 1:
            raise



Iniciando carga para (2020-01-02 00:00:00 a 2025-11-28 00:00:00)
Asegurando existencia de tabla raw.prices_daily...
Cargando datos en raw.prices_daily
Filas eliminadas previamente de AAPL: 1486
Filas eliminadas previamente de NVDA: 1486
Filas eliminadas previamente de AMZN: 1486
Filas eliminadas previamente de GOOGL: 1486
Filas insertadas: 5944


# Carga de earning_dates

In [39]:
df_dates_all = pd.DataFrame()
for TICKER in TICKERS.split(','):
    ticker_obj = yf.Ticker(TICKER)
    earnings_df = ticker_obj.get_earnings_dates(limit=20) # Trae los últimos 20 trimestres


    # yfinance devuelve la fecha en el índice con zona horaria, hay que limpiarlo
    earnings_df.reset_index(inplace=True)
    earnings_df.rename(columns={'Earnings Date': 'earnings_date', 'EPS Estimate': 'eps_estimate', 'Reported EPS': 'reported_eps'}, inplace=True)

    # Nos aseguramos de que sea solo fecha (sin hora) para cruzar con precios
    earnings_df['earnings_date'] = pd.to_datetime(earnings_df['earnings_date']).dt.date
    earnings_df['ticker'] = TICKER

    # Solo nos importan las columnas clave
    df_earnings_clean = earnings_df[['earnings_date', 'ticker', 'eps_estimate', 'reported_eps']].copy()
    df_earnings_clean.head(10)

    df_dates_all = pd.concat([df_dates_all, df_earnings_clean], ignore_index=True)

df_dates_all.head(10)


# Subir a postgres raw.earnings_dates
table_name = 'earnings_dates'
schema = os.getenv('RAW_SCHEMA', 'raw')
print(f"Cargando datos en {schema}.{table_name}")
# Reintentos para inserción de datos
max_retries = 3
for attempt in range(max_retries):
    try:
        # Insertar nuevos datos
        df_dates_all.to_sql(
            name=table_name,
            con=engine,
            schema=schema,
            if_exists='replace', # 'replace' si quieres borrar todo cada vez, 'append' para historial
            index=False,
            chunksize=1000 # Insertar por lotes para no saturar memoria
        )
        print(f"Filas insertadas: {len(df_dates_all)}")
        break
    except Exception as e:
        print(f"Error durante la insercion (intento {attempt + 1}): {e}")
        # esperar antes de reintentar
        time.sleep(5 * (attempt + 1))
        if attempt == max_retries - 1:
            raise

Cargando datos en raw.earnings_dates
Filas insertadas: 100
