In [None]:
import os
import torch 
import numpy as np

import warnings

warnings.filterwarnings("ignore")


import scipy.interpolate
import scipy.io
import matplotlib.pyplot as plt

from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import sys
from sklearn.metrics import mean_squared_error

import csv
from torch.utils.data import DataLoader, TensorDataset, RandomSampler
from math import exp, sqrt, pi
import time

from google.colab import drive
drive.mount('/content/gdrive')
data = scipy.io.loadmat(r"/content/gdrive/MyDrive/Research/RaissiData/Stenosis2D.mat") 


In [None]:
h_n = 128
input_n = 12

class Swish(nn.Module):
    def __init__(self, inplace=True):
        super(Swish, self).__init__()
        self.inplace = inplace

    def forward(self, x, ):
      x = x.mul_(torch.sigmoid(x))

      return x


class PINet(nn.Module):

    def __init__(self):
        super(PINet, self).__init__()
        self.main = nn.Sequential(
            
                    nn.Linear(input_n,h_n), 
                    Swish(),
                    nn.Linear(h_n,h_n),
                    Swish(),
                    nn.Linear(h_n,h_n),
                    Swish(),
                    nn.Linear(h_n,h_n),
                    Swish(),
                    nn.Linear(h_n,h_n),
                    Swish(),


                    nn.Linear(h_n,1),
                    
                )

    def forward(self,x):	
        output = self.main(x)
        return output



In [None]:
def Loss_data(xd,yd,ud,vd,pd,cd,u_target,v_target,p_target,c_target):

    net_in_u = torch.cat((ud, xd, yd), 1)
    net_in_v = torch.cat((vd, xd, yd), 1)
    net_in_p = torch.cat((pd, xd, yd), 1)
    net_in_c = torch.cat((cd, xd, yd), 1)


    out1_u = net2_u(net_in_u)
    out1_v = net2_v(net_in_v)
    out1_p = net2_p(net_in_p)
    out1_c = net2_c(net_in_c)



    
    out1_u = out1_u.view(len(out1_u), -1)
    out1_v = out1_v.view(len(out1_v), -1)
    out1_p = out1_p.view(len(out1_p), -1)
    out1_c = out1_c.view(len(out1_c), -1)

    loss_fun = nn.MSELoss()

    loss_d =  loss_fun(out1_u, u_target) + loss_fun(out1_v, v_target) + loss_fun(out1_p, p_target) + loss_fun(out1_c, c_target)
            
    return loss_d

