This notebook demonstrates exploratory data analysis, feature engineering, and comparison of multiple classification models on historical stock data. It displays the main pipeline and various plots to clarify results.


In [None]:
import yaml
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    accuracy_score, classification_report, confusion_matrix, roc_curve, auc,
    mean_squared_error, mean_absolute_error, r2_score
)
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor, GradientBoostingClassifier, GradientBoostingRegressor
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.svm import SVC, SVR

# Load config
with open(r"D:/Stock_Market/configs/config.yaml", "r") as f:  # Use full path
    config = yaml.safe_load(f)


data_path = config["data_path"]
target = config["target_column"]  # For binary/classification
test_size = config["test_size"]
random_seed = config["random_seed"]
model_dir = config["model_dir"]
results_dir = config["results_dir"]
model_cfg = config["models"]

df = pd.read_excel(data_path)
print("Loaded shape:", df.shape)
display(df.head())



In [None]:
plt.figure(figsize=(10,8))
sns.heatmap(df.corr(), annot=True, fmt=".2f", cmap="coolwarm")
plt.title("Correlation Matrix - Raw")
plt.show()

In [None]:
feat_cols = [col for col in df.columns if col != target]
scaler = StandardScaler()
X_scaled = pd.DataFrame(scaler.fit_transform(df[feat_cols]), columns=feat_cols)
df_scaled = X_scaled.copy()
df_scaled[target] = df[target]




In [None]:
feat_cols = [col for col in df.columns if col != target]
scaler = StandardScaler()
X_scaled = pd.DataFrame(scaler.fit_transform(df[feat_cols]), columns=feat_cols)
df_scaled = X_scaled.copy()
df_scaled[target] = df[target]




In [None]:
plt.figure(figsize=(10,8))
sns.heatmap(df_scaled.corr(), annot=True, fmt=".2f", cmap="coolwarm")
plt.title("Correlation Matrix - Standardized")
plt.show()



In [None]:
from sklearn.linear_model import LinearRegression

def vif_sklearn(df):
    vif_data = []
    X = df.values
    feature_names = df.columns
    for i in range(X.shape[1]):
        y = X[:, i]
        X_not_i = X[:, np.arange(X.shape[1]) != i]
        r_sq = LinearRegression().fit(X_not_i, y).score(X_not_i, y)
        vif = 1.0 / (1.0 - r_sq) if r_sq < 1.0 else np.inf
        vif_data.append(vif)
    return pd.DataFrame({'feature': feature_names, 'VIF': vif_data})

def drop_high_vif(df, threshold=10.0):
    features = df.columns.tolist()
    while True:
        vif_df = vif_sklearn(df[features])
        max_vif = vif_df["VIF"].max()
        if max_vif > threshold:
            drop_feat = vif_df.loc[vif_df["VIF"] == max_vif, "feature"].values[0]
            features.remove(drop_feat)
            print(f"Dropped {drop_feat} (VIF={max_vif:.2f})")
        else:
            break
    return df[features]





In [None]:
# --- VIF Feature Selection and Outlier Removal Block ---

from sklearn.linear_model import LinearRegression

def vif_sklearn(df):
    vif_data = []
    X = df.values
    feature_names = df.columns
    for i in range(X.shape[1]):
        y = X[:, i]
        X_not_i = X[:, np.arange(X.shape[1]) != i]
        r_sq = LinearRegression().fit(X_not_i, y).score(X_not_i, y)
        vif = 1.0 / (1.0 - r_sq) if r_sq < 1.0 else np.inf
        vif_data.append(vif)
    return pd.DataFrame({'feature': feature_names, 'VIF': vif_data})

def drop_high_vif(df, threshold=10.0):
    features = df.columns.tolist()
    while True:
        vif_df = vif_sklearn(df[features])
        max_vif = vif_df["VIF"].max()
        if max_vif > threshold:
            drop_feat = vif_df.loc[vif_df["VIF"] == max_vif, "feature"].values[0]
            features.remove(drop_feat)
            print(f"Dropped {drop_feat} (VIF={max_vif:.2f})")
        else:
            break
    return df[features]

def remove_outliers(df, cols, t=5.0):
    Q1 = df[cols].quantile(0.25)
    Q3 = df[cols].quantile(0.75)
    IQR = Q3 - Q1
    mask = ~((df[cols] < (Q1 - t*IQR)) | (df[cols] > (Q3 + t*IQR))).any(axis=1)
    return df[mask]

# Instantiate working features dataframe WITHOUT missing (NaN) values
features = [col for col in df_scaled.columns if col != target]
df_features_clean = df_scaled[features].dropna()  # drop rows with NaN only for VIF

# VIF Feature Selection
X_vif = drop_high_vif(df_features_clean, threshold=10.0)
print("Columns after VIF:", list(X_vif.columns))

# Merge target back (only for rows kept by VIF process)
X_all = X_vif.copy()
X_all[target] = df_scaled.loc[X_vif.index, target]

# Outlier removal (IQR filter)
X_clean = remove_outliers(X_all, X_all.columns, t=5.0).dropna()
print("Shape after outlier removal:", X_clean.shape)

# Now continue with train/test split, modeling, etc., using X_clean









In [None]:
def remove_outliers(df, cols, t=5.0):
    Q1 = df[cols].quantile(0.25)
    Q3 = df[cols].quantile(0.75)
    IQR = Q3 - Q1
    return df[~((df[cols] < Q1 - t*IQR) | (df[cols] > Q3 + t*IQR)).any(axis=1)]

