Test the performance of pre-trained FCNN models

In [None]:
# Loading module

from __future__ import print_function, division

import random
import numpy as np
import torch.nn as nn
import torch
import csv

from copy import deepcopy
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torch.utils.data.dataloader import default_collate
from torch.utils.data.sampler import SubsetRandomSampler
from ase.db import connect

In [None]:
# Data loader

def get_train_val_test_loader(dataset,
                              idx_validation=0,
                              idx_test=None,
                              collate_fn=default_collate,
                              batch_size=64,
                              num_workers=0,
                              pin_memory=False,
                              random_seed=None):
    
    indices = np.arange(len(dataset))[:-38]
    tmp = np.arange(len(dataset))[-38:] # Last 38 images are pure metals
    
    if random_seed:
        random.Random(random_seed).shuffle(indices)
    else:
        random.shuffle(indices)
    
    kfold = np.array_split(indices,10)
    
    kfold_val = deepcopy(kfold[idx_validation])
    
    try:
        kfold_test = deepcopy(kfold[idx_test])
    except:
        kfold_test = []
    
    kfold_train = deepcopy([kfold[i]
                            for i in range(0,10)
                            if i != idx_validation and i != idx_test])
    
    kfold_train = np.array([item for sl in kfold_train for item in sl])
    
    kfold_train = np.concatenate((kfold_train,tmp))
    
    if random_seed:
        random.Random(random_seed).shuffle(kfold_train)
    else:
        random.shuffle(kfold_train)
    
    val_sampler = SubsetRandomSampler(deepcopy(kfold_val))
    test_sampler = SubsetRandomSampler(deepcopy(kfold_test))
    train_sampler = SubsetRandomSampler(deepcopy(kfold_train))
    
    train_loader = DataLoader(dataset, batch_size=batch_size,
                              sampler=train_sampler,
                              num_workers=num_workers,
                              collate_fn=collate_fn,
                              pin_memory=pin_memory)
    
    val_loader = DataLoader(dataset, batch_size=batch_size,
                            sampler=val_sampler,
                            num_workers=num_workers,
                            collate_fn=collate_fn,
                            pin_memory=pin_memory)
    
    test_loader = DataLoader(dataset, batch_size=batch_size,
                             sampler=test_sampler,
                             num_workers=num_workers,
                             collate_fn=collate_fn,
                             pin_memory=pin_memory)
    
    return train_loader, val_loader, test_loader

In [None]:
# Network

class Net(nn.Module):
    def __init__(self, n_feature, n_h, h_fea_len, n_output):
        super(Net, self).__init__()

        self.fc_in = nn.Linear(n_feature, h_fea_len)
        self.fc_in_softplus = nn.Softplus()
        if n_h > 1:
            self.fcs = nn.ModuleList([nn.Linear(h_fea_len, h_fea_len)
                                      for _ in range(n_h-1)])
            self.softpluses = nn.ModuleList([nn.Softplus()
                                             for _ in range(n_h-1)])
        self.fc_out = nn.Linear(h_fea_len, n_output)

    def forward(self, x):
        crys_fea = self.fc_in(x)
        crys_fea = self.fc_in_softplus(crys_fea)
        
        if hasattr(self, 'fcs') and hasattr(self, 'softpluses'):
            for fc, softplus in zip(self.fcs, self.softpluses):
                crys_fea = softplus(fc(crys_fea))
        
        out = self.fc_out(crys_fea)
        out = torch.stack((out[:,0], torch.nn.functional.softplus(out[:,1]))).T
        return out

In [None]:
# Test the performance of pre-trained FCNN models
check_ans_train_MAE = np.zeros((10,10))
check_ans_train_MSE = np.zeros((10,10))
check_ans_val_MAE = np.zeros((10,10))
check_ans_val_MSE = np.zeros((10,10))
check_ans_test_MAE = np.zeros((10,10))
check_ans_test_MSE = np.zeros((10,10))

