In [None]:
import os
import time
import random
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torch.nn.modules.utils import _pair, _quadruple
from sklearn.model_selection import train_test_split

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'{device} is available')

class CustomDataset(Dataset):
    """Stores data on CPU and lets DataLoader move to device.
    FC: (N, H, W, C_in)
    DTO: (N, H, W, 1)
    com: (N,) scalar compliance targets per sample
    """
    def __init__(self,FC,DTO,com):
        self.x_data = torch.cuda.FloatTensor(FC)
        self.x_data = self.x_data.permute(0,3,1,2) #

        self.y_data = torch.cuda.FloatTensor(DTO)
        self.y_data = self.y_data.permute(0,3,1,2)

        self.z_data = torch.cuda.FloatTensor(com)
        
        self.len = self.y_data.shape[0]
    def __len__(self):
        return self.len
    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index], self.z_data[index]

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()

        self.layer1 = nn.Sequential(
            nn.Conv2d(4,32,2),
            nn.BatchNorm2d(32),
            nn.ReLU(),

            nn.Conv2d(32,32,3,padding="same"),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            )
        self.layer2 = nn.Sequential(
            nn.MaxPool2d(2),
            nn.Conv2d(32,64,3,padding="same"),
            nn.BatchNorm2d(64),
            nn.ReLU(),

            nn.Conv2d(64,64,3,padding="same"),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            )
        self.layer3 = nn.Sequential(
            nn.MaxPool2d(2),
            nn.Conv2d(64,128,3,padding="same"),
            nn.BatchNorm2d(128),
            nn.ReLU(),

            nn.Conv2d(128,128,3,padding="same"),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            )
        self.layer4 = nn.Sequential(
            nn.MaxPool2d(2),
            nn.Conv2d(128,256,3,padding="same"),
            nn.BatchNorm2d(256),
            nn.ReLU(),

            nn.Conv2d(256,256,3,padding="same"),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Upsample(scale_factor=2),
            )
        self.layer5 = nn.Sequential(
            nn.Conv2d(384,128,3,padding="same"),
            nn.BatchNorm2d(128),
            nn.ReLU(),

            nn.Conv2d(128,128,3,padding="same"),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Upsample(scale_factor=2),
            )
        self.layer6 = nn.Sequential(
            nn.Conv2d(192,64,3,padding="same"),
            nn.BatchNorm2d(64),
            nn.ReLU(),

            nn.Conv2d(64,64,3,padding="same"),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Upsample(scale_factor=2),
            )
        self.layer7 = nn.Sequential(
            nn.Conv2d(96,32,3,padding="same"),
            nn.BatchNorm2d(32),
            nn.ReLU(),

            nn.Conv2d(32,32,3,padding="same"),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            )
        self.layer8 =  nn.Sequential(
             nn.Conv2d(32,1,3,padding="same"),
             nn.Sigmoid()
            )

    def forward(self, x):
        x1 = self.layer1(x)
        x2 = self.layer2(x1)
        x3 = self.layer3(x2)
        x = self.layer4(x3)
        x = torch.cat([x,x3],dim=1)
        x = self.layer5(x)
        x = torch.cat([x,x2],dim=1)
        x = self.layer6(x)
        x = torch.cat([x,x1],dim=1)
        x = self.layer7(x)
        x = self.layer8(x)
        return x

def plot_loss(trn_loss_list,val_loss_list,title='Total Loss',save='Pytorch Total Loss & test.png'):
    plt.figure(figsize=(20,20))
    plt.plot(trn_loss_list)
    plt.plot(val_loss_list)
    plt.title(title)
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend(['Training data', 'Validation data'], loc=0)
    plt.rc('font', size=40)
    plt.rc('axes', labelsize=50)
    plt.rc('xtick', labelsize=40)
    plt.rc('ytick', labelsize=40)
    plt.rc('legend', fontsize=30)
    plt.rc('figure', titlesize=200)
    plt.savefig(save)
    
