In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [2]:
cd /content/gdrive/My Drive/Colab Notebooks/Practice-ML/Grad_thesis/

/content/gdrive/My Drive/Colab Notebooks/Practice-ML/Grad_thesis


In [3]:
!pip install torch
!pip install torchvision
!pip install numpy
!pip install pandas
!pip install matplotlib
!pip install pillow
!pip install opencv-python



In [1]:
import os
import random
import copy
import argparse
import time
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import models, transforms
import torch.utils.data as data
import torch.nn.functional as F
import matplotlib.pyplot as plt
import torchvision.utils as vutils
from sklearn.model_selection import train_test_split

from seq_net import weights_init, Generator, Discriminator

In [2]:
# 設定
workers = 0
batch_size=64
nz = 100
nch_g = 128
nch_d = 128
n_epoch = 200
lr = 0.0002
beta1 = 0.5
outf = './Result/lsGAN'
display_interval = 100

# 保存先ディレクトリを作成
try:
    os.makedirs(outf, exist_ok=True)
except OSError as error: 
    print(error)
    pass

# 乱数のシード（種）を固定
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

<torch._C.Generator at 0x17abae603d0>

In [3]:
SEQ_LENGTH = 128

def make_dataset(datadir):
    '''
    convert sequence to vector array
    1.init array all 0 (4*SEQ_LENGTH)
    2.convert sequences (all 0 array) to vector array.
    ex. ACCGAT =
    0 0 0 0 0 0    1 0 0 0 1 0
    0 0 0 0 0 0  → 0 1 1 0 0 0
    0 0 0 0 0 0    0 0 0 1 0 0
    0 0 0 0 0 0    0 0 0 0 0 1
    '''
    pos_seq = "SRX356455.05_peak_seq_128.txt"
    # id      chr     start   end     seq
    data = pd.read_csv(os.path.join(datadir, "sequences", pos_seq), sep="\t")
    sequences = [] 
    classes = [] #positive or negative
    for index, row in data[["id", "seq"]].iterrows():
        y = 1 #positive
        seq_vector = seq2vector(row["seq"])
        if len(seq_vector) == 0:
            continue
        sequences.append(seq2vector(row["seq"]))
        classes.append(np.array(y))
    return sequences, classes

def seq2vector(seq):
    if type(seq) is not str: # Case on Null sequence
        return np.zeros((0,0))
    seq_array = np.zeros((4, SEQ_LENGTH)) #initiallize 4*SEQ_LENGTH array all 0
    flag = 0
    for i in range(SEQ_LENGTH):
        s = seq[i]
        if s == "a" or s == "A":
            seq_array[0, i] = 1
            seq_array[1, i] = -1
            seq_array[2, i] = -1
            seq_array[3, i] = -1
        elif s == "c" or s == "C":
            seq_array[0, i] = -1
            seq_array[1, i] = 1
            seq_array[2, i] = -1
            seq_array[3, i] = -1
        elif s == "g" or s == "G":
            seq_array[0, i] = -1
            seq_array[1, i] = -1
            seq_array[2, i] = 1
            seq_array[3, i] = -1
        elif s == "t" or s == "T":
            seq_array[0, i] = -1
            seq_array[1, i] = -1
            seq_array[2, i] = -1
            seq_array[3, i] = 1
        else:
            flag += 1
    if len(seq) == flag: # Case on N sequence
        return np.zeros((0,0))
    seq_array = seq_array.astype(np.float32)
    return seq_array

In [4]:
datadir = "data"

class DatasetFolder(data.Dataset):
    def __init__(self, X, y):
        self.samples = X
        self.targets = y
        self.transforms = transforms.Compose([
            ToTensorOfTarget()
        ])

    def __getitem__(self, index):
        sample = self.samples[index]
        sample = self.transforms(sample)
        target = self.targets[index]
        target = self.transforms(target)
        return sample, target

    def __len__(self):
        return len(self.samples)


class ToTensorOfTarget(object):
    def __call__(self, target):
        return torch.from_numpy(target)

# 全体を、training, valid, testに分ける。ここでは、3:1:1 に分割。
# training + valid が、機械学習の training data 相当。
X, y = make_dataset(datadir)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size = 0.01)


