# 6 分子生成モデルを用いた分子最適化

## 6.1 分子最適化問題とその難しさ

ここでは分子最適化問題を取り扱う。すなわち、評価関数を$f~*: \mathcal{M} \rightarrow \mathbb{R}$として、
$$
m^* = \argmax_{m \in \mathcal{M}} f^* (m)
$$
となる分子$m^* \in \mathcal{M}$を求める分子最適化問題を取り扱う。例えば、ある分聖地が$\theta \in \mathbb{R}$という値をとる分子を求めたい場合にはその物性値を出力する関数$y: \mathcal{M} \rightarrow \mathbb{R}$を用いて、
$$
f^*(m) = - \frac{1}{2}(y(m) - \theta)^2
$$
のような評価関数とすればよい。

分子最適化問題の主な難しさとして、以下の二点の課題とその解決策が考えられる。

### 1. 分子の空間が離散的である
評価関数の定義息が分子からなる離散的な空間であることがあげられる。定義息が連続的であれば評価関数の勾配を用いることで解の改善をすることができる。しかし、定義息が離散的だと勾配情報のような解の改善に有用な情報が得られないため連続最適化よりも取り組むことが難しい。
この課題に対する解決策として、分子の空間を連続的な潜在空間に変換するオートエンコーダを用いて、連続最適化問題に変換する方法と、強化学習を用いて離散的な空間で最適化問題を解く方法が知られている。6章では前者を、7章では後者を説明する。

### 2. 評価関数に関する情報が限られている
評価関数$f^*$に関して得られる情報は個々の問題設定によって異なる。もっとも情報が限られた問題設定では、有限個の分子に対する評価関数値のみが与えられた状況で分子最適化を行う場合があげられる。このような問題設定では**オフライン強化学習**が知られている。

例：既存の実験データを用いて新規物質を発見したい

もう1つの問題設定では、任意の分子$m \in \mathcal{M}$に対して、その評価関数の値$f^*(m)$は知ることができるが、それ以外の情報が得られていない状況で分子最適化を行う場合。一見して多くの情報が得られるように見えるが、標準的な最適化手法では評価関数の値だけでなく勾配の情報も必要になるため使用できる最適化手法が限定される。また、一般的に評価関数の値を得るコストは小さくないため評価関数の評価回数（試験回数）をなるべく少なくしたいという要請もある。このような問題設定では**ブラックボックス最適化**が知られており、その中で代表的なものとして**ベイズ最適化**が知られている。

例：シミュレータを使って物性値を計算する場合や新たに実験を行って物性値を測定する場合

### 2つの課題への対処方法
- 分子の空間が離散的であること
- 評価関数に関する情報が限られていること

が課題となる。それらの課題について解決策を述べていく


## 6.2 分子最適化問題の連続最適化問題への変換
変分オートエンコーダを用いて離散的な分子グラフと実数値ベクトルを行き来することによって実現する。

### 6.2.1 準備
分子のデータセットを用いて学習した変分オートエンコーダを$q(\bm{z} | m), p_{M|\bm{Z}}(m | \bm{z})$とする。ここで$m \in \mathcal{M}$は分子、$\bm{n} \in \mathbb{R}^H$はそれに対応する潜在ベクトル、$q$はエンコーダ、$p_{M | \bm{Z}}$はデコーダである。また、分子$m$を入力すると、その分子に対する評価関数の値$f^*(m)$を得ることができる。

### 6.2.2 連続最適化問題への帰着
まず、デコーダ$p_{M|\bm{Z}}$と、評価関数$p_{Y|M}$を合成することで洗剤表現$\bm{z} \in \mathbb{R}^H$で条件づけた下での、それに対応する分子の評価関数の値に対応する**確率変数**が得られる。つまり、$\bm{z}$を入力すると$Y$が出力されるという確率的な入出力関係を表している。
この入手鬱力関係を関数$f_Z: \mathbb{R}^H \rightarrow \mathbb{R}$とノイズに相当する確率変数$\varepsilon \sim \mathcal{N}(0, \sigma^2)$を用いて
$$
y = f_z(\bm{z}) + \varepsilon
$$
とモデル化することを考える。この関数$f_Z$を最大化することで元の分子の最適化問題を近似的に解くことができる。最大化により得られた$\bm{z}$にデコーダを用いることで分子に変換できる。

