In [1]:
# Install and upgrade required packages
!pip install --upgrade yfinance ta pyspark xgboost shap



In [2]:
# ----- Import Dependencies -----

# Standard libraries
import os
import json
import warnings
from typing import Dict, Any, Tuple, List, Union, Sequence
from pathlib import Path

# Third-party libraries for data manipulation and visualization
import numpy as np
import pandas as pd
import yfinance as yf
import xgboost as xgb
import matplotlib.pyplot as plt
from matplotlib.figure import Figure

# Scikit-learn modules for model selection, evaluation, and preprocessing
from sklearn.model_selection import TimeSeriesSplit, train_test_split
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.preprocessing import FunctionTransformer, StandardScaler
from sklearn.base import BaseEstimator

# Technical analysis libraries
from ta.momentum import RSIIndicator
from ta.trend import EMAIndicator, MACD

# Suppress warnings that may clutter the output
warnings.simplefilter(action='ignore', category=pd.errors.SettingWithCopyWarning)

In [3]:
# ----- Fetch Stock Data & Calculate Features -----

def flatten_multi_index(data: pd.DataFrame, ticker: str) -> pd.DataFrame:
    """
    Flatten any two‐level columns and standardize key fields for a single ticker.

    We do this so downstream code can assume simple, consistent column names
    (no nested tuples) and rely on 'Date' being the index for time-series ops.

    When data.columns is a MultiIndex of (field, ticker):
      - Converts to "field_ticker".
      - Removes the "_{ticker}" suffix from Date, Open, High, Low, Close.
      - Sets Date as the index and drops the column.

    Returns the modified DataFrame indexed by Date.
    """
    # Check if the dataframe's columns are a MultiIndex
    if isinstance(data.columns, pd.MultiIndex):
        # Flatten the MultiIndex by merging levels with an underscore
        data.columns = ['_'.join(col).strip() for col in data.columns.values]
        
        # Rename the flattened columns for clarity (e.g., 'Open_TICKER' -> 'Open')
        data.rename(columns={
            f'Date_': 'Date',
            f'Open_{ticker}': 'Open',
            f'High_{ticker}': 'High',
            f'Low_{ticker}': 'Low',
            f'Close_{ticker}': 'Close'
        }, inplace=True)
        
        # Make 'Date' the index and remove its column
        data.index = data["Date"]
        data.drop(['Date'], inplace=True, axis=1)
        
    return data

