<a href="https://colab.research.google.com/github/PierreBond/SP_PREDICTOR/blob/main/predictor2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install numpy pandas matplotlib scikit-learn tensorflow yfinance



In [None]:
import numpy as np
import pandas as pd
import yfinance as yf
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
import tensorflow as tf
from tensorflow.keras.layers import Input, LSTM, Dense, Dropout, LayerNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import matplotlib.pyplot as plt

In [None]:
class StockPredictor:
    def __init__(self, ticker, seq_length=60, epochs=50):
        if not isinstance(ticker, str) or len(ticker) == 0:
            raise ValueError("Ticker must be a non-empty string")
        self.ticker = ticker
        self.seq_length = seq_length
        self.epochs = epochs
        self.scaler = MinMaxScaler(feature_range=(0, 1))
        self.model = None
        self.data = None
        self.X_test = None
        self.y_test = None
        self.split_idx = None
        self.test_data = None
        self.close_scaler = None

    def fetch_data(self, start='2010-01-01', end='2023-12-31'):
        """Fetch OHLCV data with error handling."""
        try:
            self.data = yf.download(self.ticker, start=start, end=end)
            if self.data.empty:
                raise ValueError(f"No data found for ticker: {self.ticker}")
            self.data['Returns'] = self.data['Close'].pct_change()
            self.data['RSI'] = self._compute_rsi(self.data['Close'], window=14)
            self.data['MACD'] = self.data['Close'].ewm(span=12).mean() - self.data['Close'].ewm(span=26).mean()
            self.data.dropna(inplace=True)
            return self.data
        except Exception as e:
            print(f"Error fetching data: {str(e)}")
            raise

    def _compute_rsi(self, series, window=14):
        delta = series.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window).mean()
        return 100 - (100 / (1 + gain / loss))

    def preprocess_data(self):
        """Split before scaling to prevent leakage."""
        # Start with simpler features for better generalization
        features = self.data[['Close', 'Volume', 'RSI']].values  # Simplified feature set

        self.split_idx = int(0.8 * len(features))
        train_features, test_features = features[:self.split_idx], features[self.split_idx:]

        train_scaled = self.scaler.fit_transform(train_features)
        test_scaled = self.scaler.transform(test_features)

        X_train, y_train = self._create_sequences(train_scaled)
        X_test, y_test = self._create_sequences(test_scaled)

        self.X_test, self.y_test = X_test, y_test
        self.test_data = self.data.iloc[self.split_idx + self.seq_length:]
        self.close_scaler = MinMaxScaler()  # Separate scaler for inverse transform
        self.close_scaler.fit(features[:, 0].reshape(-1, 1))

        return X_train, X_test, y_train, y_test

    def _create_sequences(self, data):
        X, y = [], []
        for i in range(len(data) - self.seq_length):
            X.append(data[i:i+self.seq_length])
            y.append(data[i+self.seq_length, 0])
        return np.array(X), np.array(y)

    def create_simpler_model(self):
        """Simplified LSTM model without attention for better generalization."""
        inputs = Input(shape=(self.seq_length, 3))  # 3 features: Close, Volume, RSI

        # Simplified architecture with regularization
        x = LSTM(32, return_sequences=True, dropout=0.3)(inputs)
        x = LayerNormalization()(x)
        x = LSTM(16, dropout=0.3)(x)
        x = Dropout(0.3)(x)

        outputs = Dense(1)(x)

        model = Model(inputs, outputs)
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
            loss='huber',
            metrics=['mae']
        )
        return model

    def train(self):
        X_train, X_test, y_train, y_test = self.preprocess_data()
        self.model = self.create_simpler_model()

        callbacks = [
            EarlyStopping(patience=15, restore_best_weights=True, monitor='val_loss', min_delta=0.001),
            ReduceLROnPlateau(factor=0.5, patience=7, min_lr=1e-7, monitor='val_loss', verbose=1)
        ]

        history = self.model.fit(
            X_train, y_train,
            epochs=self.epochs,
            batch_size=32,
            validation_data=(X_test, y_test),
            callbacks=callbacks,
            verbose=1
        )
        return history

    def predict_with_uncertainty(self, n_samples=10):
        """Monte Carlo dropout for uncertainty estimation."""
        mc_predictions = np.stack([self.model.predict(self.X_test) for _ in range(n_samples)])
        mean = mc_predictions.mean(axis=0)

        # Inverse transform only the close price
        dummy = np.zeros((len(mean), 3))
        dummy[:, 0] = mean.flatten()
        predictions = self.close_scaler.inverse_transform(dummy)[:, 0]

        return predictions

    def backtest(self, predictions, threshold=0.001):  # Reduced threshold
        """Robust backtesting with proper data alignment."""
        test_prices = self.test_data['Close'].values

        min_len = min(len(predictions), len(test_prices))
        predictions = predictions[:min_len]
        test_prices = test_prices[:min_len]

        # More conservative signal generation
        signals = np.where(predictions[:-1] > test_prices[:-1] * (1 + threshold), 1, -1)
        returns = test_prices[1:] / test_prices[:-1] - 1
        strategy_returns = signals * returns - 0.001  # Transaction cost

        # Calculate metrics
        sharpe = np.sqrt(252) * strategy_returns.mean() / strategy_returns.std()
        max_drawdown = (np.maximum.accumulate(1 + strategy_returns) - (1 + strategy_returns)).max()
        total_return = np.prod(1 + strategy_returns) - 1
        annual_return = (1 + total_return) ** (252 / len(strategy_returns)) - 1

        return sharpe, max_drawdown, total_return, annual_return

    def plot_results(self):
        """Plot with properly aligned data and uncertainty bands."""
        if self.test_data is None:
            raise ValueError("Test data not available. Run preprocess_data() first.")

        pred_mean = self.predict_with_uncertainty()

        plt.figure(figsize=(14, 8))

        # Price plot
        plt.subplot(2, 1, 1)
        plt.plot(self.test_data.index, self.test_data['Close'].values,
                label='True Price', linewidth=2)
        plt.plot(self.test_data.index, pred_mean, label='Predicted', alpha=0.8)
        plt.title(f"{self.ticker} Stock Price Prediction")
        plt.ylabel("Price ($)")
        plt.legend()
        plt.grid(True, alpha=0.3)

        # Returns distribution
        plt.subplot(2, 1, 2)
        test_returns = self.test_data['Returns'].values
        plt.hist(test_returns, bins=50, alpha=0.7, edgecolor='black', color='skyblue')
        plt.axvline(np.mean(test_returns), color='red', linestyle='--',
                   label=f'Mean: {np.mean(test_returns):.2%}')
        plt.title("Returns Distribution")
        plt.ylabel("Frequency")
        plt.legend()
        plt.grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

    def evaluate_diagnostics(self):
        """Additional diagnostics to understand model performance."""
        pred_mean = self.predict_with_uncertainty()

        # Check alignment
        print(f"Prediction length: {len(pred_mean)}")
        print(f"Test prices length: {len(self.test_data['Close'])}")

        # Directional accuracy
        actual_next = self.test_data['Close'].values[1:]
        pred_next = pred_mean[:-1]
        direction_correct = np.mean((actual_next > self.test_data['Close'].values[:-1]) ==
                                 (pred_next > self.test_data['Close'].values[:-1]))
        print(f"Directional accuracy: {direction_correct:.2%}")

        # Basic error metrics
        rmse = np.sqrt(mean_squared_error(self.test_data['Close'].values[1:], pred_next))
        print(f"RMSE: ${rmse:.2f}")

        return direction_correct, rmse


