# CNN-based prediction for the properties at each genomic loci
* ResidualBind https://github.com/p-koo/residualbind

Dataset:
* [BICCN_sub_cluster_and_annotation_file.csv](https://www.dropbox.com/scl/fi/ku2tki1yjhfaq05xkb507/BICCN_sub_cluster_and_annotation_file.csv?rlkey=8ai68ud4dqbbqfljhqw8rv4h9&dl=0) - cluster annotation data
* [targets.bed](https://www.dropbox.com/scl/fi/d3c64poskies4owpvqfyu/targets.bed?rlkey=rsyuy487alh7s9sq7xebpjsxo&dl=0) - tab deliminated bed file for 33 clusters
* [GRCm38.fa](https://www.dropbox.com/scl/fi/n0u93vm7fhos3rfgf9t6h/GRCm38.fa?rlkey=u1spsgpbkfopr396z1qh8f1q0&dl=0) - mouse genome data

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np

In [None]:
import os
from collections import defaultdict
from pathlib import Path
from typing import Any, List, Callable, Union

import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import pyfaidx

In [6]:
def one_hot_encode(
        sequence: str,
        alphabet: str = "ACGT",
        neutral_alphabet: str = "N",
        neutral_value: Any = 0,
        dtype=np.float32
    ) -> np.ndarray:
    """One-hot encode sequence."""

    def to_uint8(s):
        return np.frombuffer(s.encode("ascii"), dtype=np.uint8)

    lookup = np.zeros([np.iinfo(np.uint8).max, len(alphabet)], dtype=dtype)
    lookup[to_uint8(alphabet)] = np.eye(len(alphabet), dtype=dtype)
    lookup[to_uint8(neutral_alphabet)] = neutral_value
    lookup = lookup.astype(dtype)
    return lookup[to_uint8(sequence)]

class WindowedGenomeDataset:

    def __init__(
            self,
            target_path: str,
            fasta_path: str,
            window_size: int = None,
            chromosomes: List[str] = None,
            dtype: np.float32 = None
        ):
        self.target_path = target_path
        self.fasta_path = fasta_path
        self.chromosomes = chromosomes
        self.dtype = dtype

        df = pd.read_table(target_path, header=None, sep="\t")
        c = df[0].isin(chromosomes) if chromosomes else np.array(len(df)*[True])
        self.windows = df.loc[c, :2].values
        self.targets = df.loc[c, 3:].values.astype(self.dtype)

        _, start, stop = self.windows[0]
        self.window_size = window_size or (stop - start)

        self.input_shape = (self.window_size, 4)
        self.output_shape = (self.targets.shape[1],)

        self.genome = None

    def __len__(self):
        return len(self.targets)

    def __getitem__(self, item: Union[int, slice]):
        if not self.genome:
            from pyfaidx import Fasta
            self.genome = Fasta(self.fasta_path)

        chrom = self.windows[item, 0]
        start = self.windows[item, 1]
        stop = self.windows[item, 2]

        labels = self.targets[item]

        if isinstance(item, slice):
            seq = ""
            for chr, i, j in zip(chrom, start, stop):
                seq += self.genome[chr][i:j].seq.upper()

            inputs = one_hot_encode(seq)
            inputs = inputs.reshape(len(chrom), self.window_size, 4)

            item = range(item.start or 0, item.stop or len(self), item.step or 1)
            item = np.array(list(item), dtype=np.int32)
        else:
            seq = self.genome[chrom][start:stop].seq.upper()
            inputs = one_hot_encode(seq)

            item = item if item >= 0 else len(self) + item

        output = {
            "inputs": inputs,
            "labels": labels,
            "meta": {
                "id": item,
                "chrom": chrom,
                "start": start,
                "stop": stop
            }
        }
        return output

    def to_each_record():
        for i in tqdm(indices or range(len(self))):
            output = self[i]
            yield dict(
                id=output["meta"]["id"],
                inputs=output["inputs"],
                labels=output["labels"],
                chrom=output["meta"]["chrom"],
                start=output["meta"]["start"],
                stop=output["meta"]["stop"]
            )


In [7]:
DATA_DIR = "../data/"
SAVE_DIR = "../models/"

In [8]:
items = [
    ("train", [2, 4, 6, 8, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, "X"]),
    ("valid", [7, 9]),
    ("test", [1, 3, 5]),
    ("debug",[19])
]

datasets = {}

In [11]:
for name, chroms in items:
    datasets[name] = WindowedGenomeDataset(
        target_path=f"{DATA_DIR}/targets.bed",
        fasta_path=f"{DATA_DIR}/GRCm38.fa",
        chromosomes=[f"chr{i}" for i in chroms],
        window_size=1000,
        dtype=np.float32
    )

100%|████████████████████████████████████████████████████████████████████████| 1733242/1733242 [37:00<00:00, 780.70it/s]
100%|██████████████████████████████████████████████████████████████████████████| 250622/250622 [05:22<00:00, 776.02it/s]
100%|██████████████████████████████████████████████████████████████████████████| 482812/482812 [10:19<00:00, 779.61it/s]
100%|████████████████████████████████████████████████████████████████████████████| 56988/56988 [01:13<00:00, 779.09it/s]


In [16]:
train_size = len(datasets['train'])
valid_size = len(datasets['valid'])
test_size = len(datasets['test'])

total = train_size + valid_size + test_size

print("Train: ", train_size, train_size / total)
print("Validation: ", valid_size, valid_size / total)
print("Validation: ", test_size,  test_size / total)

Train:  1733242 0.7026630169507466
Validation:  250622 0.10160312906924136
Validation:  482812 0.19573385398001197


# Data

In [5]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf

In [9]:
AUTO = tf.data.experimental.AUTOTUNE
THRESHOLD = 0.0006867924257022952
BATCH_SIZE = 128

In [14]:
train_size = 1733242
valid_size = 250622
test_size = 482812
total = train_size + valid_size + test_size

print("Train: ", train_size / total)
print("Validation: ", valid_size / total)
print("Validation: ", test_size / total)

Train:  0.7026630169507466
Validation:  0.10160312906924136
Validation:  0.19573385398001197


# Model

In [10]:
class DilatedResidualBlock(nn.Module):
    def __init__(self, in_channels, filter_size, rates):
        super(DilatedResidualBlock, self).__init__()
        self.convs = nn.ModuleList([
            nn.Conv1d(in_channels, in_channels, filter_size, dilation=rate, padding=rate)
            for rate in rates
        ])
        self.batch_norms = nn.ModuleList([nn.BatchNorm1d(in_channels) for _ in rates])
        self.dropout = nn.Dropout(0.1)

    def forward(self, x):
        residual = x
        for conv, bn in zip(self.convs, self.batch_norms):
            x = F.relu(x)
            x = self.dropout(x)
            x = conv(x)
            x = bn(x)
        return F.relu(x + residual)

class ResidualBind(nn.Module):
    def __init__(self, input_shape, output_shape, activation='exponential', num_units=[128, 256, 512, 512]):
        super(ResidualBind, self).__init__()

        self.activation = activation
        self.num_units = num_units
        kernel_size = [19, 9, 7]
        self.conv1 = nn.Conv1d(input_shape[0], num_units[0], kernel_size[0], padding='same')

        self.bn1 = nn.BatchNorm1d(num_units[0])
        self.dropout1 = nn.Dropout(0.1)

        self.res_block1 = DilatedResidualBlock(num_units[0], 3, [1, 2, 4, 8])
        self.max_pool1 = nn.MaxPool1d(10)
        self.dropout2 = nn.Dropout(0.2)

        self.conv2 = nn.Conv1d(num_units[0], num_units[1], kernel_size[1], padding='same')
        self.bn2 = nn.BatchNorm1d(num_units[1])
        self.dropout3 = nn.Dropout(0.1)

        self.res_block2 = DilatedResidualBlock(num_units[1], 3, [1, 2, 4])
        self.max_pool2 = nn.MaxPool1d(10)
        self.dropout4 = nn.Dropout(0.2)

        self.conv3 = nn.Conv1d(num_units[1], num_units[2], kernel_size[2], padding='same')
        self.bn3 = nn.BatchNorm1d(num_units[2])

        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)
        self.dropout5 = nn.Dropout(0.3)

        # Fully-connected NN
        self.fc1 = nn.Linear(num_units[2], num_units[3])
        self.bn4 = nn.BatchNorm1d(num_units[3])
        self.dropout6 = nn.Dropout(0.5)
        # Output layer
        self.fc2 = nn.Linear(num_units[3], output_shape)

    def forward(self, x):
        # Layer 1
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.elu(x) if self.activation == 'exponential' else F.sigmoid(x)
        x = self.dropout1(x)

        # Dilated Residual Block 1
        x = self.res_block1(x)
        x = self.max_pool1(x)
        x = self.dropout2(x)

        # Layer 2
        x = self.conv2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout3(x)

        # Dilated Residual Block 2
        x = self.res_block2(x)
        x = self.max_pool2(x)
        x = self.dropout4(x)

        # Layer 3
        x = self.conv3(x)
        x = self.bn3(x)

        # Global Average Pooling
        x = self.global_avg_pool(x).squeeze(-1)
        x = self.dropout5(x)

        # Fully-connected Layer
        x = self.fc1(x)
        x = self.bn4(x)
        x = F.relu(x)
        x = self.dropout6(x)

        # Output Layer
        logits = self.fc2(x)
        outputs = torch.sigmoid(logits)

        return outputs

In [12]:
class ResidualCNN(ResidualBind):
    def forward(self, x):
        # Layer 1
        x = self.conv1(x)
        x = self.bn1(x)
        x = F.elu(x) if self.activation == 'exponential' else F.sigmoid(x)
        x = self.dropout1(x)

        # Layer 2
        x = self.conv2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout3(x)

        # Layer 3
        x = self.conv3(x)
        x = self.bn3(x)

        # Global Average Pooling
        x = self.global_avg_pool(x).squeeze(-1)
        x = self.dropout5(x)

        # Fully-connected Layer
        x = self.fc1(x)
        x = self.bn4(x)
        x = F.relu(x)
        x = self.dropout6(x)

        # Output Layer
        logits = self.fc2(x)
        outputs = torch.sigmoid(logits)

        return outputs

In [15]:
import torch
import torch.optim as optim

# ハイパーパラメータ
input_shape = (4, 1000)  # 入力データの形状 (チャンネル数, データ長)
output_shape = 1         # 出力ユニット数
batch_size = 16          # バッチサイズ

# モデルの初期化
model = ResidualBind(input_shape=input_shape, output_shape=output_shape)
model.train()

# サンプルデータの作成 (ランダムなデータ)
inputs = torch.randn(batch_size, input_shape[0], input_shape[1])  # (バッチサイズ, チャンネル数, データ長)
labels = torch.randint(0, 2, (batch_size, output_shape)).float()  # (バッチサイズ, 出力ユニット数)

# 損失関数とオプティマイザの設定
# criterion = torch.nn.CrossEntropyLoss()
criterion = torch.nn.BCELoss()  # バイナリクロスエントロピー損失関数
optimizer = optim.Adam(model.parameters(), lr=0.001)

for i in range(100):
    # フォワードパス
    outputs = model(inputs)
    
    # 損失の計算
    loss = criterion(outputs, labels)
    
    # バックプロパゲーションと最適化ステップ
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    # 結果の表示
    if i % 50 == 0:
        print(f"Loss: {loss.item()}")
        print(f"Outputs: {outputs}")

Loss: 0.7239207029342651
Outputs: tensor([[0.3542],
        [0.5610],
        [0.3922],
        [0.5928],
        [0.4082],
        [0.5311],
        [0.2784],
        [0.5975],
        [0.5943],
        [0.2659],
        [0.4483],
        [0.5138],
        [0.4478],
        [0.5573],
        [0.4650],
        [0.6773]], grad_fn=<SigmoidBackward0>)
Loss: 0.007446830160915852
Outputs: tensor([[9.8996e-01],
        [9.9431e-01],
        [8.4643e-03],
        [1.7581e-02],
        [1.7528e-03],
        [3.2361e-02],
        [9.9916e-01],
        [9.9776e-01],
        [9.9863e-01],
        [9.9642e-01],
        [1.5486e-03],
        [5.8025e-04],
        [1.0967e-03],
        [9.7920e-01],
        [9.9910e-01],
        [9.9077e-01]], grad_fn=<SigmoidBackward0>)


In [16]:
import torch
import torch.optim as optim

# ハイパーパラメータ
input_shape = (4, 1000)  # 入力データの形状 (チャンネル数, データ長)
output_shape = 1         # 出力ユニット数
batch_size = 16          # バッチサイズ

# モデルの初期化
model = ResidualCNN(input_shape=input_shape, output_shape=output_shape)
model.train()

# サンプルデータの作成 (ランダムなデータ)
inputs = torch.randn(batch_size, input_shape[0], input_shape[1])  # (バッチサイズ, チャンネル数, データ長)
labels = torch.randint(0, 2, (batch_size, output_shape)).float()  # (バッチサイズ, 出力ユニット数)

# 損失関数とオプティマイザの設定
# criterion = torch.nn.CrossEntropyLoss()
criterion = torch.nn.BCELoss()  # バイナリクロスエントロピー損失関数
optimizer = optim.Adam(model.parameters(), lr=0.001)

for i in range(100):
    # フォワードパス
    outputs = model(inputs)
    
    # 損失の計算
    loss = criterion(outputs, labels)
    
    # バックプロパゲーションと最適化ステップ
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    # 結果の表示
    if i % 50 == 0:
        print(f"Loss: {loss.item()}")
        print(f"Outputs: {outputs}")

Loss: 0.7579466700553894
Outputs: tensor([[0.4504],
        [0.4385],
        [0.5576],
        [0.2630],
        [0.3705],
        [0.4902],
        [0.4480],
        [0.2338],
        [0.5474],
        [0.6590],
        [0.5761],
        [0.5698],
        [0.4914],
        [0.3898],
        [0.5079],
        [0.7020]], grad_fn=<SigmoidBackward0>)
Loss: 0.0028502282220870256
Outputs: tensor([[9.9836e-01],
        [2.3870e-03],
        [9.9879e-01],
        [1.1728e-02],
        [5.0695e-03],
        [1.0171e-03],
        [9.9935e-01],
        [9.9879e-01],
        [9.9666e-01],
        [1.6517e-03],
        [6.9901e-04],
        [4.6731e-03],
        [2.3526e-03],
        [9.8954e-04],
        [1.8836e-03],
        [4.9718e-03]], grad_fn=<SigmoidBackward0>)
