In [40]:
import random
import os

import numpy as np
import pandas as pd

from sklearn.feature_selection import VarianceThreshold
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from rdkit import DataStructs
from rdkit import Chem, DataStructs
from rdkit.Chem import PandasTools, AllChem, Descriptors
from rdkit.ML.Descriptors.MoleculeDescriptors import MolecularDescriptorCalculator

from typing import Callable, List, Union

from autogluon.tabular import TabularDataset, TabularPredictor
import autogluon.eda.auto as auto #EDA
from IPython.display import Image, display
 
pd.set_option('display.max_seq_items', None)

In [None]:

def seed_everything(seed):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed)

In [None]:
seed_everything(42) # Seed 고정

train_d = pd.read_csv("../input/train.csv")
test_d = pd.read_csv("../input/test.csv")

train_plas= pd.read_csv("../input/train_out.csv")
test_plas = pd.read_csv("../input/test_out.csv")

In [None]:
#torch custom dataset 인데, 정의만 하고 사용하지 않았어요
class CustomDataset(Dataset):
    def __init__(self, df, target, is_test=False):
        self.df      = df
        self.target  = target # HLM or MLM
        self.feature = self.df[['AlogP', 'Molecular_Weight', 'Num_H_Acceptors', 'Num_H_Donors', 'Num_RotatableBonds', 'LogD', 'Molecular_PolarSurfaceArea', 'plas']].values 
        self.label   = self.df[self.target].values

        self.is_test = is_test # train,valid / test

    def __getitem__(self, index):
        feature = self.feature[index]
        label = self.label[index]

        if not self.is_test: # test가 아닌 경우(label 존재)
            return torch.tensor(feature, dtype=torch.float), torch.tensor(label, dtype=torch.float).unsqueeze(dim=-1) # feature, label
        else: # test인 경우
            return torch.tensor(feature, dtype=torch.float).float() # feature
        
    def __len__(self):
        return len(self.df)

In [None]:
def morgan_binary_features_generator(mol: Union[str, Chem.Mol], plot_img = False,
                                     radius: int = 2,
                                     num_bits: int = 32) -> np.ndarray:
    
    mol = Chem.MolFromSmiles(mol) if type(mol) == str else mol
    if plot_img:
        display(mol)
    features_vec = AllChem.GetMorganFingerprintAsBitVect(mol, radius, nBits=num_bits)
    features = np.zeros((1,))
    DataStructs.ConvertToNumpyArray(features_vec, features)
    return features

In [None]:
def makeFPsAsLongNumberStr(mol: Union[str, Chem.Mol]):
    fps = morgan_binary_features_generator(mol)
    return ''.join([str(int(i)) for i in fps])

In [None]:
# descriptor List
allDescriptorList = [i[0] for i in Descriptors._descList]

In [None]:
chosen_descriptors = [  'BalabanJ', 'BertzCT', 'Chi0', 'Chi0n', 'Chi0v', 'Chi1', 'Chi1n', 'Chi1v', 'Chi2n', 'Chi2v',
                        'EState_VSA5', 'EState_VSA6', 'EState_VSA7', 'EState_VSA8', 'EState_VSA9', 'ExactMolWt', 'FpDensityMorgan1', 'FpDensityMorgan2', 'FpDensityMorgan3', 'FractionCSP3',
                        'HallKierAlpha', 'HeavyAtomCount', 'HeavyAtomMolWt', 'Ipc', 'Kappa1', 'Kappa2', 'Kappa3', 'LabuteASA', 'MaxAbsEStateIndex', 'MaxAbsPartialCharge',
                        'MaxEStateIndex', 'MaxPartialCharge', 'MinAbsEStateIndex', 'MinAbsPartialCharge', 'MinEStateIndex', 'MinPartialCharge', 'MolLogP', 'MolMR', 'MolWt', 'NHOHCount',
                        'NOCount', 'NumAliphaticCarbocycles', 'NumAliphaticHeterocycles', 'NumAliphaticRings', 'NumAromaticCarbocycles', 'NumAromaticHeterocycles', 'NumAromaticRings', 'NumHAcceptors', 'NumHDonors', 'NumHeteroatoms',
                        'NumRadicalElectrons', 'NumRotatableBonds', 'NumSaturatedCarbocycles', 'NumSaturatedHeterocycles', 'NumSaturatedRings', 'NumValenceElectrons', 'PEOE_VSA1', 'PEOE_VSA10', 'PEOE_VSA11', 'PEOE_VSA12',
                        'PEOE_VSA13', 'PEOE_VSA14', 'PEOE_VSA2', 'PEOE_VSA3', 'PEOE_VSA4', 'PEOE_VSA5', 'PEOE_VSA6', 'PEOE_VSA7', 'PEOE_VSA8', 'PEOE_VSA9',
                        'RingCount', 'SMR_VSA1', 'SMR_VSA10', 'SMR_VSA2', 'SMR_VSA3', 'SMR_VSA4', 'SMR_VSA5', 'SMR_VSA6', 'SMR_VSA7', 'SMR_VSA8',
                        'SMR_VSA9', 'SlogP_VSA1', 'SlogP_VSA10', 'SlogP_VSA11', 'SlogP_VSA12', 'SlogP_VSA2', 'SlogP_VSA3', 'SlogP_VSA4', 'SlogP_VSA5', 'SlogP_VSA6',
                        'SlogP_VSA7', 'SlogP_VSA8', 'SlogP_VSA9', 'TPSA', 'VSA_EState1', 'VSA_EState10', 'VSA_EState2', 'VSA_EState3', 'VSA_EState4', 'VSA_EState5'
                    ]

