# Series de tiempo

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm
import statsmodels.tsa.api as smt


In [None]:
data_sp500 = pd.read_csv("../data/processed/sp500_index.csv")
data_sp500.head()

In [None]:
plt.figure(figsize = (30, 5))
sns.lineplot(x = "Date", y = "S&P500", data = data_sp500)
plt.title("S&P500 Index")
plt.xticks(rotation = 90)
plt.show()

In [11]:
data_sp500.set_index("Date", inplace=True)

In [None]:
from statsmodels.tsa.seasonal import seasonal_decompose

result = seasonal_decompose(data_sp500, model='multiplicative', period=1)
result.plot()


In [None]:
# Determinar si una serie es estacionaria

from typing import Tuple, Optional
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
def test_estacionariedad(
    serie: pd.Series,
    ventana: int = 12,
    titulo: str = "Análisis de Estacionariedad",
    figsize: Tuple[int, int] = (12, 8)
) -> Tuple[bool, float, dict]:
    """Realiza el test de Dickey-Fuller aumentado para determinar estacionariedad.

    Parameters
    ----------
    serie : pd.Series
        Serie temporal a analizar.
    ventana : int, optional
        Tamaño de la ventana para el cálculo de la media móvil, por defecto 12.
    titulo : str, optional
        Título para la gráfica, por defecto "Análisis de Estacionariedad".
    figsize : Tuple[int, int], optional
        Tamaño de la figura, por defecto (12, 8).

    Returns
    -------
    Tuple[bool, float, dict]
        - bool: True si la serie es estacionaria (p-valor < 0.05)
        - float: p-valor del test
        - dict: Resultados completos del test ADF

    Examples
    --------
    >>> import pandas as pd
    >>> import numpy as np
    >>> # Crear una serie temporal no estacionaria
    >>> serie = pd.Series(np.random.normal(0, 1, 100).cumsum())
    >>> es_estacionaria, p_valor, resultados = test_estacionariedad(serie)
    >>> print(f"¿Es estacionaria? {es_estacionaria}")
    >>> print(f"p-valor: {p_valor:.4f}")
    """
    # Calcular estadísticas móviles
    media_movil = serie.rolling(window=ventana).mean()
    std_movil = serie.rolling(window=ventana).std()

    # Realizar el test ADF
    resultado_adf = adfuller(serie.dropna())
    
    # Extraer resultados
    estadistico_adf = resultado_adf[0]
    p_valor = resultado_adf[1]
    valores_criticos = resultado_adf[4]
    
    # Crear diccionario con resultados
    resultados = {
        'estadistico_adf': estadistico_adf,
        'p_valor': p_valor,
        'valores_criticos': valores_criticos,
        'n_observaciones': resultado_adf[3],
        'orden_maximo': resultado_adf[2]
    }

    # Visualizar resultados
    plt.figure(figsize=figsize)
    
    # Gráfica de la serie original y media móvil
    plt.subplot(2, 2, 1)
    plt.plot(serie, label='Serie Original')
    plt.plot(media_movil, label=f'Media Móvil ({ventana} períodos)')
    plt.title('Serie Temporal y Media Móvil')
    plt.legend()
    
    # Gráfica de la desviación estándar móvil
    plt.subplot(2, 2, 2)
    plt.plot(std_movil, label=f'Desviación Estándar Móvil ({ventana} períodos)')
    plt.title('Desviación Estándar Móvil')
    plt.legend()
    
    # ACF
    plt.subplot(2, 2, 3)
    plot_acf(serie, ax=plt.gca(), lags=40)
    plt.title('Función de Autocorrelación (ACF)')
    
    # PACF
    plt.subplot(2, 2, 4)
    plot_pacf(serie, ax=plt.gca(), lags=40)
    plt.title('Función de Autocorrelación Parcial (PACF)')
    
    plt.suptitle(titulo)
    plt.tight_layout()
    plt.show()

    # Imprimir resultados del test
    print('Resultados del Test de Dickey-Fuller Aumentado:')
    print(f'Estadístico ADF: {estadistico_adf:.4f}')
    print(f'p-valor: {p_valor:.4f}')
    print('Valores Críticos:')
    for key, value in valores_criticos.items():
        print(f'\t{key}: {value:.4f}')
    
    # Determinar si la serie es estacionaria
    es_estacionaria = p_valor < 0.05
    
    print(f'\nConclusión: La serie es {"estacionaria" if es_estacionaria else "no estacionaria"}')
    print(f'(p-valor {"<" if es_estacionaria else ">"} 0.05)')

    return es_estacionaria, p_valor, resultados


