# 1. Setup and Configuration

## 1.1 Libraries

In [None]:
import pandas as pd
import numpy as np
import time
import os
import joblib 
import optuna
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
from lightgbm import LGBMClassifier
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    precision_score,
    recall_score,
    classification_report,
    confusion_matrix
)
from sklearn.exceptions import ConvergenceWarning
import warnings
import logging
import psutil

# --- Optuna Logging Configuration ---
# Reduce Optuna's default logging verbosity
optuna.logging.set_verbosity(optuna.logging.WARNING)

# --- Configure other warnings ---
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=UserWarning)
warnings.filterwarnings('ignore', category=ConvergenceWarning)

# --- Limit CPU Usage ---
p = psutil.Process()
p.cpu_affinity([1, 2, 3, 4, 5, 6, 7])

## 1.2 Configuration

In [None]:
# --- Configuration Constants ---

BASE_DIR = ".." # Assuming the notebook is in a 'notebooks' or similar folder
DATA_DIR = os.path.join(BASE_DIR, "data", "processed")
MODEL_OUTPUT_DIR = os.path.join(BASE_DIR, "models", "ml")
RESULT_DIR = os.path.join(BASE_DIR, "result")

# --- Specific Dataset Paths ---
BOOK_REVIEW_DATA_DIR = os.path.join(DATA_DIR, "book_reviews")
FINANCIAL_NEWS_DATA_DIR = os.path.join(DATA_DIR, "financial_news")

BOOK_REVIEW_MODEL_DIR = os.path.join(MODEL_OUTPUT_DIR, "book_reviews")
FINANCIAL_NEWS_MODEL_DIR = os.path.join(MODEL_OUTPUT_DIR, "financial_news")

BOOK_REVIEW_RESULT_DIR = os.path.join(RESULT_DIR, "book_reviews")
FINANCIAL_NEWS_RESULT_DIR = os.path.join(RESULT_DIR, "financial_news")

# --- Create directories if they don't exist ---
os.makedirs(BOOK_REVIEW_MODEL_DIR, exist_ok=True)
os.makedirs(FINANCIAL_NEWS_MODEL_DIR, exist_ok=True)
os.makedirs(BOOK_REVIEW_RESULT_DIR, exist_ok=True)
os.makedirs(FINANCIAL_NEWS_RESULT_DIR, exist_ok=True)

# --- File Names ---
TRAIN_FN = "train.csv"
VAL_FN = "val.csv"
TEST_FN = "test.csv"

# --- Column Names ---
TEXT_COLUMN = "text"
TARGET_COLUMN = "score"

# --- TF-IDF Parameters ---
NGRAM_RANGE = (1, 2)
MAX_FEATURES = 20000 # Keep consistent, could also be tuned

# --- Model & Tuning Parameters ---
RANDOM_STATE = 42
N_TRIALS_OPTUNA = 25 # Number of trials for Optuna search
CV_FOLDS = 3 # Number of folds for cross-validation within Optuna
MAX_ITER_LOGREG_SVM = 1500 # Allow more iterations

# --- Evaluation Metrics ---
METRICS_TO_CALCULATE = [
    "Accuracy",
    "F1 (Macro)", "Precision (Macro)", "Recall (Macro)",
    "F1 (Weighted)", "Precision (Weighted)", "Recall (Weighted)",
    "Train Time (s)", "Eval Time (s)", "Best Params"
]
OPTIMIZATION_METRIC = 'F1 (Macro)' # Metric to optimize in Optuna (good for multiclass)

