In [None]:
import os

# Ruta a la carpeta en tu Google Drive
folder_path = '/content/drive/MyDrive/leaking'

# Crear la carpeta si no existe
os.makedirs(folder_path, exist_ok=True)

---
# ***Prediccion de Closing Prices implementando una arquitectura LSTM***
---
<p style="text-align:right"><i>Otoniel Ruiz Morales<br>Machine Learning Enero-Junio 2025</i></p>





In [None]:
import numpy as np
from numpy import array
import matplotlib.pyplot as plt
import yfinance as yf
import pandas as pd
from sys import version

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import RepeatVector, TimeDistributed
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from scipy.stats import pearsonr

def split_sequence(sequence, n_steps_in, n_steps_out):
    X, y = list(), list()
    for i in range(len(sequence)):
        end_ix = i + n_steps_in
        out_end_ix = end_ix + n_steps_out
        if out_end_ix > len(sequence):
            break
        seq_x, seq_y = sequence[i:end_ix], sequence[end_ix:out_end_ix]
        X.append(seq_x)
        y.append(seq_y)
    return array(X), array(y)

"""
Extraemos y preprocesamos los datos
"""
tickers = ["AAPL", "AMZN", "NFLX", "MSFT", "NVDA",
           "EBAY", "CSCO", "INTC", "SIRI","ILMN"]

dataFrames = []

WMA_weights = np.linspace(0.1, 1, 1000)
WMA_window  = 1000
WMA_sum_weigths = np.sum(WMA_weights)

for ticker in tickers:
    data = yf.download(tickers=ticker, start="2005-01-01", end="2025-01-01")
    df = pd.DataFrame(data)
    dataFrames.append(df)

scaler = MinMaxScaler(feature_range=(-1, 1))

for df in dataFrames:
    df["Close"] = scaler.fit_transform(df[["Close"]])
    df["WMA"] = (df["Close"].rolling(window=WMA_window, center=True)
                            .apply(lambda x: np.sum(WMA_weights * x) / WMA_sum_weigths))
    df["SMA"] = df["Close"].rolling(window=WMA_window).mean()
    df["EMA"] = df["Close"].ewm(span=WMA_window).mean()

time_steps  = [0, 900, 1000, 1900, 2000, 2900]

# Configuraciones ventana entrada/salida: (n_steps_in, n_steps_out)
window_configs = [
    (1000, 100),
    (100, 10),
    (10, 2)
]

for df,ticker in zip(dataFrames,tickers):
    for n_steps_in, n_steps_out in window_configs:
        for time_step in time_steps:
            # Evitar desbordamiento de índices
            if time_step + n_steps_in + n_steps_out > len(df):
                continue

            # Extraer secuencia para esta ventana
            seq = df['Close'].iloc[time_step : time_step + n_steps_in + n_steps_out].values
            fecha_i_train = df.index[time_step + n_steps_in - 1].strftime('%d-%m-%Y')
            fecha_f_train = df.index[time_step + n_steps_in + n_steps_out - 1].strftime('%d-%m-%Y')

            fecha_i_test = df.index[n_steps_in + time_step + n_steps_in - 1].strftime('%d-%m-%Y')
            fecha_f_test = df.index[n_steps_in + time_step + n_steps_in + n_steps_out - 1].strftime('%d-%m-%Y')

            # Crear datos supervisados
            X_train, y_train = split_sequence(seq, n_steps_in, n_steps_out)
            X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))

            # Crear modelo nuevo para cada configuración
            model = Sequential()
            model.add(LSTM(100, activation='tanh', return_sequences=True, input_shape=(n_steps_in, 1)))
            model.add(LSTM(100, activation='tanh'))
            model.add(Dense(n_steps_out, activation='tanh'))
            model.compile(optimizer='adam', loss='mse')

            # Entrenar
            history = model.fit(X_train, y_train, epochs=30, verbose=1, validation_split=0)

            # Predicción sobre entrenamiento para graficar ajuste
            y_train_pred = model.predict(y_train, verbose=1)

            # Métricas entrenamiento (global sobre todo el set)
            r2_train = r2_score(y_train.flatten(), y_train_pred.flatten())
            pearson_train, _ = pearsonr(y_train.flatten(), y_train_pred.flatten())
            rmse_train = np.sqrt(mean_squared_error(y_train.flatten(), y_train_pred.flatten()))
            var = np.var(y_train.flatten())


            # Graficar pérdida y ajuste sobre entrenamiento
            plt.figure(figsize=(20, 6))
            plt.subplot(1, 2, 1)
            for i in range(min(5, len(y_train))):
                plt.plot(y_train_pred[i], label='Real', linestyle='-', color='orange')
                plt.plot(y_train[i], label='Predicted', linestyle='--', color='blue')
            plt.suptitle(f'Cosing Price {ticker} - In:{n_steps_in}, Out:{n_steps_out}')
            plt.title(f"Train - Ventana {fecha_i_train} -> {fecha_f_train}")
            plt.xlabel('Tiempo (Dias)')
            plt.ylabel('Precio normalizado (USD)')
            plt.plot(0, label=f'RMSE: {rmse_train:.4f}', linestyle="")
            plt.plot(0, label=f'R²: {r2_train:.4f}', linestyle="")
            plt.plot(0, label=f'Pearson: {pearson_train:.4f}', linestyle="")
            plt.plot(0, label=f'Varianza: {var:.4f}', linestyle="")
            plt.legend()

            # Preparar datos para predicción futura (test)
            start_test = time_step + n_steps_in
            end_test = start_test + n_steps_in
            if end_test > len(df):
                continue
            X_test = df['Close'].iloc[start_test : end_test].values
            X_test = X_test.reshape((1, n_steps_in, 1))

            y_pred = model.predict(X_test, verbose=1)[0]
            y_real = df['Close'].iloc[end_test : end_test + n_steps_out].values

            # Métricas para test
            if len(y_real) == len(y_pred):
                r2_test = r2_score(y_real, y_pred)
                pearson_test, _ = pearsonr(np.squeeze(y_real), y_pred)
                rmse_test = np.sqrt(mean_squared_error(y_real, y_pred))
            else:
                r2_test = np.nan
                pearson_test = np.nan
                rmse_test = np.nan

            plt.subplot(1, 2, 2)
            plt.plot(y_real, label='Real')
            plt.plot(y_pred, label='Predicted')
            plt.title(f'Test - Ventana {fecha_i_test} -> {fecha_f_test}')
            plt.xlabel('Tiempo (Dias)')
            plt.ylabel('Precio normalizado (USD)')
            if not np.isnan(rmse_test):
              plt.plot(0, label=f'RMSE: {rmse_train:.4f}', linestyle="")
              plt.plot(0, label=f'R²: {r2_train:.4f}', linestyle="")
              plt.plot(0, label=f'Pearson: {pearson_train:.4f}', linestyle="")
            else:
              plt.text(0, 0, 'Dim mismatch: métricas no disponibles', fontsize=10, verticalalignment='bottom')
            plt.legend()

            plt.tight_layout()
            file_name = f"{ticker}_In{n_steps_in}_Out{n_steps_out}_t{time_step}.png"
            file_path = os.path.join(folder_path, file_name)

            # Guardar el gráfico
            plt.savefig(file_path)
            plt.show()