### 6.2.3 $f_Z$の推定

ここでは分子とその評価関数の値の対から成る既存のデータセット
$$
\mathcal{D} = \{(m_n, y_n) \in \mathcal{M} \times \mathbb{R} \}_{n=1}^N
$$
を用いて目的関数$f_Z$を推定する方法を説明する。

$f_Z$を推定するためには入力$\bm{z}$と出力$f_Z{\bm{z}}$の対からなる$\mathcal{D}_Z$があればよい。適切な機械学習の手法を用いることで$\mathcal{D}_Z$から$f_Z$を推定できるためである。このようなデータセット$\mathcal{D}_Z$はエンコーダを使って作ることができる。

## 6.3 ベイズ最適化を用いて分子最適化
ここでは任意の入力$bm{z}$に対して目的関数$f_Z$の値を得ることができる場合の連続最適化問題を解く方法について説明する。このような最適化問題を**ブラックボックス最適化**と呼ぶ。様々な方法があるが今回はベイズ最適化を取り上げる。

### 6.3.1 問題設定
目的関数$f_Z: \mathbb{R}^H \rightarrow \mathbb{R}$は入力が与えられた下で対応する出力の値を計算することができるがそれ以外の情報（勾配など）は得られないとする。このような関数をブラックボックス関数と呼ぶ。

### 6.3.2 アルゴリズムの概要



In [1]:
from rdkit import Chem
from rdkit.Chem import Crippen
import torch
# from torchdrug.data.molecule import PackedMolecule
# from torchdrug.metrics import penalized_logP


def filter_valid(smiles_list):
    """SMILES系列のリストを受け取り、正しく分子に変換できるものを抽出

    Parameters
    ----------
    smiles_list : list[str]
        _description_

    Returns
    -------
    _type_
        _description_
    """
    success_list = []
    fail_idx_list = []
    for each_idx, each_smiles in enumerate(smiles_list):
        try:
            smiles = Chem.MolToSmiles(
                Chem.MolFromSmiles(each_smiles))
            success_list.append(smiles)
        except:
            fail_idx_list.append(each_idx)
    return success_list, fail_idx_list


def compute_plogp(smiles_list):
    filtered_smiles_list, fail_idx_list = filter_valid(smiles_list)
    if not filtered_smiles_list:
        return -30.0 * torch.ones(len(smiles_list))
    # packed_dataset = PackedMolecule.from_smiles(
    #     filtered_smiles_list)
    # _plogp_tensor = penalized_logP(packed_dataset)
    packed_dataset = [Chem.MolFromSmiles(smi) for smi in smiles_list]
    _plogp_tensor = torch.tensor([Crippen.MolLogP(mol) for mol in packed_dataset])
    plogp_tensor = torch.zeros(len(smiles_list),
                               dtype=torch.float)
    each_other_idx = 0
    for each_idx in range(len(plogp_tensor)):
        if each_idx in fail_idx_list:
            plogp_tensor[each_idx] = -30.0
        else:
            plogp_tensor[each_idx] = _plogp_tensor[each_other_idx]
            each_other_idx += 1
    return plogp_tensor

In [2]:
import gzip
import pickle
import pandas as pd
import torch
from torch.utils.data import DataLoader, TensorDataset

from botorch.optim import optimize_acqf
from botorch.acquisition import UpperConfidenceBound
from botorch.models import SingleTaskGP
from botorch.fit import fit_gpytorch_mll
from botorch.utils.transforms import standardize, normalize, unnormalize
from gpytorch.mlls import ExactMarginalLogLikelihood


from smiles_vocab import SmilesVocabulary
from smiles_vae import SmilesVAE

from rdkit import RDLogger

lg = RDLogger.logger()
lg.setLevel(RDLogger.CRITICAL)


