In [1]:
import random
import src.prediction_models as pm
from typing import Literal, Optional
import src.generate_encodings as ge
import optuna
import warnings

import os, sys

In [2]:
class HiddenPrints:
    def __enter__(self):
        self._original_stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')

    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout.close()
        sys.stdout = self._original_stdout


class HiddenWarnings():
    def __enter__(self):
        # Save the current filter settings before changing them
        self._previous_filters = warnings.filters[:]
        # Ignore all warnings
        warnings.filterwarnings("ignore")

    def __exit__(self, exc_type, exc_val, exc_tb):
        # Restore the original warning filter settings
        warnings.filters = self._previous_filters


In [3]:
"""Extract Sequence-Embeddings and Scores """

e_type = "blosum80"
dataset = "D7PM05_CLYGR_Somermeyer_2022"
data_set_name_shortened = dataset.split("_")[0]

import_data = f"../Data/Protein_Gym_Datasets/{dataset}.csv"
x_arr = []
y_arr = []
with open(import_data, "r") as infile:
    for line in infile.readlines()[1:]:
        line = line[:-1].split(",")
        sequence = line[1]
        label = line[2]
        x_arr.append(ge.generate_sequence_encodings(e_type, [sequence])[0])
        y_arr.append(label)

In [4]:
# random.shuffle(data)
# x_train = [xy[0] for xy in data[:int(len(data) * 0.80)]]
# y_train = [xy[1] for xy in data[:int(len(data) * 0.80)]]
#
# x_val = [xy[0] for xy in data[int(len(data) * 0.80):int(len(data) * 0.90)]]
# y_val = [xy[1] for xy in data[int(len(data) * 0.80):int(len(data) * 0.90)]]
#
# x_test = [xy[0] for xy in data[int(len(data) * 0.90):]]
# y_test = [xy[1] for xy in data[int(len(data) * 0.90):]]

In [5]:
# """To Do:
# Training-Script for XGB (Regressor)
# Provide used Hyperparameter via Arguments e.g. sys.argv[]
# return and print R2 and RMSE
# Command: Python xgboost_train.py <max_depth> <eta> <n_esti> ... --> RMSE = 0.18
# NEXT USE OPTUNA for running the script xgboost_train.py:
#
# """
#
# xgb_hyperCV = {
#     "max_depth": [4, 5, 6, 7, 8, 9, 10, 11, 12, 13],
#     "eta": [0.1, 0.2, 0.3, 0.4, 0.5, 0.6],
#     "n_estimators": [50, 100, 200, 300, 400, 500],
#     "reg_alpha": [0.0, 0.1, 0.5, 1, 1.5],
#     "reg_lambda": [0.0, 0.1, 0.5, 1, 1.5],
#     # "min_samples_split": [2, 3, 4, 5],
#     # "min_samples_leaf": [1, 2, 3, 4, 5],
#     # "bootstrap": [False, True]
# }

In [6]:
model = "lightgbm"
cv_folds = 5


In [7]:
def train_with_params(model: Literal["rf", "xgboost", "gxboost_rf", "lightgbm", "svr", "adaboost"], x_arr, y_arr,
                      params: dict,
                      early_stopping: Optional[int] = False, seed: Optional[int] = random.seed()) -> (float, float):
    regressor = pm.ActivityPredictor(model_type=model, x_arr=x_arr, y_arr=y_arr, params=params,
                                     early_stopping=early_stopping, shuffle_data=False,
                                     seed=seed)

    with HiddenPrints():
        with HiddenWarnings():
            regressor.train(k_folds=cv_folds)
        performance = regressor.get_performance()
    print('------------------------------------------------')
    print(f"R2: {round(performance[0], 4)}, RMSE: {round(performance[1], 4)}")
    return performance


In [8]:
def objective(trial: optuna.Trial) -> (float, float):
    params = dict()
    seed = 42
    params['max_depth'] = trial.suggest_int('max_depth', 3, 30)
    params['min_child_weight'] = trial.suggest_float('min_child_weight', 0.01, 10)
    params['subsample'] = trial.suggest_float('subsample', 0.2, 1)
    params['colsample_bytree'] = trial.suggest_float('colsample_bytree', 0.2, 0.9)
    params['eta'] = trial.suggest_float('eta', 0, 0.3)
    # params['num_boost_round'] = trial.suggest_int('num_boost_round', 100, 1000)

    try:
        early_stopping = int(params['num_boost_round'] / 10)
    except KeyError:
        early_stopping = 10

    results = train_with_params(model, x_arr, y_arr, params, early_stopping=early_stopping, seed=seed)
    r2 = round(results[0], 4)
    rmse = round(results[1], 4)

    return r2, rmse



In [9]:
n_studies = 5
n_trials = 100
direction = ["maximize", "minimize"]

studies_name = f"{data_set_name_shortened}_{model}_{e_type}"
overall_best_trials = []
for i in range(n_studies):
    study_name = f"{studies_name}_{i}"
    run_id = f"{i}"

    study = optuna.create_study(directions=["maximize", "minimize"], study_name=study_name)

    with HiddenPrints():
        study.optimize(objective, n_trials=n_trials, show_progress_bar=True)
    print('------------------------------------------------')
    for b_trial in study.best_trials:
        overall_best_trials.append((b_trial, run_id))
        print(
            f"{study_name}, Best hyperparameters for run {i}:, Best Trial: {b_trial.number}, Best Score: {b_trial.values}, "
            f"Best Params{b_trial.params}")

    outpath = "../Models/Hypertuned/"
    if not os.path.exists(outpath):
        os.makedirs(outpath)
    outfile = study_name + run_id + ".txt"

    with open(os.path.join(outpath, outfile), "w") as out:
        out.write(f"Study-ID: {i}_{model}_{e_type}_{dataset}\n")
        out.write("\n")
        for j, b_trial in enumerate(study.best_trials):

            out.write(
                f"Study {j}, Best hyperparameters for run {run_id}:\nBest Trial: {b_trial.number}\nBest Score: {b_trial.values}\n"
                f"Best Params: {b_trial.params} \n")
            if j < len(study.best_trials)-1:
                out.write("---\n")

        out.write("\n")
        for trial in study.trials:
            out.write('------------------------------------------------\n')
            out.write(f"Study {trial.number}\nScores: {trial.values}\n")
            out.write(f"Parameters: {trial.params}\n")



[I 2025-03-31 13:33:12,860] A new study created in memory with name: D7PM05_lightgbm_blosum80_0


  0%|          | 0/100 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
outpath = "../Models/Hypertuned/"
summary_file = f"Summary_{studies_name}.txt"
values = overall_best_trials[0]
overall_best_trials = sorted(overall_best_trials, key=lambda x: x[0].values[0], reverse=True)

with open(os.path.join(outpath, summary_file), "w") as out:
    out.write(f"{studies_name} with {n_studies} Studies with each {n_trials} trials\n")
    out.write(f"all scores: {[trial[0].values for trial in overall_best_trials]}\n")
    for i, btrail in enumerate(overall_best_trials):
        if i != 0:
            out.write('------------------------------------------------\n')
        out.write(f"Study {btrail[1]},Trial {btrail[0].number}, Scores: {btrail[0].values}\n")
        out.write(f"Parameters: {btrail[0].params}\n")
