In [None]:
import argparse
import os
import numpy as np
import math

import torchvision.transforms as transforms
from torchvision.utils import save_image

from torch.utils.data import TensorDataset, DataLoader
from torchvision import datasets
from torch.autograd import Variable

import torch.nn as nn
import torch.nn.functional as F
import torch
from matplotlib import pyplot as plt
import time

import pylab as py
import seaborn as sns
import scipy.io as spio

In [None]:
# set your device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
#Architecture 1
class PGNN(nn.Module):
    def __init__(self):
        super(PGNN, self).__init__()

        def block(in_feat, out_feat, normalize=True):
            layers = [nn.Linear(in_feat, out_feat)]
            if normalize:
                layers.append(nn.BatchNorm1d(out_feat, 0.8))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            layers.append(nn.Dropout(0.2))
            return layers
        
        self.model = nn.Sequential(
            *block(data_dim, 40, normalize=False),
            *block(40, 40),
            *block(40, 40),
            *block(40, 40),
            *block(40, 40),
            nn.Linear(40, out_dim)
        )

    def forward(self, data):
        out = self.model(data)
        return out

In [None]:
def physics_loss(x, y, stat_x = [0,1], stat_y = [0,1]):  #stat [0] = mean, stat [1] = std
    stat_x = torch.Tensor(stat_x).to(device)
    stat_y = torch.Tensor(stat_y).to(device)
    x = x * stat_x[1] + stat_x[0]
    y = y * stat_y[1] + stat_y[0]
    energy_loss =   - 0.5*x[:,4:5]*torch.pow(x[:,0:1],2) \
                    - 0.5*x[:,5:6]*torch.pow(x[:,1:2],2) \
                    + 0.5*x[:,4:5]*torch.pow(y[:,0:1],2) \
                    + 0.5*x[:,5:6]*torch.pow(y[:,1:2],2)
                  
    
    momentum_loss =  - x[:,4:5] * x[:,0:1] \
                     - x[:,5:6] * x[:,1:2] \
                     + x[:,4:5] * y[:,0:1] \
                     + x[:,5:6] * y[:,1:2] 
    f = torch.cat([energy_loss, momentum_loss], dim = 1)
    return f

In [None]:
def find_lambda(adaptive_lambda, phy_loss, loss, net, beta = 0.9):
    phyloss_layer = []
    loss_layer = []
    with torch.no_grad():
        for layer in net.model.children():
            if isinstance(layer, nn.Linear):
                phyloss_layer.append(torch.abs(torch.autograd.grad(phy_loss, layer.weight, retain_graph = True)[0]).max())
                loss_layer.append(torch.abs(torch.autograd.grad(loss, layer.weight, retain_graph = True)[0]).mean())
    max_grad_res = torch.stack(phyloss_layer).max()     
    mean_grad_loss = torch.stack(loss_layer).mean()
    lambda_new = max_grad_res / mean_grad_loss
    adaptive_lambda = (1 - beta) * adaptive_lambda + beta * lambda_new
    return adaptive_lambda

In [None]:
def uncertainity_estimate(x, model, num_samples, stat = [0,1]):
    outputs = np.stack([model(x).cpu().detach().numpy()*stat[1]+stat[0] for i in range(num_samples)], axis = 0) # n번 inference, output.shape = [20, N]
    y_mean = outputs.mean(axis=0)
    y_variance = outputs.var(axis=0)
    y_std = np.sqrt(y_variance)
    return y_mean, y_std

In [None]:


tr_frac = 0.8
##load data
data = np.loadtxt( '../../datasets/collision_shuffled.txt' )
labels = data[:,-2:]
x = data[:,:-2]

#training and test splits
n_obs = int(tr_frac * x.shape[0])
train_x , train_y = x[:n_obs,:] , labels[:n_obs,:] 
test_x , test_y = x[n_obs:,:] , labels[n_obs:,:] 