X = X_vif.copy()
X[target] = df_scaled[target]
X = remove_outliers(X, X.columns, t=5.0).dropna()
print("Shape after outlier removal:", X.shape)



In [None]:
y = X[target]
X = X.drop(columns=[target])
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=test_size, random_state=random_seed, stratify=y if y.nunique() <= 10 else None
)
print("Train shape:", X_train.shape, "| Test shape:", X_test.shape)






In [None]:
clf_models = {
    "Logistic Regression": LogisticRegression(**model_cfg["logistic_regression"]),
    "Random Forest": RandomForestClassifier(**model_cfg["random_forest"]),
    "Gradient Boosting": GradientBoostingClassifier(**model_cfg["gradient_boosting"]),
    "Decision Tree": DecisionTreeClassifier(**model_cfg["decision_tree"]),
    "SVM": SVC(
        probability=True,
        kernel=model_cfg["svm"]["binary"]["kernel"][0],
        C=model_cfg["svm"]["binary"]["C"][0],
        gamma=model_cfg["svm"]["binary"]["gamma"][0])
}

if y.nunique() <= 10:
    plt.figure(figsize=(8,7))
    auc_dict = {}
    for name, model in clf_models.items():
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        y_proba = model.predict_proba(X_test)[:,1] if hasattr(model, "predict_proba") else None
        print(f"\n=== {name} ===")
        print("Accuracy:", accuracy_score(y_test, y_pred))
        print(classification_report(y_test, y_pred))
        if y_proba is not None:
            fpr, tpr, _ = roc_curve(y_test, y_proba)
            auc_val = auc(fpr, tpr)
            auc_dict[name] = auc_val
            plt.plot(fpr, tpr, label=f"{name} (AUC={auc_val:.2f})")

    plt.plot([0,1],[0,1], 'k--', label="Random (AUC=0.5)")
    plt.legend()
    plt.title("ROC Curve Comparison")
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.tight_layout()
    plt.show()


In [None]:
from sklearn.preprocessing import StandardScaler

regression_target = "EPS"   # Change as needed, or from YAML!
if regression_target in df_scaled.columns:
    # Apply VIF feature selection for regression (same logic as classification)
    features_reg = [col for col in df_scaled.columns if col != regression_target]
    df_features_reg_clean = df_scaled[features_reg].dropna()  # drop rows with NaN for VIF
    
    # VIF Feature Selection for regression
    X_reg_vif = drop_high_vif(df_features_reg_clean, threshold=10.0)
    print("Columns after VIF filtering for regression:", list(X_reg_vif.columns))
    
    # Merge target back (only for rows kept by VIF process)
    X_reg_all = X_reg_vif.copy()
    X_reg_all[regression_target] = df_scaled.loc[X_reg_vif.index, regression_target]
    
    # Outlier removal (IQR filter) for regression
    X_reg_clean = remove_outliers(X_reg_all, X_reg_all.columns, t=5.0).dropna()
    print("Shape after outlier removal for regression:", X_reg_clean.shape)
    
    # Separate features and target after all cleaning
    y_reg_clean = X_reg_clean[regression_target]
    X_reg_final = X_reg_clean.drop(columns=[regression_target])
    
    # Scale features for SVR (critical for good performance)
    scaler = StandardScaler()
    X_reg_scaled = pd.DataFrame(scaler.fit_transform(X_reg_final), 
                               columns=X_reg_final.columns, 
                               index=X_reg_final.index)

    # Train/test split on final cleaned data
    Xr_train, Xr_test, yr_train, yr_test = train_test_split(
        X_reg_scaled, y_reg_clean, test_size=test_size, random_state=random_seed
    )

    # Define regression models with safe parameter access
    reg_models = {
        "Random Forest": RandomForestRegressor(**model_cfg["random_forest"]),
        "Gradient Boosting": GradientBoostingRegressor(**model_cfg["gradient_boosting"]),
        "Decision Tree": DecisionTreeRegressor(**model_cfg["decision_tree"]),
        "Linear Regression": LinearRegression(),
        "SVR": SVR(
            kernel=model_cfg["svm"]["regression"].get("kernel", ["rbf"])[0],
            C=model_cfg["svm"]["regression"].get("C", [1.0])[0],
            gamma=model_cfg["svm"]["regression"].get("gamma", ["scale"])[0],
            epsilon=model_cfg["svm"]["regression"].get("epsilon", [0.1])[0]
        )
    }

    # Train and evaluate all regressors
    for name, model in reg_models.items():
        model.fit(Xr_train, yr_train)
        y_pred_r = model.predict(Xr_test)
        print(f"\n=== {name} ===")
        print("R2 score:", r2_score(yr_test, y_pred_r))
        print("MAE:", mean_absolute_error(yr_test, y_pred_r))
        print("MSE:", mean_squared_error(yr_test, y_pred_r))

        # Actual vs Predicted
        plt.scatter(yr_test, y_pred_r, alpha=0.7)
        plt.title(f"{name} - Actual vs Predicted")
        plt.xlabel("Actual")
        plt.ylabel("Predicted")
        plt.show()

        # Residuals
        residuals = yr_test - y_pred_r
        plt.scatter(y_pred_r, residuals, alpha=0.7)
        plt.title(f"{name} - Residuals Plot")
        plt.xlabel("Predicted")
        plt.ylabel("Residuals")
        plt.axhline(0, color='red', linestyle='--')
        plt.show()
        plt.hist(residuals, bins=30, color='orange')
        plt.xlabel("Residuals")
        plt.title(f"{name} - Residuals Distribution")
        plt.show()