# --- Datasets Configuration ---
# Updated paths and added output directories
DATASETS_TO_PROCESS = {
    "Book Review": {
        "train_path": os.path.join(BOOK_REVIEW_DATA_DIR, f'book_reviews_{TRAIN_FN}'),
        "val_path": os.path.join(BOOK_REVIEW_DATA_DIR, f'book_reviews_{VAL_FN}'),
        "test_path": os.path.join(BOOK_REVIEW_DATA_DIR, f'book_reviews_{TEST_FN}'),
        "model_dir": BOOK_REVIEW_MODEL_DIR,
        "result_dir": BOOK_REVIEW_RESULT_DIR,
    },
    "Financial News": {
        "train_path": os.path.join(FINANCIAL_NEWS_DATA_DIR, f'financial_news_{TRAIN_FN}'),
        "val_path": os.path.join(FINANCIAL_NEWS_DATA_DIR, f'financial_news_{VAL_FN}'),
        "test_path": os.path.join(FINANCIAL_NEWS_DATA_DIR, f'financial_news_{TEST_FN}'),
        "model_dir": FINANCIAL_NEWS_MODEL_DIR,
        "result_dir": FINANCIAL_NEWS_RESULT_DIR,
    }
}


# 2. Utility Functions

In [None]:
def load_dataset(train_path, val_path, test_path):
    """Loads train, validation, and test datasets from CSV files."""
    try:
        train_df = pd.read_csv(train_path)
        val_df = pd.read_csv(val_path)
        test_df = pd.read_csv(test_path)
        print(f"Loaded Train: {train_path}, Shape: {train_df.shape}")
        print(f"Loaded Val:   {val_path}, Shape: {val_df.shape}")
        print(f"Loaded Test:  {test_path}, Shape: {test_df.shape}")

        # Basic validation
        if TEXT_COLUMN not in train_df.columns or TARGET_COLUMN not in train_df.columns:
            raise ValueError(f"Required columns '{TEXT_COLUMN}' or '{TARGET_COLUMN}' not found.")

        # Handle potential NaN values in text - replace with empty string
        for df in [train_df, val_df, test_df]:
            df[TEXT_COLUMN] = df[TEXT_COLUMN].fillna('')
            # Ensure target column is treated consistently (e.g., as strings if labels are like 'positive')
            # df[TARGET_COLUMN] = df[TARGET_COLUMN].astype(str) # Uncomment if labels need casting

        print(f"Train labels: {train_df[TARGET_COLUMN].unique()}, Val labels: {val_df[TARGET_COLUMN].unique()}, Test labels: {test_df[TARGET_COLUMN].unique()}")
        return train_df, val_df, test_df
    except FileNotFoundError as e:
        print(f"Error loading data: {e}. Please check file paths.")
        return None, None, None
    except Exception as e:
        print(f"An error occurred during data loading: {e}")
        return None

def calculate_metrics(y_true, y_pred, prefix=""):
    """Calculates standard classification metrics."""
    metrics = {}
    metrics[f"{prefix}Accuracy"] = accuracy_score(y_true, y_pred)
    metrics[f"{prefix}F1 (Macro)"] = f1_score(y_true, y_pred, average='macro', zero_division=0)
    metrics[f"{prefix}Precision (Macro)"] = precision_score(y_true, y_pred, average='macro', zero_division=0)
    metrics[f"{prefix}Recall (Macro)"] = recall_score(y_true, y_pred, average='macro', zero_division=0)
    metrics[f"{prefix}F1 (Weighted)"] = f1_score(y_true, y_pred, average='weighted', zero_division=0)
    metrics[f"{prefix}Precision (Weighted)"] = precision_score(y_true, y_pred, average='weighted', zero_division=0)
    metrics[f"{prefix}Recall (Weighted)"] = recall_score(y_true, y_pred, average='weighted', zero_division=0)
    return metrics