def bo_dataset_construction(
    vae,
    input_tensor,
    smiles_list,
    batch_size=128,
    max_batch=10
):
    """潜在空間上のデータセットへの変換を行う

    Parameters
    ----------
    vae : SmilesVAE
        SMILES-VAEのモデル
    input_tensor : torch.Tensor
        SMILES系列を整数系列として表現したテンソル
    smiles_list : list[str]
        SMILES系列のリスト
    batch_size : int, optional
        エンコーダを用いる際のバッチサイズ, by default 128
    max_batch : int, optional
        データセットとして用いるバッチ数, by default 10

    Returns
    -------
    (torch.Tensor, torch.Tensor, list[str])
        潜在ベクトルのテンソル、logPのテンソル、SMILES系列のリスト
    """
    dataloader = DataLoader(
        TensorDataset(input_tensor),
        batch_size=batch_size,
        shuffle=False
    )
    z_list = []
    plogp_list = []
    out_smiles_list = []
    for each_batch_idx, each_tensor in enumerate(dataloader):
        if each_batch_idx == max_batch:
            break
        smiles_sublist = smiles_list[batch_size * each_batch_idx: batch_size * (each_batch_idx + 1)]
        with torch.no_grad():
            z, _ = vae.encode(each_tensor[0].to(vae.device))
        z_list.append(z.to("cpu").double())
        plogp_tensor = compute_plogp(smiles_sublist)
        plogp_list.append(plogp_tensor.double())
        out_smiles_list.extend(smiles_sublist)
    return torch.cat(z_list), torch.cat(plogp_list), out_smiles_list


def obj_func(z, vae):
    """ベイズ最適化を行う対象の目的関数。潜在ベクトルを受け取ってそれに対応する分子に対する評価関数の値（logP）を返す。

    Parameters
    ----------
    z : torch.Tensor
        潜在ベクトル
    vae : SmilesVAE
        SMILES-VAEのモデル

    Returns
    -------
    (torch.Tensor, list[str])
        (評価関数の値、SMILESのリスト)
    """
    z = z.to(torch.float32)
    # SMILES-VAEは潜在ベクトルを正しいSMILES系列にデコードできる確率が高くないため正しいSMILES系列が得られるまでデコードを繰り返す
    for _ in range(100):
        smiles_list = vae.generate(z, deterministic=False)
        success_list, failed_idx_list = filter_valid(smiles_list)
        if success_list:
            smiles_list = success_list[:1]
            break
    plogp_tensor = compute_plogp(smiles_list).double()
    return plogp_tensor, smiles_list


smiles_vocab = SmilesVocabulary()
train_tensor, train_smiles_list = smiles_vocab.batch_update_from_file("data/train.smi", with_smiles=True, ratio=0.1)
val_tensor, val_smiles_list = smiles_vocab.batch_update_from_file("data/valid.smi", with_smiles=True, ratio=0.1)
max_len = train_tensor.shape[1]
latent_dim = 64

# SMILES-VAEの読み込み
vae = SmilesVAE(
    vocab=smiles_vocab, latent_dim=64, emb_dim=256,
    encoder_params={"hidden_size": 512, "num_layers": 1, "bidirectional": False, "dropout": 0},
    decoder_params={"hidden_size": 512, "num_layers": 1, "dropout": 0},
    encoder2out_params={"out_dim_list": [256]},
    max_len=max_len
).to("cuda")
vae.load_state_dict(torch.load("data/vae.pt"))
vae.eval()

# SMILES-VAEのエンコーダを用いて分子とその評価関数の値の対からなるデータセットを
# 潜在空間上のデータセットDzに変換
z_tensor, plogp_tensor, smiles_list = bo_dataset_construction(
    vae=vae,
    input_tensor=train_tensor,
    smiles_list=train_smiles_list,
    # batch_size=128,
)
n_trial = 500

