In [3]:
import pandas as pd
import time
import numpy as np
import os
import optuna
import logging
import traceback
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, log_loss, accuracy_score
from google.colab import drive

absolute_path = "/content/gdrive/My Drive/Projects/Financial-Sentiment/"

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)

# Function to create and evaluate a Logistic Regression model with given hyperparameters
def create_and_evaluate_model(params, X_train, y_train, X_val, y_val):
    try:
        # Create TF-IDF vectorizer
        tfidf = TfidfVectorizer(
            max_features=params['max_features'],
            ngram_range=params['ngram_range'],
            stop_words=params['stop_words']
        )

        # Create Logistic Regression classifier with parameters that are compatible
        logreg_params = {
            'C': params['C'],
            'penalty': params['penalty'],
            'solver': params['solver'],
            'max_iter': params['max_iter'],
            'class_weight': params.get('class_weight', None),
            'random_state': 42
        }

        # Add l1_ratio only for elasticnet penalty
        if params['penalty'] == 'elasticnet' and 'l1_ratio' in params:
            logreg_params['l1_ratio'] = params['l1_ratio']

        logreg = LogisticRegression(**logreg_params)

        # Create pipeline
        pipeline = Pipeline([
            ("tfidf", tfidf),
            ("logreg", logreg)
        ])

        # Train the model
        pipeline.fit(X_train, y_train)

        # Evaluate on validation set
        y_val_pred = pipeline.predict(X_val)
        accuracy = accuracy_score(y_val, y_val_pred)

        # Compute log loss
        val_proba = pipeline.predict_proba(X_val)
        val_loss = log_loss(y_val, val_proba)

        return pipeline, accuracy, val_loss

    except Exception as e:
        logger.error(f"Error in model creation/evaluation: {str(e)}")
        logger.error(traceback.format_exc())
        return None, 0.0, float('inf')

# Objective function for Optuna
def objective(trial):
    # Define the hyperparameter search space
    params = {
        'max_features': trial.suggest_categorical('max_features', [5000, 10000, 15000, None]),
        'ngram_range': trial.suggest_categorical('ngram_range', [(1, 1), (1, 2), (1, 3)]),
        'stop_words': trial.suggest_categorical('stop_words', [None, 'english']),
        'C': trial.suggest_float('C', 0.01, 100.0, log=True),
        'class_weight': trial.suggest_categorical('class_weight', [None, 'balanced']),
        'max_iter': trial.suggest_int('max_iter', 100, 1000, step=100),
    }

    # Instead of dynamically changing solver options based on penalty,
    # we'll use a fixed combination approach
    combination = trial.suggest_categorical('penalty_solver_combo', [
        ('l1', 'liblinear'),  # L1 with liblinear
        ('l1', 'saga'),       # L1 with saga
        ('l2', 'newton-cg'),  # L2 with newton-cg
        ('l2', 'lbfgs'),      # L2 with lbfgs
        ('l2', 'liblinear'),  # L2 with liblinear
        ('l2', 'sag'),        # L2 with sag
        ('l2', 'saga'),       # L2 with saga
        ('elasticnet', 'saga'),  # elasticnet with saga (only valid combo)
        ('none', 'newton-cg'),   # no penalty with newton-cg
        ('none', 'lbfgs'),       # no penalty with lbfgs
        ('none', 'sag'),         # no penalty with sag
        ('none', 'saga')         # no penalty with saga
    ])

    # Extract penalty and solver from the combination
    params['penalty'], params['solver'] = combination

    # For elasticnet penalty, we need l1_ratio
    if params['penalty'] == 'elasticnet':
        params['l1_ratio'] = trial.suggest_float('l1_ratio', 0.0, 1.0)

    # Log selected hyperparameters
    logger.info(f"Trial {trial.number} hyperparameters: {params}")

    # Create and evaluate model
    model, accuracy, val_loss = create_and_evaluate_model(params, X_train, y_train, X_val, y_val)

    # Save the trial results to the all_trials.txt file
    absolute_path = "/content/gdrive/My Drive/Projects/Financial-Sentiment/"
    trial_results_path = os.path.join(absolute_path, 'OptimizationResults', 'logreg_all_trials.txt')
    os.makedirs(os.path.dirname(trial_results_path), exist_ok=True)

    with open(trial_results_path, 'a') as f:
        f.write(f"\n----- Trial {trial.number} -----\n")
        f.write(f"Validation Accuracy: {accuracy:.4f}\n")
        f.write(f"Validation Loss: {val_loss:.4f}\n")
        for key, value in params.items():
            f.write(f"{key}: {value}\n")
        f.write("--------------------------\n")

    if model is None:
        return 0.0  # Return poor performance for failed trials

    return accuracy  # We want to maximize accuracy