def objective_logreg(trial, X_train, y_train, X_val, y_val):
    """Objective function for Logistic Regression tuning."""
    # Define hyperparameters to tune
    logreg_c = trial.suggest_float("C", 1e-4, 1e2, log=True) # Regularization strength (inverse)
    solver = trial.suggest_categorical("solver", ["liblinear", "saga"]) # Solvers good for L1/L2
    penalty = "l2"
    if solver == "liblinear":
        penalty = trial.suggest_categorical("penalty", ["l1", "l2"])
    elif solver == "saga":
        penalty = trial.suggest_categorical("penalty", ["l1", "l2", "elasticnet"])
        if penalty == "elasticnet":
            l1_ratio = trial.suggest_float("l1_ratio", 0, 1) # Needs setting for elasticnet
        else:
            l1_ratio = None # Not used otherwise


    model = LogisticRegression(
        C=logreg_c,
        solver=solver,
        penalty=penalty,
        l1_ratio=l1_ratio if penalty == 'elasticnet' else None,
        max_iter=MAX_ITER_LOGREG_SVM,
        class_weight='balanced',
        random_state=RANDOM_STATE,
        n_jobs=-1
    )
    model.fit(X_train, y_train)
    y_pred_val = model.predict(X_val)
    f1_macro = f1_score(y_val, y_pred_val, average='macro', zero_division=0)
    return f1_macro # Optuna maximizes this

def objective_linearsvc(trial, X_train, y_train, X_val, y_val):
    """Objective function for LinearSVC tuning."""
    svc_c = trial.suggest_float("C", 1e-4, 1e2, log=True)
    loss = trial.suggest_categorical("loss", ["hinge", "squared_hinge"])
    # dual='auto' is generally safe
    model = LinearSVC(
        C=svc_c,
        loss=loss,
        max_iter=MAX_ITER_LOGREG_SVM * 2,
        class_weight='balanced',
        random_state=RANDOM_STATE,
        dual='auto'
    )
    model.fit(X_train, y_train)
    y_pred_val = model.predict(X_val)
    f1_macro = f1_score(y_val, y_pred_val, average='macro', zero_division=0)
    return f1_macro

def objective_randomforest(trial, X_train, y_train, X_val, y_val):
    """Objective function for RandomForest tuning."""
    n_estimators = trial.suggest_int("n_estimators", 50, 300)
    max_depth = trial.suggest_int("max_depth", 5, 50, log=True) # Depth of trees
    min_samples_split = trial.suggest_int("min_samples_split", 2, 20) # Min samples to split node
    min_samples_leaf = trial.suggest_int("min_samples_leaf", 1, 20)  # Min samples at leaf node
    # max_features = trial.suggest_categorical("max_features", ['sqrt', 'log2', None]) # Features to consider

    model = RandomForestClassifier(
        n_estimators=n_estimators,
        max_depth=max_depth,
        min_samples_split=min_samples_split,
        min_samples_leaf=min_samples_leaf,
        # max_features=max_features,
        class_weight='balanced',
        random_state=RANDOM_STATE,
        n_jobs=-1
    )
    model.fit(X_train, y_train)
    y_pred_val = model.predict(X_val)
    f1_macro = f1_score(y_val, y_pred_val, average='macro', zero_division=0)
    return f1_macro

def objective_lgbm(trial, X_train, y_train, X_val, y_val):
    """Objective function for LightGBM tuning."""
    # Determine objective based on number of unique classes
    num_classes = len(np.unique(np.concatenate((y_train, y_val))))
    objective = 'multiclass' if num_classes > 2 else 'binary'

    params = {
        'objective': objective,
        'metric': 'multi_logloss' if objective == 'multiclass' else 'binary_logloss', # Logloss often used for training metric
        'n_estimators': trial.suggest_int('n_estimators', 100, 1000),
        'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
        'num_leaves': trial.suggest_int('num_leaves', 20, 300),
        'max_depth': trial.suggest_int('max_depth', 3, 12),
        'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True), # L1 regularization
        'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True), # L2 regularization
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0), # Feature fraction
        'subsample': trial.suggest_float('subsample', 0.6, 1.0), # Data fraction (bagging)
        'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
        'random_state': RANDOM_STATE,
        'n_jobs': -1,
        'class_weight': 'balanced'
    }
    if objective == 'multiclass':
        params['num_class'] = num_classes # Required for multiclass

    model = LGBMClassifier(**params)
    model.fit(X_train, y_train,
              eval_set=[(X_val, y_val)],
              # Use early stopping to prevent overfitting and speed up trials
              callbacks=[optuna.integration.LightGBMPruningCallback(trial, 'multi_logloss' if objective == 'multiclass' else 'binary_logloss')] # Use same metric as 'metric'
             )
    y_pred_val = model.predict(X_val)
    f1_macro = f1_score(y_val, y_pred_val, average='macro', zero_division=0)
    return f1_macro