test_estacionariedad(data_sp500["S&P500"])

In [None]:
diff_sp500 = data_sp500.diff(1).dropna()


In [None]:

result = seasonal_decompose(diff_sp500, model='additive', period=1)
result.plot()

In [None]:
test_estacionariedad(diff_sp500["S&P500"])

In [None]:
print("Fecha máxima: ", np.max(diff_sp500.index))
print("Fecha mínima: ", np.min(diff_sp500.index))



test = diff_sp500.loc[diff_sp500.index >= "2024-01-01"]
train = diff_sp500.loc[(diff_sp500.index < "2024-01-01") & (diff_sp500.index >= "2021-01-01")]





In [None]:
# Modelos arima

from statsmodels.tsa.arima.model import ARIMA

mod = sm.tsa.arima.ARIMA(train, order=(0, 0, 0))
res = mod.fit()

In [None]:
preds = res.predict(start=len(train.index)+1, end=len(train.index) + len(test.index))

In [136]:
train_valores_reales = data_sp500.loc[train.index]
test_valores_reales = data_sp500.loc[test.index]

In [137]:
valor_base = train_valores_reales.tail(1).values[0][0]
preds_finales = []
for i in preds:
    valor_predicho = valor_base + i
    valor_base = valor_predicho
    preds_finales.append(valor_predicho)

In [None]:
from sklearn.metrics import mean_absolute_error
mean_absolute_error(test_valores_reales, preds_finales)





In [None]:
sns.lineplot(x = "Date", y = "S&P500", data = train_valores_reales, label = "Train")
sns.lineplot(x = "Date", y = "S&P500", data = test_valores_reales, label = "Test")
sns.lineplot(x = test_valores_reales.index, y = preds_finales, label = "Predictions")


In [None]:
sns.lineplot(x = "Date", y = "S&P500", data = test_valores_reales, label = "Test")
sns.lineplot(x = test_valores_reales.index, y = preds_finales, label = "Predictions")

In [None]:
from typing import Tuple, Dict, Any, Optional, List
import itertools
import warnings
from statsmodels.tsa.statespace.sarimax import SARIMAX
def encontrar_mejor_arima(
    train,
    test,
    max_p: int = 5,
    max_d: int = 2,
    max_q: int = 5,
    max_P: int = 2,
    max_D: int = 1,
    max_Q: int = 2,
    m: int = 12,
    seasonal: bool = False,
    information_criterion: str = 'aic'
) -> Tuple[Tuple[int, int, int], Tuple[int, int, int], float]:
    """Encuentra los mejores parámetros para el modelo ARIMA/SARIMA.

    Parameters
    ----------
    serie : pd.Series
        Serie temporal a modelar.
    max_p : int, optional
        Máximo orden del componente AR, por defecto 5.
    max_d : int, optional
        Máximo orden de diferenciación, por defecto 2.
    max_q : int, optional
        Máximo orden del componente MA, por defecto 5.
    max_P : int, optional
        Máximo orden del componente AR estacional, por defecto 2.
    max_D : int, optional
        Máximo orden de diferenciación estacional, por defecto 1.
    max_Q : int, optional
        Máximo orden del componente MA estacional, por defecto 2.
    m : int, optional
        Período estacional, por defecto 12.
    seasonal : bool, optional
        Si se debe considerar estacionalidad, por defecto True.
    information_criterion : str, optional
        Criterio de información a usar ('aic' o 'bic'), por defecto 'aic'.

    Returns
    -------
    Tuple[Tuple[int, int, int], Tuple[int, int, int], float]
        - Mejores parámetros (p,d,q)
        - Mejores parámetros estacionales (P,D,Q)
        - Valor del criterio de información
    """
    mejor_criterio = float('inf')
    mejores_parametros = None
    mejores_parametros_estacionales = None

    # Determinar el orden de diferenciación
    d = 0

    # Generar combinaciones de parámetros
    p_values = range(max_p + 1)
    q_values = range(max_q + 1)
    P_values = range(max_P + 1) if seasonal else [0]
    Q_values = range(max_Q + 1) if seasonal else [0]
    D_values = range(max_D + 1) if seasonal else [0]

    # Probar todas las combinaciones
    for p, q, P, Q, D in itertools.product(p_values, q_values, P_values, Q_values, Q_values):
        try:
            modelo = SARIMAX(
                train,
                order=(p, d, q),
                seasonal_order=(P, D, Q, m) if seasonal else (0, 0, 0, 0)
            )
            resultados = modelo.fit(disp=False)
            
            # Obtener criterio de información
            criterio = getattr(resultados, information_criterion)
            
            if criterio < mejor_criterio:
                mejor_criterio = criterio
                mejores_parametros = (p, d, q)
                mejores_parametros_estacionales = (P, D, Q)
                
        except:
            continue

    return mejores_parametros, mejores_parametros_estacionales, mejor_criterio

