In [None]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable

import os
import numpy as np
import scipy
import matplotlib.pyplot as plt
import pandas as pd
import itertools
import progressbar

print("Using Pytorch",torch.__version__)

In [None]:
#@title Define NN architecture and loss function

##################################
# Custom Dataset
##################################
class ColvarDataset(Dataset):
    """COLVAR dataset"""

    def __init__(self, colvar_list):
        self.nstates = len( colvar_list )
        self.colvar = colvar_list

    def __len__(self):
        return len(self.colvar[0])

    def __getitem__(self, idx):
        x = ()
        for i in range(self.nstates):
            x += (self.colvar[i][idx],)
        return x

#useful for cycling over the test dataset if it is smaller than the training set
def cycle(iterable):
    while True:
        for x in iterable:
            yield x

##################################
# Define Networks
##################################

class NN_DeepLDA(nn.Module):

    def __init__(self, l ):
        super(NN_DeepLDA, self).__init__()

        modules=[]
        for i in range( len(l)-1 ):
            print(l[i],' --> ', l[i+1], end=' ')
            if( i<len(l)-2 ):
                modules.append(nn.Linear(l[i], l[i+1]) )
                modules.append( nn.ReLU(True) )
                print("(relu)")
            else:
                modules.append(nn.Linear(l[i], l[i+1]) )
                print("")

        self.nn = nn.Sequential(*modules)

        #norm option
        self.normIn = False

    def set_norm(self, Mean: torch.Tensor, Range: torch.Tensor):
        self.normIn = True
        self.Mean = Mean
        self.Range = Range

    def normalize(self, x: Variable):
        batch_size = x.size(0)
        x_size = x.size(1)

        Mean = self.Mean.unsqueeze(0).expand(batch_size, x_size)
        Range = self.Range.unsqueeze(0).expand(batch_size, x_size)

        return x.sub(Mean).div(Range)

    def get_hidden(self, x: Variable, svd=False, svd_vectors=False, svd_eigen=False, training=False) -> (Variable):
        if(self.normIn):
            x = self.normalize(x)
        z = self.nn(x)
        return z

    def set_lda(self, x: torch.Tensor):
        self.lda = nn.Parameter(x.unsqueeze(0), requires_grad=False)

    def get_lda(self) -> (torch.Tensor):
        return self.lda

    def apply_lda(self, x: Variable) -> (Variable):
        z = torch.nn.functional.linear(x,self.lda)
        return z

    def forward(self, x: Variable) -> (Variable):
        z = self.get_hidden(x,svd=False)
        z = self.apply_lda(z)
        return z

    def get_cv(self, x: Variable) -> (Variable):
        return self.forward(x)

# auxiliary class to export a model which outputs the topmost hidden layer
class NN_Hidden(nn.Module):

    def __init__(self, l ):
        super(NN_Hidden, self).__init__()

        modules=[]
        for i in range( len(l)-1 ):
            if( i<len(l)-2 ):
                modules.append(nn.Linear(l[i], l[i+1]) )
                modules.append( nn.ReLU(True) )
            else:
                modules.append(nn.Linear(l[i], l[i+1]) )

        self.nn = nn.Sequential(*modules)

        #norm option
        self.normIn = False

    def set_norm(self, Mean: torch.Tensor, Range: torch.Tensor):
        self.normIn = True
        self.Mean = Mean
        self.Range = Range

    def normalize(self, x: Variable):
        batch_size = x.size(0)
        x_size = x.size(1)
        Mean = self.Mean.unsqueeze(0).expand(batch_size, x_size)
        Range = self.Range.unsqueeze(0).expand(batch_size, x_size)
        return x.sub(Mean).div(Range)

    def get_hidden(self, x: Variable, svd=False, svd_vectors=False, svd_eigen=False, training=False) -> (Variable):
        if(self.normIn):
            x = self.normalize(x)
        z = self.nn(x)
        return z

    def forward(self, x: Variable) -> (Variable):
        z = self.get_hidden(x,svd=False)
        return z

##################################
# Loss function
##################################