# 潜在空間上のデータセットDzをもとにベイズ最適化を行う
for each_trial in range(n_trial):
    # ガウス過程の入出力に対応するz_tensorおよびplotp_tensorを標準化する
    standardized_y = standardize(plogp_tensor).reshape(-1, 1)
    bounds = torch.stack([z_tensor.min(dim=0)[0],
                          z_tensor.max(dim=0)[0]])
    normalized_X = normalize(z_tensor, bounds)
    # 上記のデータを用いてガウス過程を学習する
    gp = SingleTaskGP(
        train_X=normalized_X,
        train_Y=standardized_y
    )
    # 獲得関数として信頼上限を計算し、獲得関数の最適化を通じて
    # 次に目的関数の値を計算する候補点candidateを求める
    mll = ExactMarginalLogLikelihood(gp.likelihood, gp)
    fit_gpytorch_mll(mll)
    UCB = UpperConfidenceBound(gp, beta=0.1)
    candidate, acq_value = optimize_acqf(
        acq_function=UCB,
        bounds=torch.stack([-0.1 * torch.ones(latent_dim),
                            1.1 * torch.ones(latent_dim)]),
        q=1,
        num_restarts=5,
        raw_samples=10
    )
    # candidateは標準化された空間における点であるためこれをもとの入力空間に戻す
    unnormalize_candidate = unnormalize(X=candidate, bounds=bounds)
    # 目的関数の値を計算してデータセットを更新
    plogp_val, each_smiles_list = obj_func(
        z=unnormalize_candidate, vae=vae
    )
    z_tensor = torch.cat([z_tensor, unnormalize_candidate])
    plogp_tensor = torch.cat([plogp_tensor, plogp_val])
    smiles_list.extend(each_smiles_list)
    print(f" * {each_trial}\t{plogp_val}")

plogp_tensor = plogp_tensor[-n_trial:]
smiles_list = smiles_list[-n_trial:]
_, ascending_idx_tensor = plogp_tensor.sort()

# 見つかった分子のうち、logPの大きい分子上位10個を表示する
print("plogp\t smiles")
out_dict_list = []
for each_idx in ascending_idx_tensor.tolist()[::-1][:10]:
    print(f"{plogp_tensor[each_idx]}\t{smiles_list[each_idx]}")
    out_dict_list.append({"smiles": smiles_list[each_idx],
                          "plogp": plogp_tensor[each_idx]})
res_df = pd.DataFrame(out_dict_list)
with gzip.open("data/smiles_vae_best_mol.pklz", "wb") as f:
    pickle.dump(res_df, f)

with gzip.open("data/smiles_vae_bo_full.pklz", "wb") as f:
    pickle.dump((smiles_list, plogp_tensor), f)



  from .autonotebook import tqdm as notebook_tqdm
100%|██████████| 127310/127310 [00:02<00:00, 58105.67it/s]
100%|██████████| 7956/7956 [00:00<00:00, 76789.34it/s]


 * 0	tensor([2.8966], dtype=torch.float64)
 * 1	tensor([0.6716], dtype=torch.float64)
 * 2	tensor([5.0504], dtype=torch.float64)
 * 3	tensor([1.7106], dtype=torch.float64)
 * 4	tensor([0.4904], dtype=torch.float64)
 * 5	tensor([-2.1848], dtype=torch.float64)
 * 6	tensor([-0.9497], dtype=torch.float64)
 * 7	tensor([2.8980], dtype=torch.float64)
 * 8	tensor([4.1570], dtype=torch.float64)
 * 9	tensor([7.7818], dtype=torch.float64)
 * 10	tensor([0.1610], dtype=torch.float64)
 * 11	tensor([3.3149], dtype=torch.float64)
 * 12	tensor([5.2022], dtype=torch.float64)
 * 13	tensor([4.6439], dtype=torch.float64)
 * 14	tensor([2.2004], dtype=torch.float64)
 * 15	tensor([-0.1827], dtype=torch.float64)
 * 16	tensor([1.3911], dtype=torch.float64)
 * 17	tensor([7.0427], dtype=torch.float64)
 * 18	tensor([4.7053], dtype=torch.float64)
 * 19	tensor([2.9968], dtype=torch.float64)
 * 20	tensor([3.3650], dtype=torch.float64)
 * 21	tensor([1.4717], dtype=torch.float64)
 * 22	tensor([3.2338], dtype=torch.floa