In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import scipy.io as sio
import numpy as np
import time 
import matplotlib.pyplot as plt

from python_utils import models

### setting parameters

In [None]:
model_layers   = 2
nonlinearity   = 'tanh'
latent_nums    = [1,2,3,4]
learning_rates = [1e-5,1e-5,1e-5,1e-5]
all_epochs     = [200000,200000,200000,200000]

test_skip_idx = 5   #grab every 5th entry of dictionary for testing

device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")

### loading dictionary and setting training / testing

In [None]:
#Loading the baseline dictionary
dictionary = np.real(sio.loadmat('data/dictionary.mat')['dictionary'])
t2_range   = sio.loadmat('data/dict_params.mat')['dictionary_params'][0][0][3]
    
    
t2_range_training = np.expand_dims(np.delete(t2_range,slice(None,None,test_skip_idx)),axis=0)
t2_range_testing  = t2_range[:,::test_skip_idx]

dictionary_training = np.delete(dictionary,slice(None,None,test_skip_idx),axis=1)
dictionary_testing  = dictionary[:,::test_skip_idx]
    
[T,Ntraining] = dictionary_training.shape
[_,Ntesting]  = dictionary_testing.shape

### defining auto-encoder

In [None]:
class autoencoder(nn.Module):
    def __init__(self,input_dimension,hidden_dimension,model_layers = 2, nonlinearity = 'leakyrelu'):
        super().__init__()
        
        if(nonlinearity == 'leakyrelu'):
            self.activation = F.leaky_relu
        elif(nonlinearity == 'relu'):
            self.activation = F.relu
        elif(nonlinearity == 'tanh'):
            self.activation = torch.tanh
        elif(nonlinearity == 'nalini'):
            self.activation = self.nalini
        elif(nonlinearity == 'htanh'):
            self.activation = F.hardtanh
        elif(nonlinearity == 'selu'):
            self.activation = torch.nn.SELU()
            
        self.encoding_layers = nn.ModuleList([])
        for ii in range(model_layers):
            if(ii < (model_layers - 1)):#last layer goes from current dimension directly to latent dimension
                self.encoding_layers.append(nn.Linear(input_dimension // 2**ii, input_dimension // 2**(ii+1)))              
            else:
                self.encoding_layers.append(nn.Linear(input_dimension // 2**ii, hidden_dimension))
                
        
        self.decoding_layers = nn.ModuleList([])
        self.decoding_layers.append(nn.Linear(hidden_dimension,input_dimension//2**(model_layers-1)))
        for ii in range(model_layers-1):
            self.decoding_layers.append(nn.Linear(input_dimension//2**(model_layers-(ii+1)),\
                                       input_dimension//2**(model_layers-(ii+1)-1)))

    def nalini(self,x):
        return x + F.relu((x-1)/2) + F.relu((-x-1)/2)
        
    def decode(self,x):
        out = x
        out = self.decoding_layers[0](out)
        for ii in range(len(self.decoding_layers)-1):
            out = self.activation(out)
            out = self.decoding_layers[ii+1](out)
            
        return out
    
    def encode(self,x):
        out = x
        for layer in self.encoding_layers:
            out = layer(out)
            out = self.activation(out)
        return out

    def forward(self,x):
        return self.decode(self.encode(x))

### training models

In [None]:
criterion = nn.MSELoss()

#re-shaping dictionary, casting it to right type, and sending it to GPU
dictionary_for_training = torch.tensor(np.transpose(dictionary_training,(1,0)),dtype = torch.float).to(device)

all_models = []

for latent_num,epochs,learning_rate in zip(latent_nums,all_epochs,learning_rates):
    starting_time = time.time()
    
    print('Latent Variables: %d || epochs: %d || learning rate: %.3f' % (latent_num,epochs,learning_rate))
    
    model = autoencoder(T,latent_num,model_layers,nonlinearity)
        
    model.to(device)
    
    optimizer = optim.Adam(model.parameters(),lr=learning_rate)
    
    for epoch in range(epochs):
        optimizer.zero_grad()
        
        output = model(dictionary_for_training)
        loss   = criterion(output,dictionary_for_training)
        loss.backward()
        optimizer.step()
        
        running_loss = loss.item()
        
        if(epoch % 1000 == 0):
            print('  iteration %d/%d current loss: %.12f' % (epoch,epochs,running_loss))
    
    print('  final loss: %.12f' % running_loss)
    ending_time = time.time()
    print('  elapsed time: %.2f min' % ((ending_time - starting_time)/60))

    all_models.append(model)

### computing training and testing errors

In [None]:
rmse = lambda truth, comp: np.sqrt(np.sum((comp - truth)**2,keepdims=False,axis=0)) / np.sqrt(np.sum(truth**2,keepdims=False,axis=0))

[u,s,v] = np.linalg.svd(dictionary,full_matrices=False)

all_basis = []

linear_errors_training   = []
proposed_errors_training = []

linear_errors_testing   = []
proposed_errors_testing = []

dictionary_linear_training   = []
dictionary_proposed_training = []

dictionary_linear_testing   = []
dictionary_proposed_testing = []

for latent_num, model in zip(latent_nums,all_models):
    #determine basis
    basis = u[:,0:latent_num]
    
    #estimate dictionaries with svd and autoencoder
    dictionary_estimate_linear_training   = basis @ np.conj(basis.T) @ dictionary_training
    dictionary_estimate_proposed_training = model(dictionary_for_training).cpu().detach().numpy().transpose((1,0))
    
    dictionary_estimate_linear_testing    = basis @ np.conj(basis.T) @ dictionary_testing
    dictionary_for_testing                = \
        torch.tensor(np.transpose(dictionary_testing,(1,0)),dtype = torch.float).to(device)
    dictionary_estimate_proposed_testing = \
        model(dictionary_for_testing).cpu().detach().numpy().transpose((1,0))

    error_linear_training   = rmse(dictionary_training,dictionary_estimate_linear_training)
    error_proposed_training = rmse(dictionary_training,dictionary_estimate_proposed_training)
    
    error_linear_testing    = rmse(dictionary_testing,dictionary_estimate_linear_testing)
    error_proposed_testing  = rmse(dictionary_testing,dictionary_estimate_proposed_testing)
    
    linear_errors_training.append(error_linear_training)
    proposed_errors_training.append(error_proposed_training)
    
    linear_errors_testing.append(error_linear_testing)
    proposed_errors_testing.append(error_proposed_testing)
    
    dictionary_linear_training.append(dictionary_estimate_linear_training)
    dictionary_proposed_training.append(dictionary_estimate_proposed_training)
    
    dictionary_linear_testing.append(dictionary_estimate_linear_testing)
    dictionary_proposed_testing.append(dictionary_estimate_proposed_testing)

### entry by entry training error

In [None]:
lw = 3
for latent_num,error_linear,error_proposed in zip(latent_nums,linear_errors_training,proposed_errors_training):
    print('average percent errors:')
    print('  proposed: %.2f' % (np.mean(error_proposed)*100))
    print('  linear:   %.2f' % (np.mean(error_linear)*100))
    
    plt.title('Latent Variables: %d' % latent_num,fontsize=24)
    plt.plot(error_proposed,linewidth=lw)
    plt.plot(error_linear,linewidth=lw)
    plt.legend(['proposed','linear'],prop={'size':20})
    plt.ylim([0,.3])
    plt.show()

### entry by entry testing error

In [None]:
lw = 3
for latent_num,error_linear,error_proposed in zip(latent_nums,linear_errors_testing,proposed_errors_testing):
    print('average percent errors:')
    print('  proposed: %.2f' % (np.mean(error_proposed)*100))
    print('  linear:   %.2f' % (np.mean(error_linear)*100))\
    
    plt.title('Latent Variables: %d' % latent_num,fontsize=24)
    plt.plot(error_proposed,linewidth = lw)
    plt.plot(error_linear,  linewidth = lw)
    plt.legend(['proposed','linear'],prop={'size':20})
    plt.ylim([0,.4])
    plt.show()