np.float = float  # to avoid "module 'numpy' has no attribute 'float'"

def getDescriptorValues(mol: Union[str, Chem.Mol]):

    mol = Chem.MolFromSmiles(mol) if type(mol) == str else mol
    mol_descriptor_calculator = MolecularDescriptorCalculator(chosen_descriptors)
    # use molecular descriptor calculator on RDKit mol object
    descriptor_vals = list(mol_descriptor_calculator.CalcDescriptors(mol))

    return descriptor_vals

In [None]:
# FPs column 추가
train_d["plas"] = train_plas["pred_0"]
test_d["plas"] = test_plas["pred_0"]

# FPS 정보 추가
train_d["FPS"]= train_d["SMILES"].apply(makeFPsAsLongNumberStr)
test_d["FPS"]=  test_d["SMILES"].apply(makeFPsAsLongNumberStr)

# descriptor 정보 추가
train_d["add"] = train_d["SMILES"].apply(lambda x: getDescriptorValues(x)) # new feautre as descriptor
train_d[chosen_descriptors] = pd.DataFrame(train_d["add"].tolist())
train_d.drop(["add"], axis=1, inplace=True)

test_d["add"] = test_d["SMILES"].apply(lambda x: getDescriptorValues(x)) # new feautre as descriptor
test_d[chosen_descriptors] = pd.DataFrame(test_d["add"].tolist())
test_d.drop(["add"], axis=1, inplace=True)

for key in chosen_descriptors:
    if ( len(train_d[key].unique()) < (len(train_d)//3)):
        print(f"Drop the Calculated Feautre: {key}")
        train_d.drop([key], axis=1, inplace=True)
        test_d.drop([key], axis=1, inplace=True)


train_d["AlogP"].fillna(value=train_d["AlogP"].mean(), inplace=True)
test_d["AlogP"].fillna(value=train_d["AlogP"].mean(), inplace=True)

train_d.fillna(value=0, inplace=True)
test_d.fillna(value=0, inplace=True)


print(train_d.head(3))
print(test_d.head(3))

In [None]:
print(f"train colums: {len(train_d.columns)}")
print(f"test colums: {len(test_d.columns)}")


# Show Sampled Mol from SMILES data
print(f"Tester to FPs: {morgan_binary_features_generator(train_d['SMILES'][0], True)}")
print(f"Unique len: {train_d['FPS'].unique().shape[0]} / {train_d.shape[0]}")

In [None]:
train_d.isna().sum(), test_d.isna().sum()

In [None]:
# 사용할 column만 추출
train_MLM = TabularDataset(train_d.drop(['id', "HLM"], axis=1))
train_HLM = TabularDataset(train_d.drop(['id', "MLM"], axis=1))
test = TabularDataset(test_d.drop(["id"], axis=1))

In [None]:
# EDA
state = auto.quick_fit(
    train_d, 
    "MLM", 
    return_state=True,
    fit_bagging_folds=3,
    show_feature_importance_barplots=True
)

In [None]:
auto.explain_rows(
    train_data=train_d,
    model=state.model,
    display_rows=True,
    rows=state.model_evaluation.highest_error[:1]
)

In [None]:
state = auto.quick_fit(
    train_HLM, 
    "HLM", 
    return_state=True,
    show_feature_importance_barplots=True
)

In [None]:
predictor_MLM = TabularPredictor(label='MLM', eval_metric='root_mean_squared_error', verbosity=False).fit(train_MLM)
predictor_HLM = TabularPredictor(label='HLM', eval_metric='root_mean_squared_error', verbosity=False).fit(train_HLM)

ld_board_MLM = predictor_MLM.leaderboard(train_MLM, silent=True)
print("="*20, "MLM", "="*20)
print(ld_board_MLM)
print(f"Best: {predictor_MLM.get_model_best()}")

ld_board_HLM = predictor_HLM.leaderboard(train_HLM, silent=True)
print("="*20, "HLM", "="*20)
print(ld_board_HLM)
print(f"Best: {predictor_HLM.get_model_best()}")

In [None]:
path_to_png=predictor_MLM.plot_ensemble_model()
display(Image(filename=path_to_png))
path_to_png=predictor_HLM.plot_ensemble_model()
display(Image(filename=path_to_png))

---

In [None]:
#### TESTING

In [None]:
# 결과파일 작성
pred_MLM = predictor_MLM.predict(test)
pred_HLM = predictor_HLM.predict(test)

submission = pd.DataFrame()
submission["id"] = test_d["id"]
submission["MLM"] = pred_MLM
submission["HLM"] = pred_HLM

submission.to_csv("../output/submission.csv", index=False)

In [None]:
# check before sub
submission.isna().sum()