In [None]:
        # Initialize and train
        predictor = StockPredictor("AAPL", seq_length=60, epochs=50)
        predictor.fetch_data()
        predictor.train()

In [None]:
# Run these after training completes
pred_mean = predictor.predict_with_uncertainty()
sharpe, drawdown, total_return, annual_return = predictor.backtest(pred_mean)
print(f"Sharpe Ratio: {sharpe:.2f}")
print(f"Max Drawdown: {drawdown:.2%}")
print(f"Total Return: {total_return:.2%}")


[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 39ms/step
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 16ms/step
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m21/21[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
Sharpe Ratio: -0.08
Max Drawdown: 13.76%
Total Return: -100.00%


In [None]:
def check_prediction_alignment(self):
    pred_mean = self.predict_with_uncertainty()
    print(f"First 5 predictions: {pred_mean[:5]}")
    print(f"First 5 actual prices: {self.test_data['Close'].values[:5]}")
    print(f"Prediction range: {pred_mean.min():.2f} to {pred_mean.max():.2f}")
    print(f"Actual price range: {self.test_data['Close'].values.min():.2f} to {self.test_data['Close'].values.max():.2f}")

In [None]:
def check_directional_accuracy(self):
    pred_mean = self.predict_with_uncertainty()
    actual_next = self.test_data['Close'].values[1:]
    pred_next = pred_mean[:-1]

    direction_correct = np.mean((actual_next > self.test_data['Close'].values[:-1]) ==
                             (pred_next > self.test_data['Close'].values[:-1]))
    print(f"Directional accuracy: {direction_correct:.2%}")

    # Check if predictions are systematically high or low
    pred_error = pred_next - self.test_data['Close'].values[:-1]
    print(f"Mean prediction error: {np.mean(pred_error):.4f}")
    print(f"Prediction error std: {np.std(pred_error):.4f}")

In [None]:
def backtest_debug(self, predictions):
    test_prices = self.test_data['Close'].values

    # Very simple backtest to isolate issues
    signals = np.where(predictions[:-1] > test_prices[:-1], 1, -1)
    returns = test_prices[1:] / test_prices[:-1] - 1
    strategy_returns = signals * returns

    print(f"Number of long signals: {np.sum(signals == 1)}")
    print(f"Number of short signals: {np.sum(signals == -1)}")
    print(f"Number of no positions: {np.sum(signals == 0)}")
    print(f"Mean strategy return: {np.mean(strategy_returns):.4f}")
    print(f"Mean actual return: {np.mean(returns):.4f}")

In [None]:
if __name__ == "__main__":
    try:
        # Initialize and train
        predictor = StockPredictor("AAPL", seq_length=60, epochs=50)
        predictor.fetch_data()
        predictor.train()

        # Run comprehensive evaluation
        predictor.plot_diagnostics()

        # Print key metrics
        pred_mean = predictor.predict_with_uncertainty()
        sharpe, drawdown, total_return, annual_return = predictor.backtest(pred_mean)
        direction_acc, rmse = predictor.evaluate_diagnostics()

        print("\n=== Performance Metrics ===")
        print(f"Sharpe Ratio: {sharpe:.2f}")
        print(f"Max Drawdown: {drawdown:.2%}")
        print(f"Total Return: {total_return:.2%}")
        print(f"Annual Return: {annual_return:.2%}")
        print(f"Directional Accuracy: {direction_acc:.2%}")
        print(f"RMSE: ${rmse:.2f}")

    except Exception as e:
        print(f"Error: {str(e)}")

In [None]:
def create_very_simple_model(self):
    inputs = Input(shape=(self.seq_length, 3))
    x = LSTM(16, dropout=0.4)(inputs)
    outputs = Dense(1)(x)
    model = Model(inputs, outputs)
    model.compile(optimizer='adam', loss='mse')
    return model

In [None]:
# After training completes
pred_mean = predictor.predict_with_uncertainty()
sharpe, drawdown, total_return, annual_return = predictor.backtest(pred_mean)
direction_acc, rmse = predictor.evaluate_diagnostics()

In [None]:
def create_very_simple_model(self):
    inputs = Input(shape=(self.seq_length, 3))
    x = LSTM(16, dropout=0.4)(inputs)
    outputs = Dense(1)(x)
    model = Model(inputs, outputs)
    model.compile(optimizer='adam', loss='mse')
    return model

In [None]:
def check_prediction_quality(self):
    pred_mean = self.predict_with_uncertainty()
    actual = self.test_data['Close'].values

    # Check if predictions are systematically high or low
    pred_error = pred_mean - actual
    print(f"Mean prediction error: {np.mean(pred_error):.2f}")
    print(f"Prediction error std: {np.std(pred_error):.2f}")

    # Check if predictions follow trends
    trend_correct = np.mean(np.sign(np.diff(pred_mean)) == np.sign(np.diff(actual)))
    print(f"Trend accuracy: {trend_correct:.2%}")

In [None]:
def analyze_predictions(self):
    pred_mean = self.predict_with_uncertainty()
    actual = self.test_data['Close'].values

    # Check systematic bias
    pred_error = pred_mean - actual
    print(f"Mean prediction error: {np.mean(pred_error):.2f}")
    print(f"Error std: {np.std(pred_error):.2f}")

    # Check trend following
    trend_correct = np.mean(np.sign(np.diff(pred_mean)) == np.sign(np.diff(actual)))
    print(f"Trend accuracy: {trend_correct:.2%}")

    # Plot predictions vs actual
    plt.figure(figsize=(12, 6))
    plt.plot(actual, label='Actual')
    plt.plot(pred_mean, label='Predicted', alpha=0.7)
    plt.legend()
    plt.title('Predictions vs Actual')
    plt.show()