# 訓練データの読み込み

In [2]:
import torch
from smiles_vocab import SmilesVocabulary

device = torch.device("mps") if torch.backends.mps.is_available() else "cpu"
torch.mps.manual_seed(330252033)

smiles_vocab = SmilesVocabulary()
# 訓練データと検証データの整数系列を作成
train_tensor: torch.Tensor = smiles_vocab.batch_update_from_file("train.smi").to(device)
train_tensor.shape

torch.Size([1273104, 102])

In [3]:
from torch.utils.data import DataLoader, TensorDataset

batch_size = 256

# シャッフルありでバッチモードで訓練データの DataLoader を作成
train_dataset = TensorDataset(torch.flip(train_tensor, dims=[1]), train_tensor)
train_data_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, drop_last=True
)
train_data_loader_iter = train_data_loader.__iter__()
each_train_batch = train_data_loader_iter.__next__()

# バッチごとに学習
in_seq: torch.Tensor = each_train_batch[0].to(device)
out_seq: torch.Tensor = each_train_batch[1].to(device)
in_seq.shape, out_seq.shape

(torch.Size([256, 102]), torch.Size([256, 102]))

# `SmilesVAE.loss()`

## `SmilesVAE.forward()`

### `SmilesVAE.encode()`

In [4]:
from torch import nn


emb_dim = 256
vocab = smiles_vocab
vocab_size = len(vocab.char_list)
print(f"{vocab_size=}")

# 埋め込みベクトル
embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=vocab.pad_idx, device=device)
embedding.weight.shape

vocab_size=41


torch.Size([41, 256])

In [5]:
# in_seq (batch_size, seq_len) を埋め込みベクトルの行列に変換
in_seq_emb: torch.Tensor = embedding(in_seq)
in_seq_emb.shape  # バッチサイズ * 系列長 * 隠れ状態の次元数

torch.Size([256, 102, 256])

#### `self.encoder`

In [6]:
encoder_params = {
    "hidden_size": 512,
    "num_layers": 1,
    "bidirectional": False,
    "dropout": 0.0,
}

# 埋め込みベクトルの系列をエンコーダに入力
# 隠れ状態の系列 out_seq: サンプルサイズ * 系列長 * 隠れ状態の次元
# 最終隠れ状態 (h, c)
encoder = nn.LSTM(emb_dim, batch_first=True, **encoder_params, device=device)
encoder_out_seq, (h, c) = encoder(in_seq_emb)
encoder_out_seq.shape  # バッチサイズ * 系列長 * 隠れ状態の次元数

torch.Size([256, 102, 512])

#### `self.encoder2out`

In [7]:
in_dim = 512
each_out_dim = 256

# エンコーダの LSTM の出力を変換する多層ニューラルネットワーク
encoder2out: nn.Sequential = nn.Sequential()
encoder2out.append(nn.Linear(in_dim, each_out_dim, device=device))
encoder2out.append(nn.Sigmoid())
in_dim = each_out_dim

# 末尾の隠れ状態は、入力系列すべてを反映した隠れ状態であり、これを使ってエンコーダの出力を作る
last_out: torch.Tensor = encoder_out_seq[:, -1, :]
out: torch.Tensor = encoder2out(last_out)
last_out.shape, out.shape  # バッチサイズ * 隠れ状態の次元数

(torch.Size([256, 512]), torch.Size([256, 256]))

#### `self.encoder_out2mu`, `self.encoder_out2logvar`

In [8]:
latent_dim = 64

# self.encoder2out の出力を潜在空間上の正規分布の平均に変換する線形モデル
encoder_out2mu = nn.Linear(in_dim, latent_dim, device=device)

# self.encoder2out の出力を潜在空間上の正規分布の分散共分散行列の対角成分に変換する線形モデル
encoder_out2logvar = nn.Linear(in_dim, latent_dim, device=device)

# 潜在空間上の正規分布の平均と分散共分散行列をつくり、エンコーダの出力とする
mu: torch.Tensor = encoder_out2mu(out)
logvar: torch.Tensor = encoder_out2logvar(out)
mu.shape, logvar.shape  # バッチサイズ * 潜在空間の次元数

(torch.Size([256, 64]), torch.Size([256, 64]))

### `SmilesVAE.reparam()`

In [9]:
# 潜在空間上の正規分布の分散共分散行列の対角成分の対数から標準偏差を計算
std = torch.exp(0.5 * logvar)
# 標準偏差を何倍にするかを正規分布からランダムサンプリング
eps = torch.randn_like(std)
z = mu + std * eps
z.shape  # バッチサイズ * 潜在空間の次元数