# Main function to run optimization
def run_logreg_optimization(n_trials=100, timeout=None):
    global X_train, y_train, X_val, y_val, X_test, y_test, absolute_path
    global absolute_path

    start_time = time.time()
    logger.info(f"Starting Logistic Regression optimization with {n_trials} trials, timeout={timeout}")

    # Mount Google Drive
    try:
        drive.mount('/content/gdrive', force_remount=True)
        logger.info("Google Drive mounted successfully")
    except Exception as e:
        logger.error(f"Error mounting Google Drive: {str(e)}")
        raise

    # Set paths
    absolute_path = "/content/gdrive/My Drive/Projects/Financial-Sentiment/"
    dataset_path = absolute_path + "Datasets/"
    os.makedirs(os.path.join(absolute_path, 'OptimizationResults'), exist_ok=True)

    # Initialize the all trials file with a header
    all_trials_path = os.path.join(absolute_path, 'OptimizationResults', 'logreg_all_trials.txt')
    with open(all_trials_path, 'w') as f:
        f.write("========== LOGISTIC REGRESSION HYPERPARAMETER OPTIMIZATION RESULTS ==========\n")
        f.write(f"Started: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write(f"Number of trials: {n_trials}\n")
        f.write("=============================================================\n")

    # Load datasets
    logger.info("Loading datasets")
    train_df = pd.read_csv(dataset_path + "train_set.csv")
    val_df = pd.read_csv(dataset_path + "validation_set.csv")
    test_df = pd.read_csv(dataset_path + "test_set.csv")

    # Extract features and labels
    X_train, y_train = train_df["Sentence"], train_df["SentimentNumerical"]
    X_val, y_val = val_df["Sentence"], val_df["SentimentNumerical"]
    X_test, y_test = test_df["Sentence"], test_df["SentimentNumerical"]

    logger.info(f"Train set size: {len(X_train)}")
    logger.info(f"Validation set size: {len(X_val)}")
    logger.info(f"Test set size: {len(X_test)}")

    # Create an Optuna study
    study = optuna.create_study(
        direction='maximize',  # Maximize accuracy
        pruner=optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=0)
    )

    # Run the optimization
    try:
        study.optimize(objective, n_trials=n_trials, timeout=timeout)
    except KeyboardInterrupt:
        logger.info("Optimization stopped by user")
    except Exception as e:
        logger.error(f"Error during optimization: {str(e)}")
        logger.error(traceback.format_exc())

    # Log summary to the all trials file
    with open(all_trials_path, 'a') as f:
        f.write("\n========== OPTIMIZATION SUMMARY ==========\n")
        f.write(f"Completed: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write(f"Total optimization time: {(time.time() - start_time) / 60:.2f} minutes\n")
        f.write(f"Number of trials: {len(study.trials)}\n")
        f.write(f"Number of completed trials: {len([t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE])}\n")

    # Get best trial if any completed successfully
    completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
    if completed_trials:
        best_trial = study.best_trial
        logger.info(f"Best trial:")
        logger.info(f"  Value (validation accuracy): {best_trial.value:.4f}")
        logger.info(f"  Params:")
        for key, value in best_trial.params.items():
            logger.info(f"    {key}: {value}")

        # Save the best model results to a detailed file
        best_model_path = os.path.join(absolute_path, 'OptimizationResults', 'logreg_best_model_results.txt')
        with open(best_model_path, 'w') as f:
            f.write("========== LOGISTIC REGRESSION BEST MODEL RESULTS ==========\n")
            f.write(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
            f.write(f"Best Trial Number: {best_trial.number}\n")
            f.write(f"Best Validation Accuracy: {best_trial.value:.4f}\n\n")
            f.write("Hyperparameters:\n")
            f.write("--------------\n")
            for key, value in best_trial.params.items():
                f.write(f"{key}: {value}\n")
            f.write("\n")

            # Add some statistics and analysis
            f.write("Performance Analysis:\n")
            f.write("--------------\n")
            f.write(f"Total trials completed: {len(completed_trials)}\n")
            f.write(f"Optimization time: {(time.time() - start_time) / 60:.2f} minutes\n")

            # Compare to worst trial - FIXED to handle zero values
            worst_trial = min(completed_trials, key=lambda t: t.value)
            f.write(f"Worst trial accuracy: {worst_trial.value:.4f} (Trial {worst_trial.number})\n")
            absolute_improvement = best_trial.value - worst_trial.value
            f.write(f"Absolute improvement over worst: {absolute_improvement:.4f}\n")

            # Only calculate percentage improvement if worst_trial.value is not zero
            if worst_trial.value > 0:
                percentage_improvement = (best_trial.value / worst_trial.value - 1) * 100
                f.write(f"Percentage improvement over worst: {percentage_improvement:.2f}%\n")

            # Calculate average performance
            avg_accuracy = sum(t.value for t in completed_trials) / len(completed_trials)
            f.write(f"Average trial accuracy: {avg_accuracy:.4f}\n")
            absolute_improvement_avg = best_trial.value - avg_accuracy
            f.write(f"Absolute improvement over average: {absolute_improvement_avg:.4f}\n")

            # Only calculate percentage improvement if avg_accuracy is not zero
            if avg_accuracy > 0:
                percentage_improvement_avg = (best_trial.value / avg_accuracy - 1) * 100
                f.write(f"Percentage improvement over average: {percentage_improvement_avg:.2f}%\n")
            f.write("\n")

            # Add parameter importance if available
            try:
                importance = optuna.importance.get_param_importances(study)
                f.write("Parameter Importance:\n")
                f.write("--------------\n")
                for param, score in importance.items():
                    f.write(f"{param}: {score:.4f}\n")
            except Exception as e:
                f.write(f"Could not calculate parameter importance: {str(e)}\n")

            f.write("\n========== END OF REPORT ==========\n")

        logger.info(f"Best model results saved to {best_model_path}")

        # Return the best parameters for final model training
        best_params = best_trial.params.copy()

        # Convert penalty_solver_combo back into separate parameters
        if 'penalty_solver_combo' in best_params:
            penalty, solver = best_params['penalty_solver_combo']
            best_params['penalty'] = penalty
            best_params['solver'] = solver
            del best_params['penalty_solver_combo']

        return best_params
    else:
        logger.warning("No trials completed successfully")
        return None

# Function to train and evaluate the final model with the best hyperparameters
def train_final_logreg_model(best_params):
    global absolute_path
    logger.info("Training final Logistic Regression model with best hyperparameters")

    # Create and train the final model
    final_model, accuracy, val_loss = create_and_evaluate_model(
        best_params, X_train, y_train, X_val, y_val
    )

    if final_model is None:
        logger.error("Failed to train final model")
        return

    # Evaluate on validation set
    y_val_pred = final_model.predict(X_val)
    val_report = classification_report(y_val, y_val_pred)

    logger.info(f"Final model validation accuracy: {accuracy:.4f}")
    logger.info(f"Final model validation loss: {val_loss:.4f}")
    logger.info(f"Validation Classification Report:\n{val_report}")

    # Save validation report to the best model results file
    best_model_path = os.path.join(absolute_path, 'OptimizationResults', 'logreg_best_model_results.txt')
    with open(best_model_path, 'a') as f:
        f.write("\n========== FINAL MODEL EVALUATION ==========\n")
        f.write(f"Validation Accuracy: {accuracy:.4f}\n")
        f.write(f"Validation Loss: {val_loss:.4f}\n")
        f.write("Classification Report:\n")
        f.write(val_report)
        f.write("\n")

    # Measure prediction time for each sample in test set
    logger.info("Evaluating prediction time on test set")
    logreg_predictions = []
    prediction_times = []

    for sentence in X_test:
        start_time = time.time()
        prediction = final_model.predict([sentence])[0]
        end_time = time.time()
        elapsed_time = end_time - start_time
        logreg_predictions.append(prediction)
        prediction_times.append(elapsed_time)

    # Calculate average prediction time
    avg_prediction_time = sum(prediction_times) / len(prediction_times)
    logger.info(f"Average prediction time per sample: {avg_prediction_time:.6f} seconds")

    absolute_path = "/content/gdrive/My Drive/Projects/Financial-Sentiment/"
    dataset_path = absolute_path + "Datasets/"

    # Store predictions and time taken in the test DataFrame
    test_df_copy = pd.read_csv(dataset_path + "test_set.csv")
    test_df_copy["logreg_predictions"] = logreg_predictions
    test_df_copy["time_logreg"] = prediction_times

    # Save the updated test set with predictions and time
    output_path = dataset_path + "test_set_with_logreg_predictions.csv"
    test_df_copy.to_csv(output_path, index=False)
    logger.info(f"Test set with predictions saved to {output_path}")

    # Evaluate on test set
    y_test_pred = final_model.predict(X_test)
    test_accuracy = accuracy_score(y_test, y_test_pred)
    test_report = classification_report(y_test, y_test_pred)

    logger.info(f"Test set accuracy: {test_accuracy:.4f}")
    logger.info(f"Test Classification Report:\n{test_report}")

    # Add test evaluation to the results file
    with open(best_model_path, 'a') as f:
        f.write("\n========== TEST SET EVALUATION ==========\n")
        f.write(f"Test Accuracy: {test_accuracy:.4f}\n")
        f.write("Classification Report:\n")
        f.write(test_report)
        f.write(f"\nAverage prediction time: {avg_prediction_time:.6f} seconds\n")
        f.write("========== END OF FINAL EVALUATION ==========\n")

    # Save model using joblib
    try:
        import joblib
        model_save_path = os.path.join(absolute_path, 'TrainedModels', 'logreg_optimized_model.joblib')
        os.makedirs(os.path.dirname(model_save_path), exist_ok=True)
        joblib.dump(final_model, model_save_path)
        logger.info(f"Model saved to {model_save_path}")

        # Add model path to results file
        with open(best_model_path, 'a') as f:
            f.write(f"\nFinal model saved to: {model_save_path}\n")

    except Exception as e:
        logger.error(f"Error saving model: {str(e)}")
        logger.error(traceback.format_exc())

# Visualization function
def visualize_optuna_study(study):
    """Visualize the results of an Optuna study."""
    # Only proceed if we have completed trials
    completed_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]
    if not completed_trials:
        logger.warning("No completed trials to visualize")
        return

    try:
        logger.info("Generating visualization plots")

        # Plot optimization history
        fig1 = optuna.visualization.plot_optimization_history(study)
        fig1.show()

        # Plot parameter importances
        fig2 = optuna.visualization.plot_param_importances(study)
        fig2.show()

        # Plot parallel coordinate plot
        fig3 = optuna.visualization.plot_parallel_coordinate(study)
        fig3.show()

        # Plot slice plot
        fig4 = optuna.visualization.plot_slice(study)
        fig4.show()

    except Exception as e:
        logger.error(f"Visualization failed: {str(e)}")
        logger.error(traceback.format_exc())

# Main execution
if __name__ == "__main__":
    logger.info("========== STARTING LOGISTIC REGRESSION HYPERPARAMETER OPTIMIZATION ==========")

    # Optional: Install required packages if needed
    try:
        import optuna
    except ImportError:
        logger.info("Installing optuna...")
        !pip install optuna
        import optuna

    try:
        import joblib
    except ImportError:
        logger.info("Installing joblib...")
        !pip install joblib
        import joblib

    # Run optimization with 100 trials (adjust as needed)
    try:
        best_params = run_logreg_optimization(n_trials=100, timeout=None)

        # If optimization was successful, train the final model
        if best_params:
            # Train final model with best parameters
            train_final_logreg_model(best_params)

            # Visualize the study results if possible
            try:
                # Create a study and populate it with a single trial to visualize
                dummy_study = optuna.create_study(direction='maximize')
                dummy_study.optimize(objective, n_trials=1)
                visualize_optuna_study(dummy_study)
            except Exception as viz_error:
                logger.error(f"Error in visualization: {str(viz_error)}")
                logger.error(traceback.format_exc())

    except Exception as e:
        logger.error(f"Error in main execution: {str(e)}")
        logger.error(traceback.format_exc())

    logger.info("========== LOGISTIC REGRESSION HYPERPARAMETER OPTIMIZATION COMPLETED ==========")

[I 2025-04-13 02:52:22,657] A new study created in memory with name: no-name-c48aa362-f843-475c-95fa-ba18c342fdf4


Mounted at /content/gdrive


[I 2025-04-13 02:52:22,895] Trial 0 finished with value: 0.6550387596899225 and parameters: {'max_features': 5000, 'ngram_range': (1, 2), 'stop_words': None, 'C': 8.959369794479093, 'class_weight': 'balanced', 'max_iter': 1000, 'penalty_solver_combo': ('l2', 'newton-cg')}. Best is trial 0 with value: 0.6550387596899225.
[I 2025-04-13 02:52:23,232] Trial 1 finished with value: 0.6569767441860465 and parameters: {'max_features': 10000, 'ngram_range': (1, 1), 'stop_words': None, 'C': 57.225171875259896, 'class_weight': 'balanced', 'max_iter': 500, 'penalty_solver_combo': ('l2', 'saga')}. Best is trial 1 with value: 0.6569767441860465.
ERROR:__main__:Error in model creation/evaluation: The 'penalty' parameter of LogisticRegression must be a str among {'l2', 'l1', 'elasticnet'} or None. Got 'none' instead.
ERROR:__main__:Traceback (most recent call last):
  File "<ipython-input-3-77b0fec66fbc>", line 57, in create_and_evaluate_model
    pipeline.fit(X_train, y_train)
  File "/usr/local/lib/

ERROR:__main__:Visualization failed: Cannot evaluate parameter importances with only a single trial.
ERROR:__main__:Traceback (most recent call last):
  File "<ipython-input-3-77b0fec66fbc>", line 394, in visualize_optuna_study
    fig2 = optuna.visualization.plot_param_importances(study)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/optuna/visualization/_param_importances.py", line 168, in plot_param_importances
    importances_infos = _get_importances_infos(study, evaluator, params, target, target_name)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/optuna/visualization/_param_importances.py", line 82, in _get_importances_infos
    _get_importances_info(
  File "/usr/local/lib/python3.11/dist-packages/optuna/visualization/_param_importances.py", line 54, in _get_importances_info
    importances = optuna.importance.get_param_imp