# --- Map model names to their objective functions ---
# Naive Bayes is usually not tuned extensively with Optuna, handled separately.
objective_map = {
    "Logistic Regression": objective_logreg,
    "Linear SVM": objective_linearsvc,
    "Random Forest": objective_randomforest,
    "LightGBM": objective_lgbm,
}

# --- Define base model instances (needed for Naive Bayes and as template) ---
base_models = {
    "Naive Bayes": MultinomialNB(),
    "Logistic Regression": LogisticRegression(random_state=RANDOM_STATE, class_weight='balanced', n_jobs=-1),
    "Linear SVM": LinearSVC(random_state=RANDOM_STATE, class_weight='balanced'),
    "Random Forest": RandomForestClassifier(random_state=RANDOM_STATE, class_weight='balanced', n_jobs=-1),
    "LightGBM": LGBMClassifier(random_state=RANDOM_STATE, class_weight='balanced', n_jobs=-1)
}

# 3. Define Models

In [None]:
models_to_run = {
    "Naive Bayes": MultinomialNB(),
    "Logistic Regression": LogisticRegression(
        random_state=RANDOM_STATE,
        max_iter=MAX_ITER_LOGREG_SVM,
        class_weight='balanced',
        solver='liblinear'
    ),
    "Linear SVM": LinearSVC(
        random_state=RANDOM_STATE,
        max_iter=MAX_ITER_LOGREG_SVM * 2, 
        class_weight='balanced',
        dual=False 
    ),
    "Random Forest": RandomForestClassifier(
        n_estimators=100,
        random_state=RANDOM_STATE,
        class_weight='balanced',
        n_jobs=-1
    ),
    "LightGBM": LGBMClassifier(
        random_state=RANDOM_STATE,
        class_weight='balanced',
        objective='multiclass',
        n_jobs=-1
    )
}

# 4. Run Experiments

In [None]:
all_results = []
vectorizers = {} # Store vectorizers per dataset