def criterion(u_in, v_in, p_in, c_in, x,  y):

    Re = 5
    Pec = 15
    
    x = torch.tensor(x).to(device)
    y = torch.tensor(y).to(device) 
            
    x.requires_grad = True
    y.requires_grad = True
      
    net_in_u = torch.cat((u_in, x, y),1)
    net_in_v = torch.cat((v_in, x, y),1)
    net_in_p = torch.cat((p_in, x, y),1)
    net_in_c = torch.cat((c_in, x, y),1)

    u = net2_u(net_in_u)
    u = u.view(len(u),-1)

    v = net2_v(net_in_v)
    v = v.view(len(v),-1)

    P = net2_p(net_in_p)
    P = P.view(len(P),-1)

    c = net2_c(net_in_c)
    c = c.view(len(c),-1)
    
    
    u_x = torch.autograd.grad(u,x,grad_outputs=torch.ones_like(x),create_graph = True,only_inputs=True)[0]
    u_xx = torch.autograd.grad(u_x,x,grad_outputs=torch.ones_like(x),create_graph = True,only_inputs=True)[0]

    u_y = torch.autograd.grad(u,y,grad_outputs=torch.ones_like(y),create_graph = True,only_inputs=True)[0]
    u_yy = torch.autograd.grad(u_y,y,grad_outputs=torch.ones_like(y),create_graph = True,only_inputs=True)[0]

    v_x = torch.autograd.grad(v,x,grad_outputs=torch.ones_like(x),create_graph = True,only_inputs=True)[0]
    v_xx = torch.autograd.grad(v_x,x,grad_outputs=torch.ones_like(x),create_graph = True,only_inputs=True)[0]

    v_y = torch.autograd.grad(v,y,grad_outputs=torch.ones_like(y),create_graph = True,only_inputs=True)[0]
    v_yy = torch.autograd.grad(v_y,y,grad_outputs=torch.ones_like(y),create_graph = True,only_inputs=True)[0]

    c_x = torch.autograd.grad(c,x,grad_outputs=torch.ones_like(x),create_graph = True,only_inputs=True)[0]
    c_xx = torch.autograd.grad(c_x,x,grad_outputs=torch.ones_like(x),create_graph = True,only_inputs=True)[0]

    c_y = torch.autograd.grad(c,y,grad_outputs=torch.ones_like(y),create_graph = True,only_inputs=True)[0]
    c_yy = torch.autograd.grad(c_y,y,grad_outputs=torch.ones_like(y),create_graph = True,only_inputs=True)[0]
    
    
    P_x = torch.autograd.grad(P,x,grad_outputs=torch.ones_like(x),create_graph = True,only_inputs=True)[0]
    P_y = torch.autograd.grad(P,y,grad_outputs=torch.ones_like(y),create_graph = True,only_inputs=True)[0]
                    
    loss_1 = u*u_x + v*u_y - (Re^-1)*(u_xx + u_yy) + P_x
    loss_2 = u*v_x + v*v_y - (Re^-1)*(v_xx+  v_yy) + P_y
    loss_3 = u*c_x + v*c_y - (Pec^-1)*(c_xx+  c_yy) 
    loss_4 = u_x  + v_y 
    
    loss_f1 = nn.MSELoss()
    
    loss = loss_f1(loss_1,torch.zeros_like(loss_1))+  loss_f1(loss_2,torch.zeros_like(loss_2))+  loss_f1(loss_3,torch.zeros_like(loss_3))
    
    
    return loss




In [None]:
def train_epoch(u_in, v_in, p_in, c_in, x_in, y_in,
                u_data, v_data, p_data, c_data, x_data, y_data,
                u_target, v_target, p_target, c_target):
      
    x  = torch.Tensor(x_in).to(device)
    y  = torch.Tensor(y_in).to(device)

    u_in = torch.Tensor(u_in).to(device).squeeze()
    v_in = torch.Tensor(v_in).to(device).squeeze()
    p_in = torch.Tensor(p_in).to(device).squeeze()
    c_in = torch.Tensor(c_in).to(device).squeeze()


    x_data = torch.Tensor(x_data).to(device)
    y_data = torch.Tensor(y_data).to(device)

    u_data = torch.Tensor(u_data).to(device).squeeze()
    v_data = torch.Tensor(v_data).to(device).squeeze()
    p_data = torch.Tensor(p_data).to(device).squeeze()
    c_data = torch.Tensor(c_data).to(device).squeeze()
    
    u_target = torch.Tensor(u_target).to(device)
    v_target = torch.Tensor(v_target).to(device)
    p_target = torch.Tensor(p_target).to(device)
    c_target = torch.Tensor(c_target).to(device)

        
    loss_dict = []
              
    net2_u.zero_grad()
    net2_v.zero_grad()
    net2_p.zero_grad()
    net2_c.zero_grad()

    loss_eqn  = criterion(u_in, v_in, p_in, c_in, x, y)
    loss_data = Loss_data(x_data, y_data, u_data, v_data, p_data, c_data, u_target, v_target, p_target, c_target)

    loss = loss_eqn + loss_data 
    
    loss_dict.append(loss.item())

    loss.backward()

    optimizer_u.step() 
    optimizer_v.step()
    optimizer_p.step()  
    optimizer_c.step() 

    net_in_u = torch.cat((u_in,x.requires_grad_(),y.requires_grad_()),1)
    net_in_v = torch.cat((v_in,x.requires_grad_(),y.requires_grad_()),1)
    net_in_p = torch.cat((p_in,x.requires_grad_(),y.requires_grad_()),1)


    output_u = net2_u(net_in_u) 
    output_v = net2_v(net_in_v)  
    output_p = net2_p(net_in_p)

    
    # output_u = output_u.cpu().data.numpy() 
    # output_v = output_v.cpu().data.numpy()
    # output_p = output_p.cpu().data.numpy()
    
    return output_u.clone().detach().cpu().data.numpy() , output_v.clone().detach().cpu().data.numpy() , output_p.clone().detach().cpu().data.numpy() , loss_dict