encontrar_mejor_arima(train, test)

In [None]:
## Exponential Smoothing

from statsmodels.tsa.api import ExponentialSmoothing
fit1 = ExponentialSmoothing(
    train,
    seasonal_periods=10,
    trend="add",
    seasonal="add"
).fit()

preds = fit1.predict(start=len(train.index)+1, end=len(train.index) + len(test.index))

valor_base = train_valores_reales.tail(1).values[0][0]
preds_finales = []
for i in preds:
    valor_predicho = valor_base + i
    valor_base = valor_predicho
    preds_finales.append(valor_predicho)

mean_absolute_error(test_valores_reales, preds_finales)


sns.lineplot(x = "Date", y = "S&P500", data = test_valores_reales, label = "Test")
sns.lineplot(x = test_valores_reales.index, y = preds_finales, label = "Predictions")

In [141]:
train_valores_reales.reset_index(inplace=True)
train_valores_reales.columns = ["ds", "y"]
test_valores_reales.reset_index(inplace=True)
test_valores_reales.columns = ["ds", "y"]


In [None]:
train_valores_reales.head()

In [None]:
from prophet import Prophet

m = Prophet()
m.fit(train_valores_reales)

In [None]:
df_pred = m.predict(test_valores_reales)
df_pred

In [None]:
print(mean_absolute_error(test_valores_reales["y"], df_pred["yhat"]))

sns.lineplot(x = "ds", y = "y", data = test_valores_reales, label = "Test")
sns.lineplot(x = test_valores_reales["ds"], y = df_pred["yhat"], label = "Predictions")

In [243]:
### Probando con LSTM

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from typing import Tuple, Dict, List, Any


def crear_dataset(
    serie: np.ndarray, 
    ventana: int = 1
) -> Tuple[np.ndarray, np.ndarray]:
    """Crea conjuntos de datos X e y para entrenamiento del LSTM.
    
    Parameters
    ----------
    serie : np.ndarray
        Serie temporal univariada.
    ventana : int, optional
        Tamaño de la ventana (lookback), por defecto 1.
        
    Returns
    -------
    Tuple[np.ndarray, np.ndarray]
        X: Datos de entrada de forma (n_muestras, ventana, 1)
        y: Datos objetivo
    """
    X, y = [], []
    for i in range(len(serie) - ventana):
        X.append(serie[i:(i + ventana)])
        y.append(serie[i + ventana])
    return np.array(X), np.array(y)