def LDAloss_cholesky(H, label, test_routines=False):
    #sizes
    N, d = H.shape

    # Mean centered observations for entire population
    H_bar = H - torch.mean(H, 0, True)
    #Total scatter matrix (cov matrix over all observations)
    S_t = H_bar.t().matmul(H_bar) / (N - 1)
    #Define within scatter matrix and compute it
    S_w = torch.Tensor().new_zeros((d, d), device = device, dtype = dtype)
    S_w_inv = torch.Tensor().new_zeros((d, d), device = device, dtype = dtype)
    buf = torch.Tensor().new_zeros((d, d), device = device, dtype = dtype)
    #Loop over classes to compute means and covs
    for i in range(categ):
        #check which elements belong to class i
        H_i = H[torch.nonzero(label == i).view(-1)]
        # compute mean centered obs of class i
        H_i_bar = H_i - torch.mean(H_i, 0, True)
        # count number of elements
        N_i = H_i.shape[0]
        if N_i == 0:
            continue
        S_w += H_i_bar.t().matmul(H_i_bar) / ((N_i - 1) * categ)

    S_b = S_t - S_w

    S_w = S_w + lambdA * torch.diag(torch.Tensor().new_ones((d), device = device, dtype = dtype))

    ## Generalized eigenvalue problem: S_b * v_i = lambda_i * Sw * v_i

    # (1) use cholesky decomposition for S_w
    L = torch.cholesky(S_w,upper=False)

    # (2) define new matrix using cholesky decomposition and
    L_t = torch.t(L)
    L_ti = torch.inverse(L_t)
    L_i = torch.inverse(L)
    S_new = torch.matmul(torch.matmul(L_i,S_b),L_ti)

    # (3) solve  S_new * w_i = lambda_i * w_i
    eig_values, eig_vectors = torch.symeig(S_new,eigenvectors=True)
    eig_vectors = eig_vectors.t()
    # (4) sort eigenvalues and retrieve old eigenvector
    #eig_values, ind = torch.sort(eig_values, 0, descending=True)
    max_eig_vector = eig_vectors[-1]
    max_eig_vector = torch.matmul(L_ti,max_eig_vector)
    norm=max_eig_vector.pow(2).sum().sqrt()
    max_eig_vector.div_(norm)

    loss = - eig_values[-1]

    return loss, eig_values, max_eig_vector, S_b, S_w

# Evaluate LDA over all the training set
def check_LDA_cholesky(loader, model):
    with torch.no_grad():
        for data in loader:
            X,y = data[0].float().to(device),data[1].long().to(device)
            H  = model.get_hidden(X)
            _, eig_values, eig_vector, _, _ = LDAloss_cholesky(H, y)
    return eig_values, eig_vector


In [None]:
#@title Encoding and plotting functions

##################################
# Encoding functions
##################################

def encode_hidden(loader,model,batch,n_hidden,device):
    """Compute the hidden layer over a dataloader"""
    s=np.empty((len(loader),batch,n_hidden))
    l=np.empty((len(loader),batch))
    for i,data in enumerate(loader):
        x,lab = data[0].float(),data[1].long()
        x = Variable(x).to(device)
        cv = model.get_hidden(x,svd=False)
        #cv = model.apply_pca(cv)
        s[i] = cv.detach().cpu().numpy()
        l[i] = lab

    s=s.reshape(len(loader)*batch,n_hidden)
    s=s[0:len(loader)*batch]

    l=l.reshape(len(loader)*batch)
    l=l[0:len(loader)*batch]

    sA = s[l==0]
    sB = s[l==1]

    return sA,sB

def encode_cv(loader,model,batch,n_cv,device):
    """Compute the CV over a dataloader"""
    s=np.empty((len(loader),batch,n_cv))
    l=np.empty((len(loader),batch))
    for i,data in enumerate(loader):
        x,lab = data[0].float(),data[1].long()
        x = Variable(x).to(device)
        cv = model(x)
        s[i] = cv.detach().cpu().numpy()
        l[i] = lab

    s=s.reshape(len(loader)*batch,n_cv)
    s=s[0:len(loader)*batch]

    l=l.reshape(len(loader)*batch)
    l=l[0:len(loader)*batch]

    sA = s[l==0]
    sB = s[l==1]

    return sA,sB

