In [None]:
# ESG-Driven Stock Value Prediction
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, accuracy_score
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore', category=FutureWarning)

# ============ 1. Feature Engineering ============
def create_features(df: pd.DataFrame) -> pd.DataFrame:
    """Creates new features from the raw dataframe."""
    # Composite ESG score
    df["composite_esg"] = (df["env"] + df["social"] + df["governance"]) / 3.0
    
    # Sort by ticker and date to prepare for rolling calculations
    df = df.sort_values(["ticker", "date"])
    
    # 5-period price momentum (percentage change)
    df["momentum_5d"] = df.groupby("ticker")["price"].pct_change(5)
    
    # 10-period rolling mean price
    df["rolling_mean_10"] = df.groupby("ticker")["price"].transform(lambda x: x.rolling(10).mean())
    
    # Drop rows with NaN values resulting from rolling calculations
    df = df.dropna(subset=["momentum_5d", "rolling_mean_10", "composite_esg"])
    return df

# ============ 2. Walk-Forward Backtesting ============
def _make_class_labels(y_true_block):
    """
    Helper function to create classification labels for a block of price data.
    Prices above the median for the block are labeled 1, others are 0.
    """
    median_price = np.median(y_true_block)
    return (y_true_block > median_price).astype(int)

def walk_forward_backtest(df, features, rf_model, log_model, n_splits=5, target_col="price"):
    """
    Performs a walk-forward backtest and returns detailed performance metrics.
    """
    df = df.sort_values("date")
    dates = df["date"].unique()
    split_size = len(dates) // n_splits

    # Lists to store results from each fold
    rf_rmse_list, log_rmse_list = [], []
    rf_acc_list,  log_acc_list  = [], []
    fold_labels = []

    print(f"Starting walk-forward backtest with {n_splits} splits...")
    for i in range(n_splits):
        # Define the date ranges for training and testing sets
        train_dates = dates[: (i + 1) * split_size]
        test_dates  = dates[(i + 1) * split_size : (i + 2) * split_size]
        
        if len(test_dates) == 0:
            continue

        fold_label = f"{pd.to_datetime(test_dates[0]).strftime('%Y-%m')} to {pd.to_datetime(test_dates[-1]).strftime('%Y-%m')}"
        fold_labels.append(f"Fold {i+1}\n({fold_label})")
        print(f"  - Fold {i+1}/{n_splits}: Training up to {pd.to_datetime(train_dates[-1]).strftime('%Y-%m')}, Testing on {fold_label}")

        # Split data into training and testing sets
        train = df[df["date"].isin(train_dates)]
        test  = df[df["date"].isin(test_dates)]

        X_train, y_train = train[features], train[target_col].values
        X_test,  y_test  = test[features],  test[target_col].values

        # ---- Random Forest: Regress on price, then convert to classification ----
        rf_model.fit(X_train, y_train)
        rf_preds_reg = rf_model.predict(X_test)
        rf_rmse_list.append(mean_squared_error(y_test, rf_preds_reg, squared=False))

        y_test_cls = _make_class_labels(y_test)
        rf_preds_cls = (rf_preds_reg > np.median(rf_preds_reg)).astype(int)
        rf_acc_list.append(accuracy_score(y_test_cls, rf_preds_cls))

        # ---- Logistic Regression: Baseline classification model ----
        y_train_cls = _make_class_labels(y_train)
        log_model.fit(X_train, y_train_cls)
        log_preds_cls = log_model.predict(X_test)
        log_acc_list.append(accuracy_score(y_test_cls, log_preds_cls))
        
        log_probs = log_model.predict_proba(X_test)[:, 1]
        log_rmse_list.append(mean_squared_error(y_test, log_probs * np.mean(y_test), squared=False))

    results = {
        "rf_rmse": np.mean(rf_rmse_list), "log_rmse": np.mean(log_rmse_list),
        "rf_acc": np.mean(rf_acc_list), "log_acc": np.mean(log_acc_list),
        "lift": (np.mean(rf_acc_list) - np.mean(log_acc_list)) / np.mean(log_acc_list),
        "rf_acc_folds": rf_acc_list, "log_acc_folds": log_acc_list,
        "fold_labels": fold_labels
    }
    return results

# ============ 3. Load Existing Data from CSV ============
print("Loading existing data from CSV file...")
try:
    file_path = 'my_esg_stock_data.csv' 
    df = pd.read_csv(file_path, parse_dates=['date'])
    print(f"Data loaded successfully from {file_path}. Shape: {df.shape}")
except FileNotFoundError:
    print(f"Error: The file '{file_path}' was not found. Please check the file path and try again.")
    exit() 


# ============ 4. Full Pipeline ============
print("Handling missing values...")
df = df.fillna(df.median(numeric_only=True))

print("Creating features...")
df = create_features(df)
print(f"Shape after feature engineering: {df.shape}")

# Feature Scaling
feature_cols = [c for c in df.columns if c not in ["date", "ticker", "price"]]
scaler = StandardScaler()
df[feature_cols] = scaler.fit_transform(df[feature_cols])

# Model Definition
rf_model  = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1, min_samples_leaf=10)
log_model = LogisticRegression(max_iter=1000, random_state=42)

# Run Walk-Forward Backtest
results = walk_forward_backtest(df, feature_cols, rf_model, log_model, n_splits=5)

# Print Final Results
print("\n" + "="*25)
print("=== Backtest Results ===")
print("="*25)
print(f"Random Forest Avg. RMSE: {results['rf_rmse']:.4f}")
print(f"Logistic Reg. Avg. RMSE: {results['log_rmse']:.4f}")
print("-" * 25)
print(f"Random Forest Avg. Accuracy: {results['rf_acc']:.4f}")
print(f"Logistic Reg.  Avg. Accuracy: {results['log_acc']:.4f}")
print("-" * 25)
print(f"Relative Lift in Classification Accuracy: {results['lift']*100:.2f}%")
print("="*25 + "\n")


# ============ 5. Visualization ============
if results and results['fold_labels']:
    plt.style.use('seaborn-v0_8-whitegrid')
    plt.figure(figsize=(12, 7))

    bar_width = 0.35
    index = np.arange(len(results['fold_labels']))

    bar1 = plt.bar(index - bar_width/2, results['rf_acc_folds'], bar_width, label='Random Forest', color='royalblue', alpha=0.9)
    bar2 = plt.bar(index + bar_width/2, results['log_acc_folds'], bar_width, label='Logistic Regression', color='darkorange', alpha=0.9)

    for bar in bar1:
        yval = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2.0, yval, f'{yval:.3f}', va='bottom', ha='center', fontsize=9)
    for bar in bar2:
        yval = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2.0, yval, f'{yval:.3f}', va='bottom', ha='center', fontsize=9)

    plt.xlabel("Backtest Fold", fontsize=12)
    plt.ylabel("Classification Accuracy", fontsize=12)
    plt.title("Model Accuracy Comparison per Fold (Walk-Forward Backtest)", fontsize=16, fontweight='bold')
    plt.xticks(index, results['fold_labels'], rotation=0, ha="center")
    if results['rf_acc_folds'] and results['log_acc_folds']:
        y_max = max(max(results['rf_acc_folds']), max(results['log_acc_folds'])) * 1.1
    else:
        y_max = 1.0
    plt.ylim(0.45, y_max)
    plt.legend()
    plt.tight_layout()
    plt.show()
else:
    print("No results to visualize. This might happen if the dataset is too small for the backtest splits.")