torch.Size([256, 64])

### `SmilesVAE.decode()`

#### `self.latent2dech`, `self.latent2decc`

In [10]:
decoder_params = {"hidden_size": 512, "num_layers": 1, "dropout": 0.0}
# 潜在ベクトルを、デコーダの LSTM の隠れ状態に変換するモデル
latent2dech: nn.Linear = nn.Linear(
    latent_dim, decoder_params["hidden_size"] * decoder_params["num_layers"]
).to(device)
# 潜在ベクトルを、デコーダの LSTM の細胞状態に変換するモデル
latent2decc: nn.Linear = nn.Linear(
    latent_dim, decoder_params["hidden_size"] * decoder_params["num_layers"]
).to(device)
latent2dech, latent2decc

(Linear(in_features=64, out_features=512, bias=True),
 Linear(in_features=64, out_features=512, bias=True))

#### `self.decoder`

In [11]:
# デコーダ
decoder: nn.LSTM = nn.LSTM(
    emb_dim, batch_first=True, bidirectional=False, **decoder_params, device=device
)
decoder

LSTM(256, 512, batch_first=True)

In [12]:
# batch_size: int = z.shape[0]  # 自明なのでスキップ

# デコードに用いる LSTM の隠れ状態 h と細胞状態 c を潜在ベクトルから作成
h_unstructured: torch.Tensor = latent2dech(z)
c_unstructured: torch.Tensor = latent2decc(z)

print(f"{h_unstructured.shape=}, {c_unstructured.shape=}")

h: torch.Tensor = torch.stack(
    [
        h_unstructured[:, each_idx : each_idx + decoder.hidden_size]
        for each_idx in range(0, h_unstructured.shape[1], decoder.hidden_size)
    ]
)
c: torch.Tensor = torch.stack(
    [
        c_unstructured[:, each_idx : each_idx + decoder.hidden_size]
        for each_idx in range(0, c_unstructured.shape[1], decoder.hidden_size)
    ]
)

h.shape, c.shape

h_unstructured.shape=torch.Size([256, 512]), c_unstructured.shape=torch.Size([256, 512])


(torch.Size([1, 256, 512]), torch.Size([1, 256, 512]))

#### `self.decoder2vocab`

In [13]:
# デコーダの出力を、アルファベット空間上のロジットベクトルに変換するモデル
decoder2vocab: nn.Linear = nn.Linear(
    decoder_params["hidden_size"], vocab_size, device=device
)
decoder2vocab

Linear(in_features=512, out_features=41, bias=True)

In [14]:
# 正解の SMILES 系列がある場合は、正解の SMILES 系列をデコードして対数尤度を返す
# 埋め込みベクトルの系列に変換
out_seq_emb: torch.Tensor = embedding(out_seq)
out_seq_emb_out, _ = decoder(out_seq_emb, (h, c))
# 対数尤度（バッチサイズ * 系列長 * アルファベット数）を計算
out_seq_vocab_logit: torch.Tensor = decoder2vocab(out_seq_emb_out)
# 損失関数の計算に使われるため、系列長を 1 短くしている
out_seq_logit, _ = out_seq_vocab_logit[:, :-1], out_seq[:-1]

out_seq_logit.shape, _.shape

(torch.Size([256, 101, 41]), torch.Size([255, 102]))

# `SmilesVAE.loss()`

## `self.loss_func`

In [15]:
# 損失関数
loss_func: nn.CrossEntropyLoss = nn.CrossEntropyLoss(reduction="none")

In [16]:
# 交差エントロピー損失を計算
neg_likelihood: torch.Tensor = loss_func(out_seq_logit.transpose(1, 2), out_seq[:, 1:])
neg_likelihood.shape

torch.Size([256, 101])

In [17]:
# バッチごとに損失を合計し、その平均を取る
neg_likelihood = neg_likelihood.sum(dim=1).mean()
neg_likelihood

tensor(376.1754, device='mps:0', grad_fn=<MeanBackward0>)

In [18]:
# KL 情報量を計算
kl_div: torch.Tensor = (
    -0.5 * (1.0 + logvar - mu**2 - torch.exp(logvar)).sum(dim=1).mean()
)
kl_div

tensor(3.4408, device='mps:0', grad_fn=<MulBackward0>)

In [19]:
beta = 1.0

# β-VAE のため、KL 情報量に β を乗じている
each_loss = (neg_likelihood + beta * kl_div)
each_loss.item()

379.61627197265625