In [1]:
## Pipeline de Ingesta de Datos Financieros desde Yahoo Finance

In [2]:
!pip install yfinance sqlalchemy psycopg2-binary pandas

Collecting yfinance
  Downloading yfinance-0.2.66-py2.py3-none-any.whl.metadata (6.0 kB)
Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.11-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl.metadata (4.9 kB)
Collecting multitasking>=0.0.7 (from yfinance)
  Downloading multitasking-0.0.12.tar.gz (19 kB)
  Preparing metadata (setup.py) ... [?25ldone
Collecting frozendict>=2.3.4 (from yfinance)
  Downloading frozendict-2.4.7-py3-none-any.whl.metadata (23 kB)
Collecting peewee>=3.16.2 (from yfinance)
  Downloading peewee-3.18.3.tar.gz (3.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Collecting curl_cffi>=0.7 (from yfinance)
  Downloading curl_cffi-0.13.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aa

In [None]:
#Importes

In [3]:
import os
import pandas as pd
import yfinance as yf
from sqlalchemy import create_engine, text
from datetime import datetime
import time

In [6]:
#Descarga de datos

In [5]:
TICKER = os.getenv('TICKER')
START_DATE = os.getenv('START_DATE')
END_DATE = os.getenv('END_DATE')
end_dt = datetime.strptime(END_DATE, '%Y-%m-%d').date()

max_retries = 4
for attempt in range(max_retries):
    try:
        print(f"Descargando datos para {TICKER} (Intento {attempt + 1}/{max_retries})...")
        df_raw = yf.download(
            tickers=TICKER, 
            start=START_DATE, 
            end=end_dt + pd.Timedelta(days=1), 
            interval='1d',
            progress=False,
            auto_adjust=False
        )

        # Asegurar que las columnas no sean MultiIndex
        if isinstance(df_raw.columns, pd.MultiIndex):
            df_raw.columns = df_raw.columns.get_level_values(0)

        df_raw.reset_index(inplace=True)
        # Añadir columna ticker
        df_raw['ticker'] = TICKER
        print(f"Filas {TICKER}: {len(df_raw)}")
        break  
        
    except Exception as e:
        print(f"Error al descargar {TICKER} (intento {attempt + 1}): {e}")
        time.sleep(5 * (attempt + 1))
        if attempt == max_retries - 1:
            print(f"No se pudo descargar {TICKER} después de {max_retries} intentos.")


Descargando datos para IBM (Intento 1/4)...
Filas IBM: 1496


In [9]:
#Agregar metadatos
df_raw['ingested_at_utc'] = datetime.utcnow()
df_raw['run_id'] = datetime.now().strftime('run_%Y%m%d_%H%M')
df_raw['source_name'] = 'yahoo_finance'

In [None]:
# Estandarizácion de nombres 

In [11]:
mapping = {
    'Date': 'date',         # Columna generada por reset_index
    'Open': 'open',
    'High': 'high',
    'Low': 'low',
    'Close': 'close',
    'Adj Close': 'adj_close',
    'Volume': 'volume'
}
df_raw.columns = [mapping.get(col, str(col).lower()) for col in df_raw.columns]

print(f"Columnas renombradas: {df_raw.columns.tolist()}")
df_raw.head()

Columnas renombradas: ['date', 'adj_close', 'close', 'high', 'low', 'open', 'volume', 'ticker', 'ingested_at_utc', 'run_id', 'source_name']


Unnamed: 0,date,adj_close,close,high,low,open,volume,ticker,ingested_at_utc,run_id,source_name
0,2020-01-02,100.013763,129.46463,129.942642,128.843216,129.063095,3293436,IBM,2025-12-14 00:42:43.575547,run_20251214_0042,yahoo_finance
1,2020-01-03,99.216125,128.432129,128.92926,127.686424,127.695984,2482890,IBM,2025-12-14 00:42:43.575547,run_20251214_0042,yahoo_finance
2,2020-01-06,99.038879,128.202682,128.336517,127.342255,127.552582,2537073,IBM,2025-12-14 00:42:43.575547,run_20251214_0042,yahoo_finance
3,2020-01-07,99.105324,128.288712,129.024857,127.533463,127.810707,3232977,IBM,2025-12-14 00:42:43.575547,run_20251214_0042,yahoo_finance
4,2020-01-08,99.932503,129.359467,129.885284,128.030594,128.59465,4545916,IBM,2025-12-14 00:42:43.575547,run_20251214_0042,yahoo_finance


In [None]:
#Motor de SQLAlchemy

In [13]:
PG_USER = os.getenv('PG_USER')
PG_PASSWORD = os.getenv('PG_PASSWORD')
PG_HOST = os.getenv('PG_HOST')
PG_PORT = os.getenv('PG_PORT')
PG_DB = os.getenv('PG_DB')

engine = create_engine(f"postgresql://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DB}")

In [None]:
# Días bursátiles sin datos (deben ser pocos y explicados).

In [15]:
min_date = df_raw['date'].min()
max_date = df_raw['date'].max()

full_days = pd.date_range(
    start=min_date, 
    end=max_date, 
    freq='B' #Dias hábiles
)

df_dates = set(df_raw['date'].dt.normalize())
missing_days = full_days[~full_days.isin(df_dates)]

for day in missing_days:
    print(f"  - Falta el día: {day.strftime('%Y-%m-%d')}")

  - Falta el día: 2020-01-20
  - Falta el día: 2020-02-17
  - Falta el día: 2020-04-10
  - Falta el día: 2020-05-25
  - Falta el día: 2020-07-03
  - Falta el día: 2020-09-07
  - Falta el día: 2020-11-26
  - Falta el día: 2020-12-25
  - Falta el día: 2021-01-01
  - Falta el día: 2021-01-18
  - Falta el día: 2021-02-15
  - Falta el día: 2021-04-02
  - Falta el día: 2021-05-31
  - Falta el día: 2021-07-05
  - Falta el día: 2021-09-06
  - Falta el día: 2021-11-25
  - Falta el día: 2021-12-24
  - Falta el día: 2022-01-17
  - Falta el día: 2022-02-21
  - Falta el día: 2022-04-15
  - Falta el día: 2022-05-30
  - Falta el día: 2022-06-20
  - Falta el día: 2022-07-04
  - Falta el día: 2022-09-05
  - Falta el día: 2022-11-24
  - Falta el día: 2022-12-26
  - Falta el día: 2023-01-02
  - Falta el día: 2023-01-16
  - Falta el día: 2023-02-20
  - Falta el día: 2023-04-07
  - Falta el día: 2023-05-29
  - Falta el día: 2023-06-19
  - Falta el día: 2023-07-04
  - Falta el día: 2023-09-04
  - Falta el d

In [None]:
 #Crear datos y estandarizacion de tipos

In [16]:
RAW_TABLE_NAME=os.getenv('RAW_TABLE_NAME')
RAW_SCHEMA = os.getenv('RAW_SCHEMA') 

table_create_query = text(f"""
                CREATE TABLE IF NOT EXISTS {RAW_SCHEMA}.{RAW_TABLE_NAME} (
                    date DATE,
                    ticker VARCHAR(20) NOT NULL,
                    open DOUBLE PRECISION,
                    high DOUBLE PRECISION,
                    low DOUBLE PRECISION,
                    close DOUBLE PRECISION,
                    adj_close DOUBLE PRECISION,
                    volume BIGINT,
                    source_name VARCHAR(50),
                    ingested_at_utc TIMESTAMP,
                    run_id VARCHAR(50)
                );
            """)

# Reintentos para asegurar existencia de la tabla
print(f"Iniciando carga para ({min_date} a {max_date})")

#Se mantienen los mismos retries
for attempt in range(max_retries):
    try:
        with engine.begin() as conn:
            conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {RAW_SCHEMA};"))
            conn.execute(table_create_query)
        break
    except Exception as e:
        print(f"Error al asegurar tabla (intento {attempt + 1}): {e}")
        time.sleep(5 * (attempt + 1))
        if attempt == max_retries - 1:
            raise  


Iniciando carga para (2020-01-02 00:00:00 a 2025-12-12 00:00:00)


In [None]:
#Carga en postgress

In [18]:
print(f"Cargando datos en {RAW_SCHEMA}.{RAW_TABLE_NAME}")

delete_query = text(f"""
    DELETE FROM {RAW_SCHEMA}.{RAW_TABLE_NAME}
    WHERE ticker = :ticker
    AND date >= :min_date 
    AND date <= :max_date
    """)   

# Reintentos para inserción de datos
for attempt in range(max_retries):
    try:
        with engine.begin() as conn:
            # Limpiar datos previos
            for current_ticker in TICKER.split(','):
                result = conn.execute(delete_query, {
                    'ticker': current_ticker,
                    'min_date': min_date,
                    'max_date': max_date
                })
                print(f"Filas eliminadas previamente de {current_ticker}: {result.rowcount}")

        # Insertar nuevos datos
        df_raw.to_sql(
            name=RAW_TABLE_NAME,
            con=engine,
            schema=RAW_SCHEMA,
            if_exists='append', # 'replace' si quieres borrar todo cada vez, 'append' para historial
            index=False,
            chunksize=1000 # Insertar por lotes para no saturar memoria
        )
        print(f"Filas insertadas: {len(df_raw)}")
        break
    except Exception as e:
        print(f"Error durante la insercion (intento {attempt + 1}): {e}")
        # esperar antes de reintentar
        time.sleep(5 * (attempt + 1))
        if attempt == max_retries - 1:
            raise



Cargando datos en raw.prices_daily
Filas eliminadas previamente de IBM: 1006
Filas insertadas: 1496


In [14]:
df_dates_all = pd.DataFrame()
for TICKER in TICKERS.split(','):
    ticker_obj = yf.Ticker(TICKER)
    earnings_df = ticker_obj.get_earnings_dates(limit=20) # Trae los últimos 20 trimestres


    # yfinance devuelve la fecha en el índice con zona horaria, hay que limpiarlo
    earnings_df.reset_index(inplace=True)
    earnings_df.rename(columns={'Earnings Date': 'earnings_date', 'EPS Estimate': 'eps_estimate', 'Reported EPS': 'reported_eps'}, inplace=True)

    # Nos aseguramos de que sea solo fecha (sin hora) para cruzar con precios
    earnings_df['earnings_date'] = pd.to_datetime(earnings_df['earnings_date']).dt.date
    earnings_df['ticker'] = TICKER

    # Solo nos importan las columnas clave
    df_earnings_clean = earnings_df[['earnings_date', 'ticker', 'eps_estimate', 'reported_eps']].copy()
    df_earnings_clean.head(10)

    df_dates_all = pd.concat([df_dates_all, df_earnings_clean], ignore_index=True)

df_dates_all.head(10)


# Subir a postgres raw.earnings_dates
table_name = 'earnings_dates'
schema ='raw'
print(f"Cargando datos en {schema}.{table_name}")
# Reintentos para inserción de datos
max_retries = 3
for attempt in range(max_retries):
    try:
        # Insertar nuevos datos
        df_dates_all.to_sql(
            name=table_name,
            con=engine,
            schema=schema,
            if_exists='replace', # 'replace' si quieres borrar todo cada vez, 'append' para historial
            index=False,
            chunksize=1000 # Insertar por lotes para no saturar memoria
        )
        print(f"Filas insertadas: {len(df_dates_all)}")
        break
    except Exception as e:
        print(f"Error durante la insercion (intento {attempt + 1}): {e}")
        # esperar antes de reintentar
        time.sleep(5 * (attempt + 1))
        if attempt == max_retries - 1:
            raise

ImportError: lxml not found, please install it