def encode_cv_all(loader,model,batch,n_cv,device):
    """Compute the CV over a dataloader with labels"""
    s=np.empty((len(loader),batch,n_cv))
    l=np.empty((len(loader),batch))
    for i,data in enumerate(loader):
        x,lab = data[0].float(),data[1].long()
        x = Variable(x).to(device)
        cv = model.get_cv(x)
        s[i] = cv.detach().cpu().numpy()
        l[i] = lab

    s=s.reshape(len(loader)*batch,n_cv)
    s=s[0:len(loader)*batch]

    l=l.reshape(len(loader)*batch)
    l=l[0:len(loader)*batch]

    return s,l

##################################
# Plotting functions
##################################

def plot_results(save=False,testing=False):
    fig, axes = plt.subplots(1, 3, figsize=(13,4))
    plot_training(axes[0],save)
    plot_H(axes[1],save,testing)
    plot_CV(axes[2],save,testing)
    if save:
        fig.savefig("{}/{}.png".format(tr_folder, "training"),dpi=150)
        plt.close()
    else:
        plt.show()

def plot_training(ax,save=False,training=False):
    ax.set_title("Deep-LDA optimization")
    ax.plot(np.asarray(ep),np.asarray(eig),'.-', c='tab:green', label='train-batch')
    ax.plot(np.asarray(ep),np.asarray(eig_t),'.-', c='tab:grey', label='train')
    ax.plot(np.asarray(ep),np.asarray(eig_val),'.-', c='tab:orange', label='valid')
    ax.set_xlabel("Epoch")
    ax.set_ylabel("1st Eigenvalue")
    ax.legend()

def plot_H(ax,save=False,testing=False):
    ax.set_title("LDA on Hidden-space H")
    # -- Testing and Validation histograms --
    trA,trB = encode_hidden(train_all_loader,model,train_data,n_hidden,device)
    eigen=max_eig_vector.detach().numpy()

    ax.scatter(trA[:,0],trA[:,1], c='tab:red', label='train A',alpha=0.3)
    ax.scatter(trB[:,0],trB[:,1], c='tab:blue', label='train B',alpha=0.3)

    if testing:
        ttA,ttB = encode_hidden(valid_loader,model,valid_data,n_hidden,device)
        ax.scatter(ttA[:,0],ttA[:,1], c='tab:orange', label='valid A',s=0.2, alpha=0.5)
        ax.scatter(ttB[:,0],ttB[:,1], c='tab:cyan', label='valid B',s=0.2, alpha=0.5)
        mIN=np.min([np.min(trA[:,0]),np.min(trB[:,0]),np.min(ttA[:,0]),np.min(ttB[:,0])])
        mAX=np.max([np.max(trA[:,0]),np.max(trB[:,0]),np.max(ttA[:,0]),np.max(ttB[:,0])])
    else:
        mIN=np.min([np.min(trA[:,0]),np.min(trB[:,0])])
        mAX=np.max([np.max(trA[:,0]),np.max(trB[:,0])])

    ax.set_xlabel(r"$h_0$")
    ax.set_ylabel(r"$h_1$")

    #x=np.linspace(mIN,mAX,100)
    #y=-eigen[0]/eigen[1]*x+0
    #plt.plot(x,y, linewidth=2, label='DeepLDA')
    ax.legend()

def plot_CV(ax,save=False,testing=False):
    sA,sB = encode_cv(train_all_loader,model,train_data,n_cv,device)
    sA,sB = sA[:,0], sB[:,0]
    if testing:
        stA,stB = encode_cv(valid_loader,model,valid_data,n_cv,device)
        stA,stB = stA[:,0], stB[:,0]
        min_s=np.min([np.min(sA),np.min(sB),np.min(stA),np.min(stB)])
        max_s=np.max([np.max(sA),np.max(sB),np.max(stA),np.max(stB)])
    else:
        min_s=np.min([np.min(sA),np.min(sB)])
        max_s=np.max([np.max(sA),np.max(sB)])

    b=np.linspace(min_s,max_s,100)

    ax.set_title("Deep-LDA CV Histogram")
    ax.hist(sA, bins=b, ls='dashed', alpha = 0.7, lw=2, color='tab:red', label='train A',density=True)
    ax.hist(sB, bins=b, ls='dashed', alpha = 0.7, lw=2, color='tab:blue', label='train B',density=True)

    if testing:
        plt.hist(stA, bins=b, ls='dashed', alpha = 0.5, lw=2, color='tab:orange', label='valid A',density=True)
        plt.hist(stB, bins=b, ls='dashed', alpha = 0.5, lw=2, color='tab:cyan', label='valid B',density=True)

    ax.legend()


