In [1]:
#1. imports y carga de variables de entorno
import os
import pandas as pd
import numpy as np
import yfinance as yf
from sqlalchemy import create_engine, text
from datetime import datetime, timezone

# Leer variables de entorno
PG_HOST = os.getenv("PG_HOST")
PG_PORT = os.getenv("PG_PORT")
PG_DB = os.getenv("PG_DB")
PG_USER = os.getenv("PG_USER")
PG_PASSWORD = os.getenv("PG_PASSWORD")
PG_SCHEMA_RAW = os.getenv("PG_SCHEMA_RAW")
RUN_ID = os.getenv("RUN_ID")

TICKERS = os.getenv("TICKERS").split(",")
START_DATE = os.getenv("START_DATE")
END_DATE = os.getenv("END_DATE")

print("Tickers:", TICKERS)
print("Rango:", START_DATE, END_DATE)


Tickers: ['AAPL', 'MSFT', 'TSLA']
Rango: 2019-01-01 2025-11-01


In [2]:
#2. Conexion a postgres y creacion del esquema raw
# Crear engine de SQLAlchemy
engine = create_engine(
    f"postgresql://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DB}"
)

# Crear esquema raw si no existe
with engine.connect() as conn:
    conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {PG_SCHEMA_RAW};"))
    conn.commit()

print("Conectado a Postgres âœ”")


Conectado a Postgres âœ”