def preparar_datos(
    serie: pd.Series,
    ventana: int = 6,
    fecha_inicio: str = "2020-01-01",
    escalar: Any = None
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, Any]:
    """Prepara los datos para el modelo LSTM.
    
    Parameters
    ----------
    serie : pd.Series
        Serie temporal a modelar.
    ventana : int, optional
        Tamaño de la ventana, por defecto 6.
    train_size : float, optional
        Proporción de datos para entrenamiento, por defecto 0.8.
    escalar : Any, optional
        Escalador preentrenado. Si es None, se crea uno nuevo.
        
    Returns
    -------
    Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, Any]
        X_train, X_test, y_train, y_test, escalar
    """
    # Convertir a array y reshape para el escalador
    valores = serie.values.reshape(-1, 1)
    
    # Escalar los datos
    if escalar is None:
        escalar = MinMaxScaler(feature_range=(0, 1))
        valores_escalados = pd.DataFrame(escalar.fit_transform(valores), index=serie.index)
    else:
        valores_escalados = pd.DataFrame(escalar.transform(valores), index=serie.index)
    
    # Dividir en train y test

    train = valores_escalados.iloc[valores_escalados.index < fecha_inicio]
    test = valores_escalados[valores_escalados.index >= fecha_inicio]
 
    # Crear datasets para LSTM
    X_train, y_train = crear_dataset(train.values, ventana)
    X_test, y_test = crear_dataset(test.values, ventana)
    
    # Reshape para LSTM [muestras, pasos_tiempo, características]
    X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
    X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
    
    return X_train, X_test, y_train, y_test, escalar


def crear_modelo_lstm(
    ventana: int = 6,
    unidades: int = 50,
    capas: int = 1,
    dropout: float = 0.2
) -> Sequential:
    """Crea un modelo LSTM para series temporales.
    
    Parameters
    ----------
    ventana : int, optional
        Tamaño de la ventana, por defecto 6.
    unidades : int, optional
        Número de unidades en cada capa LSTM, por defecto 50.
    capas : int, optional
        Número de capas LSTM, por defecto 1.
    dropout : float, optional
        Tasa de dropout, por defecto 0.2.
        
    Returns
    -------
    Sequential
        Modelo Keras LSTM
    """
    modelo = Sequential()
    
    # Primera capa LSTM
    if capas == 1:
        modelo.add(LSTM(units=unidades, input_shape=(ventana, 1)))
    else:
        modelo.add(LSTM(units=unidades, return_sequences=True, input_shape=(ventana, 1)))
        modelo.add(Dropout(dropout))
        
        # Capas intermedias
        for _ in range(capas - 2):
            modelo.add(LSTM(units=unidades, return_sequences=True))
            modelo.add(Dropout(dropout))
        
        # Última capa LSTM
        modelo.add(LSTM(units=unidades))
    
    modelo.add(Dropout(dropout))
    modelo.add(Dense(units=1))
    
    modelo.compile(optimizer='adam', loss='mean_squared_error')
    
    return modelo


def entrenar_lstm(
    modelo: Sequential,
    X_train: np.ndarray,
    y_train: np.ndarray,
    X_test: np.ndarray,
    y_test: np.ndarray,
    epocas: int = 100,
    batch_size: int = 32,
    paciencia: int = 10,
    verbose: int = 1,
    ruta_modelo: str = 'mejor_modelo.h5'
) -> Tuple[Sequential, Dict[str, List[float]]]:
    """Entrena el modelo LSTM.
    
    Parameters
    ----------
    modelo : Sequential
        Modelo Keras LSTM.
    X_train : np.ndarray
        Datos de entrenamiento X.
    y_train : np.ndarray
        Datos de entrenamiento y.
    X_test : np.ndarray
        Datos de validación X.
    y_test : np.ndarray
        Datos de validación y.
    epocas : int, optional
        Número de épocas de entrenamiento, por defecto 100.
    batch_size : int, optional
        Tamaño del lote, por defecto 32.
    paciencia : int, optional
        Paciencia para early stopping, por defecto 10.
    verbose : int, optional
        Nivel de verbosidad, por defecto 1.
    ruta_modelo : str, optional
        Ruta para guardar el mejor modelo, por defecto 'mejor_modelo.h5'.
        
    Returns
    -------
    Tuple[Sequential, Dict[str, List[float]]]
        Modelo entrenado e historial de entrenamiento
    """
    # Callbacks
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=paciencia,
        restore_best_weights=True
    )
    
    model_checkpoint = ModelCheckpoint(
        filepath=ruta_modelo,
        monitor='val_loss',
        save_best_only=True
    )
    
    # Entrenar modelo
    historia = modelo.fit(
        X_train, y_train,
        epochs=epocas,
        batch_size=batch_size,
        validation_data=(X_test, y_test),
        callbacks=[early_stopping, model_checkpoint],
        verbose=verbose
    )
    
    return modelo, historia.history