## **Load files**

Specify dataset structure (use **from_column=1** to exclude time from COLVAR file)

In [None]:
##@title **Load files**
n_input =  17#no of descriptors {type:"integer"}
from_column =  2#normally ignore first column, which is time {type:"integer"}

#load files:
#A.dat contains the information of state A; distA.shape= no_of_frames_in_A*17 
#B.dat contains the information of state B; distB.shape= no_of_frames_in_B*17 

distA = np.loadtxt('A.dat',usecols=range(from_column,from_column+n_input))
distB = np.loadtxt('B.dat',usecols=range(from_column,from_column+n_input))

print("[Imported data]")
print("- dataA.shape:", distA.shape)
print("- dataB.shape:", distB.shape)

In [None]:
##@title Create datasets
standardize_inputs = True #@param {type:"boolean"}

if standardize_inputs:
    print("[Standardize inputs]")
    print("- Calculating mean and range over the training set")
    Max=np.amax(np.concatenate([distA,distB],axis=0),axis=0)
    Min=np.amin(np.concatenate([distA,distB],axis=0),axis=0)

    Mean=(Max+Min)/2.
    Range=(Max-Min)/2.
    if(np.sum(np.argwhere(Range<1e-6))>0):
        print("- [Warning] Skipping normalization where range of values is < 1e-6. Input(s):", np.argwhere(Range<1e-6).reshape(-1))
        Range[Range<1e-6]=1.

# create labels
lA=np.zeros_like(distA[:,0])
lB=np.ones_like(distB[:,0])

dist=np.concatenate([distA,distB],axis=0)
dist_label=np.concatenate([lA,lB],axis=0)

print(dist.shape)

p = np.random.permutation(len(dist))
dist, dist_label = dist[p], dist_label[p]

#@title Training and validation set size
train_data = 12000#@param {type:"integer"}
batch_tr = 4000#@param {type:"integer"}
train_labels=ColvarDataset([dist[:train_data],dist_label[:train_data]])
train_loader=DataLoader(train_labels, batch_size=batch_tr,shuffle=True)

# create additional dataset which cover all the training set in one batch
train_all_labels=ColvarDataset([dist[:train_data],dist_label[:train_data]])
train_all_loader=DataLoader(train_all_labels, batch_size=train_data)

#print(list(train_all_labels))

# The validation is evaluated in a single batch
valid_data = 4000#@param {type:"integer"}
batch_val=valid_data
valid_labels=ColvarDataset([dist[train_data:train_data+valid_data],dist_label[train_data:train_data+valid_data]])
valid_loader=DataLoader(valid_labels, batch_size=batch_val)

In [None]:
##@title NN and training parameters
#type
dtype = torch.float32
# wheter to use CUDA
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

categ = 2
n_cv=1

hidden_nodes = "20,20,5" #@param {type:"raw"}
nodes = [int(x)for x in hidden_nodes.split(',')]
nodes.insert(0, n_input)
n_hidden=nodes[-1]

print("[NN Architecture]")
print("- hidden layers:", nodes)
print("")
print("========= NN =========")
model = NN_DeepLDA(nodes)
if standardize_inputs:
    model.set_norm(torch.tensor(Mean,dtype=dtype,device=device),torch.tensor(Range,dtype=dtype,device=device))
print("======================")
model.to(device)
if torch.cuda.is_available():
    print("using CUDA acceleration")
    print("========================")

# -- Optimization --
lrate = 0.001 #@param {type:"slider", min:0.0001, max:0.005, step:0.0001}
lambdA = 0.05 #@param {type:"number"}
l2_reg = 1e-5 #@param {type:"number"}
act_reg = 1./lambdA # lorentzian regularization

print("")
print("[Optimization]")
print("- Learning rate \t=",lrate)
print("- l2 regularization \t=",l2_reg)
print("- lambda (S_w reg.) \t=",lambdA)
print("- lorentian (CV reg.) \t=",act_reg)

