In [None]:
# Import necessary modules
import torch # PyTorch
from torch import nn # Neural Network 
from torch import Tensor
from torch.nn import Transformer
from torch.utils.data import DataLoader, Dataset # Wrap an iterable around datasets
from torch.nn.utils.rnn import pad_sequence # Pad to make variable length sequence to same length

# Transform input's features and labels to suitable tensors
import numpy as np
import math
import pandas as pd
import torch.nn.functional as F
# Visualize
import matplotlib.pyplot as plt
# Random
import random
from random import seed
from random import randint
# Optimizer and loss function
from torch.optim import SGD
from torch.nn import CrossEntropyLoss

# LabelEncoder
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder

In [None]:
# Load the data from the Training_data.txt
# getitem returns a tuple of sequence as tensor, label, length of sequence
class CigarDataset(Dataset):
    def __init__(self):
        # data loading
        data = np.loadtxt('./train60k.txt', delimiter="/t", dtype=str)
        self.ind = data[:,0]
        self.seq = data[:,1]
        self.label = data[:,2]
        self.over = data[:,3]
        self.n_samples = data.shape[0]
    def __getitem__(self,index):
        # dataset[0]
        fx = list(self.seq[index])
        for i in range(len(fx)):
            if fx[i] == 'A':
                fx[i] = 1 
            if fx[i] == 'T':
                fx[i] = 2 
            if fx[i] == 'G':
                fx[i] = 3 
            if fx[i] == 'C':
                fx[i] = 4 
        # Type string
        #label encode target and ensure the values are floats
        self.label = LabelEncoder().fit_transform(self.label)
        
        #bac_key = ['lm','ef','ec','pa','bs','sa','se']
        #for i in range(len(bac_key)):
        #    if bac_key[i] == self.label[index]:
        #        label = i
        
        index1 = self.ind[index] # Index or name of the x1. This is a string.
        
        overlap = self.over[index] # List of overlapping sequences
        overlap = overlap.replace("[", "")
        overlap = overlap.replace("]", "")
        overlap = overlap.split(',')
        overlap = [int(i) for i in overlap] # Because of the format of the input file, need some modification here.
        index2 = random.sample(overlap,1)
        
        this = 0
        for i in range(len(self.ind)):
            if int(self.ind[i]) == index2[0]:
                this = i
                break
        that = self.seq[this]
        
        sample = self.seq[index], self.label[index], len(fx), that, len(that)
            
        return sample
    
    def __len__(self):
        return self.n_samples

In [None]:
data = CigarDataset()

In [None]:
def embed(seq):
    # Input a DNA of type string
    # Output a tensor
    # creating instance of labelencoder
    labelencoder = LabelEncoder()
    # Assigning numerical values and storing in another column
    seq = labelencoder.fit_transform(seq)
    
    # creating instance of one-hot-encoder
    onehot_encoder = OneHotEncoder(sparse=False)
    integer_encoded = seq.reshape(len(seq),1)
    onehot_encoded = onehot_encoder.fit_transform(integer_encoded)
    
    out = torch.tensor(onehot_encoded)
    return out

In [None]:
def collate_fn(data):
    """
       data: is a list of tuples with (seq, label, length, ind1, ind2)
             where 'seq' is a list of arbitrary length
    """
    # each data is a batch of size 64 sequences
    seq, labels, len_seq, overlap, len_overlap  = zip(*data)
    
    # seq is a list of form ATCG
    # Use embed here
    # Create a list of tensor
    seq = [embed(list(sequ)) for sequ in seq]
    overlap = [embed(list(over)) for over in overlap]
    
    features = pad_sequence(seq, batch_first = True,padding_value= 2.0)
    
    overlaps = pad_sequence(overlap, batch_first = True,padding_value= 2.0)
    return features, labels, len_seq, overlaps, len_overlap


In [None]:
# prepare the dataset
# Create dataloader.
batch_size = 64
train_dl = DataLoader(data, batch_size=batch_size, shuffle = True, collate_fn=collate_fn, drop_last = True)

for src, label , leng_src, overlap, leng_over in train_dl:
    print("Shape of feature: ", src.shape,src.dtype)
    print("label = ",len(label))
    print("length = ",leng_src)
    print("Overlap:",overlap)
    print("Length overlap:",leng_over)
    break

In [None]:
# The model with forward process
class OurModel(nn.Module):
    def __init__(self, d_model:int, nhead:int, num_layer:int):
        super(OurModel, self).__init__()
        self.num_layer = num_layer
        self.d_model = d_model
        self.nhead = nhead
        encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layer)
        
    def forward(self, src: Tensor, mask: Tensor , src_key_padding_mask: Tensor) -> Tensor:
        output = self.transformer_encoder(src, mask, src_key_padding_mask)
        return F.normalize(output,dim=2)

In [None]:
#DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
DEVICE = torch.device('cpu')
model = OurModel(d_model = 512, nhead=8, num_layer=6).to(DEVICE)

In [None]:
with torch.no_grad():
    for batch, (X,label,leng_X,Overlap,leng_O) in enumerate(train_dl):
        print('Seq after one-hot encoding: ',X.shape)
        X = X.unfold(1,128,128) # Want to use d_model = 512, and currently we have 4. So unfolding use 512/4=128
        X = X.reshape([64,-1,512])
        X = X.type(torch.float32) # The TransformerEncoder uses float32.
        print('Seq after unfold+reshape encoding: ',X.shape)
        
        cls = nn.Parameter(torch.randn(64, 1, 512))
        src1 = torch.cat((cls,X), dim=1) 
        
        print('Overlap after one-hot encoding: ',Overlap.shape)
        Overlap = Overlap.unfold(1,128,128) # Want to use d_model = 512, and currently we have 4. So unfolding use 512/4=128
        Overlap = Overlap.reshape([64,-1,512])
        Overlap = Overlap.type(torch.float32) # The TransformerEncoder uses float32.
        src2 = torch.cat((cls,Overlap), dim=1) 
        
        pred1 = model(src1, mask = None, src_key_padding_mask = None)
        print(pred1.shape)
        
        pred2 = model(src2, mask = None, src_key_padding_mask = None)
        break

In [None]:
t1 = pred1[:,0,:]
print('Prediction 1: ',t1)
t2 = pred2[:,0,:]
print('Prediction 2: ',t2)
#### Do layer normalization
temperature = 0.1
out = torch.cat([t1, t2], dim=0)
n_samples = len(out)

cov = torch.mm(out, out.t().contiguous())
sim = torch.exp(cov / temperature)

mask = ~torch.eye(n_samples).bool()
neg = sim.masked_select(mask).view(n_samples, -1).sum(dim=-1)

print(torch.sum(t1*t2,dim=-1))
pos = torch.exp(torch.sum(t1 * t2, dim=-1) / temperature)
pos = torch.cat([pos, pos], dim=0)
print(neg)
loss = -torch.log(pos / neg).mean()
print(loss)

In [None]:
def nt_xent_loss(out_1, out_2, temperature):
    """Loss used in SimCLR."""
    out = torch.cat([out_1, out_2], dim=0)
    n_samples = len(out)

    # Full similarity matrix
    cov = torch.mm(out, out.t().contiguous())
    sim = torch.exp(cov / temperature)

    # Negative similarity
    mask = ~torch.eye(n_samples).bool()
    neg = sim.masked_select(mask).view(n_samples, -1).sum(dim=-1)

    # Positive similarity :
    pos = torch.exp(torch.sum(out_1 * out_2, dim=-1) / temperature)
    pos = torch.cat([pos, pos], dim=0)
    loss = -torch.log(pos / neg).mean()
    return loss