def topNN(DTO_data_path, FC_data_path, com_data_path, save_path, num_epochs = 200, data_num = 2000,
          lamda1 = 5*10**(-15),lamda2 = 10**(-5),mini_batch_size=128):
    
    # data load
    DTO = np.load(DTO_data_path)[0:data_num,:,:,:]
    FC = np.load(FC_data_path)[0:data_num,:,:,:]
    com = np.load(com_data_path)[0:data_num]

    # data split 
    DTO_train, DTO_test, com_train, com_test, FC_train, FC_test = \
                train_test_split(DTO,com,FC, test_size=0.2, shuffle=True)
    DTO_train, DTO_val, com_train, com_val, FC_train, FC_val = \
                train_test_split(DTO_train,com_train,FC_train, test_size=0.2, shuffle=True)

    tr_dataset = CustomDataset(FC_train,DTO_train,com_train)
    val_dataset = CustomDataset(FC_val,DTO_val,com_val)
    test_dataset = CustomDataset(FC_test,DTO_test,com_test)
    
    tr_dataloader = DataLoader(tr_dataset, batch_size = mini_batch_size, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size = mini_batch_size, shuffle=True)
    test_dataloader = DataLoader(test_dataset, batch_size = 1, shuffle=True)

    cnn = CNN().to(device) # model 

    loss_fun = nn.BCELoss()
    optimizer = torch.optim.Adam(cnn.parameters(), 
                                 lr=0.001, betas=(0.9, 0.999))


    os.makedirs(save_path)
    os.chdir(save_path)
    
    f = open('torch_train_results.txt', 'w')
    print('\nlambda1: ',lamda1, file=f)
    print('\nlambda2: ',lamda2, file=f)
    print('\nbatch_size: ', mini_batch_size, file=f)

    trn_loss_list = []
    train_loss_list = []
    train_comploss_list = []
    train_conloss_list = []
    
    val_loss_list = []
    vali_loss_list = []
    vali_comploss_list = []
    vali_conloss_list = []
    
    trigger_times = 0
    patience = 10
    start_time = time.time()
    print("Start")

    for epoch in range(num_epochs):
        trn_loss = 0.0
        cnn.train()

        for i, samples in enumerate(tr_dataloader):
            x_train, y_train, z_train = samples
            y_train.requires_grad=True
            optimizer.zero_grad()
            output = cnn.forward(x_train)
            
            loss = loss_fun(output,y_train)
                
            loss.backward()
            optimizer.step()

        with torch.no_grad():
            val_loss = 0.0
            for j, val in enumerate(val_dataloader):
                x_val, y_val, z_val = val
                val_output = cnn(x_val)

                v_loss = loss_fun(val_output,y_val)
                
                val_loss += v_loss

            print(("epoch: {}/{} \ntrain: | loss: {:.4f} | \nvalid: | loss: {:.4f} ") .format(epoch+1,num_epochs,loss.item(),v_loss.item()))
            trn_loss_list.append(loss.item())
            val_loss_list.append(v_loss.item())
            
    
        # Early Stopping criterion patence
        if epoch>1:
            if val_loss_list[epoch]>val_loss_list[epoch-1]:
                trigger_times +=1
                if trigger_times >= patience:
                    print('Early stopping! {}'.format(epoch+1))
                    break
            else:
                trigger_times = 0
        torch.save(cnn, 'model'+str(epoch)+'.pt') 
    print("End")
    print("Time: {:.4f}sec".format((time.time() - start_time)))

    print('\nTotal iteration: {}/{}'.format(epoch+1, num_epochs), file=f)
    print('\nTraining time: {:.4f}sec'.format((time.time() - start_time)), file=f)
    
    plot_loss(trn_loss_list,val_loss_list,'Total Loss','Pytorch Total Loss & test1.png')
    plot_loss(train_loss_list,vali_loss_list,'BCE Loss','Pytorch BCE Loss & test.png')
    plot_loss(train_comploss_list,vali_comploss_list,'comp Loss','Pytorch comp Loss & test.png')
    plot_loss(train_conloss_list,vali_conloss_list,'con Loss','Pytorch con Loss & test.png')
    plot_loss(trn_loss_list,val_loss_list,'Total Loss','Pytorch Total Loss & test.png')

    test_loss_fun = nn.L1Loss() 

    cnn.eval()
    start_time = time.time()
    print("Start")
    
    test_loss = 0.0
    test_comploss = 0.0
    test_conloss = 0.0
    with torch.no_grad():
        for i, samples in enumerate(test_dataloader):
            x_test, y_test, z_test = samples
            test_output = cnn(x_test)
            test_loss += test_loss_fun(test_output,y_test)

            if i == 0: 
                test_out = test_output
                x_test_out = x_test
                y_test_out = y_test
                z_test_out = z_test
            else:
                test_out = torch.cat([test_out,test_output], dim=0)
                x_test_out = torch.cat([x_test_out,x_test], dim=0)
                y_test_out = torch.cat([y_test_out,y_test], dim=0)
                z_test_out = torch.cat([z_test_out,z_test], dim=0)

    print("End")
    print("Time: {:.4f}sec".format((time.time() - start_time)))
    print("test loss: {:.4f} ".format(test_loss/(i+1)))
    print("\noriginal test loss: {:.4f} ".format(test_loss/(i+1)), file=f)
    
    accuracy_test = abs(test_out - y_test_out)
    accuracy_test = accuracy_test.cpu().numpy()
    acc = len(accuracy_test[np.where(accuracy_test<0.5)])/test_out.cpu().numpy().size*100
    print(acc, file=f)

    save_test = test_out
    save_test = save_test.permute(0,2,3,1)
    save_test = save_test.detach().cpu().numpy()

    save_x_test = x_test_out
    save_x_test = save_x_test.permute(0,2,3,1)
    save_x_test = save_x_test.detach().cpu().numpy()

    save_y_test = y_test_out
    save_y_test = save_y_test.permute(0,2,3,1)
    save_y_test = save_y_test.detach().cpu().numpy()

    save_z_test = z_test_out
    save_z_test = save_z_test.detach().cpu().numpy()

    np.save('save_new_test.npy',save_test)
    np.save('save_x_new_test.npy',save_x_test)
    np.save('save_y_new_test.npy',save_y_test)
    np.save('save_z_new_test.npy',save_z_test)

    torch.save(cnn, 'model.pt')  # save the model 
    torch.save(cnn.state_dict(), 'model_state_dict.pt')  # save the state_dict of the model
    torch.save({
        'model': cnn.state_dict(),
        'optimizer': optimizer.state_dict()
    }, 'all.tar')  
    f.close()
    return test_loss/(i+1), test_comploss/(i+1), acc

In [None]:
test_loss = []
test_comploss = []
test_acc = []

epoch = 200
data_num = 2000
batch_size = 32

DTO_data_path = 'newDTO_3232_cantilver_beam_20k.npy'
com_data_path = 'newcomp_3232_cantilver_beam_20k.npy'
FC_data_path = 'newFCv_3232_cantilver_beam_20k.npy'

save_path = 'C:\\Users\\user\\'

name = "2000data_32x32_original_additional_test"
a = topNN(DTO_data_path, FC_data_path, com_data_path,
          save_path+name,
          epoch,data_num, 10**(-5), 10**(-4), batch_size)

test_loss.append(a[0])
test_comploss.append(a[1])
test_acc.append(a[2])