#OPTIMIZERS
opt = torch.optim.Adam(model.parameters(), lr=lrate, weight_decay=l2_reg)

#define arrays and values
ep = []
eig = []
eig_t = []
eig_val = []
init_epoch = 0
best_result = 0

In [None]:
##@title Training
num_epochs =  100#@param {type:"number"}
print_loss = 1#@param {type:"slider", min:1, max:100, step:1}
plot_every = 5#@param {type:"slider", min:1, max:100, step:1}
plot_validation = True #@param {type:"boolean"}

#format output
float_formatter = lambda x: "%.6f" % x
np.set_printoptions(formatter={'float_kind':float_formatter})

print('[{:>3}/{:>3}] {:>10} {:>10} {:>10} {:>10}'.format('ep','tot','eig_train','eig_valid','reg loss','eigenvector'))

# -- Training --
for epoch in range(num_epochs):
    for data in train_loader:
        # =================get data===================
        X,y = data[0].float().to(device),data[1].long().to(device)
        # =================forward====================
        #H,S = model.get_hidden(X,svd=True,svd_eigen=True)
        H = model.get_hidden(X)
        # =================lda loss===================
        lossg, eig_values, max_eig_vector, Sb, Sw = LDAloss_cholesky(H, y)
        model.set_lda(max_eig_vector)
        s = model.apply_lda(H)
        # =================reg loss===================
        #reg_loss = H.pow(2).sum().div( H.size(0) )
        #reg_loss_lor = - act_reg / (1+(reg_loss-1).pow(2))
        Ha = H[y==0]
        Hb = H[y==1]
        reg_loss = Ha.pow(2).sum().div( Ha.size(0) ) + Hb.pow(2).sum().div( Hb.size(0) )
        reg_loss_lor = - act_reg / (1+(reg_loss-1.0).pow(2))
        # =================backprop===================
        opt.zero_grad()
        lossg.backward(retain_graph=True)
        reg_loss_lor.backward()
        opt.step()

    #Compute LDA over entire datasets and save LDA eigenvector
    train_eig_values, train_eig_vector = check_LDA_cholesky(train_all_loader, model)
    model.set_lda(train_eig_vector)
    valid_eig_values, valid_eig_vector = check_LDA_cholesky(valid_loader, model)

    #save results
    ep.append(epoch+init_epoch+1)
    eig.append(eig_values.detach().numpy()[-1])
    eig_t.append(train_eig_values[-1])
    eig_val.append(valid_eig_values[-1])

    if (epoch+1)%print_loss == 0:
        print('[{:3d}/{:3d}] {:10.3f} {:10.3f} {:10.3f} '.format
          (init_epoch+epoch+1, init_epoch+num_epochs, train_eig_values.detach().numpy()[-1], valid_eig_values.numpy()[-1], reg_loss), train_eig_vector.numpy() )

    if (epoch+1)%plot_every == 0:
        plot_results(testing=plot_validation)

init_epoch += num_epochs

In [None]:
#@title Plot hidden components
#encode_hidden computes the hidden variables for an entire dataset, using the specified model

trA,trB = encode_hidden(train_all_loader,model,train_data,n_hidden,device)
print(np.shape(trA), np.shape(trB))
fig, axs = plt.subplots(1, n_hidden, figsize=(3.5*n_hidden,3.5))
fig.suptitle('Hidden components')
for i in range(n_hidden):
    ax = axs[i]
    ax.set_xlabel("Training set")
    ax.set_ylabel(r"$h_"+str(i)+"$")
    ax.plot(trA[:,i], c='tab:red', label='trA')
    ax.plot(trB[:,i], c='tab:blue', label='trB')
    ax.legend()

fig.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()

In [None]:
#@title Scatter plot w/LDA boundaries
from itertools import combinations

#get LDA eigenvector
eigen = model.get_lda().numpy()[0]

#compute h
trA,trB = encode_hidden(train_all_loader,model,train_data,n_hidden,device)
ttA,ttB = encode_hidden(valid_loader,model,valid_data,n_hidden,device)

#plot layout
n_plots=len(list(combinations(range(n_hidden),2)))
num_plots_per_line = 5 #@param {type:"slider", min:1, max:10, step:1}
num_lines=int(n_plots/num_plots_per_line+0.999)

