In [1]:
from sklearn.model_selection import KFold
import pickle
import pandas as pd
import os.path
file_name = 'resampled_2x5_cross_validation'
if os.path.isfile(file_name+'.pickle'):
    with open(file_name+'.pickle', 'rb') as handle:
        train_folds,test_folds = pickle.load(handle)
                                            

In [2]:
import transformer_lstr as tr
import torch.nn as nn
import torch
class feature_encoder_1d(nn.Module):
    def __init__(self, input_len, d_model):
        super(feature_encoder_1d, self).__init__()
        self.input_len = input_len
        self.d_model = d_model
        self.feature_encoder = nn.Linear(self.input_len, self.input_len * self.d_model)
        
    def forward(self, encoded_features):
        _b, _l, _c = encoded_features.shape
        # assert _l == self.input_len and _c == 1
        encoded_features = encoded_features.flatten(1,2) # (Bx64x1) -> (Bx64)
        encoded_features = self.feature_encoder(encoded_features) # (B x 64) -> (B x 64*d_model)
        encoded_features = encoded_features.view(_b, _l, self.d_model) # (B x 64*d_model32) -> (Bx64xd_model32)
        return encoded_features
        
        
class SpectrumTransformer(nn.Module):
    def __init__(self):
        super(SpectrumTransformer, self).__init__()
        
        self.training = False

        # Decoder configs
        self.input_len = 64
        self.d_model_init = 1
        
        self.d_model = 16
        self.d_val = 250
        self.num_heads = 1
        self.dim_feedforward = 256
        self.dropout = 0.
        self.activation = 'relu'
        
        self.embedding_len = [
            [64, 4, True],[64, 2, True], [250, 1, True]
        ]
        self.num_classes = 250

        # Build position encoding
        self.pos_encoding = tr.PositionalEncoding(self.d_model, self.dropout)

        # decoder modules
        # self.keys = nn.Parameter(torch.ones(self.input_len, self.d_model), requires_grad=True).to(device)
        self.keys = nn.Embedding(self.input_len, self.d_model)
        
        # self.values = nn.Parameter(rand(self.input_len, self.d_val), requires_grad=True).to(device)
        # enc_layer = tr.TransformerDecoderLayer(
        #                 self.d_model, self.num_heads, self.dim_feedforward,
        #                 self.dropout, self.activation
        #             )
        # self.enc_modules.append(
        #     tr.TransformerDecoder(
        #         enc_layer, param[1], tr.layer_norm(self.d_model, param[2])
        #     )
        # )
        
        self.transformer = tr.Transformer(d_model=self.d_model, nhead=self.num_heads, num_encoder_layers=1,
                 num_decoder_layers=1, dim_feedforward=self.dim_feedforward, dropout=self.dropout,
                 activation='relu', custom_encoder=None, custom_decoder=None)


        # Build classifier
        self.feature_encoder = feature_encoder_1d(self.input_len, self.d_model)
        
        self.feature_decoder = nn.Sequential(
            nn.Linear(self.d_model, 64),
            # nn.Sigmoid(),
        )
        
        self.feature_decoder2 = nn.Sequential(
            nn.Linear(self.d_model*self.input_len, 250),
            nn.Sigmoid(),
        )
    
    def create_keys(self, keys, encoded_features):
        _b, _l, _c = encoded_features.shape
        keys = keys.expand(_b, -1, -1)
        return keys

    def forward(self, encoded_features, memory_key_padding_mask=None):
        encoded_features = self.feature_encoder(encoded_features) # (B x 64) -> (B x 64*d_model32)
        encoded_features = encoded_features.transpose(0, 1) # (Bx64xd_model32) -> (64xBxd_model32)
        q = self.pos_encoding(encoded_features)
        # print(self.create_keys(self.keys, encoded_features).shape, q.shape)
        spectrum = self.transformer(self.create_keys(self.keys.weight.unsqueeze(1).repeat(1, encoded_features.shape[1], 1), encoded_features), q)
        # spectrum = self.transformer(self.create_keys(self.keys, encoded_features), q)
        # kkk = self.keys.weight.unsqueeze(1).repeat(1, encoded_features.shape[1], 1)
        # print(torch.max(kkk), torch.min(kkk))
        # spectrum = self.feature_decoder(spectrum)
        # print(spectrum.shape)
        spectrum = spectrum.transpose(0, 1).flatten(1,2)
        # print(spectrum.shape)
        spectrum = self.feature_decoder2(spectrum)
        # print(spectrum.shape)
        return spectrum[:,:,None]