def evaluar_predicciones(
    y_real: np.ndarray,
    y_pred: np.ndarray,
    escalar: Any = None
) -> Dict[str, float]:
    """Evalúa las predicciones del modelo.
    
    Parameters
    ----------
    y_real : np.ndarray
        Valores reales.
    y_pred : np.ndarray
        Valores predichos.
    escalar : Any, optional
        Escalador para invertir la transformación, por defecto None.
        
    Returns
    -------
    Dict[str, float]
        Diccionario con métricas de evaluación
    """
    # Invertir la transformación si se proporciona un escalador
    if escalar is not None:
        y_real = escalar.inverse_transform(y_real.reshape(-1, 1)).flatten()
        y_pred = escalar.inverse_transform(y_pred.reshape(-1, 1)).flatten()
    
    # Calcular métricas
    metricas = {
        'mse': mean_squared_error(y_real, y_pred),
        'rmse': np.sqrt(mean_squared_error(y_real, y_pred)),
        'mae': mean_absolute_error(y_real, y_pred),
        'r2': r2_score(y_real, y_pred)
    }
    
    return metricas


def visualizar_resultados(
    serie: pd.Series,
    real: np.ndarray,
    predicciones: np.ndarray,
    titulo: str = "Predicciones LSTM",
    figsize: Tuple[int, int] = (12, 6)
) -> None:
    """Visualiza los resultados del modelo.
    
    Parameters
    ----------
    serie : pd.Series
        Serie temporal original.
    real : np.ndarray
        Valores reales.
    predicciones : np.ndarray
        Valores predichos.
    titulo : str, optional
        Título del gráfico, por defecto "Predicciones LSTM".
    figsize : Tuple[int, int], optional
        Tamaño de la figura, por defecto (12, 6).
    """
    plt.figure(figsize=figsize)
    
    # Graficar serie original
    plt.plot(serie, label='Serie Original', alpha=0.5)
    
    # Calcular el índice correcto para las predicciones
    offset = len(serie) - len(real)
    idx_predicciones = range(offset, offset + len(predicciones))
    
    # Graficar predicciones
    plt.plot(idx_predicciones, predicciones, label='Predicciones', color='red')
    
    plt.title(titulo)
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()


def pronosticar_futuro(
    modelo: Sequential,
    serie: np.ndarray,
    ventana: int,
    n_pasos: int,
    escalar: Any
) -> np.ndarray:
    """Realiza pronósticos futuros con el modelo LSTM.
    
    Parameters
    ----------
    modelo : Sequential
        Modelo LSTM entrenado.
    serie : np.ndarray
        Serie temporal escalada.
    ventana : int
        Tamaño de la ventana.
    n_pasos : int
        Número de pasos a pronosticar.
    escalar : Any
        Escalador para invertir la transformación.
        
    Returns
    -------
    np.ndarray
        Array con pronósticos
    """
    # Obtener los últimos 'ventana' valores para la predicción inicial
    ultimos_valores = serie[-ventana:].reshape(1, ventana, 1)
    
    # Lista para almacenar pronósticos
    pronosticos = []
    
    # Realizar pronósticos paso a paso
    for _ in range(n_pasos):
        # Predecir el siguiente valor
        siguiente_valor = modelo.predict(ultimos_valores, verbose=0)
        pronosticos.append(siguiente_valor[0, 0])
        
        # Actualizar los valores para la siguiente predicción
        ultimos_valores = np.append(ultimos_valores[:, 1:, :], 
                                    siguiente_valor.reshape(1, 1, 1), 
                                    axis=1)
    
    # Convertir a array y deshacer escalado
    pronosticos = np.array(pronosticos).reshape(-1, 1)
    pronosticos = escalar.inverse_transform(pronosticos).flatten()
    
    return pronosticos