# Normalization 

# train_x:
mean_x = train_x.mean(axis=0)
std_x = train_x.std(axis=0)

train_x = (train_x-mean_x)/std_x
test_x = (test_x-mean_x)/std_x

# train_y:
mean_y = train_y.mean(axis=0)
std_y = train_y.std(axis=0)

train_y = (train_y-mean_y)/std_y
test_y = (test_y-mean_y)/std_y

#defining noise dimensions
#         noise_dim = 2
data_dim = train_x.shape[-1]
out_dim = labels.shape[-1]

#defining batch size and parameters
batch_size = 64 # mini-batch size
num_workers = 4 # how many parallel workers are we gonna use for reading data
shuffle = True # shuffle the dataset


 #training and testing dataset creation
train_x = torch.FloatTensor(train_x).to(device)
test_x = torch.FloatTensor(test_x).to(device)
train_y = torch.FloatTensor(train_y).to(device)
test_y = torch.FloatTensor(test_y).to(device)

train_loader = DataLoader(list(zip(train_x,train_y)), batch_size=batch_size, shuffle=shuffle)

x_f = torch.cat([train_x, test_x], dim = 0)
#         print(x_f.shape)
net = PGNN().to(device)

net_optimizer = torch.optim.Adam(net.parameters(), lr=1e-3, betas = (0.5, 0.999))

num_epochs = 5000
lambda_mse = 1
#         lambda_phy = 0.001
# Adv_loss = np.zeros(num_epochs)

MSE_loss = np.zeros(num_epochs)
PHY_loss = np.zeros(num_epochs)
TOT_loss = np.zeros(num_epochs)


train_pred = np.zeros((num_epochs,train_y.shape[0]))
test_pred = np.zeros((num_epochs,test_y.shape[0]))

for epoch in range(num_epochs):
    epoch_loss = 0
    for i, (x, y) in enumerate(train_loader):

        net_optimizer.zero_grad()
        y_pred = net.forward(x)

        y_f = net.forward(x_f)

        phy_loss = torch.mean(torch.abs(physics_loss(x_f, y_f, [mean_x, std_x], [mean_y, std_y])))
        mse_loss = torch.nn.functional.mse_loss(y_pred, y)

        lambda_mse = find_lambda(lambda_mse, phy_loss, lambda_mse * mse_loss, net)

        loss = lambda_mse * mse_loss + phy_loss
        loss.backward()
        net_optimizer.step()

        MSE_loss[epoch] += mse_loss.detach().cpu().numpy()
        PHY_loss[epoch] += phy_loss.detach().cpu().numpy()
        TOT_loss[epoch] += loss.detach().cpu().numpy()

    MSE_loss[epoch] = MSE_loss[epoch] / len(train_loader)
    PHY_loss[epoch] = PHY_loss[epoch] / len(train_loader)
    TOT_loss[epoch] = TOT_loss[epoch] / len(train_loader)

    if (epoch % 100 == 0):
        print(
            "[Epoch %d/%d] [MSE loss: %f] [Phy loss: %f] [Total loss: %f] [Lambda mse: %f]"
            % (epoch, num_epochs, MSE_loss[epoch], PHY_loss[epoch], TOT_loss[epoch], lambda_mse )
        )



# ###############################################################################################
# ######################################## LOSS PLOTS ###########################################

plt.figure(figsize=(10,10))
plt.plot(MSE_loss)
plt.plot(PHY_loss)
plt.plot(TOT_loss)
plt.legend(['MSE_loss','PHY_loss','Total_loss'])
plt.show()

# ###############################################################################################


# ###############################################################################################
# ################################## TEST PREDICTIONS ###########################################
n_samples = 10000
test_mean_y, test_std_y = uncertainity_estimate(test_x, net, n_samples, [mean_y, std_y])

test_mean_y_0 = test_mean_y[:,0]
test_mean_y_1 = test_mean_y[:,1]