#define subplots
fig, axs = plt.subplots(num_lines, num_plots_per_line, figsize=(3*num_plots_per_line,3*num_lines),dpi=100)
axs = axs.reshape(-1)
fig.suptitle('Scatter plots of hidden components')

#iterate and plot
idx=0
point_size=2 #@param {type:"slider", min:0.5, max:20, step:0.5}
for i,j in combinations(range(n_hidden),2):
    ax = axs[idx]
    idx+=1
    ax.scatter(trA[:,i],trA[:,j], c='tab:red', s=point_size, label='trA')
    ax.scatter(ttA[:,i],ttA[:,j], c='tab:orange', s=point_size, label='testA',alpha=0.3,marker='o')
    ax.scatter(trB[:,i],trB[:,j], c='tab:blue', s=point_size, label='trB')
    ax.scatter(ttB[:,i],ttB[:,j], c='tab:cyan', s=point_size, label='testB',alpha=0.3,marker='o')
    #plot LDA line
    mIN=np.min([np.min(trA[:,i]),np.min(trB[:,i]),np.min(ttA[:,i]),np.min(ttB[:,i])])
    mAX=np.max([np.max(trA[:,i]),np.max(trB[:,i]),np.max(ttA[:,i]),np.max(ttB[:,i])])
    x=np.linspace(mIN,mAX,100)
    y=-eigen[i]/eigen[j]*x+0
    ax.plot(x,y, linewidth=2,  label='LDA boundary', color='darkgrey',alpha=0.7,linestyle='dashed')
    #labels
    ax.set_xlabel(r"$h_"+str(i)+"$")
    ax.set_ylabel(r"$h_"+str(j)+"$")
    if idx == 1:
        leg=ax.legend(loc='lower left', bbox_to_anchor= (0.0, 1.01), ncol=5,
            borderaxespad=0, frameon=False)
        leg.set_in_layout(False)

fig.tight_layout(rect=[0, 0.03, 1, 0.93])
plt.show()

In [None]:
torch_distA = torch.tensor(distA, dtype=torch.float)
torch_distB = torch.tensor(distB, dtype=torch.float)

resultA = model(torch_distA)
resultB = model(torch_distB)
np.savetxt("A_from_model.dat", resultA.detach().numpy())
np.savetxt("B_from_model.dat", resultB.detach().numpy())

## **Download model**

In [None]:
##@title Download model

model_name = "model_walp" #@param {type:"string"}
save_hidden_layer_model = True #@param {type:"boolean"}
save_lda_coeffs = True #@param {type:"boolean"}
save_pictures = True #@param {type:"boolean"}
save_checkpoint = True #@param {type:"boolean"}

# == Set output folder
tr_folder=model_name+"/"
!mkdir -p "{tr_folder}"

# == Create fake dataloader ==
fake_loader = DataLoader(train_labels, batch_size=1,shuffle=False)
fake_input = next(iter(fake_loader ))[0].float()

# == Export model ==
mod = torch.jit.trace(model, fake_input)
mod.save(tr_folder+model_name+".pt")

if save_hidden_layer_model:
    model2 = NN_Hidden(nodes)
    if standardize_inputs:
        model2.set_norm(torch.tensor(Mean,dtype=dtype,device=device),torch.tensor(Range,dtype=dtype,device=device))
    #transfer parameters
    params = model.named_parameters()
    params2 = model2.named_parameters()
    dict_params2 = dict(params2)
    for name, param in params:
        if name in dict_params2:
            dict_params2[name].data.copy_(param.data)
    # save model
    mod2 = torch.jit.trace(model2, fake_input)
    mod2.save(tr_folder+model_name+"_hidden.pt")

if save_lda_coeffs:
    f = open(tr_folder+"lda.dat", "w")
    f.write(str(model.get_lda().numpy()))
    f.close()
    
if save_pictures:
    plot_results(save=True,testing=True)

if save_checkpoint:
    torch.save(model.state_dict(), tr_folder+model_name+'_state_dict.pt')
    np.savetxt(tr_folder+model_name+'_mean.dat', Mean)
    np.savetxt(tr_folder+model_name+'_range.dat', Range)
    