for idx_val in range(8,9):
    for idx_test in range(9,10):
        
        lr = 0.001050190043090246
        n_h = 8
        h_fea_len = 65
        
        random_seed = 1234    # reproducible
        batch_size = 12500
        num_workers = 0
        weight_decay = 0.0001
        
        collate_fn = default_collate
        best_val_loss = 1e10
        best_counter = 0
        
        db = connect('../Database.db')
        
        d_cen = np.array([r['data']['d_cen'] for r in db.select()])
        full_width = np.array([r['data']['full_width'] for r in db.select()])
        target = np.stack((d_cen,full_width)).T

        v2ds = np.array([r['data']['tabulated_v2ds'] for r in db.select()])
        v2dd = np.array([r['data']['tabulated_v2dd'] for r in db.select()])
        mulliken = np.array([r['data']['tabulated_mulliken'] for r in db.select()])
        d_cen_inf = np.array([r['data']['tabulated_d_cen_inf'] for r in db.select()])
        full_width_inf = np.array([r['data']['tabulated_full_width_inf'] for r in db.select()])
        fea = np.stack((np.sum((v2ds + v2dd), axis=1), mulliken, d_cen_inf, full_width_inf**2.0/12.0)).T
        
        idx = np.arange(len(target))
        
        idx_1 = idx[:-38]
        idx_2 = idx[-38:] # Last 38 images are pure metals
        
        num = int(len(idx_1)*1.00)
        np.random.seed(12345)
        np.random.shuffle(idx_1)
        
        idx_1 = idx_1[0:num]
        
        idx = np.sort(np.concatenate((idx_1,idx_2)))
        
        np.savetxt('index.txt', idx)
        
        target = np.array([target[i] for i in idx])
        
        fea = np.array([fea[i] for i in idx])
        
        target = Variable(torch.Tensor(target))
        fea = Variable(torch.Tensor(fea))
        
        name_images = np.arange(len(fea))
        
        dataset = [(torch.Tensor(fea[i]),
                    name_images[i])
                   for i in range(len(fea))]
        
        train_loader, val_loader, test_loader =\
            get_train_val_test_loader(dataset=dataset,
                                      collate_fn=collate_fn,
                                      batch_size=batch_size,
                                      idx_validation=idx_val,
                                      idx_test=idx_test,
                                      num_workers=num_workers,
                                      pin_memory=torch.cuda.is_available(),
                                      random_seed=random_seed)
        
        net = Net(n_feature=fea.shape[-1], n_h=n_h, h_fea_len=h_fea_len, n_output=2).cuda()
        optimizer = torch.optim.AdamW(net.parameters(), lr=lr, weight_decay=weight_decay)
        loss_func = nn.MSELoss()
        
        best_checkpoint = torch.load('model_best_train_idx_val_' + str(idx_val) + '_idx_test_' + str(idx_test) + '.pth.tar')
        net.load_state_dict(best_checkpoint['state_dict'])
        
        # switch to evaluate mode
        net.eval()
        
        for i, (input, batch_cif_ids) in enumerate(train_loader):
            prediction = net(input.cuda(non_blocking=True))
            train_loss_MAE = torch.mean(torch.abs(target[batch_cif_ids].cuda(non_blocking=True) - prediction))*prediction.shape[-1]
            train_loss_MSE = loss_func(prediction, target[batch_cif_ids].cuda(non_blocking=True))*prediction.shape[-1]
            print('Train loss MAE {loss:.4f}'.format(loss=train_loss_MAE))
            print('Train loss MSE {loss:.4f}'.format(loss=train_loss_MSE))
            
            ans = np.column_stack((batch_cif_ids, target[batch_cif_ids].detach().numpy(), prediction.detach().cpu().numpy()))
            np.savetxt('train_results_idx_val_' + str(idx_val) + '_idx_test_' + str(idx_test) + '.txt', ans)
            
            batch_cif_ids = batch_cif_ids.detach().numpy()
            target_out = target.detach().numpy()
            prediction = prediction.detach().cpu().numpy()
            
            with open('train_results_idx_val_' + str(idx_val) + '_idx_test_' + str(idx_test) + '.csv', 'w') as f:
                writer = csv.writer(f)
                for cif_id, target_out, pred in zip(batch_cif_ids, target_out[batch_cif_ids], prediction):
                    writer.writerow((cif_id, target_out[0], target_out[1], pred[0], pred[1]))
        
        for i, (input, batch_cif_ids) in enumerate(val_loader):
            prediction = net(input.cuda(non_blocking=True))
            val_loss_MAE = torch.mean(torch.abs(target[batch_cif_ids].cuda(non_blocking=True) - prediction))*prediction.shape[-1]
            val_loss_MSE = loss_func(prediction, target[batch_cif_ids].cuda(non_blocking=True))*prediction.shape[-1]
            print('Validation loss MAE {loss:.4f}'.format(loss=val_loss_MAE))
            print('Validation loss MSE {loss:.4f}'.format(loss=val_loss_MSE))
            
            ans = np.column_stack((batch_cif_ids, target[batch_cif_ids].detach().numpy(), prediction.detach().cpu().numpy()))
            np.savetxt('val_results_idx_val_' + str(idx_val) + '_idx_test_' + str(idx_test) + '.txt', ans)
            
            batch_cif_ids = batch_cif_ids.detach().numpy()
            target_out = target.detach().numpy()
            prediction = prediction.detach().cpu().numpy()
            
            with open('val_results_idx_val_' + str(idx_val) + '_idx_test_' + str(idx_test) + '.csv', 'w') as f:
                writer = csv.writer(f)
                for cif_id, target_out, pred in zip(batch_cif_ids, target_out[batch_cif_ids], prediction):
                    writer.writerow((cif_id, target_out[0], target_out[1], pred[0], pred[1]))
        
        for i, (input, batch_cif_ids) in enumerate(test_loader):
            prediction = net(input.cuda(non_blocking=True))
            test_loss_MAE = torch.mean(torch.abs(target[batch_cif_ids].cuda(non_blocking=True) - prediction))*prediction.shape[-1]
            test_loss_MSE = loss_func(prediction, target[batch_cif_ids].cuda(non_blocking=True))*prediction.shape[-1]
            print('Test loss MAE {loss:.4f}'.format(loss=test_loss_MAE))
            print('Test loss MSE {loss:.4f}'.format(loss=test_loss_MSE))
            
            ans = np.column_stack((batch_cif_ids, target[batch_cif_ids].detach().numpy(), prediction.detach().cpu().numpy()))
            np.savetxt('test_results_idx_val_' + str(idx_val) + '_idx_test_' + str(idx_test) + '.txt', ans)
            
            batch_cif_ids = batch_cif_ids.detach().numpy()
            target_out = target.detach().numpy()
            prediction = prediction.detach().cpu().numpy()
            
            with open('test_results_idx_val_' + str(idx_val) + '_idx_test_' + str(idx_test) + '.csv', 'w') as f:
                writer = csv.writer(f)
                for cif_id, target_out, pred in zip(batch_cif_ids, target_out[batch_cif_ids], prediction):
                    writer.writerow((cif_id, target_out[0], target_out[1], pred[0], pred[1]))
        
        check_ans_train_MAE[idx_test,idx_val] = train_loss_MAE
        check_ans_train_MSE[idx_test,idx_val] = train_loss_MSE
        check_ans_val_MAE[idx_test,idx_val] = val_loss_MAE
        check_ans_val_MSE[idx_test,idx_val] = val_loss_MSE
        check_ans_test_MAE[idx_test,idx_val] = test_loss_MAE
        check_ans_test_MSE[idx_test,idx_val] = test_loss_MSE

np.savetxt('check_ans_train_MAE.txt', check_ans_train_MAE)
np.savetxt('check_ans_train_MSE.txt', check_ans_train_MSE)
np.savetxt('check_ans_val_MAE.txt', check_ans_val_MAE)
np.savetxt('check_ans_val_MSE.txt', check_ans_val_MSE)
np.savetxt('check_ans_test_MAE.txt', check_ans_test_MAE)
np.savetxt('check_ans_test_MSE.txt', check_ans_test_MSE)