In [16]:
import pandas as pd
import numpy as np
import itertools
import plotly.express as px
import plotly.graph_objects as go
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error, r2_score
from rich.console import Console
from rich.progress import Progress

console = Console()
import warnings
# Suppress warnings
warnings.filterwarnings("ignore")

In [17]:
# Load the dataset
df = pd.read_csv("../data/Top_12_German_Companies_Financial_Data.csv")

# Convert Period column to datetime format
df["Period"] = pd.to_datetime(df["Period"], format="%m/%d/%Y")

In [18]:
print("Unique Companies", df["Company"].unique())

Unique Companies ['Volkswagen AG' 'Siemens AG' 'Allianz SE' 'BMW AG' 'BASF SE'
 'Deutsche Telekom AG' 'Daimler AG' 'SAP SE' 'Bayer AG' 'Deutsche Bank AG'
 'Porsche AG' 'Merck KGaA']


In [19]:
def load_data(filepath):
    df = pd.read_csv(filepath)
    df["Period"] = pd.to_datetime(df["Period"], format="%m/%d/%Y")
    return df

def preprocess_data(df, company):
    company_df = df[df["Company"] == company].copy()
    company_df = company_df.sort_values(by="Period")
    company_df.set_index("Period", inplace=True)
    company_df["Revenue"] = pd.to_numeric(company_df["Revenue"], errors="coerce")
    return company_df["Revenue"].dropna()

def split_data(series, train_ratio=0.8):
    split_index = int(len(series) * train_ratio)
    return series[:split_index], series[split_index:]

def find_best_sarima(train, test):
    p_values = range(0, 5)  # AR terms
    d_values = range(0, 3)  # Differencing terms
    q_values = range(0, 3)  # MA terms
    P_values = range(0, 3)  # Seasonal AR terms
    D_values = range(0, 2)  # Seasonal differencing terms
    Q_values = range(0, 2)  # Seasonal MA terms
    S = 4  # Seasonality (quarterly)

    best_score, best_cfg = float("inf"), None

    total_iterations = len(list(itertools.product(p_values, d_values, q_values, P_values, D_values, Q_values)))
    
    console.print(f"🧐 [bold cyan]Starting SARIMA tuning... {total_iterations} combinations to check.[/bold cyan]")

    with Progress() as progress:
        task = progress.add_task("[green]Optimizing SARIMA...", total=total_iterations)

        for p, d, q, P, D, Q in itertools.product(p_values, d_values, q_values, P_values, D_values, Q_values):
            if q == Q:  # Avoid conflicts
                continue  

            try:
                model = SARIMAX(train, order=(p, d, q), seasonal_order=(P, D, Q, S),
                            enforce_stationarity=False, enforce_invertibility=False)
                model_fit = model.fit(disp=False)
                predictions = model_fit.forecast(steps=len(test))
                error = mean_absolute_error(test, predictions)
                if error < best_score:
                    best_score, best_cfg = error, (p, d, q, P, D, Q)

            except:
                continue

            progress.update(task, advance=1)

    console.print(f"✅ [bold green]Best SARIMA order found:[/bold green] {best_cfg} with MAE = {best_score:.2f}")
    
    return best_cfg

def train_sarima(train, best_cfg):
    best_p, best_d, best_q, best_P, best_D, best_Q = best_cfg
    model = SARIMAX(train, order=(best_p, best_d, best_q), seasonal_order=(best_P, best_D, best_Q, 4),
                    enforce_stationarity=False, enforce_invertibility=False)
    return model.fit(disp=False)

def evaluate_model(test, predictions):
    metrics = {
        "MAE": mean_absolute_error(test, predictions),
        "RMSE": np.sqrt(mean_squared_error(test, predictions)),
        "MAPE": mean_absolute_percentage_error(test, predictions),
        "R² Score": r2_score(test, predictions)
    }
    naive_forecast = test.shift(1).dropna()
    mase_denominator = mean_absolute_error(test.loc[naive_forecast.index], naive_forecast) if not naive_forecast.empty else np.nan
    metrics["MASE"] = metrics["MAE"] / mase_denominator if mase_denominator != 0 else np.nan
    return metrics