In [None]:
def val_epoch(u_in, v_in, p_in, c_in, x_in, y_in, 
              u_data, v_data, p_data, c_data, x_data, y_data,
              u_target, v_target, p_target, c_target): 
    
    u_in = torch.Tensor(u_in).to(device).squeeze()
    v_in = torch.Tensor(v_in).to(device).squeeze()
    p_in = torch.Tensor(p_in).to(device).squeeze()
    c_in = torch.Tensor(c_in).to(device).squeeze()

    x  = torch.Tensor(x_in).to(device)
    y  = torch.Tensor(y_in).to(device)

    u_data = torch.Tensor(u_data).to(device).squeeze()
    v_data = torch.Tensor(v_data).to(device).squeeze()
    p_data = torch.Tensor(p_data).to(device).squeeze()
    c_data = torch.Tensor(c_data).to(device).squeeze()

    
    x_data = torch.Tensor(x_data).to(device)
    y_data = torch.Tensor(y_data).to(device)

    u_target = torch.Tensor(u_target).to(device)
    v_target = torch.Tensor(v_target).to(device)
    p_target = torch.Tensor(p_target).to(device)  
    c_target = torch.Tensor(c_target).to(device)  


    loss_dict = []      
    
    loss_eqn  = criterion(u_in, v_in, p_in, c_in, x, y)
    loss_data = Loss_data(x_data, y_data, u_data, v_data, p_data, c_data, u_target, v_target, p_target, c_target)

    loss = loss_eqn + loss_data
    
    loss_dict.append(loss.item())

    net_in_u = torch.cat((u_in,x.requires_grad_(),y.requires_grad_()),1)
    net_in_v = torch.cat((v_in,x.requires_grad_(),y.requires_grad_()),1)
    net_in_p = torch.cat((p_in,x.requires_grad_(),y.requires_grad_()),1)


    output_u = net2_u(net_in_u) 
    output_v = net2_v(net_in_v)  
    output_p = net2_p(net_in_p)

    
    

    return output_u.clone().detach().cpu().data.numpy() , output_v.clone().detach().cpu().data.numpy() , output_p.clone().detach().cpu().data.numpy() , loss_dict

In [None]:
learning_rate = 5e-4 
step_epoch = 1200
decay_rate = 0.1 
Diff = 0.00125 
rho = 1.
lamda_rate = 1e-6

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

net2_u = PINet().to(device)
net2_v = PINet().to(device)
net2_p = PINet().to(device)
net2_c = PINet().to(device)



def init_normal(m):
    if type(m) == nn.Linear:
        nn.init.kaiming_normal_(m.weight)

net2_u.apply(init_normal)
net2_v.apply(init_normal)
net2_p.apply(init_normal)
net2_c.apply(init_normal)


optimizer_u = optim.Adam(net2_u.parameters(), lr=learning_rate, betas = (0.9,0.99),eps = 10**-15)
optimizer_v = optim.Adam(net2_v.parameters(), lr=learning_rate, betas = (0.9,0.99),eps = 10**-15)
optimizer_p = optim.Adam(net2_p.parameters(), lr=learning_rate, betas = (0.9,0.99),eps = 10**-15)
optimizer_c = optim.Adam(net2_c.parameters(), lr=learning_rate, betas = (0.9,0.99),eps = 10**-15)