def calculate_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Generate momentum and trend indicators for forecasting.

    We compute short-term (1/2/3-day) and medium-term (EMA vs. SMA) signals:
      - Momentum_1/2/3 captures recent price swings.
      - Trend_2/3/5 highlights whether price is accelerating or decelerating.
    We then form a composite momentum to blend volatility and direction.

    Returns the original DataFrame with the new feature columns added.
    """
    # Momentum: daily price differences over 1, 2, and 3 days
    # These differences capture the change in closing prices over the specified periods.
    df["Momentum_1"] = df["Close"].diff(periods=1)
    df["Momentum_2"] = df["Close"].diff(periods=2)
    df["Momentum_3"] = df["Close"].diff(periods=3)

    # Trend: Difference between the Exponential Moving Average (EMA) and the Simple Moving Average (SMA)
    # A positive difference indicates that EMA is above SMA, suggesting an upward trend.
    # Using higher spans helps capture longer-term trends.
    df['Trend_2'] = df["Close"].ewm(span=2, adjust=False).mean() - df["Close"].rolling(window=2).mean()
    df['Trend_3'] = df["Close"].ewm(span=3, adjust=False).mean() - df["Close"].rolling(window=3).mean()
    # Note: Trend_4 duplicates the logic of Trend_2, which might be intentional for experimentation
    df['Trend_4'] = df["Close"].ewm(span=2, adjust=False).mean() - df["Close"].rolling(window=2).mean()
    df['Trend_5'] = df["Close"].ewm(span=5, adjust=False).mean() - df["Close"].rolling(window=5).mean()

    # Momentum is a critical feature.
    # To aggregate momentum features from different time scales into a single composite indicator, we combine them as a weighted sum.
    df['Momentum_Combo'] = 0.5 * df['Momentum_1'] + 0.3 * df['Momentum_2'] + 0.2 * df['Momentum_3']

    return df

def fetch_and_process_stock_data(ticker: str) -> pd.DataFrame:
    """
    Fetch and preprocess closing‐price data for forecasting.

    Downloads daily OHLC data from Yahoo Finance, flattens any nested columns,
    and retains only the 'Close' series. It then computes:
      – Daily percent change to capture returns.
      – Momentum features to reflect recent price swings.
      – Trend features to highlight acceleration vs. deceleration.
    Finally, it removes any rows with missing values (e.g., from rolling calculations).

    Returns the cleaned, feature‐engineered DataFrame indexed by Date, ready for modeling.
    """
    # 1. Download data from Yahoo Finance and reset index
    df_yf = yf.download(ticker, start="2023-01-01", end="2024-12-11")
    df_yf.reset_index(inplace=True)
    
    # 2. Select relevant columns
    df = df_yf[['Date', "Open", "High", "Low", "Close"]]
    
    # 3. Flatten MultiIndex columns if present
    df = flatten_multi_index(df, ticker)
    
    # 4. Drop unused columns
    df.drop(['Open', 'High', 'Low'], inplace=True, axis=1)

    # 5. Calculate daily percentage change
    df["Change"] = df["Close"].pct_change() * 100

    # 6. Compute additional features and remove missing values
    df = calculate_features(df)
    df.dropna(inplace=True)

    return df

["Prev_Change_1","Prev_Change_2","Prev_Change_3","Prev_Change_4","Prev_Change_5",
                   "Prev_Change_6","Prev_Change_7","Prev_Change_8","Prev_Change_9","Prev_Change_10",
                   "EMA_50", "EMA_200", "RSI","MACD","Day_of_Week", "Month"]

In [5]:
# ----- Forcast Stock Changes for X Weeks -----

def scale_data(
    df: pd.DataFrame
) -> Tuple[pd.DataFrame, List[str], StandardScaler]:    
    """
    Standardize momentum and trend features for modeling.

    Scales the momentum and trend indicators to zero mean and unit variance so that
    downstream models treat these features on comparable scales. 

    Returns the scaled DataFrame, the list of feature names that were transformed,
    and the fitted scaler instance for any inverse transformations.
    """
    # Define features and label
    feature_columns = ['Momentum_1', "Momentum_2", "Momentum_3",
                         'Trend_2', 'Trend_3', 'Trend_4', 'Trend_5',
                         'Momentum_Combo']
    
    # Standardize the features
    scaler = StandardScaler()
    scaled_array = scaler.fit_transform(df[feature_columns])
    df[feature_columns] = scaled_array

    return df, feature_columns, scaler

def update_with_predicted_close(
    df: pd.DataFrame,
    pred: Union[np.ndarray, pd.Series, Sequence[float]]
) -> pd.DataFrame:    
    """
    Append the next trading day's predicted close and refresh features.

    This extends the series one step at a time using only the forecasted percent change,
    ensuring that all feature computations rely solely on available or predicted values.
    By recalculating features after each prediction and never peeking at actual future
    prices, we prevent data leakage into the model inputs.

    Steps:
      - Compute the new Close using the last known price and pred[0].
      - Advance to the next trading day (skipping the weekend if needed).
      - Insert a new row with the predicted Close.
      - Recalculate daily returns and momentum/trend features over the extended series.

    Returns the DataFrame with the added prediction and updated feature columns.
    """
    # 1. Compute the new predicted 'Close' price based on the last known close
    new_close = df['Close'].iloc[-1] * (1 + (pred[0] / 100))

    # 2. Determine the next trading day: if it's Friday, jump to Monday; else next day
    if df.index[-1].dayofweek == 4:
        next_date = df.index[-1] + pd.Timedelta(days=3)
    else:
        next_date = df.index[-1] + pd.Timedelta(days=1)

    # 3. Duplicate the last row, replace 'Close' with the new value, and assign to next_date
    new_row = df.iloc[-1].copy()
    new_row['Close'] = new_close
    df.loc[next_date] = new_row

    # 4. Recompute daily percentage changes and recalculate features
    df["Change"] = df["Close"].pct_change() * 100
    df = calculate_features(df)

    return df
    
def forecast_stock_changes(
    df: pd.DataFrame,
    scaler: StandardScaler,
    ts: int,
    model: Any,
    features: List[str]
) -> np.ndarray:
    """
    Iteratively simulate live forecasts of percentage changes over `ts` steps.

    This rolling‐window approach mimics real‐time forecasting and prevents data leakage:
      - Predict the next period’s change using only the most recent scaled features.
      - Inverse‐transform features back to their original scale before inserting the prediction.
      - Append the predicted 'Close' and recompute features via `update_with_predicted_close`.
      - Reapply scaling to the extended dataset before the next prediction.

    Returns the 1D array of predicted 'Change' values for the forecast horizon.
    """
    df_forecast = df.copy()
    for i in range(ts):
        # Use the last row of features to generate a prediction.
        X = df_forecast[features].iloc[-1:]
        pred = model.predict(X)

        # Convert features back to original scale before updating.
        df_forecast[features] = scaler.inverse_transform(df_forecast[features])
        
        # Insert the new predicted 'Close' and update features.
        df_forecast = update_with_predicted_close(df_forecast, pred)
        
        # Re-scale the updated features for the next prediction.
        df_forecast[features] = scaler.transform(df_forecast[features])

    # Return the predicted 'Change' values for the forecasted steps.
    return df_forecast['Change'].iloc[-ts:].values

In [6]:
# ----- Train N XGBoost Models for N Stock Tickers -----

def train_models(
    tickers: List[str],
    weeks: int
) -> Tuple[Dict[str, BaseEstimator], Dict[str, Dict[str, float]]]: 
    """
    Train and evaluate XGBoost forecasting models for multiple tickers.

    For each ticker, this function:
      - Fetches and preprocesses historical price data.
      - Scales features and fits an XGBoost model.
      - Performs rolling forecasts over `weeks` successive windows.
      - Records actual vs. predicted percent changes.

    It returns two mappings:
      1. model_metadata: ticker → {'model': estimator, 'scaler': scaler, 'features': [...]}
      2. evaluation:     ticker → week_index → {'Dates', 'actual_change', 'predicted_change'}
    """
    # Initialize dictionaries to store metadata (model, scaler, features) and evaluation results.
    model_metadata = {}
    evaluation = {}

    # Create an evaluation sub-dictionary for each ticker if not present.
    for ticker in tickers:
        if ticker not in evaluation:
            evaluation[ticker] = {}

    # Process each ticker individually
    for ticker in tickers:
        # Load and prepare the data
        df = fetch_and_process_stock_data(ticker)
        df, feature_columns, scaler = scale_data(df)

        # Train the model on this ticker's data
        xgboost_model = train_and_evaluate(df, feature_columns, weeks)
        
        # Store the model, scaler, and feature columns
        model_metadata[ticker] = {
            "model": xgboost_model,
            "scaler": scaler,
            "features": feature_columns
        }

        # Generate and record evaluation metrics for each week in the specified range
        for week in range(weeks):
            # Calculate how many rows to go back (x) to retrieve actual data and make predictions
            x = (weeks + 2 - week) * 5
            
            # Subset df to exclude the last x rows, then keep the relevant portion for predictions
            df_forecast = df.iloc[:-x+5]
            
            # Get actual changes and forecast them with the model
            actual_diffs = df.iloc[-x+5:-x+10]['Change'].values
            pred = forecast_stock_changes(df_forecast, scaler, 5, xgboost_model, feature_columns)

            # Convert arrays to lists for easier serialization or storage
            actual_diffs = actual_diffs.tolist()
            pred = pred.tolist()

            # Retrieve corresponding dates for the actual changes
            actual_dates = df.reset_index()
            actual_dates = actual_dates['Date'].iloc[-x+5:-x+10]

            # Record results (dates, actual changes, predicted changes) for this week
            evaluation[ticker][week + 1] = {
                'Dates': [str(date)[:10] for date in actual_dates],
                "actual_change": actual_diffs,
                "predicted_change": pred
            }

    return model_metadata, evaluation

In [7]:
# ----- Train Individual XGBoost Model for Each Stock Ticker Over X Weeks -----

def train_and_evaluate(
    df: pd.DataFrame,
    features: List[str],
    weeks: int
) -> BaseEstimator:    
    """
    Train an XGBoost regressor on historical features, excluding a hold-out block.

    To prevent lookahead bias, the final (weeks + 1) * 5 rows are dropped and never seen
    during training. The model is then fitted on all prior data.

    Steps:
      - Compute hold-out size = (weeks + 1) * 5 rows.
      - Drop those rows from the DataFrame.
      - Train an XGBRegressor on the remaining data.

    Returns the trained XGBoost regressor.
    """
    # Calculate how many rows to exclude from the end for hold-out
    past = (weeks + 1) * 5
    
    # Truncate the DataFrame to exclude future rows
    df = df.iloc[:-past]

    # Separate features (X) and target (y)
    X = df[features]
    y = df["Change"]

    # Initialize XGBoost regressor with chosen hyperparameters
    xgboost = xgb.XGBRegressor(
        max_depth=4,
        learning_rate=0.01,
        n_estimators=1000,
        subsample=0.8,
        colsample_bytree=0.8,
        random_state=42,
        reg_alpha=0.1,
        reg_lambda=1.0,
    )

    # Train the model on the available data
    xgboost.fit(X, y)

    return xgboost

In [8]:
# ----- Plot Feature Importance for Each Model -----

def plot_feature_importance(
    model: Any,
    feature_names: List[str],
    figsize: Tuple[int, int] = (10, 6)
) -> pd.DataFrame:
    """
    Plot feature importances and return them as a sorted DataFrame.

    Displays a horizontal bar chart (largest importance on top) and returns
    a DataFrame with columns ['Feature', 'Importance'] sorted descending.
    """
    feature_importances = model.feature_importances_

    importance_df = pd.DataFrame({
        'Feature': feature_names,
        'Importance': feature_importances
    }).sort_values(by='Importance', ascending=False)

    plt.figure(figsize=figsize)
    plt.barh(importance_df['Feature'], importance_df['Importance'], align='center')
    plt.xlabel('Feature Importance')
    plt.ylabel('Features')
    plt.title('Feature Importance')
    plt.gca().invert_yaxis()
    plt.show()

    return importance_df

def plot_importances_for_tickers(
    ticker_sample: List[str],
    metadata: Dict[str, Any]
) -> Figure:    
    """
    Plot feature importances for each ticker in the sample.

    For each symbol in `ticker_sample`, this invokes the base
    `plot_importances_for_tickers(model, features)` function to display
    its feature‐importance chart.
    """
    for stock in ticker_sample:
        m = metadata[stock]['model']
        f = metadata[stock]['features']
        plot_feature_importance(m, f)

:Beste Parameter: {'colsample_bytree': 0.8, 'learning_rate': 0.01, 'max_depth': 4, 'n_estimators': 1000, 'reg_alpha': 0.1, 'reg_lambda': 1.0, 'subsample': 0.8}

In [10]:
# Print the current working directory to get filepath
print(os.getcwd())

/Users/benjamingreif/Documents/DocumentsLocal/dataScience/GitHub_Profil/AbschlussprojektNew


In [11]:
# ----- Plot Performance Metrics of all Models -----

def calculate_metrics(
    json_file: Union[str, Path, Dict[str, Any]],
    ticker: str
) -> Tuple[List[Tuple[int, float, float]], float, float]:
    """
    Compute weekly and overall accuracy and error for a ticker’s forecasts.

    Reads prediction data and, for each week, calculates:
      – ADA (directional accuracy)  
      – MAE (mean absolute error)  

    Returns a tuple of (weekly_metrics, total_ada, overall_mae).
    """
    with open(json_file, 'r') as file:
        data = json.load(file)

    stock = ticker
    total_correct_directions = 0
    total_predictions = 0
    total_absolute_error = 0
    weekly_metrics = []

    for week, values in data[stock].items():
        actual_changes = np.array(values["actual_change"])
        predicted_changes = np.array(values["predicted_change"])

        # Calculate Directional Accuracy (Hit Rate)
        correct_directions = np.sum(
            (actual_changes > 0) == (predicted_changes > 0)
        )
        ada = correct_directions / len(actual_changes)
        total_correct_directions += correct_directions
        total_predictions += len(actual_changes)

        # Calculate MAE
        mae = np.mean(np.abs(actual_changes - predicted_changes))
        total_absolute_error += np.sum(np.abs(actual_changes - predicted_changes))
        weekly_metrics.append((week, ada, mae))

    total_ada = total_correct_directions / total_predictions
    overall_mae = total_absolute_error / total_predictions

    return weekly_metrics, total_ada, overall_mae

def plot_forecast_performance(
    json_file: Union[str, Path],
    tickers: List[str]
) -> None:    
    """
    Display weekly and overall ADA and MAE for each ticker from a JSON file.

    Reads `json_file` (a path to a JSON file mapping tickers → weekly predictions)
    and, for each symbol in `tickers`, prints:
      - Directional accuracy (ADA) and MAE for each week
      - Overall ADA and MAE across all weeks

    Finally, prints the average ADA/MAE aggregated across all tickers.
    """
    total_overall_ada = []
    total_overall_mae = []

    for stock in tickers:

        weekly_results, overall_ada, overall_mae = calculate_metrics(json_file, stock)
        total_overall_ada.append(overall_ada)
        total_overall_mae.append(overall_mae)

        print(f"\n{stock} Weekly Metrics (Week, ADA/MAE):")
        for week, ada, mae in weekly_results:
            print(f"Week {week}: ADA = {ada:.2f}, MAE = {mae:.2f}%")

        print(f"\n{stock} Overall Average Directional Accuracy: {overall_ada:.2f}")
        print(f"{stock} Overall MAE: {overall_mae:.2f}")

    print(f"\nTotal Average Directional Accuracy across all {len(tickers)} Stocks: {np.mean(total_overall_ada):.2f}")
    print(f"Total Average MAE across all {len(tickers)} Stocks: {np.mean(total_overall_mae):.2f}%")

def compute_average_directional_accuracy(
    file_path: Union[str, Path]
) -> Dict[int, float]:   
    """
    Compute average directional accuracy for the first five forecast days.

    Reads a JSON of weekly actual vs. predicted changes and returns a mapping
    from day index (0–4) to the fraction of times the predicted and actual 
    change directions matched across all stocks and weeks.
    """
    with open(file_path, 'r') as file:
        data = json.load(file)

    ada_per_day = {day: [] for day in range(5)}

    for stock, weeks in data.items():
        for week, week_data in weeks.items():
            actual_changes = week_data['actual_change']
            predicted_changes = week_data['predicted_change']

            for day in range(len(actual_changes)):
                if day < 5:  # Ensure we stay within the first 5 prediction days
                    actual = actual_changes[day]
                    predicted = predicted_changes[day]

                    # Check if the directions match and record the result
                    ada_per_day[day].append((actual >= 0 and predicted >= 0) or (actual < 0 and predicted < 0))

    # Calculate the average directional accuracy for each day
    average_ada = {}
    for day, results in ada_per_day.items():
        average_ada[day] = sum(results) / len(results) if results else 0

    return average_ada

In [12]:
# ----- Define Balanced Stock Sample by Industry Segments -----

ticker_sample = [
    # E-commerce/Consumer Staples
    "KO",         # The Coca-Cola Company (Beverages, Consumer Staples)
    "AMZN",       # Amazon.com Inc. (E-Commerce, Cloud Computing)

    # Technology
    "IBM",        # International Business Machines Corporation (Cloud Computing, AI, Hardware)
    "SIE.DE",     # Siemens AG (Industrial Manufacturing, Automation)

    # Automotive/Electric Vehicles
    "TSLA",       # Tesla Inc. (Electric Vehicles, Renewable Energy)
    "VOW3.DE",    # Volkswagen AG (Automotive Manufacturing)

    # Financial Services
    "V",          # Visa Inc. (Payment Processing, Financial Services)
    "MUV2.DE",    # Munich Re (Münchener Rückversicherungs-Gesellschaft) (Reinsurance)

    # Chemicals
    "BAS.DE",     # BASF SE (Chemical Manufacturing)
    "BAYN.DE"     # Bayer AG (Pharmaceuticals, Chemicals)
]

In [13]:
# ----- Setup Stock Forecasting Pipeline -----

def run_forecasting(
    weeks: int,
    tickers: List[str],
) -> Tuple[Dict[str, BaseEstimator], Dict[str, Dict[str, float]]]:
    """
    Train models and gather raw forecast results for each ticker.

    Calls `train_models` and returns two mappings:
      - metadata: ticker → {'model': estimator, 'scaler': scaler, 'features': [...]}
      - evaluation: ticker → week → {
            'Dates': [...], 
            'actual_change': [...], 
            'predicted_change': [...]
        }
    """
    metadata, evaluation = train_models(tickers, weeks)
    return metadata, evaluation

def plot_forecasting_results(
    weeks: int,
    tickers: List[str],
    metadata: Dict[str, BaseEstimator],
    evaluation: Dict[str, Dict[str, float]]
) -> None:    
    """
    Save evaluation results, display forecasts, and print average directional accuracy.

    - Writes `evaluation` to "stock_sample_predictions_{weeks}_weeks_new.json".
    - Calls `plot_forecast_performance` to show actual vs. predicted changes.
    - Computes and prints the average directional accuracy for each forecast day.
    """
    file_path = f"stock_sample_predictions_{weeks}_weeks.json"
    print(f"Forecasting {len(tickers)} stocks for {weeks} weeks...")

    # If interested in metadata
    #print("Model metadata:", metadata)
    #print("Evaluation results:", evaluation)

    # If interested plot feature importances
    #plot_importances_for_tickers(ticker_sample, metadata)

    # Write the evaluation data to the JSON file first.
    with open(file_path, "w") as json_file:
        json.dump(evaluation, json_file, indent=4)

    # If interested plot feature importance
    plot_forecast_performance(file_path, tickers)

    # print avverage directional accuracy rate
    ada_results = compute_average_directional_accuracy(file_path)
    print("\nAverage Directional Accuracy per Day:")
    for day, ada in ada_results.items():
        print(f"Day {day+1}: {ada:.2f}")

In [14]:
# ----- Run Stock Forecasting Pipeline -----

weeks = 6
metadata, evaluation = run_forecasting(weeks, ticker_sample)

YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


In [15]:
# ----- Plot Stock Forecasting Results -----
plot_forecasting_results(weeks, ticker_sample, metadata, evaluation)

Forecasting 10 stocks for 6 weeks...

KO Weekly Metrics (Week, ADA/MAE):
Week 1: ADA = 1.00, MAE = 2.03%
Week 2: ADA = 0.60, MAE = 0.69%
Week 3: ADA = 0.40, MAE = 0.91%
Week 4: ADA = 0.80, MAE = 0.70%
Week 5: ADA = 1.00, MAE = 0.62%
Week 6: ADA = 0.40, MAE = 0.92%

KO Overall Average Directional Accuracy: 0.70
KO Overall MAE: 0.98

AMZN Weekly Metrics (Week, ADA/MAE):
Week 1: ADA = 0.80, MAE = 0.94%
Week 2: ADA = 0.60, MAE = 2.53%
Week 3: ADA = 0.40, MAE = 3.07%
Week 4: ADA = 0.60, MAE = 2.23%
Week 5: ADA = 0.60, MAE = 1.80%
Week 6: ADA = 0.80, MAE = 5.02%

AMZN Overall Average Directional Accuracy: 0.63
AMZN Overall MAE: 2.60

IBM Weekly Metrics (Week, ADA/MAE):
Week 1: ADA = 0.60, MAE = 1.66%
Week 2: ADA = 0.60, MAE = 1.58%
Week 3: ADA = 0.20, MAE = 3.03%
Week 4: ADA = 0.60, MAE = 0.96%
Week 5: ADA = 1.00, MAE = 1.72%
Week 6: ADA = 0.60, MAE = 1.07%

IBM Overall Average Directional Accuracy: 0.60
IBM Overall MAE: 1.67

SIE.DE Weekly Metrics (Week, ADA/MAE):
Week 1: ADA = 0.60, MAE = 