In [187]:
from sklearn.preprocessing import MinMaxScaler
train_lstm = train_valores_reales.set_index("ds")
test_lstm = test_valores_reales.set_index("ds")
esc = MinMaxScaler()

train_lstm["y"] = esc.fit_transform(train_lstm["y"].values.reshape(-1, 1))
test_lstm["y"] = esc.transform(test_lstm["y"].values.reshape(-1, 1))
x_train, y_train = crear_dataset(train_lstm["y"].values, ventana=1)
x_test, y_test = crear_dataset(test_lstm["y"].values, ventana=1)

x_train = x_train.reshape(x_train.shape[0], x_train.shape[1], 1)
x_test = x_test.reshape(x_test.shape[0], x_test.shape[1], 1)







In [None]:
modelo = Sequential()
    
    
    
modelo.add(LSTM(units=256, return_sequences=True, input_shape=(1, 1)))
modelo.add(Dropout(0.2))


modelo.add(LSTM(units=512, return_sequences=True))
modelo.add(Dropout(0.2))

# Última capa LSTM
modelo.add(LSTM(units=512))

modelo.add(Dropout(0.2))
modelo.add(Dense(units=1))

modelo.compile(optimizer='adam', loss='mean_absolute_error')


modelo.fit(x_train, y_train, epochs=50, batch_size=128)

In [None]:
preds = esc.inverse_transform(modelo.predict(x_test))

In [None]:
mean_absolute_error(preds, test_valores_reales["y"].iloc[:-1])

In [None]:


sns.lineplot(x = "ds", y = "y", data = test_valores_reales.iloc[test_valores_reales["y"].iloc[:-1].index], label = "Test")



sns.lineplot(x = test_valores_reales["ds"].iloc[:-1], y = preds[:,0], label = "Predictions")

In [None]:
# Entrenando el modelo para acciones


data_stocks = pd.read_csv("../data/processed/sp500_stocks.csv")
data_stocks.head()

In [None]:
acciones_disponibles = data_stocks["Symbol"].unique()

amazon = data_stocks[data_stocks["Symbol"] == "NVDA"]
amazon = amazon.set_index("Date")
X_train, X_test, y_train, y_test, escalar = preparar_datos(amazon["Close"],1,"2024-01-01")

modelo = Sequential()
    

modelo.add(LSTM(units=256, return_sequences=True, input_shape=(1, 1)))
modelo.add(Dropout(0.2))


modelo.add(LSTM(units=512, return_sequences=True))
modelo.add(Dropout(0.2))

# Última capa LSTM
modelo.add(LSTM(units=512))

modelo.add(Dropout(0.2))
modelo.add(Dense(units=1))

modelo.compile(optimizer='adam', loss='mean_absolute_error')


modelo.fit(X_train, y_train, epochs=50, batch_size=128)

preds = escalar.inverse_transform(modelo.predict(X_test))


In [None]:
sns.lineplot(data = amazon.iloc[(amazon.index >= "2024-01-01") & (amazon.index < np.max(amazon.index))], x = amazon.iloc[(amazon.index >= "2024-01-01") & (amazon.index < np.max(amazon.index))].index, y = amazon.iloc[(amazon.index >= "2024-01-01") & (amazon.index < np.max(amazon.index))]["Close"], label = "Real")
sns.lineplot(x = amazon.iloc[(amazon.index >= "2024-01-01") & (amazon.index < np.max(amazon.index))].index, y = preds[:,0], label = "Predictions")

In [None]:
x_test

In [None]:
n = 100
last_value_available = np.array([[[1.87781358]]])
predictions = []
for i in range(n):
    last_value_available = modelo.predict(np.array(last_value_available))
    last_value_available = last_value_available.reshape(1,1,1)
    predictions.append(escalar.inverse_transform(last_value_available.reshape(1,1))[0][0])


In [None]:
sns.lineplot(x = range(0, len(predictions)), y = predictions)