scheduler_u = torch.optim.lr_scheduler.StepLR(optimizer_u, step_size=step_epoch, gamma=decay_rate)
scheduler_v = torch.optim.lr_scheduler.StepLR(optimizer_v, step_size=step_epoch, gamma=decay_rate)
scheduler_p = torch.optim.lr_scheduler.StepLR(optimizer_p, step_size=step_epoch, gamma=decay_rate)
scheduler_c = torch.optim.lr_scheduler.StepLR(optimizer_c, step_size=step_epoch, gamma=decay_rate)



In [None]:
def normalize_channel(ch):
  mean = np.mean(ch[:])
  std  = np.std(ch[:]) 

  ch[:] -= mean
  ch[:] /= std
  return ch


In [None]:
epochs  = 100

x = data['x_star']
y = data['y_star']

u = data['U_star']
v = 400*data['V_star']
p = data['P_star']
c = data['C_star']


Nt = u.shape[1]

idx = np.where(x == x.max())[0][-1]

x = x[:idx]
y = y[:idx]

u = u[:idx,:]
v = v[:idx,:]
p = p[:idx,:]
c = c[:idx,:]

u_min = u.min()
v_min = v.min()
p_min = p.min()

u_max = u.max()
v_max = v.max()
p_max = p.max()

N = x.shape[0]

train_index = 30
val_index = 35
test_index = 70

train_range = np.arange(0, train_index)
val_range  = np.arange(train_index, val_index)
test_range = np.arange(val_index, test_index)

train_losses = []
val_losses = []
min_val_loss = 1e10

loss_fn = nn.MSELoss()