test_std_y_0 = test_std_y[:,0]
test_std_y_1 = test_std_y[:,1]

# print(test_y.shape)
test_y_true_0 = test_y[:, 0].detach().cpu().numpy()*std_y[0] + mean_y[0]
test_y_true_1 = test_y[:, 1].detach().cpu().numpy()*std_y[1] + mean_y[1]

x = np.linspace(0, test_y.shape[0]-1, test_y.shape[0])


plt.figure(figsize=(20,7))
plt.plot(x, test_mean_y_0 , label = 'test predictions', alpha= 0.9, color='b', marker='+')
plt.fill_between(x, test_mean_y_0-2*test_std_y_0, test_mean_y_0+2*test_std_y_0, alpha=0.2, color='b')
# plt.errorbar(x,test_mean_y_0,test_std_y_0)
plt.plot(x, test_y_true_0, label = 'ground truth', alpha=1, color='r', marker='*')
py.legend(loc='upper right')
plt.show()

plt.figure(figsize=(20,7))
plt.plot(x, test_mean_y_1 , label = 'test predictions', alpha= 0.9, color='b', marker='+')
plt.fill_between(x, test_mean_y_1-2*test_std_y_1, test_mean_y_1+2*test_std_y_1, alpha=0.2, color='b')
# plt.errorbar(x,mean_y,std_y)
plt.plot(x, test_y_true_1, label = 'ground truth', alpha=1, color='r', marker='*')
py.legend(loc='upper right')
plt.show()

test_x = test_x.detach().cpu().numpy()
test_x = test_x * std_x + mean_x

test_rmse0 = (((test_mean_y_0.flatten() - test_y_true_0.flatten())**2).mean())**0.5
test_rmse1 = (((test_mean_y_1.flatten() - test_y_true_1.flatten())**2).mean())**0.5
# test_rmse = (((np.stack(test_mean_y_0,test_mean_y_1) - test_y_true_1.flatten())**2).mean())**0.5

energy_loss = np.mean(np.absolute(0.5*test_x[:,4]*np.power(test_x[:,0],2)+0.5*test_x[:,5]*np.power(test_x[:,1],2)
                                  -0.5*test_x[:,4]*np.power(test_mean_y_0,2)-0.5*test_x[:,5]*np.power(test_mean_y_1,2)))

momentum_loss = np.mean(np.absolute(test_x[:,4]*test_x[:,0] + test_x[:,5]*test_x[:,1] 
                                     - test_x[:,4]*test_mean_y_0 - test_x[:,5]*test_mean_y_1))
test_phy_loss = (energy_loss + momentum_loss)

test_true = np.stack((test_y_true_0, test_y_true_1), axis =-1).flatten()
test_mean_y = np.stack((test_mean_y_0, test_mean_y_1), axis =-1).flatten()
test_rmse = ((( test_mean_y - test_true)**2).mean())**0.5

print("test RMSE = %f" %(test_rmse))
print("test RMSE va = %f" %(test_rmse0))
print("test RMSE vb = %f" %(test_rmse1))
print("test Physical Inconsistency = %f" %(test_phy_loss))

energy_loss_true = np.mean(np.absolute(0.5*test_x[:,4]*np.power(test_x[:,0],2)
                                       +0.5*test_x[:,5]*np.power(test_x[:,1],2)-0.5*test_x[:,4]*np.power(test_y_true_0,2)
                                       -0.5*test_x[:,5]*np.power(test_y_true_1,2)))

momentum_loss_true = np.mean(np.absolute(test_x[:,4]*test_x[:,0] + test_x[:,5]*test_x[:,1] 
                                     - test_x[:,4]*test_y_true_0 - test_x[:,5]*test_y_true_1))
test_phy_loss_true = (energy_loss_true + momentum_loss_true)
print("test Physical Inconsistency (TRUE)= %f" %(test_phy_loss_true))


