In [None]:
##FinancialDataLoader##

"""
Contiene la clase DataLoader que implementa:

    Descarga de datos financieros con caché.
    Procesamiento en paralelo.
    Funciones de visualización de precios.
"""   


In [None]:
#Importar librerías

import pandas_datareader as pdr
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor  # Se importan ambos para escoger según necesidad
import logging
from typing import List, Optional, Dict, Tuple
import yfinance as yf
import os
import pickle
import multiprocessing
from pathlib import Path

In [None]:
# Configuración de logging para ver mensajes informativos en consola

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

In [None]:
#Clase DataLoader

class DataLoader:
    def __init__(self, fecha_inicio: str = "2020-01-01", 
                 fecha_final: str = "2025-01-01",
                 cache_dir: str = "data_cache"):
        """
        Inicializa el cargador de datos financieros.
        
        Se convierten las fechas de entrada a objetos datetime para mayor robustez
        y se mantiene la versión en string para usarla en nombres de archivos de caché.
        
        Args:
            fecha_inicio: Fecha de inicio para los datos (formato "YYYY-MM-DD").
            fecha_final: Fecha final para los datos (formato "YYYY-MM-DD").
            cache_dir: Directorio para almacenar la caché de datos.
        """
        # Guardamos ambas representaciones: string y datetime
        self.fecha_inicio_str = fecha_inicio
        self.fecha_final_str = fecha_final
        self.fecha_inicio = pd.to_datetime(fecha_inicio)
        self.fecha_final = pd.to_datetime(fecha_final)
        
        # Convertimos el directorio de caché a objeto Path y lo creamos (incluyendo subdirectorios si es necesario)
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        
    def _get_cache_path(self, ticker: str, source: str) -> Path:
        """
        Genera la ruta del archivo de caché para un ticker y fuente específicos.
        
        Se usa la representación en string de las fechas para la consistencia del nombre.
        """
        return self.cache_dir / f"{ticker}_{source}_{self.fecha_inicio_str}_{self.fecha_final_str}.pkl"
    
    def _load_from_cache(self, ticker: str, source: str) -> Optional[pd.DataFrame]:
        """
        Intenta cargar datos desde la caché.
        
        Se recomienda cargar solo archivos de fuentes de confianza, ya que el uso de pickle
        puede ser riesgoso si se cargan datos no verificados.
        """
        cache_path = self._get_cache_path(ticker, source)
        if cache_path.exists():
            try:
                with open(cache_path, 'rb') as f:
                    data = pickle.load(f)
                logger.info(f"Datos cargados desde caché para {ticker} de {source}")
                return data
            except Exception as e:
                # Se recomienda capturar excepciones específicas en producción para mayor control
                logger.warning(f"Error al cargar caché para {ticker}: {e}")
        return None
    
    def _save_to_cache(self, ticker: str, source: str, data: pd.DataFrame) -> None:
        """
        Guarda los datos en caché usando pickle.
        """
        cache_path = self._get_cache_path(ticker, source)
        try:
            with open(cache_path, 'wb') as f:
                pickle.dump(data, f)
            logger.info(f"Datos guardados en caché para {ticker} de {source}")
        except Exception as e:
            logger.warning(f"Error al guardar caché para {ticker}: {e}")

    def _get_data_from_source(self, ticker: str, source: str) -> Optional[pd.DataFrame]:
        """
        Obtiene datos de una fuente específica, utilizando caché si está disponible.
        
        Se utilizan dos fuentes:
          - "stooq" a través de pandas_datareader.
          - "yahoo" a través de yfinance.
        
        Las fechas se pasan como objetos datetime.
        """
        # Intentar cargar datos desde la caché
        cached_data = self._load_from_cache(ticker, source)
        if cached_data is not None:
            return cached_data
        
        try:
            if source == "stooq":
                # pdr.get_data_stooq utiliza los parámetros de fecha
                df = pdr.get_data_stooq(symbols=ticker, 
                                        start=self.fecha_inicio, 
                                        end=self.fecha_final)
            elif source == "yahoo":
                # yf.download también acepta objetos datetime
                df = yf.download(ticker, 
                                 start=self.fecha_inicio, 
                                 end=self.fecha_final,
                                 progress=False)
            else:
                logger.error(f"Fuente {source} no soportada")
                return None
            
            # Ordenar el DataFrame cronológicamente usando el índice (fechas)
            df = df.sort_index(ascending=True)
            
            # Guardar en caché para evitar descargas futuras innecesarias
            self._save_to_cache(ticker, source, df)
            
            return df
            
        except Exception as error:
            logger.error(f"Error al obtener datos de {source} para {ticker}: {error}")
            return None

    def _process_ticker(self, args: Tuple[str, str]) -> Optional[Tuple[str, pd.DataFrame]]:
        """
        Función auxiliar para el procesamiento en paralelo.
        
        Args:
            args: Una tupla que contiene el ticker y la fuente.
            
        Returns:
            Una tupla (clave, DataFrame) si la descarga es exitosa, o None en caso de error.
        """
        ticker, source = args
        df = self._get_data_from_source(ticker, source)
        if df is not None:
            return (f"{ticker}_{source}", df)
        return None

    def load_parallel(self, tickers: List[str], 
                      sources: List[str] = ["stooq", "yahoo"],
                      executor_type: str = "thread") -> Dict[str, pd.DataFrame]:
        """
        Carga datos en paralelo usando un pool de hilos o procesos.
        
        Debido a que la descarga de datos es una operación IO-bound, se recomienda por defecto
        usar hilos (ThreadPoolExecutor). Se puede cambiar a procesos pasando executor_type="process".
        
        Args:
            tickers: Lista de símbolos (tickers) a descargar.
            sources: Lista de fuentes desde las cuales se descargan los datos.
            executor_type: "thread" para ThreadPoolExecutor o "process" para ProcessPoolExecutor.
            
        Returns:
            Un diccionario donde la clave es el ticker concatenado con la fuente y el valor es el DataFrame.
        """
        results = {}
        # Se genera la lista de tareas para cada combinación de ticker y fuente
        tasks = [(ticker, source) for ticker in tickers for source in sources]
        
        # Determinar el número óptimo de trabajadores basado en el número de CPUs o tareas
        num_cpus = multiprocessing.cpu_count()
        num_workers = min(num_cpus, len(tasks))
        
        # Seleccionar el tipo de ejecutor según el argumento
        if executor_type.lower() == "process":
            Executor = ProcessPoolExecutor
            logger.info("Usando ProcessPoolExecutor para procesamiento en paralelo")
        else:
            Executor = ThreadPoolExecutor
            logger.info("Usando ThreadPoolExecutor para procesamiento en paralelo")
        
        # Ejecutar las tareas en paralelo
        with Executor(max_workers=num_workers) as executor:
            for result in executor.map(self._process_ticker, tasks):
                if result:
                    key, df = result
                    results[key] = df
                    
        return results

    def plot_prices(self, data: Dict[str, pd.DataFrame], 
                    column: str = "Close") -> None:
        """
        Grafica los precios de cierre de múltiples activos.
        
        Args:
            data: Diccionario de DataFrames con datos financieros.
            column: Columna que se desea graficar (por defecto "Close").
        """
        # Se establece un estilo moderno para el gráfico
        plt.style.use('seaborn')
        plt.figure(figsize=(22, 12))
        
        # Graficar la columna indicada para cada activo
        for key, df in data.items():
            if column in df.columns:
                df[column].plot(label=key, linewidth=2, alpha=0.8)
            else:
                logger.warning(f"La columna '{column}' no se encontró en los datos de {key}")
                
        plt.title("Precios de Cierre", size=25, pad=20)
        plt.xlabel("Fecha", size=20)
        plt.ylabel("Precios", size=20)
        plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.show()