In [None]:
# --- Loop through each dataset defined in the configuration ---
for dataset_name, config in DATASETS_TO_PROCESS.items():
    print(f"\n{'='*20} Processing Dataset: {dataset_name} {'='*20}")

    # 1. Load Data
    train_df, val_df, test_df = load_dataset(config['train_path'], config['val_path'], config['test_path'])
    if train_df is None:
        print(f"Skipping dataset {dataset_name} due to loading error.")
        continue

    # Prepare data splits
    X_train_raw = train_df[TEXT_COLUMN]
    y_train = train_df[TARGET_COLUMN]
    X_val_raw = val_df[TEXT_COLUMN]
    y_val = val_df[TARGET_COLUMN]
    X_test_raw = test_df[TEXT_COLUMN]
    y_test = test_df[TARGET_COLUMN]

    # Combine Train and Validation for final model training after tuning
    X_train_val_raw = pd.concat([X_train_raw, X_val_raw], ignore_index=True)
    y_train_val = pd.concat([y_train, y_val], ignore_index=True)

    print(f"\nTraining data shape: {X_train_raw.shape}, Validation data shape: {X_val_raw.shape}, Test data shape: {X_test_raw.shape}")
    print(f"Combined Train+Val shape: {X_train_val_raw.shape}")

    # 2. Feature Extraction (TF-IDF)
    print("\nFitting TF-IDF Vectorizer on Training data...")
    tfidf_vectorizer = TfidfVectorizer(
        ngram_range=NGRAM_RANGE,
        max_features=MAX_FEATURES
    )
    # Fit only on original training data and transform on the rest
    X_train_tfidf = tfidf_vectorizer.fit_transform(X_train_raw)
    X_val_tfidf = tfidf_vectorizer.transform(X_val_raw)
    X_test_tfidf = tfidf_vectorizer.transform(X_test_raw)
    X_train_val_tfidf = tfidf_vectorizer.transform(X_train_val_raw)

    vectorizers[dataset_name] = tfidf_vectorizer # Store vectorizer
    vectorizer_path = os.path.join(config['model_dir'], f"{dataset_name.replace(' ', '_')}_tfidf_vectorizer.joblib")
    joblib.dump(tfidf_vectorizer, vectorizer_path)
    print(f"TF-IDF Vectorizer saved to {vectorizer_path}")
    print(f"TF-IDF Matrix Shape (Train): {X_train_tfidf.shape}")
    print(f"TF-IDF Matrix Shape (Val):   {X_val_tfidf.shape}")
    print(f"TF-IDF Matrix Shape (Test):  {X_test_tfidf.shape}")
    print(f"TF-IDF Matrix Shape (Train+Val): {X_train_val_tfidf.shape}")

    # --- Loop through each model defined ---
    for model_name, base_model_instance in base_models.items():
        print(f"\n--- Processing Model: {model_name} ---")
        results = {"Dataset": dataset_name, "Model": model_name}
        best_params = None

        try:
            # --- Hyperparameter Tuning (if applicable) ---
            if model_name in objective_map:
                print(f"Starting Optuna tuning for {model_name} ({N_TRIALS_OPTUNA} trials)...")
                objective_func = objective_map[model_name]

                # Wrap objective to pass fixed data arguments
                wrapped_objective = lambda trial: objective_func(trial, X_train_tfidf, y_train, X_val_tfidf, y_val)

                study = optuna.create_study(direction='maximize', pruner=optuna.pruners.MedianPruner()) # Use pruning
                study.optimize(wrapped_objective, n_trials=N_TRIALS_OPTUNA)

                best_params = study.best_params
                best_value = study.best_value
                results["Best Params"] = str(best_params) # Store as string for CSV
                print(f"Optuna tuning finished. Best Validation {OPTIMIZATION_METRIC}: {best_value:.4f}")
                print(f"Best parameters found: {best_params}")

                # Instantiate the final model with best parameters
                final_model = base_model_instance.set_params(**best_params)
                # Special handling for LGBM objective/num_class if needed (usually handled by fit)
                if model_name == 'LightGBM':
                    num_classes = len(y_train_val.unique())
                    if num_classes > 2:
                        final_model.set_params(objective='multiclass', num_class=num_classes)
                    else:
                        final_model.set_params(objective='binary')


            else: # Handle Naive Bayes (no Optuna tuning here)
                print("Using default parameters for Naive Bayes.")
                final_model = base_model_instance # Use the default instance
                results["Best Params"] = "Default"


            # --- Train Final Model ---
            print(f"Training final {model_name} model on combined Train+Val data...")
            start_train_time = time.time()
            # Train on combined Train + Validation data
            final_model.fit(X_train_val_tfidf, y_train_val)
            end_train_time = time.time()
            results["Train Time (s)"] = round(end_train_time - start_train_time, 3)
            print(f"Final model training completed in {results['Train Time (s)']:.3f} seconds.")

            # --- Evaluate on Test Set ---
            print(f"Evaluating final {model_name} model on Test data...")
            start_eval_time = time.time()
            y_pred_test = final_model.predict(X_test_tfidf)
            end_eval_time = time.time()
            results["Eval Time (s)"] = round(end_eval_time - start_eval_time, 3)

            # Calculate test metrics
            test_metrics = calculate_metrics(y_test, y_pred_test)
            results.update(test_metrics) # Add test metrics to results dict

            print("\nTest Set Performance:")
            labels_order = sorted(y_test.unique()) 
            print(classification_report(y_test, y_pred_test, zero_division=0))
            print(f"Test Accuracy: {results['Accuracy']:.4f}")
            print(f"Test F1 (Macro): {results['F1 (Macro)']:.4f}")

            # --- Calculate and Save Confusion Matrix CSV ---
            cm = confusion_matrix(y_test, y_pred_test, labels=labels_order)
            cm_df = pd.DataFrame(cm, index=labels_order, columns=labels_order)
            cm_df.index.name = 'True Label'
            cm_df.columns.name = 'Predicted Label'
            
            cm_filename = f"{dataset_name.replace(' ', '_')}_{model_name.replace(' ', '_')}_confusion_matrix.csv"
            cm_save_path = os.path.join(config['result_dir'], cm_filename)
            try:
                cm_df.to_csv(cm_save_path, index=True, mode='w+')
                print(f"\nConfusion matrix saved to {cm_save_path}")
            except Exception as cm_save_e:
                print(f"\nError saving confusion matrix CSV for {model_name}: {cm_save_e}")
            # --- End Save Confusion Matrix CSV ---

            # --- Save Final Model ---
            model_filename = f"{dataset_name.replace(' ', '_')}_{model_name.replace(' ', '_')}_best_model.joblib"
            model_save_path = os.path.join(config['model_dir'], model_filename)
            joblib.dump(final_model, model_save_path)
            print(f"Final tuned model saved to {model_save_path}")

            

        except Exception as e:
            print(f"!!! An error occurred while processing {model_name} for {dataset_name}: {e}")
            # Record partial results if possible
            results["Accuracy"] = np.nan
            results["F1 (Macro)"] = np.nan
            results["Best Params"] = f"Error: {e}"
            # Fill other metrics with NaN or error messages
            for metric in METRICS_TO_CALCULATE:
                if metric not in results:
                    results[metric] = np.nan if metric not in ["Train Time (s)", "Eval Time (s)", "Best Params"] else 0.0

        all_results.append(results)