def plot_forecast(test, predictions, conf_int, company):
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=test.index, y=test, mode='lines+markers', name='Actual Revenue'))
    fig.add_trace(go.Scatter(x=test.index, y=predictions, mode='lines+markers', name='Optimized SARIMA Forecast', line=dict(dash='dash')))
    fig.add_trace(go.Scatter(x=test.index, y=conf_int.iloc[:, 0], fill=None, mode='lines', line=dict(color='lightgray'), name='Lower Bound'))
    fig.add_trace(go.Scatter(x=test.index, y=conf_int.iloc[:, 1], fill='tonexty', mode='lines', line=dict(color='lightgray'), name='Upper Bound'))
    fig.update_layout(title=f"Optimized SARIMA Forecast - {company} Revenue", xaxis_title="Year", yaxis_title="Revenue")
    fig.show()

def plot_residuals(test, predictions, company):
    residuals = test - predictions
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=test.index, y=residuals, mode='lines+markers', name='Residuals', marker=dict(color='green')))
    fig.add_hline(y=0, line=dict(color='black', dash='dash'))
    fig.update_layout(title=f"Residual Plot - SARIMA Model for {company} Revenue", xaxis_title="Year", yaxis_title="Residual (Error)")
    fig.show()


In [20]:
def process_company(company, results):
    print(f"Processing {company}...")
    series = preprocess_data(df, company)
    if len(series) < 10:
        print(f"Skipping {company} due to insufficient data points.")
        return results
    train, test = split_data(series)
    best_cfg = find_best_sarima(train, test)
    print("Best SARIMA configuration found:", best_cfg)
    model_fit = train_sarima(train, best_cfg)
    predictions = model_fit.get_forecast(steps=len(test))
    optimized_predictions, conf_int = predictions.predicted_mean, predictions.conf_int()
    results[company] = evaluate_model(test, optimized_predictions)
    plot_forecast(test, optimized_predictions, conf_int, company)
    plot_residuals(test, optimized_predictions, company)
    return results

In [21]:
list_of_companies = df["Company"].unique()
results = {}

In [27]:
process_company(str(list_of_companies[11]), results)

Processing Merck KGaA...


Output()

Best SARIMA configuration found: (3, 1, 0, 0, 1, 1)


{'Daimler AG': {'MAE': 2803403604.737504,
  'RMSE': np.float64(3229035031.7458644),
  'MAPE': 0.2592787144524177,
  'R² Score': 0.476156175144558,
  'MASE': 0.4088002509751166},
 'SAP SE': {'MAE': 1795966753.3207772,
  'RMSE': np.float64(2276246995.6220593),
  'MAPE': 0.15346896680489733,
  'R² Score': 0.44072214479808214,
  'MASE': 0.4490690656807591},
 'Bayer AG': {'MAE': 1904095941.842971,
  'RMSE': np.float64(2299718936.7021823),
  'MAPE': 0.23220658526728813,
  'R² Score': 0.37888093690197866,
  'MASE': 0.5434079601836702},
 'Deutsche Bank AG': {'MAE': 4650226927.57839,
  'RMSE': np.float64(5801968772.601931),
  'MAPE': 0.31027975021307463,
  'R² Score': -0.4268341425413975,
  'MASE': 0.6351088757138532},
 'Porsche AG': {'MAE': 3060876810.471199,
  'RMSE': np.float64(3823746336.8342195),
  'MAPE': 0.19385020352210683,
  'R² Score': -4.00679160952956,
  'MASE': 1.023251665129631},
 'Merck KGaA': {'MAE': 1785817856.8230712,
  'RMSE': np.float64(2239837431.6204414),
  'MAPE': 0.14346