In [None]:
def setup_notebook():
    """
    Configura el entorno del notebook para mostrar gráficos inline.
    
    Esto es útil cuando se ejecuta en Jupyter.
    """
    import IPython
    IPython.get_ipython().run_line_magic('matplotlib', 'inline')
    plt.style.use('seaborn')
    

In [None]:
def main():
    """
    Ejemplo de uso del DataLoader.
    
    Se configura el entorno (en caso de estar en un notebook), se descargan datos de varios
    tickers en paralelo y se grafican los precios de cierre.
    """
    # Configurar el entorno del notebook
    try:
        setup_notebook()
    except Exception as e:
        logger.warning(f"No se pudo configurar el notebook: {e}")
    
    # Inicializar el cargador de datos
    loader = DataLoader()
    
    # Definir una lista de tickers a procesar
    tickers = ["AMZN", "AAPL", "MSFT", "GOOGL", "META"]
    
    # Cargar datos en paralelo utilizando ThreadPoolExecutor (por defecto IO-bound)
    logger.info("Iniciando carga de datos...")
    data = loader.load_parallel(tickers, executor_type="thread")
    logger.info(f"Datos cargados exitosamente para {len(data)} combinaciones de ticker y fuente")
    
    # Graficar los precios de cierre
    loader.plot_prices(data)

if __name__ == "__main__":
    main()