In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import gzip
import pickle
import matplotlib.pyplot as plt

# 데이터 전처리 함수
def match_dat(afflst, hladic, aadic):
    seqlst = []
    tablst = []
    header = []
    for affin in afflst:
        affstr = affin.strip().split('\t')
        if affstr[0] in hladic:
            hlaseq = hladic[affstr[0]]
            aaseq = affstr[1]
            tmp = []
            tmp0 = []
            for hlain in hlaseq:
                for aain in aaseq:
                    if hlain == 'X' or aain == 'X':
                        tmp0.append([float(0)])
                    elif hlain == '*' or hlain == '.' or aain == 'U':
                        tmp0.append([float(0)])
                    elif aain == 'J':
                        aa1 = aadic[hlain, 'L']
                        aa2 = aadic[hlain, 'I']
                        aamax = max(aa1, aa2)
                        tmp0.append([float(aamax)])
                    elif aain == 'Z':
                        aa1 = aadic[hlain, 'Q']
                        aa2 = aadic[hlain, 'E']
                        aamax = max(aa1, aa2)
                        tmp0.append([float(aamax)])
                    elif aain == 'B':
                        aa1 = aadic[hlain, 'D']
                        aa2 = aadic[hlain, 'N']
                        aamax = max(aa1, aa2)
                        tmp0.append([float(aamax)])
                    else:
                        tmp0.append([aadic[hlain, aain]])
                tmp.append(tmp0)
                tmp0 = []
            seqlst.append(list(zip(*tmp)))
            tablst.append(int(affstr[2]))
            header.append((affstr[0], affstr[1]))
    seqarray0 = np.array(seqlst, dtype=np.float32)
    a_seq2 = seqarray0.reshape(seqarray0.shape[0], seqarray0.shape[1] * seqarray0.shape[2])
    a_lab2 = np.array(tablst, dtype=np.float32)
    return (a_seq2, a_lab2), header

def header_output(lstin, outname):
    with open(outname, 'w') as outw:
        for lin in lstin:
            outw.write('\t'.join(lin) + '\n')

def modify_matrix(affydatin_test, seqdatin, outfile):
    hladicin = {x.strip().split('\t')[0]: x.strip().split('\t')[1] for x in open(seqdatin).readlines()}
    aalst = open('data/Calpha.txt').readlines()
    aadicin = {}
    aaseq0 = aalst[0].strip().split('\t')
    for aain in aalst[1:]:
        aastr = aain.strip().split('\t')
        for i in range(1, len(aastr)):
            aadicin[aaseq0[i-1], aastr[0]] = float(aastr[i])
    afflst = open(affydatin_test).readlines()
    d, test_header = match_dat(afflst, hladicin, aadicin)
    with gzip.open(outfile, 'wb') as f:
        pickle.dump(d, f, protocol=2)
    header_output(test_header, affydatin_test + '.header')

datname = 'data/class1_input.dat'
modify_matrix(datname, 'data/All_prot_alignseq_C_369.dat', 'temp/class1_input.dat.pkl.gz')
print('The running is completed!')

In [None]:
# 데이터 로드 함수
def shared_dataset(data_xy):
    data_x, data_y = data_xy
    tensor_x = torch.tensor(data_x, dtype=torch.float32)
    tensor_y = torch.tensor(data_y, dtype=torch.int64)
    return tensor_x, tensor_y

def load_data(dataset):
    print('... loading data')
    with gzip.open(dataset, 'rb') as f:
        train_set, valid_set, test_set = pickle.load(f)
    train_set_x, train_set_y = shared_dataset(train_set)
    valid_set_x, valid_set_y = shared_dataset(valid_set)
    test_set_x, test_set_y = shared_dataset(test_set)
    return (train_set_x, train_set_y), (valid_set_x, valid_set_y), (test_set_x, test_set_y)

def load_data_ind(dataset):
    print('... loading data')
    with gzip.open(dataset, 'rb') as f:
        test_set = pickle.load(f)
    test_set_x, test_set_y = shared_dataset(test_set)
    return test_set_x, test_set_y

def load_npdata(dataset):
    print('... loading data')
    datasets = np.load(dataset)
    test_set_x = datasets['test_seq']
    test_set_y = datasets['test_lab']
    test_set = (test_set_x, test_set_y)
    test_set_x, test_set_y = shared_dataset(test_set)
    return test_set_x, test_set_y

# 모델 정의
class LeNetConvPoolLayer(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, pool_size):
        super(LeNetConvPoolLayer, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size)
        self.pool = nn.MaxPool2d(pool_size)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.conv(x)
        x = self.relu(x)
        x = self.pool(x)
        return x

class LogisticRegression(nn.Module):
    def __init__(self, n_in, n_out):
        super(LogisticRegression, self).__init__()
        self.linear = nn.Linear(n_in, n_out)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.linear(x)
        x = self.sigmoid(x).flatten()
        return x

class CNN(nn.Module):
    def __init__(self, in_dim, nkerns, filtsize, poolsize, hidden):
        super(CNN, self).__init__()
        self.layer0 = LeNetConvPoolLayer(1, nkerns[0], filtsize[0], poolsize[0])
        self.layer1 = LeNetConvPoolLayer(nkerns[0], nkerns[1], filtsize[1], poolsize[1])
        # self.fc input size is calculated as: output channels * output height * output width
        conv_output_size = self._get_conv_output(in_dim, nkerns, filtsize, poolsize)
        self.fc = LogisticRegression(conv_output_size, hidden)

    def forward(self, x):
        x = self.layer0(x)
        x = self.layer1(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

    def _get_conv_output(self, shape, nkerns, filtsize, poolsize):
        o = torch.zeros(1, *shape)
        o = self.layer0(o)
        o = self.layer1(o)
        return int(np.prod(o.size()))

# 데이터 로드
train_set, valid_set, test_set = load_data('temp/class1_input.dat.pkl.gz')
train_loader = DataLoader(TensorDataset(*train_set), batch_size=10, shuffle=True)
valid_loader = DataLoader(TensorDataset(*valid_set), batch_size=10, shuffle=False)
test_loader = DataLoader(TensorDataset(*test_set), batch_size=10, shuffle=False)

# 모델 초기화
model = CNN(in_dim=(28, 28), nkerns=[20, 50], filtsize=[(5, 5), (5, 5)], poolsize=[(2, 2), (2, 2)], hidden=1)
criterion = nn.BCELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

In [None]:
# 모델 학습 및 평가 함수
def train_model(model, train_loader, criterion, optimizer, num_epochs):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs = inputs.view(inputs.size(0), 1, 28, 28)  # Assuming input size is 28x28
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}')

def evaluate_model(model, test_loader):
    model.eval()
    y_true = []
    y_pred = []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.view(inputs.size(0), 1, 28, 28)  # Assuming input size is 28x28
            outputs = model(inputs)
            preds = outputs > 0.5
            y_true.extend(labels.numpy())
            y_pred.extend(preds.numpy())
    return y_true, y_pred

# 모델 학습
train_model(model, train_loader, criterion, optimizer, num_epochs=10)

In [None]:
# 모델 평가
y_true, y_pred = evaluate_model(model, test_loader)

# 예측 결과 저장
with open('temp/class1_mhcbinding_result.txt', 'w') as f:
    for true, pred in zip(y_true, y_pred):
        f.write(f'{true}\t{pred}\n')