In [3]:
# 3. crear tabla raw.prices_daily (si no existe)
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {PG_SCHEMA_RAW}.prices_daily (
    date DATE NOT NULL,
    ticker VARCHAR(20) NOT NULL,
    open DOUBLE PRECISION,
    high DOUBLE PRECISION,
    low DOUBLE PRECISION,
    close DOUBLE PRECISION,
    adj_close DOUBLE PRECISION,
    volume BIGINT,
    run_id VARCHAR(50),
    ingested_at_utc TIMESTAMP,
    source_name VARCHAR(50),
    PRIMARY KEY (date, ticker)
);
"""

with engine.connect() as conn:
    conn.execute(text(create_table_sql))
    conn.commit()

print("Tabla raw.prices_daily lista âœ”")


Tabla raw.prices_daily lista âœ”


In [3]:
#4. Funcion para descargar datos de Yahoo Finance
def download_ticker_prices(ticker, start, end):
    print(f"Descargando {ticker} ...")
    
    df = yf.download(ticker, start=start, end=end, auto_adjust=False)

    # ðŸ”¥ Si las columnas vienen como MultiIndex â†’ aplanarlas
    if isinstance(df.columns, pd.MultiIndex):
        df.columns = df.columns.get_level_values(0)

    if df.empty:
        print(f"âš  No se encontraron datos para {ticker}")
        return None

    df = df.reset_index()

    df = df.rename(columns={
        "Date": "date",
        "Open": "open",
        "High": "high",
        "Low": "low",
        "Close": "close",
        "Adj Close": "adj_close",
        "Volume": "volume",
    })

    df["ticker"] = ticker
    df["run_id"] = RUN_ID
    df["ingested_at_utc"] = datetime.now(timezone.utc)
    df["source_name"] = "yfinance"

    df = df[[
        "date", "ticker", "open", "high", "low", "close",
        "adj_close", "volume", "run_id", "ingested_at_utc", "source_name"
    ]]

    return df


In [4]:
#5. Descargar datos para todos los tickers
all_data = []

for t in TICKERS:
    df = download_ticker_prices(t.strip(), START_DATE, END_DATE)
    if df is not None:
        all_data.append(df)

if len(all_data) == 0:
    raise ValueError("No se pudo descargar ningÃºn ticker.")

# ConcatenaciÃ³n CORRECTA (vertical)
prices_df = pd.concat(all_data, axis=0, ignore_index=True)

# Ordenar
prices_df = prices_df.sort_values(["ticker", "date"]).reset_index(drop=True)

prices_df.head()


Descargando AAPL ...


[*********************100%***********************]  1 of 1 completed


Descargando MSFT ...


[*********************100%***********************]  1 of 1 completed


Descargando TSLA ...


[*********************100%***********************]  1 of 1 completed


Price,date,ticker,open,high,low,close,adj_close,volume,run_id,ingested_at_utc,source_name
0,2019-01-02,AAPL,38.7225,39.712502,38.557499,39.48,37.538822,148158800,initial_ingest,2025-11-29 18:04:16.128999+00:00,yfinance
1,2019-01-03,AAPL,35.994999,36.43,35.5,35.547501,33.799671,365248800,initial_ingest,2025-11-29 18:04:16.128999+00:00,yfinance
2,2019-01-04,AAPL,36.1325,37.137501,35.950001,37.064999,35.242554,234428400,initial_ingest,2025-11-29 18:04:16.128999+00:00,yfinance
3,2019-01-07,AAPL,37.174999,37.2075,36.474998,36.982498,35.164116,219111200,initial_ingest,2025-11-29 18:04:16.128999+00:00,yfinance
4,2019-01-08,AAPL,37.389999,37.955002,37.130001,37.6875,35.83445,164101200,initial_ingest,2025-11-29 18:04:16.128999+00:00,yfinance


In [5]:
#revisamos la estructura del dataframe
prices_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5157 entries, 0 to 5156
Data columns (total 11 columns):
 #   Column           Non-Null Count  Dtype              
---  ------           --------------  -----              
 0   date             5157 non-null   datetime64[ns]     
 1   ticker           5157 non-null   object             
 2   open             5157 non-null   float64            
 3   high             5157 non-null   float64            
 4   low              5157 non-null   float64            
 5   close            5157 non-null   float64            
 6   adj_close        5157 non-null   float64            
 7   volume           5157 non-null   int64              
 8   run_id           5157 non-null   object             
 9   ingested_at_utc  5157 non-null   datetime64[us, UTC]
 10  source_name      5157 non-null   object             
dtypes: datetime64[ns](1), datetime64[us, UTC](1), float64(5), int64(1), object(3)
memory usage: 443.3+ KB


In [7]:
# 6. insertar datos en raw.prices_daily
rows_before = 0
with engine.connect() as conn:
    result = conn.execute(
        text(f"SELECT COUNT(*) FROM {PG_SCHEMA_RAW}.prices_daily")
    )
    rows_before = result.scalar()

prices_df.to_sql(
    "prices_daily",
    engine,
    schema=PG_SCHEMA_RAW,
    if_exists="append",
    index=False,
    method="multi",
    chunksize=1000
)

rows_after = 0
with engine.connect() as conn:
    result = conn.execute(
        text(f"SELECT COUNT(*) FROM {PG_SCHEMA_RAW}.prices_daily")
    )
    rows_after = result.scalar()

print(f"Filas antes de ingesta: {rows_before}")
print(f"Filas despuÃ©s de ingesta: {rows_after}")
print(f"Nuevas filas insertadas: {rows_after - rows_before}")


Filas antes de ingesta: 0
Filas despuÃ©s de ingesta: 5157
Nuevas filas insertadas: 5157


In [8]:
# verificar fechas mÃ­nimas y mÃ¡ximas
with engine.connect() as conn:
    result = conn.execute(text(f"""
        SELECT ticker, MIN(date), MAX(date), COUNT(*)
        FROM {PG_SCHEMA_RAW}.prices_daily
        GROUP BY ticker
        ORDER BY ticker;
    """))

rows = result.fetchall()

for r in rows:
    print(f"{r[0]} â†’ {r[1]} hasta {r[2]} ({r[3]} filas)")


AAPL â†’ 2019-01-02 hasta 2025-10-31 (1719 filas)
MSFT â†’ 2019-01-02 hasta 2025-10-31 (1719 filas)
TSLA â†’ 2019-01-02 hasta 2025-10-31 (1719 filas)
