In [8]:
import numpy as np
def one_hot(sequence):
    nucl_list = ['A', 'T', 'C', 'G']
    nucl_dic = dict((int, nucl) for nucl, int in enumerate(nucl_list))
    _seq_data = [nucl_dic[base] for base in sequence]
    result = np.zeros((len(sequence), 4))
    for _, base in enumerate(_seq_data):
        result[_, base] = 1
    return(result)

In [9]:
one_hot("ATCG")

array([[1., 0., 0., 0.],
       [0., 1., 0., 0.],
       [0., 0., 1., 0.],
       [0., 0., 0., 1.]])

In [10]:
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
import numpy as np

# convert sequence to matrix
def one_hot(sequence):
    nucl_list = ['A', 'T', 'C', 'G']
    nucl_dic = dict((int, nucl) for nucl, int in enumerate(nucl_list))
    _seq_data = [nucl_dic[base] for base in sequence]
    result = np.zeros((len(sequence), 4))
    for _, base in enumerate(_seq_data):
        result[_, base] = 1
    return(result)

# cut off sequence to 2k segment
def seq2onehot(sequence_file):

    for seq_record in SeqIO.parse(sequence_file, 'fasta'):
        # virus 2k DNA sequence segments
        new_record = []
        # get sequence data
        seq = seq_record.seq
        # remove N
        seq = str(seq).replace('N', '')
        
        for i in range(0, len(seq), 500):
            if (i + 2000) > len(seq):
                new_seq = seq[-2000:]
                record = SeqRecord(new_seq, id=seq_record.id)
                new_record.append(record)
                break
            new_seq = seq[i:i+2000]
            record = SeqRecord(new_seq, id=seq_record.id)
            new_record.append(record)
           
        seq_data = np.array([one_hot(segment) for segment in new_record])
        return seq_data
        

In [31]:
import torch
import torch.nn as nn
import torch.nn.functional as F
class MyModel(nn.Module):
    def __init__(self, hidden_dim, num_classes):
        super(MyModel, self).__init__()
        self.conv1 = nn.Conv1d(4, hidden_dim, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=3, stride=1, padding=1)
        self.adaptive_pool = nn.AdaptiveMaxPool1d(1)
        self.fc = nn.Linear(hidden_dim, num_classes)
    def forward(self, x):
        # three convolutional layers
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        # (batch_size, hidden_dim, 1)
        x = self.adaptive_pool(x)
        # 将每个子序列的维度转换为(batch_size, hidden_dim)
        x = x.view(x.size(0), -1)
        # concatenate subsequence
        x = torch.cat(x, dim=0)
        # 
        x = self.fc(x)
        x = F.softmax(x, dim=1)
        return x

In [33]:
CNN = MyModel(256, 4)
CNN

In [11]:
data_path = '/home/ouconstand/data/Virus_Host/seq_data/sequence_0.fasta'

['x', 'x', 'x', 'x']

In [12]:
train = seq2onehot(data_path)

In [13]:
train.shape

(56, 2000, 4)

In [46]:
len(train)

1

In [14]:
train[0]

array([[1., 0., 0., 0.],
       [0., 0., 1., 0.],
       [0., 1., 0., 0.],
       ...,
       [0., 0., 0., 1.],
       [0., 0., 1., 0.],
       [0., 1., 0., 0.]])

In [30]:
x = 'ATCG'*250
print(one_hot(x).shape)

(1000, 4)


In [25]:
import torch
embedding = torch.nn.Embedding(2000, 4)

In [23]:
nucl_list = ['A', 'T', 'C', 'G']
nucl_dic = dict((int, nucl) for nucl, int in enumerate(nucl_list))
x = [nucl_dic[i] for i in x]

In [26]:
x = torch.tensor(x)

In [27]:
embedding_seq = embedding(x)

In [28]:
embedding_seq

tensor([[ 0.1532, -0.5192, -1.5923, -1.6631],
        [-1.8701, -0.2104, -1.2200, -2.4347],
        [-1.6501,  0.2935,  0.1855,  0.8239],
        ...,
        [-1.8701, -0.2104, -1.2200, -2.4347],
        [-1.6501,  0.2935,  0.1855,  0.8239],
        [ 0.1877,  1.0988,  0.6499, -0.0389]], grad_fn=<EmbeddingBackward0>)