In [3]:
from torch.utils.data import DataLoader
from torch import optim
import numpy as np
class CalculateMSE():
    def __init__(self, device,n_epochs,batch_size ):
        super().__init__()
        self.net = SpectrumTransformer().to(device)
        #initialize some constants
        self.batch_size = 32
        self.learning_rate = 1e-4
        self.n_epochs = n_epochs
        self.net.apply(self.weights_init)   
        self.device=device
    def weights_init(self,layer):
        if type(layer) == nn.Linear:
            nn.init.orthogonal_(layer.weight)
    def get_mse(self,train_data, train_label, test_data, test_label):
        train_set = torch.utils.data.TensorDataset(
            torch.Tensor(train_data), 
            torch.Tensor(train_label))
        val_set = torch.utils.data.TensorDataset(
            torch.Tensor(test_data), 
            torch.Tensor(test_label))
        loader_args = dict(batch_size=self.batch_size)
        train_loader = DataLoader(train_set, shuffle=True, drop_last=True, **loader_args)
        val_loader = DataLoader(val_set, shuffle=True, drop_last=True, **loader_args)
        tloss = []
        vloss = []
        criterion = nn.MSELoss()
        optimizer = optim.Adam(self.net.parameters(), lr=self.learning_rate) # weight_decay=0
        for epoch in range(0, self.n_epochs):
            epoch_train_loss=[]
            self.net.train()
            for i, data in enumerate(train_loader, 0):
                inputs, label = data
                y_pred = self.net(inputs.to(self.device).unsqueeze(2))
                loss = criterion(y_pred, label.to(self.device).unsqueeze(2))
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                epoch_train_loss.append(loss.item())
            tloss.append(np.mean(epoch_train_loss))
            epoch_loss=[]
            self.net.eval()
            for i, data in enumerate(val_loader, 0):
                with torch.no_grad():
                    inputs1, label1 = data
                    y_pred1 = self.net(inputs1.to(self.device).unsqueeze(2))
                    loss1 = criterion(y_pred1, label1.to(self.device).unsqueeze(2))
                    epoch_loss.append(loss1.item())
            vloss.append(np.mean(epoch_loss))
        return np.min(vloss), self.net


In [4]:
from pathlib import Path
n_epochs=3000
batch_size=32
PATH = 'saved_model/Transformer_combined_poisson_30percent/'
Path(PATH).mkdir(parents=True, exist_ok=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
losses = []
def add_noise(inputs,inputs2, a = 0.02, std = 0.02, sequence_length=64, noise_seed=None):
    if noise_seed is not None:
        np.random.seed(noise_seed)
    noise = np.random.normal(0,std, size = (inputs.shape[0], sequence_length)).astype(np.float32)
    noise2 = np.random.normal(0,std, size = (inputs2.shape[0], sequence_length)).astype(np.float32)
    #noise = poisson.rvs(mu, size=(inputs.shape[0], sequence_length)).astype(np.float32)
    
    nsd = a*np.random.poisson(inputs/a).astype(np.float32)
    nsd2 = a*np.random.poisson(inputs2/a).astype(np.float32)
    
    # Calculate the absolute error between nsd and original inputs
    absolute_error = np.abs(nsd - inputs)
    absolute_error2 = np.abs(nsd2 - inputs2)
    # Sum up the absolute errors
    total_error = np.sum(absolute_error)
    total_error2 = np.sum(absolute_error2)
    # Sum of the original data
    total_original = np.sum(inputs)
    total_original2 = np.sum(inputs2)

    # Calculate the noise ratio
    noise_ratio = (total_error+total_error2) / (total_original+total_original2)
    
    return nsd, nsd2, noise_ratio
noise_ratios = []
for i,(train,test) in enumerate(zip(train_folds,test_folds)):
    train_data, train_label= train[0],train[1]
    test_data, test_label= test[0],test[1]
    # Adding noise to the train and test data
    train_data,test_data,noise_ratio = add_noise(train_data,test_data, a=0.058,std = 0.05, sequence_length=64, noise_seed=i)
    print(noise_ratio)
    noise_ratios.append(noise_ratio)
    mse_calculator = CalculateMSE(device,n_epochs,batch_size)
    loss,model = mse_calculator.get_mse(train_data, train_label, test_data, test_label)
    losses.append(loss)
    print(i,loss)
    torch.save(model.state_dict(), PATH+'model'+str(i))
print(np.mean(losses),np.std(losses))
print(np.mean(noise_ratios))

0.2991638516368172
0 0.02490582801401615
0.30001349371568536
1 0.029095442220568656
0.30083086123556907
2 0.023962047696113587
0.30097298174478687
3 0.030762665346264838
0.29975233389828004
4 0.01938649695366621
0.2992994934645396
5 0.024121922627091407
0.30073801023410157
6 0.025475231185555457
0.3009065363556807
7 0.025385582819581032
0.29975297467000084
8 0.025454199686646462
0.30008254071360696
9 0.021292650327086448
0.024984206687659025 0.003125257941404093
0.3001513077669068


In [5]:
print(np.mean(losses),np.std(losses))

0.024984206687659025 0.003125257941404093


In [6]:
number_figures = 10
import matplotlib.pyplot as plt
model=mse_calculator.net
indices = torch.randint(0,len(response),(number_figures,)).unique()
for i in indices:
    res = response[i].reshape(1,-1,1)
    spec = spectra[i].flatten()
    plt.figure(i)

    plt.plot(model(torch.Tensor(res).to(device)).detach().cpu().flatten())
    plt.plot(spec)
    plt.legend(['reconstruction','ground truth'])


NameError: name 'response' is not defined