# --- Combine results into a DataFrame ---
results_df = pd.DataFrame(all_results)


# 5. Results

In [None]:
print("\n\n===== Overall Tuned ML Results Summary =====")
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1200) # Wider display
pd.set_option('display.max_colwidth', 200) # Show more of Best Params column
pd.set_option('display.float_format', '{:.4f}'.format)

# Reorder columns for clarity
column_order = ["Dataset", "Model"] + [m for m in METRICS_TO_CALCULATE if m != "Best Params"] + ["Best Params"]
results_df = results_df[column_order]

print(results_df)

# --- Save results to CSV for each dataset ---
for dataset_name, config in DATASETS_TO_PROCESS.items():
    dataset_results_df = results_df[results_df['Dataset'] == dataset_name]
    if not dataset_results_df.empty:
        results_filename = f"{dataset_name.replace(' ', '_')}_ml_tfidf_tuned_results.csv"
        results_save_path = os.path.join(config['result_dir'], results_filename)
        try:
            dataset_results_df.to_csv(results_save_path, index=False, mode='w+')
            print(f"\nResults for {dataset_name} saved to {results_save_path}")
        except Exception as e:
            print(f"\nError saving results for {dataset_name} to {results_save_path}: {e}")

# --- Save combined results ---
combined_results_path = os.path.join(RESULT_DIR, "combined_ml_tfidf_tuned_results.csv")
try:
    results_df.to_csv(combined_results_path, index=False, mode='w+')
    print(f"\nCombined results saved to {combined_results_path}")
except Exception as e:
    print(f"\nError saving combined results to {combined_results_path}: {e}")