for epoch in range(epochs):

  train_epoch_loss = []
  start = time.time()
  
  scheduler_u.step()
  scheduler_v.step()
  scheduler_p.step()
  scheduler_c.step()


  for i in train_range:
      # Input data for previous snapshots

      u_in = u[:,i:i+10]
      v_in = v[:,i:i+10]
      p_in = p[:,i:i+10]
      c_in = c[:,i:i+10]


      u_target = u[:,i+15:i+16]
      v_target = v[:,i+15:i+16]
      p_target = p[:,i+15:i+16]
      c_target = c[:,i+15:i+16]

      # Data from final snapshot

      rand_idx = np.random.choice(N, 10000)

      x_data_in = x[rand_idx,:]
      y_data_in = y[rand_idx,:]

      u_data_in = u_in[rand_idx,:]
      v_data_in = v_in[rand_idx,:]
      p_data_in = p_in[rand_idx,:]
      c_data_in = c_in[rand_idx,:]


      u_data_target = u_target[rand_idx,:]
      v_data_target = v_target[rand_idx,:]
      p_data_target = p_target[rand_idx,:]
      c_data_target = c_target[rand_idx,:]



      u_star, v_star, p_star, _  = train_epoch(u_in, v_in, p_in, c_in, x, y,
                                               u_data_in, v_data_in, p_data_in, c_data_in, x_data_in, y_data_in,
                                                u_data_target,v_data_target,p_data_target, c_data_target)
      
      
      mse_loss = mean_squared_error(u_target, u_star) + mean_squared_error(v_target, v_star) + mean_squared_error(p_target, p_star)
      train_epoch_loss.append(np.mean(mse_loss))
  
   
  
  end = time.time()
  print(f'Epoch: {epoch}, Loss: {np.mean(train_epoch_loss)}, Time: {end-start}')
  train_losses.append(np.mean(train_epoch_loss))
    
  if epoch % 5 == 0:
      
      val_epoch_mse_loss = []
      val_epoch_percentage_loss = []
      for i in val_range:

        u_in = u[:,i:i+10]
        v_in = v[:,i:i+10]
        p_in = p[:,i:i+10]
        c_in = c[:,i:i+10]

        u_target = u[:,i+15:i+16]
        v_target = v[:,i+15:i+16]
        p_target = p[:,i+15:i+16]
        c_target = c[:,i+15:i+16]


        rand_idx = np.random.choice(N, 10000)

        x_data_in = x[rand_idx,:]
        y_data_in = y[rand_idx,:]

        u_data_in = u_in[rand_idx,:]
        v_data_in = v_in[rand_idx,:]
        p_data_in = p_in[rand_idx,:]
        c_data_in = c_in[rand_idx,:]


        u_data_target = u_target[rand_idx,:]
        v_data_target = v_target[rand_idx,:]
        p_data_target = p_target[rand_idx,:]
        c_data_target = c_target[rand_idx,:]

        u_star, v_star, p_star, _  = val_epoch(u_in, v_in, p_in, c_in, x, y, 
                                                  u_data_in, v_data_in, p_data_in, c_data_in, x_data_in, y_data_in,
                                                  u_data_target, v_data_target, p_data_target, c_data_target) 
        
        
        mse_loss = mean_squared_error(u_target, u_star) + mean_squared_error(v_target, v_star) + mean_squared_error(p_target, p_star)
        print(np.mean(u_star), np.mean(u_target), np.mean(v_star), np.mean(v_target), np.mean(p_star), np.mean(p_target))        

        val_epoch_mse_loss.append(np.mean(mse_loss))

      plt.figure()
      plt.subplot(2, 1, 1)
      plt.scatter(x, y, c = u_target , vmin=u_min, vmax=u_max, cmap = 'rainbow')
      plt.title('Real results, u')
      plt.colorbar()
      plt.show()

      plt.figure()
      plt.subplot(2, 1, 1)
      plt.scatter(x, y, c = v_target ,vmin=v_min, vmax=v_max, cmap = 'rainbow')
      plt.title('Real results, v')
      plt.colorbar()
      plt.show()

      plt.figure()
      plt.subplot(2, 1, 1)
      plt.scatter(x, y, c = p_target ,vmin=p_min, vmax=p_max, cmap = 'rainbow')
      plt.title('Real results, p')
      plt.colorbar()
      plt.show()

      
      plt.figure()
      plt.subplot(2, 1, 1)
      plt.scatter(x, y, c = u_star ,vmin=u_min, vmax=u_max, cmap = 'rainbow')
      plt.title('NN results, u')
      plt.colorbar()
      plt.show()

      plt.figure()
      plt.subplot(2, 1, 1)
      plt.scatter(x, y, c = v_star , vmin=v_min, vmax=v_max, cmap = 'rainbow')
      plt.title('NN results, v')
      plt.colorbar()
      plt.show()
      
      plt.figure()
      plt.subplot(2, 1, 1)
      plt.scatter(x, y, c = p_star , vmin=p_min, vmax=p_max,cmap = 'rainbow')
      plt.title('NN results, p')
      plt.colorbar()
      plt.show()
      
      plt.figure()
      plt.subplot(2, 1, 1)
      plt.scatter(x, y, c =abs(u_star-u_target),vmin=u_min, vmax=u_max, cmap = 'rainbow')
      plt.title('Absolute error, u')
      plt.colorbar()
      plt.show()

      plt.figure()
      plt.subplot(2, 1, 1)
      plt.scatter(x, y, c = abs(v_star-v_target) , vmin=v_min, vmax=v_max,cmap = 'rainbow')
      plt.title('Absolute error, v')
      plt.colorbar()
      plt.show()

      plt.figure()
      plt.subplot(2, 1, 1)
      plt.scatter(x, y, c = abs(p_star-p_target) , vmin=p_min, vmax=p_max,cmap = 'rainbow')
      plt.title('Absolute error, p')
      plt.colorbar()
      plt.show()

      print('Validation MSE Loss: ', np.mean(val_epoch_mse_loss),'Validation Percentage Loss: ', np.mean(val_epoch_percentage_loss) )
      val_losses.append(np.mean(val_epoch_mse_loss))
      if val_losses[-1] < min_val_loss:
        min_val_loss = val_losses[-1]
        best_model = (net2_u.state_dict(), net2_v.state_dict(), net2_p.state_dict())


# torch.save(best_model[0].state_dict(), "/content/gdrive/MyDrive/Research/BestU")
# torch.save(best_model[1].state_dict(), "/content/gdrive/MyDrive/Research/BestV")
# torch.save(best_model[2].state_dict(), "/content/gdrive/MyDrive/Research/BestP")


      


