In [None]:
import datetime
import json
import pickle
import warnings

import lightgbm as lgb
import numpy as np
import optuna
import pandas as pd
from rdkit import Chem, RDLogger
from rdkit.Chem import rdMolDescriptors
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
from tqdm import tqdm

warnings.filterwarnings("ignore")
RDLogger.DisableLog("rdApp.*")

In [None]:
def mfgen(mol, nBits=2048, radius=2):
    fp = rdMolDescriptors.GetMorganFingerprintAsBitVect(mol, radius=radius, nBits=nBits)
    return np.array(list(map(eval, list(fp.ToBitString()))))


def vec_cpd_lst(smi_lst):
    smi_set = list(set(smi_lst))
    smi_vec_map = {}
    for smi in tqdm(smi_set):
        mol = Chem.MolFromSmiles(smi)
        smi_vec_map[smi] = mfgen(mol)
    smi_vec_map[""] = np.zeros(2048)

    vec_lst = [smi_vec_map[smi] for smi in smi_lst]
    return np.array(vec_lst)

In [None]:
dataset_dir = "../data"

train_df = pd.read_csv(f"{dataset_dir}/round1_train_data.csv")
test_df = pd.read_csv(f"{dataset_dir}/round1_test_data.csv")

print(f"Training set size: {len(train_df)}, test set size: {len(test_df)}")

In [None]:
train_rct1_fp = vec_cpd_lst(train_df["Reactant1"].to_list())
train_rct2_fp = vec_cpd_lst(train_df["Reactant2"].to_list())
train_add_fp = vec_cpd_lst(train_df["Additive"].to_list())
train_sol_fp = vec_cpd_lst(train_df["Solvent"].to_list())
train_x = np.concatenate(
    [train_rct1_fp, train_rct2_fp, train_add_fp, train_sol_fp], axis=1
)
train_y = train_df["Yield"].to_numpy()

test_rct1_fp = vec_cpd_lst(test_df["Reactant1"].to_list())
test_rct2_fp = vec_cpd_lst(test_df["Reactant2"].to_list())
test_add_fp = vec_cpd_lst(test_df["Additive"].to_list())
test_sol_fp = vec_cpd_lst(test_df["Solvent"].to_list())
test_x = np.concatenate([test_rct1_fp, test_rct2_fp, test_add_fp, test_sol_fp], axis=1)

In [None]:
# x_train, x_val, y_train, y_val = train_test_split(
#     train_x, train_y, test_size=0.1, random_state=2024
# )

# train_matrix = lgb.Dataset(x_train, label=y_train)
# val_matrix = lgb.Dataset(x_val, label=y_val)


# def objective(trial):
#     param = {
#         "objective": "regression",
#         "boosting_type": "gbdt",
#         "learning_rate": trial.suggest_loguniform("learning_rate", 1e-3, 1e-1),
#         "num_leaves": trial.suggest_int("num_leaves", 2**3, 2**10),
#         "seed": 2024,
#         "min_data_in_leaf": trial.suggest_int("min_data_in_leaf", 10, 50),
#         "min_sum_hessian_in_leaf": trial.suggest_int("min_sum_hessian_in_leaf", 1, 10),
#         "bagging_fraction": trial.suggest_uniform("bagging_fraction", 0.6, 1.0),
#         "bagging_freq": trial.suggest_int("bagging_freq", 1, 10),
#         "feature_fraction": trial.suggest_uniform("feature_fraction", 0.6, 1.0),
#         "lambda_l2": trial.suggest_loguniform("lambda_l2", 1e-2, 1.0),
#         "metric": "mse",
#         "feature_pre_filter": False,
#     }

#     model = lgb.train(
#         param,
#         train_matrix,
#         10000,
#         valid_sets=[train_matrix, val_matrix],
#         callbacks=[lgb.early_stopping(500)],
#     )

#     val_pred = model.predict(x_val)
#     r2 = r2_score(y_val, val_pred)

#     return r2


# study = optuna.create_study(direction="maximize")
# study.optimize(
#     objective,
#     n_trials=10,
# )

# best_params = study.best_trial.params
# print(f"Best params: {best_params}")

# with open("best_trial_params.json", "wb") as file:
#     json.dump(best_params, file)

In [None]:
x_train, x_val, y_train, y_val = train_test_split(
    train_x, train_y, test_size=0.1, random_state=2024
)

train_matrix = lgb.Dataset(x_train, label=y_train)
val_matrix = lgb.Dataset(x_val, label=y_val)

param = {
    "objective": "regression",
    "boosting": "gbdt",
    "learning_rate": 0.002,
    "num_leaves": 916,
    "num_threads": -1,
    "device_type": "cpu",
    "seed": 2024,
    "min_data_in_leaf":11,
    "min_sum_hessian_in_leaf":9,
    "bagging_fraction": 0.9,
    "bagging_freq": 6,
    "feature_fraction": 0.7,
    "lambda_l2": 0.06,
    "metric": "mse",
}

model = lgb.train(
    param,
    train_matrix,
    10000,
    valid_sets=[train_matrix, val_matrix],
    categorical_feature=[],
    callbacks=[
        lgb.log_evaluation(100),
        lgb.early_stopping(1000),
    ],
)

val_pred = model.predict(
    x_val,
    num_iteration=model.best_iteration,
)

score = r2_score(val_pred, y_val)
print("Validation Set Score:", score)

In [None]:
test_pred = model.predict(
    test_x,
    num_iteration=model.best_iteration,
)

In [None]:
ans_str_lst = ["rxnid,Yield"]
for idx, y in enumerate(test_pred):
    ans_str_lst.append(f"test{idx+1},{y:.4f}")
with open("../submit/submit_" + datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".txt", "w") as fw:
    fw.writelines("\n".join(ans_str_lst))