sequence_datasets = {
    'train':DatasetFolder(X_train, y_train),
    'test': DatasetFolder(X_test, y_test)
}

dataset_sizes = {x: len(sequence_datasets[x]) for x in ['train', 'test']}
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print('device:', device)

device: cuda:0


In [5]:
# バッチサイズ分のデータを読み込む。
# training はデータをシャッフルし、読み込み始める配列をランダムにする。
# 他はシャッフルの必要なし。
batch_size=64
workers=0
dataloaders = {
    'train': torch.utils.data.DataLoader(
        sequence_datasets['train'],
        batch_size=batch_size,
        shuffle=True,
        num_workers=workers),
    'test': torch.utils.data.DataLoader(
        sequence_datasets['test'],
        batch_size=batch_size,
        shuffle=False,
        num_workers=workers)
}
dataset_sizes = {x: len(sequence_datasets[x]) for x in ['train', 'test']}

In [6]:
# 生成器G。ランダムベクトルから贋作画像を生成する
netG = Generator(nz=nz, nch_g=nch_g).to(device)
netG.apply(weights_init)    # weights_init関数で初期化
print(netG)

Generator(
  (layers): ModuleDict(
    (layer0): Sequential(
      (0): ConvTranspose1d(100, 2048, kernel_size=(4,), stride=(1,))
      (1): BatchNorm1d(2048, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer1): Sequential(
      (0): ConvTranspose1d(2048, 1024, kernel_size=(4,), stride=(2,), padding=(1,))
      (1): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer2): Sequential(
      (0): ConvTranspose1d(1024, 512, kernel_size=(4,), stride=(2,), padding=(1,))
      (1): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer3): Sequential(
      (0): ConvTranspose1d(512, 256, kernel_size=(4,), stride=(2,), padding=(1,))
      (1): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
    )
    (layer4): Sequential(
      (0): ConvTranspose1d(256, 128, kernel_size=(4

In [7]:
# 識別器D。画像が、元画像か贋作画像かを識別する
netD = Discriminator(nch_d=nch_d).to(device)
netD.apply(weights_init)
print(netD)

Discriminator(
  (layers): ModuleDict(
    (layer0): Sequential(
      (0): Conv1d(4, 128, kernel_size=(4,), stride=(2,), padding=(1,))
      (1): LeakyReLU(negative_slope=0.2)
    )
    (layer1): Sequential(
      (0): Conv1d(128, 256, kernel_size=(4,), stride=(2,), padding=(1,))
      (1): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (layer2): Sequential(
      (0): Conv1d(256, 512, kernel_size=(4,), stride=(2,), padding=(1,))
      (1): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (layer3): Sequential(
      (0): Conv1d(512, 1024, kernel_size=(4,), stride=(2,), padding=(1,))
      (1): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): LeakyReLU(negative_slope=0.2)
    )
    (layer4): Sequential(
      (0): Conv1d(1024, 2048, kernel_size=(4,), stride=(2,), padding=(1,))


In [8]:
criterion = nn.MSELoss()    # 損失関数は平均二乗誤差損失

# オプティマイザ−のセットアップ
optimizerD = optim.Adam(netD.parameters(), lr=lr, betas=(beta1, 0.999), weight_decay=1e-5)  # 識別器D用
optimizerG = optim.Adam(netG.parameters(), lr=lr, betas=(beta1, 0.999), weight_decay=1e-5)  # 生成器G用

fixed_noise = torch.randn(batch_size, nz, 1,  device=device)  # 確認用の固定したノイズ

In [10]:
# 学習のループ
for epoch in range(n_epoch):
    D_loss = []
    G_loss = []
    for itr, data in enumerate(dataloaders['train']):
        real_image = data[0].to(device)     # 元画像
        #print(real_image)
        sample_size = real_image.size(0)    # 画像枚数
        noise = torch.randn(sample_size, nz, 1, device=device)   # 正規分布からノイズを生成
        
        real_target = torch.full((sample_size,), 1., device=device)     # 元画像に対する識別信号の目標値「1」
        fake_target = torch.full((sample_size,), 0., device=device)     # 贋作画像に対する識別信号の目標値「0」
        
        ############################
        # 識別器Dの更新
        ###########################
        netD.zero_grad()    # 勾配の初期化

        output = netD(real_image)   # 識別器Dで元画像に対する識別信号を出力
        errD_real = criterion(output, real_target)  # 元画像に対する識別信号の損失値
        D_x = output.mean().item()

        fake_image = netG(noise)    # 生成器Gでノイズから贋作画像を生成
        #print(fake_image.detach())
        output = netD(fake_image.detach())  # 識別器Dで元画像に対する識別信号を出力
        errD_fake = criterion(output, fake_target)  # 贋作画像に対する識別信号の損失値
        D_G_z1 = output.mean().item()

        errD = errD_real + errD_fake    # 識別器Dの全体の損失
        errD.backward()    # 誤差逆伝播
        optimizerD.step()   # Dのパラメーターを更新

        ############################
        # 生成器Gの更新
        ###########################
        netG.zero_grad()    # 勾配の初期化
        
        output = netD(fake_image)   # 更新した識別器Dで改めて贋作画像に対する識別信号を出力
        errG = criterion(output, real_target)   # 生成器Gの損失値。Dに贋作画像を元画像と誤認させたいため目標値は「1」
        errG.backward()     # 誤差逆伝播
        D_G_z2 = output.mean().item()

        optimizerG.step()   # Gのパラメータを更新

        if itr % display_interval == 0: 
            print('[{}/{}][{}/{}] Loss_D: {:.3f} Loss_G: {:.3f} D(x): {:.3f} D(G(z)): {:.3f}/{:.3f}'
                  .format(epoch + 1, n_epoch,
                          itr + 1, len(dataloaders['train']),
                          errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))
            
#         if epoch == 0 and itr == 0:     # 初回に元画像を保存する
#             vutils.save_image(real_image, '{}/real_samples.png'.format(outf),
#                               normalize=True, nrow=10)

    ############################
    # 確認用画像の生成
    ############################
    fake_image = netG(fixed_noise)  # 1エポック終了ごとに確認用の贋作画像を生成する
    print(fake_image.detach())
#     vutils.save_image(fake_image.detach(), '{}/fake_samples_epoch_{:03d}.png'.format(outf, epoch + 1),
#                       normalize=True, nrow=10)

    ############################
    # モデルの保存
    ############################
    if (epoch + 1) % 50 == 0:   # 50エポックごとにモデルを保存する
        torch.save(netG.state_dict(), '{}/netG_epoch_{}.pth'.format(outf, epoch + 1))
        torch.save(netD.state_dict(), '{}/netD_epoch_{}.pth'.format(outf, epoch + 1))

[1/200][1/843] Loss_D: 9.980 Loss_G: 0.796 D(x): -0.381 D(G(z)): -2.222/0.941
[1/200][101/843] Loss_D: 0.862 Loss_G: 1.898 D(x): 0.761 D(G(z)): 0.081/-0.330
[1/200][201/843] Loss_D: 0.864 Loss_G: 1.919 D(x): 0.810 D(G(z)): -0.051/-0.232
[1/200][301/843] Loss_D: 0.673 Loss_G: 2.114 D(x): 0.821 D(G(z)): 0.202/-0.413
[1/200][401/843] Loss_D: 0.770 Loss_G: 3.594 D(x): 1.033 D(G(z)): 0.425/-0.874
[1/200][501/843] Loss_D: 0.589 Loss_G: 2.531 D(x): 0.973 D(G(z)): 0.168/-0.573
[1/200][601/843] Loss_D: 0.975 Loss_G: 2.386 D(x): 0.581 D(G(z)): 0.284/-0.454
[1/200][701/843] Loss_D: 0.668 Loss_G: 1.679 D(x): 1.035 D(G(z)): 0.401/-0.270
[1/200][801/843] Loss_D: 0.367 Loss_G: 0.907 D(x): 0.833 D(G(z)): -0.051/0.071
tensor([[[-0.0433,  0.3061, -0.1479,  ..., -0.1747, -0.3049,  0.0933],
         [-0.1360, -0.1477, -0.3997,  ...,  0.3405, -0.3641, -0.1557],
         [-0.2293, -0.1005, -0.1253,  ..., -0.1841, -0.2964, -0.3869],
         [-0.2273, -0.3892,  0.1563,  ..., -0.1390,  0.1499,  0.0298]],

   

KeyboardInterrupt: 