In [None]:
plt.plot(val_losses)

In [None]:
net2_u = PINet().to(device)
net2_v = PINet().to(device)
net2_p = PINet().to(device)

net2_u.load_state_dict(best_model[0])
net2_v.load_state_dict(best_model[1])
net2_p.load_state_dict(best_model[2])



test_loss = []
for i in test_range:

    u_in = u[:,i:i+10]
    v_in = v[:,i:i+10]
    p_in = p[:,i:i+10]

    u_target = u[:,i+15:i+16]
    v_target = v[:,i+15:i+16]
    p_target = p[:,i+15:i+16]

    # Data from final snapshot

    rand_idx = np.random.choice(N, 10000)

    x_data_in = x[rand_idx,:]
    y_data_in = y[rand_idx,:]

    u_data_in = u_in[rand_idx,:]
    v_data_in = v_in[rand_idx,:]
    p_data_in = p_in[rand_idx,:]

    u_data_target = u_target[rand_idx,:]
    v_data_target = v_target[rand_idx,:]
    p_data_target = p_target[rand_idx,:]

    u_star, v_star, p_star, _  = val_epoch(u_in, v_in, p_in, c_in, x, y, 
                                                  u_data_in, v_data_in, p_data_in, c_data_in, x_data_in, y_data_in,
                                                  u_data_target, v_data_target, p_data_target, c_data_target)
    
    
    mse_loss = mean_squared_error(u_target, u_star) + mean_squared_error(v_target, v_star) + mean_squared_error(p_target, p_star)
    test_loss.append(np.mean(mse_loss))

    plt.figure()
    plt.subplot(2, 1, 1)
    plt.scatter(x, y, c = u_target , vmin=u_min, vmax=u_max, cmap = 'rainbow')
    plt.title('Real results, u')
    plt.colorbar()
    plt.show()

    plt.figure()
    plt.subplot(2, 1, 1)
    plt.scatter(x, y, c = v_target ,vmin=v_min, vmax=v_max, cmap = 'rainbow')
    plt.title('Real results, v')
    plt.colorbar()
    plt.show()

    plt.figure()
    plt.subplot(2, 1, 1)
    plt.scatter(x, y, c = p_target ,vmin=p_min, vmax=p_max, cmap = 'rainbow')
    plt.title('Real results, p')
    plt.colorbar()
    plt.show()

    
    plt.figure()
    plt.subplot(2, 1, 1)
    plt.scatter(x, y, c = u_star ,vmin=u_min, vmax=u_max, cmap = 'rainbow')
    plt.title('NN results, u')
    plt.colorbar()
    plt.show()

    plt.figure()
    plt.subplot(2, 1, 1)
    plt.scatter(x, y, c = v_star , vmin=v_min, vmax=v_max, cmap = 'rainbow')
    plt.title('NN results, v')
    plt.colorbar()
    plt.show()
    
    plt.figure()
    plt.subplot(2, 1, 1)
    plt.scatter(x, y, c = p_star , vmin=p_min, vmax=p_max,cmap = 'rainbow')
    plt.title('NN results, p')
    plt.colorbar()
    plt.show()
    
    plt.figure()
    plt.subplot(2, 1, 1)
    plt.scatter(x, y, c =abs(u_star-u_target),vmin=u_min, vmax=u_max, cmap = 'rainbow')
    plt.title('Absolute error, u')
    plt.colorbar()
    plt.show()

    plt.figure()
    plt.subplot(2, 1, 1)
    plt.scatter(x, y, c = abs(v_star-v_target) , vmin=v_min, vmax=v_max,cmap = 'rainbow')
    plt.title('Absolute error, v')
    plt.colorbar()
    plt.show()

    plt.figure()
    plt.subplot(2, 1, 1)
    plt.scatter(x, y, c = abs(p_star-p_target) , vmin=p_min, vmax=p_max,cmap = 'rainbow')
    plt.title('Absolute error, p')
    plt.colorbar()
    plt.show()

    print('Test Loss: ', np.mean(test_loss))
