In [None]:
%load_ext tensorboard
%tensorboard --logdir runs

In [None]:
!nvidia-smi
!mkdir saved_models
!mkdir saved_images

In [None]:
!pip install chaospy

Notes:

the loss outputs a row vector --- the NN model outputs a column vector

data: batch x row vector  ->  [raio, comprimento, tempo, peclet number, knudsen number * beta_v, brinkman number]

In [3]:
import os
import math 
import torch
from torch import nn
import torch.autograd.functional as F
import torch.utils.tensorboard as board
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import mpl_toolkits.mplot3d as plt_3d
import plotly.graph_objects as go
import plotly.express as px
import chaospy
import scipy
import sklearn as sk
from scipy.interpolate import interp2d
from sklearn import mixture

Cleaning CUDA memory 

In [4]:
import gc

gc.collect()
torch.cuda.empty_cache()

miscellaneous functions

In [5]:
# transform column vector into a row vector
def column_to_row(x):
    return x.reshape(len(x))

# stack the data
def especial_sorting(data, axis=0, engine="numpy"):   # data shape =>  [batch X domain / EX: [20000x4]]
    if engine == "numpy":
        data_sorted = np.zeros(data.shape)
        for idx, i in enumerate(np.argsort(data.T[axis])):
            data_sorted[idx] = data[i]
        return data_sorted
    if engine == "torch":
        data_sorted = torch.zeros_like(data)
        for idx, i in enumerate(torch.argsort(data.T[axis])):
            data_sorted[idx] = data[i]
        return data_sorted

# collect error
def collect_error(loss, data, percentage=0.1):
    
    num_data_erro = int(len(loss)*percentage)
    data_erro = np.hstack((loss, data))

    data_erro = especial_sorting(data_erro)
    
    return data_erro[:num_data_erro,1:]

# split the data (use for separate the data into train and test)
def split_data(data, percent):
    return torch.split(data,[math.floor(len(data)*percent),len(data)-math.floor(len(data)*percent)])

# reshape_data (use for gradient accumulation)
def reshape_data(data, ac_iter):
    if (len(data)%ac_iter) == 0:
        return torch.reshape(data,(ac_iter,int(len(data)/ac_iter),int(len(data[0]))))

    if (len(data)%ac_iter) != 0:
        Y = (len(data)%ac_iter)
        return torch.reshape(data[:-Y],(ac_iter,int(len(data[:-Y])/ac_iter),int(len(data[0]))))

# load the NN model parameters
def load_params(model,PATH,cpu_only=False):
    if cpu_only is True:
        return model.load_state_dict(torch.load(PATH,map_location=torch.device('cpu')))
    if cpu_only is False:
        return model.load_state_dict(torch.load(PATH))

--------------------------------------------------------------------------------------------------

Código de treinamento PINN para o TCC

Definição do domínio de estudo, os "dados" a serem treinados na rede neural

In [6]:
#-----------------sampling functions-------------------- (data exmplo: [x_menor,x_maior,y_menor,y_maior,t_menor,t_maior])
#data sampling
def data_sampling(data,count, rule="sobol"):
    uniform_cube = chaospy.J(chaospy.Uniform(data[0], data[1]), 
                             chaospy.Uniform(data[2], data[3]), 
                             chaospy.Uniform(data[4], data[5]),
                             chaospy.Uniform(data[6], data[7]))
    sobol_samples = uniform_cube.sample(count, rule)
    return sobol_samples

#mapping to a 2D line (use lambda t: F(t)) - "chebyshev" / "korobov"  (row vector: [x,y])
def smap(func,count,data,v_dependente,metodo):
    if v_dependente == "x":
        uniform_line = chaospy.J(chaospy.Uniform(data[2], data[3]), 
                                 chaospy.Uniform(data[4], data[5]),
                                 chaospy.Uniform(data[6], data[7]))
        samples = uniform_line.sample(count, rule=metodo)
        transform_x = np.vstack((func(samples[0]),samples))
        return transform_x
    
    if v_dependente == "y":
        uniform_line = chaospy.J(chaospy.Uniform(data[0], data[1]), 
                                 chaospy.Uniform(data[4], data[5]),
                                 chaospy.Uniform(data[6], data[7]))
        samples = uniform_line.sample(count, rule=metodo)
        transform_y = np.vstack((samples[0],func(samples[0]),samples[1:]))
        return transform_y

    if v_dependente == "t":
        uniform_line = chaospy.J(chaospy.Uniform(data[0], data[1]), 
                                 chaospy.Uniform(data[2], data[3]),
                                 chaospy.Uniform(data[6], data[7]))
        samples = uniform_line.sample(count, rule=metodo)
        transform_t = np.vstack((samples[0],samples[1],func(samples[1]),samples[2:]))
        return transform_t

#adaptive sampling (data -> [batch,dimension])
def adaptive_sampling(data, num_samples, num_modes=1, tolerance=0.001):
    
    model = mixture.GaussianMixture(num_modes,tol=tolerance)
    model.fit(data)

    D = chaospy.GaussianMixture(model.means_, model.covariances_)

    return D.sample(num_samples,rule="sobol")

#specify the domain (use lambda t: F(t))
def S_domain(data_domain,func_S,func_I,v_menor,v_maior):
    for i in range(len(data_domain[0])):
        if data_domain[0,i]>v_maior or data_domain[0,i]<v_menor:
            continue
        if func_S==func_I:
            data_domain[0,i]=math.nan
            continue
        if data_domain[1,i]>func_S(data_domain[0,i]) or data_domain[1,i]<func_I(data_domain[0,i]):
            data_domain[0,i]=math.nan
    data_domain=data_domain[:, ~np.isnan(data_domain).any(axis=0)]
    return data_domain

def ST_domain(data_domain, dim, menor, maior):
    for i in range(len(data_domain[0])):
        if data_domain[dim,i]>maior or data_domain[dim,i]<menor:
            data_domain[dim,i]=math.nan
    data_domain=data_domain[:, ~np.isnan(data_domain).any(axis=0)]
    return data_domain

#-----------------sampling the domain-------------------- 
total_num_samples = 120000
dy_num_samples = 25000
problem_domain = [0,1,0,12,0,10,0.0002,1]

uniform_samples = data_sampling(problem_domain, total_num_samples).T
dynamic_S_CPU, constant_S_CPU = uniform_samples[:dy_num_samples],uniform_samples[dy_num_samples:]

#-----------------sampling the boundary-------------------- 
CC_1_CPU=smap(lambda t: t*0,20000,problem_domain,"x","sobol").T
CC_2_CPU=smap(lambda t: t*0+1,20000,problem_domain,"x","sobol").T
CC_3_CPU=smap(lambda t: t*0,20000,problem_domain,"y","sobol").T
CC_4_CPU=smap(lambda t: t*0+12,20000,problem_domain,"y","sobol").T
CC_5_CPU=smap(lambda x: x*0,20000,problem_domain,"t","sobol").T

#-----------------converting to tensor--------------------
dynamic_S = torch.from_numpy(dynamic_S_CPU)
constant_S = torch.from_numpy(constant_S_CPU)
CC_1 = torch.from_numpy(CC_1_CPU)
CC_2 = torch.from_numpy(CC_2_CPU)
CC_3 = torch.from_numpy(CC_3_CPU)
CC_4 = torch.from_numpy(CC_4_CPU)
CC_5 = torch.from_numpy(CC_5_CPU)


TMCA data

In [7]:
# TMAC_data = pd.read_excel('/content/Tangential_Momentum_Accommodation_Coefficient_Data.xlsx').values.T[0]
# TMAC_data += np.random.uniform(-0.0005,0.0005,len(TMAC_data))
# TMAC_data = chaospy.GaussianKDE(TMAC_data, h_mat=0.008).sample(3000,rule="latin_hypercube")
# for i in range(len(TMAC_data)):
#     if TMAC_data[i]<0.1:
#         TMAC_data[i]=np.mean(TMAC_data)

# beta_v = (2-TMAC_data)/TMAC_data

Definição das equações a serem estudadas (problem definition) e condições de contorno / iniciais

In [8]:
b = 2.03704

#---------------------PDE------------------------

def PDE_loss(NN,data):
    
    grad = torch.autograd.grad(NN.sum(),data,create_graph=True,retain_graph=True)[0]

    T_r = grad[:,0]
    T_z = grad[:,1]
    T_t = grad[:,2]

    T_rr = (torch.autograd.grad(T_r.sum(),data,create_graph=False,retain_graph=True)[0])[:,0]
    T_zz = (torch.autograd.grad(T_z.sum(),data,create_graph=False,retain_graph=True)[0])[:,1]
    
    pe = 1

    return ((data[:,0]*(pe)*(0.5+(4.0*data[:,3]))*(T_t-T_rr))+
            (data[:,0]*(pe)*(1+(4.0*data[:,3])-(data[:,0]**2))*T_z)-
            ((0.5+(4.0*data[:,3]))*(((pe)*T_r)+(data[:,0]*T_zz))))

#---------------------boundary and initial conditions------------------------
def BC_1(NN,bc):
    
    T = torch.autograd.grad(NN.sum(),bc,create_graph=False,retain_graph=True)[0]
    
    return T[:,0]


def BC_2(NN,bc):
    
    T = torch.autograd.grad(NN.sum(),bc,create_graph=False,retain_graph=True)[0]
    
    return column_to_row(NN)+(b*bc[:,3]*T[:,0])-1
    
def BC_3(NN):
    
    return column_to_row(NN)
    
def BC_4(NN,bc):
    
    T = torch.autograd.grad(NN.sum(),bc,create_graph=False,retain_graph=True)[0]

    return T[:,1]

def BC_5(NN):
    
    return column_to_row(NN)

Definição da estrutura da rede neural 

In [9]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

writer = board.SummaryWriter('runs/Código_PINN_microtubulações_6')

# Define the Stan activation function
class Stan(nn.Module):
    def __init__(self,dim_act):
        super(Stan, self).__init__()
        
        self.tan_h = nn.Tanh()

        shape = (dim_act,)
        self.Learnable_P = nn.Parameter(torch.Tensor(*shape))
        self.init_parameters()

    def forward(self, x):
        return self.tan_h(x) + (self.Learnable_P*x*self.tan_h(x))

    def init_parameters(self):
        nn.init.uniform_(self.Learnable_P)

# Define the block
class Block(nn.Module):
    def __init__(self,dim):
        super(Block, self).__init__()

        self.FF_layer_1 = nn.Linear(dim, dim)
        self.FF_layer_2 = nn.Linear(dim, dim)
        
        # self.actv = nn.SiLU()
        # self.actv_2 = nn.SiLU()
        self.actv = Stan(dim)
        self.actv_2 = Stan(dim)

    def forward(self, X):
        identity = X.clone()
        
        X = self.FF_layer_1(X)
        X = self.actv(X)
        X = self.FF_layer_2(X)

        X += identity
        X = self.actv_2(X)
        return X

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        
        input_dim = 4
        mid_dim = 100
        out_dim = 1
        num_of_blocks = 4
        
        
        self.input = nn.Linear(input_dim, mid_dim)
        # self.input_actv = nn.SiLU()
        self.input_actv = Stan(mid_dim)
        
        self.mid_layers_list = []
        for _ in range(num_of_blocks):
            self.mid_layers_list.append(Block(mid_dim))
        self.mid_sequential = nn.Sequential(*self.mid_layers_list)
        
        self.output = nn.Linear(mid_dim, out_dim)

        self.input.apply(self.init_weights)
        self.mid_sequential.apply(self.init_weights)
        self.output.apply(self.init_weights)

    def forward(self, x):
        
        Z = self.input(x)
        Z = self.input_actv(Z)
        Z = self.mid_sequential(Z)
        Z = self.output(Z)
        
        return Z

    def init_weights(self,m):
        if isinstance(m, nn.Linear):
            nn.init.xavier_uniform_(m.weight)


model = NeuralNetwork().to(device)
print(model)

Using cpu device
NeuralNetwork(
  (input): Linear(in_features=4, out_features=100, bias=True)
  (input_actv): Stan(
    (tan_h): Tanh()
  )
  (mid_sequential): Sequential(
    (0): Block(
      (FF_layer_1): Linear(in_features=100, out_features=100, bias=True)
      (FF_layer_2): Linear(in_features=100, out_features=100, bias=True)
      (actv): Stan(
        (tan_h): Tanh()
      )
      (actv_2): Stan(
        (tan_h): Tanh()
      )
    )
    (1): Block(
      (FF_layer_1): Linear(in_features=100, out_features=100, bias=True)
      (FF_layer_2): Linear(in_features=100, out_features=100, bias=True)
      (actv): Stan(
        (tan_h): Tanh()
      )
      (actv_2): Stan(
        (tan_h): Tanh()
      )
    )
    (2): Block(
      (FF_layer_1): Linear(in_features=100, out_features=100, bias=True)
      (FF_layer_2): Linear(in_features=100, out_features=100, bias=True)
      (actv): Stan(
        (tan_h): Tanh()
      )
      (actv_2): Stan(
        (tan_h): Tanh()
      )
    )
    (3

Definição do solver(otimização)

In [10]:
# --------- Training parameters ------------
def Causal_MSELoss(X,causal_res):
    return torch.mean(causal_res*(torch.pow((X),2)))

def Adap_MSELoss(X):
    return torch.mean(torch.pow(X,2))

loss_fn_adap = Adap_MSELoss
loss_fn_causal = Causal_MSELoss
loss_fnc = nn.MSELoss()

optimizer_adam = torch.optim.Adam(model.parameters(), lr=8.0e-4)

scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer_adam, milestones=[10000, 20000, 40000], gamma=0.5)

# --------- Training function -----------
def train(X_,CC_1_,CC_2_,CC_3_,CC_4_,CC_5_, model, loss_fn, loss_fn_B, optimizer, i, accum_iter=1, causal_p=300):
    loss_board = 0
    model.train()
    
    X_ = reshape_data(X_, accum_iter)
    CC_1_ = reshape_data(CC_1_, accum_iter)
    CC_2_ = reshape_data(CC_2_, accum_iter)
    CC_3_ = reshape_data(CC_3_, accum_iter)
    CC_4_ = reshape_data(CC_4_, accum_iter)
    CC_5_ = reshape_data(CC_5_, accum_iter)
    
    for idx in range(accum_iter):
        
        X = X_[idx].to(device,dtype=torch.float32).requires_grad_()
        CC_1 = CC_1_[idx].to(device,dtype=torch.float32).requires_grad_()
        CC_2 = CC_2_[idx].to(device,dtype=torch.float32).requires_grad_()
        CC_3 = CC_3_[idx].to(device,dtype=torch.float32).requires_grad_()
        CC_4 = CC_4_[idx].to(device,dtype=torch.float32).requires_grad_()
        CC_5 = CC_5_[idx].to(device,dtype=torch.float32).requires_grad_()
        # Y = torch.zeros(len(X),device=device,dtype=torch.float32).requires_grad_()
        # Y_bc = torch.zeros(len(CC_1)+len(CC_2)+len(CC_3)+len(CC_4)+len(CC_5),device=device,dtype=torch.float32).requires_grad_()

        time = X_[idx,:,2].to(device,dtype=torch.float32)
        
        def computing_loss_train():
            pred_domain = PDE_loss(model(X),X)
            pred_boundery_1 = BC_1(model(CC_1),CC_1)
            pred_boundery_2 = BC_2(model(CC_2),CC_2)
            pred_boundery_3 = BC_3(model(CC_3))
            pred_boundery_4 = BC_4(model(CC_4),CC_4)
            pred_boundery_5 = BC_5(model(CC_5))
            # pred_boundery = torch.hstack((pred_boundery_1,pred_boundery_2,pred_boundery_3,pred_boundery_4,pred_boundery_5))
            
            iteracao = 0
            causal_W = torch.exp(-time*(causal_p/(causal_p+i+iteracao)))
            # causal_W = 1

            return (loss_fn(pred_domain,causal_W)+
                    5*loss_fn_B(pred_boundery_1)+
                    5*loss_fn_B(pred_boundery_2)+
                    5*loss_fn_B(pred_boundery_3)+
                    2*loss_fn_B(pred_boundery_4)+
                    2*loss_fn_B(pred_boundery_5))

        # Compute prediction error
        loss = computing_loss_train()/accum_iter
        loss_board += loss
            
        # Backpropagation
        loss.backward()
            
        if (idx+1) % accum_iter == 0: 
            optimizer.step()
            optimizer.zero_grad()
            scheduler.step()
            
    writer.add_scalar('train loss', loss_board.item(), global_step=i)

    # print(f"train loss: \n Avg loss: {loss_board.item():>8f} \n")
            
def test(X_,CC_1_,CC_2_,CC_3_,CC_4_,CC_5_, model, loss_fn, i):

    model.eval()

    X = X_.to(device,dtype=torch.float32).requires_grad_()
    CC_1 = CC_1_.to(device,dtype=torch.float32).requires_grad_()
    CC_2 = CC_2_.to(device,dtype=torch.float32).requires_grad_()
    CC_3 = CC_3_.to(device,dtype=torch.float32).requires_grad_()
    CC_4 = CC_4_.to(device,dtype=torch.float32).requires_grad_()
    CC_5 = CC_5_.to(device,dtype=torch.float32).requires_grad_()
    # Y = torch.zeros(len(X),device=device,dtype=torch.float32).requires_grad_()
    # Y_bc = torch.zeros(len(CC_1)+len(CC_2)+len(CC_3)+len(CC_4)+len(CC_5),device=device,dtype=torch.float32).requires_grad_()
    
    pred_domain = PDE_loss(model(X),X)
    pred_boundery_1 = BC_1(model(CC_1),CC_1)
    pred_boundery_2 = BC_2(model(CC_2),CC_2)
    pred_boundery_3 = BC_3(model(CC_3))
    pred_boundery_4 = BC_4(model(CC_4),CC_4)
    pred_boundery_5 = BC_5(model(CC_5))
    # pred_boundery = torch.hstack((pred_boundery_1,pred_boundery_2,pred_boundery_3,pred_boundery_4,pred_boundery_5))
    
    domain_test = loss_fn(pred_domain)
    boundery_test_1 = loss_fn(pred_boundery_1)
    boundery_test_2 = loss_fn(pred_boundery_2)
    boundery_test_3 = loss_fn(pred_boundery_3)
    boundery_test_4 = loss_fn(pred_boundery_4)
    boundery_test_5 = loss_fn(pred_boundery_5)
    test_loss = domain_test+boundery_test_1+boundery_test_2+boundery_test_3+boundery_test_4+boundery_test_5

    
    # print(f"Test Error: \n Avg loss: {test_loss.item():>8f} \n")
    writer.add_scalar('test loss', test_loss.item(), global_step=i)
    writer.add_scalar('domain loss', domain_test.item(), global_step=i)
    writer.add_scalar('boundery 1 loss', boundery_test_1.item(), global_step=i)
    writer.add_scalar('boundery 2 loss', boundery_test_2.item(), global_step=i)
    writer.add_scalar('boundery 3 loss', boundery_test_3.item(), global_step=i)
    writer.add_scalar('boundery 4 loss', boundery_test_4.item(), global_step=i)
    writer.add_scalar('boundery 5 loss', boundery_test_5.item(), global_step=i)
    
    return test_loss
    

Treinamento e "running time operations"

In [11]:
epochs = 500000
grad_accum = 1
model_pre_comp, continue_comp = True ,False
data_pre_comp = False

#---------------------separete the data--------------------------
Percent_of_train = 0.8

dynamic_train ,dynamic_test = split_data(dynamic_S, Percent_of_train)
constant_train ,constant_test = split_data(constant_S, Percent_of_train)
CC_1_train ,CC_1_test = split_data(CC_1, Percent_of_train)
CC_2_train ,CC_2_test = split_data(CC_2, Percent_of_train)
CC_3_train ,CC_3_test = split_data(CC_3, Percent_of_train)
CC_4_train ,CC_4_test = split_data(CC_4, Percent_of_train)
CC_5_train ,CC_5_test = split_data(CC_5, Percent_of_train)

len_dyn_train = len(dynamic_train)
DS_train ,DS_test = torch.vstack((dynamic_train, constant_train)) ,torch.vstack((dynamic_test, constant_test))

DS_train = especial_sorting(DS_train, axis=2, engine="torch")

if data_pre_comp is True:
    DS_train = pre_comp

if model_pre_comp is True:
    load_params(model,"saved_models/save_microtubulações.pth",cpu_only=True)

test_loss_save=100
for t in range(epochs):

    if continue_comp is not True:
        break
    
    # ---------importling sampling-------------------

    # if (t+1)%500 == 0 and t<8000:

    #     # ---------sampling to avoid memory issue-------------------
    #     randon_permutation = torch.randperm(len(DS_train))
    #     import_sampl_data = torch.zeros(int(len(DS_train)/grad_accum),len(DS_train[0]))
    #     for i in range(len(import_sampl_data)):
    #         import_sampl_data[i] = DS_train[randon_permutation[i].item()]


    #     dy_gpu = import_sampl_data.to(device,dtype=torch.float32).requires_grad_()
    #     error_gpu = PDE_loss(model(dy_gpu),dy_gpu)
    #     dy_cpu = dy_gpu.cpu().detach().numpy()
    #     error_cpu = error_gpu.cpu().detach().numpy()
        
    #     adap_sam = adaptive_sampling(collect_error(error_cpu.reshape(1,len(error_cpu)).T, dy_cpu, percentage=0.2), len_dyn_train, num_modes=5)
        
    #     adap_sam = ST_domain(adap_sam, 0, 0, 1)
    #     adap_sam = ST_domain(adap_sam, 1, 0, 25)
    #     adap_sam = ST_domain(adap_sam, 2, 0, 10)
    #     adap_sam = (ST_domain(adap_sam, 3, 0.0002,1)).T
        
    #     dynamic_train = torch.from_numpy(adap_sam)
        
    #     DS_train = torch.vstack((dynamic_train, constant_train))

    #     DS_train = especial_sorting(DS_train, axis=2, engine="torch")
    
    #--------------------------------------------------------
    
    train(DS_train, CC_1_train, CC_2_train, CC_3_train, CC_4_train, CC_5_train, model, loss_fn_causal, loss_fn_adap, optimizer_adam, t, accum_iter=grad_accum)
    test_loss_epochs = test(DS_test, CC_1_test, CC_2_test, CC_3_test, CC_4_test, CC_5_test, model, loss_fn_adap, t)
        
    #--------------------------------------------------------
    
    if t%30 == 0:
        if test_loss_epochs<test_loss_save:
            test_loss_save=test_loss_epochs
            iter_save_model = t
            torch.save(model.state_dict(), "saved_models/save_microtubulações.pth")

writer.close()
print("Done!")

Done!


funções de Pós-processamento dos dados

In [12]:
# Transforms the NN output data in grid form for ploting
def data_for_ploting(model, dim, Y_dim, pde_error=None, time_domain=None, others_domains=None):
    ploting_dataY, ploting_dataX = np.mgrid[Y_dim[0]:Y_dim[1]:dim*1j,0:1:dim*1j]
    
    # transform the grid data into batches ([number_of_points, 2])
    X = np.hstack((np.swapaxes([ploting_dataX.flatten()],0,1),np.swapaxes([ploting_dataY.flatten()],0,1)))
    X = torch.from_numpy(X)
    if time_domain is not None:
        X = torch.hstack((X,torch.ones(len(X), 1)*time_domain))
    if others_domains is not None:
        for ii in range(len(others_domains)):
            X = torch.hstack((X,torch.ones(len(X), 1)*others_domains[ii]))
            
    model.eval()
    X = X.to(device,dtype=torch.float32).requires_grad_()
    Y = model(X)
    if pde_error is not None:
        Y = pde_error(Y,X)
    Y = Y.cpu()
    Y = Y.detach().numpy()
    
    grid_form = np.reshape(Y,(dim, dim))
    
    return grid_form

# mirroring the data
def symmetric_data(Z):
    Z_sym = Z[:,0].reshape(Z.shape[0],1)
    for i in range(Z.shape[1]-1):
        Z_sym = np.hstack((Z[:,i+1].reshape(Z.shape[0],1),Z_sym))
    return np.hstack((Z_sym,Z))

def symmetric_data_1D(Z):
    Z_sym = np.array([Z[0]])
    for i in range(len(Z)-1):
        Z_sym = np.hstack((-Z[i+1],Z_sym))
    return np.hstack((Z_sym,Z))

Pós-processamento dos dados

In [None]:
pixel_y,pixel_x=1500,1500                   #   <<<<<<------       <<<<<<------         <<<<<<------

#--------------domain----------------

# make data with even sampling in x
plot_dim = 400                                            #   <<<<<<------       <<<<<<------         <<<<<<------

X_grid = np.linspace(-1, 1, num=plot_dim*2)
Y_grid = np.linspace(0, 3, num=plot_dim)

Z = data_for_ploting(model, plot_dim, pde_error=None, time_domain=19, others_domains=[1])
Z = symmetric_data(Z)

#------------- interpolating the data ---------------------
f = interp2d(X_grid, Y_grid, Z, kind='cubic')
X_interp = np.linspace(-1, 1, num=1500)
Y_interp = np.linspace(0, 3, num=1500)
interp_data = f(X_interp,Y_interp)

#--------------Image creation----------------

px = 1/plt.rcParams['figure.dpi']  # pixel in inches
fig, ax = plt.subplots(figsize=(pixel_x*px, pixel_y*px))

#--------------plots----------------

plot_mapping = ax.pcolormesh(X_interp, Y_interp, interp_data,antialiased=True,cmap='hot')

fig.colorbar(plot_mapping)
plt.show()


In [None]:
def plot_3D_K(t, domains, iteracao=None):
    #--------------Image creation----------------

    pixel_y,pixel_x=1500,1500
    plot_dim_3d = 80
    Y_dim = [0.03,1]

    px = 1/plt.rcParams['figure.dpi']  # pixel in inches

    #--------------data for the plots----------------

    Z, RAIO = np.mgrid[Y_dim[0]:Y_dim[1]:plot_dim_3d*1j,-1:1:(plot_dim_3d*2)*1j]
    
    temp = data_for_ploting(model, plot_dim_3d, Y_dim, pde_error=None, time_domain=t, others_domains=domains)
    temp = symmetric_data(temp)
    
    #----------------- Matplotlib -------------------

    fig_3d, ax_3d = plt.subplots(figsize=(15, 8))
    ax_3d = plt.axes(projection='3d')
    
    ax_3d.plot_surface(RAIO, Z, temp, rstride=1, cstride=1, cmap='viridis')
    # ax_3d.plot_wireframe(RAIO, Z, temp, color='black')
    ax_3d.set_xlabel('\n \n \n \n \n raio', rotation=45)
    ax_3d.set_ylabel('\n \n comprimento')
    ax_3d.set_zlabel('\n \n temperatura')
    ax_3d.set_title(f"Distribuição de temperatura \n Kn Bv = {2*domains[0]} e Br = 0 tempo = {t} \n Figura: d) \n")
    ax_3d.set_zlim(0,1.1)
    plt.xticks(rotation = 45)

    for item in ([ax_3d.title, ax_3d.xaxis.label, ax_3d.yaxis.label,ax_3d.zaxis.label] +
             ax_3d.get_xticklabels()+ax_3d.get_yticklabels()+ax_3d.get_zticklabels()):
        item.set_fontsize(20)
    
    # ax_3d.view_init(35, 50);
    if iteracao is None:
        fig_3d.savefig(f"saved_images/kn_bv={2*domains[0]},Br=0,tempo={t}.svg",format="svg", dpi=60)
    else:
        fig_3d.savefig(f"saved_images_3/{iteracao}.svg",format="svg", dpi=60)

    #----------------- plotly ----------------------
    # fig = go.Figure(data=[go.Surface(z=temp,x=RAIO,y=Z)])
    # fig.update_layout(title=f" {t} segundos", autosize=False,
    #               width=400, height=400)
    # fig.show()

# for KNBV in [0.0005,0.05,0.5]:
#     for tempo in [0.1,0.6,1,1.5]:
#         plot_3D_K(tempo, [KNBV])
# for k,i in enumerate(np.linspace(0.05,2,80)):
#     plot_3D_K(i, [0.5], k)
plot_3D_K(1.5, [0.05])

In [None]:
!zip -r fotos_3_pe1_br0.zip saved_images_3

In [None]:
def plot_T_R_Z_Error(Z, t = 1, kn_bv = 0.0002, resolution = 200):

    R = torch.linspace(0, 1, resolution, dtype=torch.float32).reshape(resolution,1)
    time = torch.ones(resolution, 1, dtype=torch.float32)*t
    Kn_Bv = torch.ones(resolution, 1, dtype=torch.float32)*kn_bv
    
    T = torch.tensor(()).to(device)
    Error = torch.tensor(()).to(device)
    
    model.eval()
    for i in range(len(Z)):
        data = torch.hstack((R, torch.ones(resolution, 1, dtype=torch.float32)*Z[i], time, Kn_Bv)).to(device)
        data.requires_grad_()
        T = torch.hstack((T, model(data)))
        Error = torch.hstack((Error, PDE_loss(model(data),data).reshape(resolution,1)))
        

    T = torch.t(T).cpu()
    T = T.detach().numpy()
    T = symmetric_data(T)
    
    Error = torch.t(Error).cpu()
    Error = Error.detach().numpy()
    Error = symmetric_data(Error)
    
    R = column_to_row(R.cpu().detach().numpy())
    R = symmetric_data_1D(R)
    
    conso_data = np.array([T,Error])

    fig, ax = plt.subplots(2, 1,figsize=(5, 5))
    for a in range(2):
        for i in range(len(Z)):
            ax[a].plot(R, conso_data[a,i], label=f'K = {Z[i]}')
        if a == 0:
            # ax[a].set_ylim(None,1.1)
            ax[a].set_xlabel('raio')  # Add an x-label to the axes.
            ax[a].set_ylabel('Temperatura')  # Add a y-label to the axes.
            ax[a].set_title("Gráfico Temperatura")  # Add a title to the axes.
            ax[a].legend();  # Add a legend.
            ax[a].grid(True,alpha = 0.25)
        else:
            # ax[a].set_ylim(None,1.1)
            ax[a].set_xlabel('raio')  # Add an x-label to the axes.
            ax[a].set_ylabel('erro local')  # Add a y-label to the axes.
            ax[a].set_title("Gráfico erro")  # Add a title to the axes.
            ax[a].legend();  # Add a legend.
            ax[a].grid(True,alpha = 0.25)

    # fig.savefig("aaa.svg", format="svg", dpi=1200)
    
plot_T_R_Z_Error([0.11,0.4,0.8,2], t = 0.8, kn_bv = 0.001, resolution = 100)

In [None]:
def plot_T_R_Z(Z, t, kn_bv = 0.0002, resolution = 200):

    R = torch.linspace(0, 1, resolution, dtype=torch.float32).reshape(resolution,1)
    Z_ = torch.ones(resolution, 1, dtype=torch.float32)
    time = torch.ones(resolution, 1, dtype=torch.float32)
    Kn_Bv = torch.ones(resolution, 1, dtype=torch.float32)*kn_bv
    
    conso_data = np.zeros((len(t), len(Z), resolution*2))
    
    model.eval()
    for iii in range(len(t)):
        T = torch.tensor(()).to(device)
        for i in Z:
            data = torch.hstack((R, Z_*i, time*t[iii], Kn_Bv)).to(device)
            data.requires_grad_()
            T = torch.hstack((T, model(data)))

        T = torch.t(T).cpu()
        T = T.detach().numpy()
        T = symmetric_data(T)
        conso_data[iii] = T
    
    R = column_to_row(R.cpu().detach().numpy())
    R = symmetric_data_1D(R)

    fig, ax = plt.subplots(len(t), 1,figsize=(5, 2.1*len(t)), layout='constrained')
    for a in range(len(conso_data)):
        for i in range(len(Z)):
            ax[a].plot(R, conso_data[a,i], label=f'Z = {Z[i]}')
        ax[a].set_ylim(None,1.2)
        ax[a].set_xlabel('raio')  # Add an x-label to the axes.
        ax[a].set_ylabel('Temperatura')  # Add a y-label to the axes.
        if a == 0:
            ax[a].set_title(f"Gráfico Temperatura da seção \n \n Kn Bv = {2*kn_bv} e Br = 0 \n \n tempo = {t[a]}")
        else:
            ax[a].set_title(f"tempo = {t[a]}")
        if a == 0:
            ax[a].legend();  # Add a legend.
        ax[a].grid(True,alpha = 0.25)

    fig.savefig(f"saved_images_2D/plot_T_R_Z Kn_Bv={2*kn_bv},Br=0.svg", format="svg", dpi=300)

for KNBV in [0.0005,0.05,0.5]:
    plot_T_R_Z([0.15,0.4,0.8,4], t = [0.1,0.6,1,1.5], kn_bv = KNBV, resolution = 100)


In [None]:
def plot_Tmean_Z_time_Nu(Z_ = 18, t = [0.1,0.4,0.8,5], kn_bv = 0.01, resolution = 200, sampling = 1000):

    Z = torch.linspace(0, Z_, resolution, dtype=torch.float32).reshape(resolution,1)
    R = torch.from_numpy(chaospy.J(chaospy.Uniform(0, 1)).sample(sampling, rule="latin_hypercube")).reshape(sampling,1)
    R_Nu = torch.ones(resolution, 1, dtype=torch.float32).requires_grad_()
    time = torch.ones(sampling, 1, dtype=torch.float32)
    Kn_Bv = torch.ones(sampling, 1, dtype=torch.float32)*kn_bv
    
    T = torch.ones(len(t), resolution, dtype=torch.float32)
    Nu = torch.zeros(len(t), resolution, dtype=torch.float32).to(device)
    
    model.eval()
    for i in range(len(t)):
        for ii in range(resolution):
            data = torch.hstack((R, torch.ones(sampling, 1)*Z[ii], time*t[i], Kn_Bv)).to(device, dtype=torch.float32)
            T[i,ii] = T[i,ii]*(torch.mean(model(data)).item())
            
    for i in range(len(t)):
        data_Nu = torch.hstack((R_Nu*0.9, Z, time[:resolution]*t[i], Kn_Bv[:resolution])).to(device, dtype=torch.float32).requires_grad_()
        T_r = (torch.autograd.grad(model(data_Nu).sum(),data_Nu,create_graph=False,retain_graph=True)[0])[:,0]
        Nu[i] = T_r
        
    Nu = -Nu/(1-T)
    Nu = Nu.detach().numpy()
    T = T.detach().numpy()
        
    T_Nu = np.array([T,Nu])
        
    Z = column_to_row(Z.numpy())
    
    fig, ax = plt.subplots(1, 2,figsize=(10, 2.5), layout='constrained')
    for a in range(2):
        for i in range(len(t)):
            ax[a].plot(Z, T_Nu[a,i], label=f'tempo = {t[i]}')
        if a == 0:
            # ax[a].set_ylim(None,1.1)
            ax[a].set_xlabel('Comprimento')  # Add an x-label to the axes.
            ax[a].set_ylabel('Temperatura média')  # Add a y-label to the axes.
            ax[a].set_title("Temperatura média X Comprimento")  # Add a title to the axes.
            ax[a].legend();  # Add a legend.
            ax[a].grid(True,alpha = 0.25)
        else:
            ax[a].set_ylim(-5,5)
            ax[a].set_xlabel('Comprimento')  # Add an x-label to the axes.
            ax[a].set_ylabel('Nusselt')  # Add a y-label to the axes.
            ax[a].set_title("Nusselt X Comprimento")  # Add a title to the axes.
            ax[a].legend();  # Add a legend.
            ax[a].grid(True,alpha = 0.25)

    # fig.savefig("aaa.svg", format="svg", dpi=1200)
    
plot_Tmean_Z_time_Nu(Z_ = 1.5, kn_bv = 0.005)

In [None]:
def plot_Tmean_Z_time(Z_ = 18, t = [0.1,0.4,0.8,5], kn_bv = 0.01, resolution = 200, sampling = 1000):

    Z = torch.linspace(0, Z_, resolution, dtype=torch.float32).reshape(resolution,1)
    R = torch.from_numpy(chaospy.J(chaospy.Uniform(0, 1)).sample(sampling, rule="latin_hypercube")).reshape(sampling,1)
    time = torch.ones(sampling, 1, dtype=torch.float32)
    Kn_Bv = torch.ones(sampling, 1, dtype=torch.float32)*kn_bv
    
    T = torch.ones(len(t), resolution, dtype=torch.float32)
    
    model.eval()
    for i in range(len(t)):
        for ii in range(resolution):
            data = torch.hstack((R, torch.ones(sampling, 1)*Z[ii], time*t[i], Kn_Bv)).to(device, dtype=torch.float32)
            T[i,ii] = T[i,ii]*(torch.mean(model(data)).item())
        
    T = T.detach().numpy()
        
    Z = column_to_row(Z.numpy())
    
    fig, ax = plt.subplots(figsize=(5, 2.7))
    for i in range(len(t)):
        ax.plot(Z, T[i])
    # ax.set_ylim(None,1.1)
    ax.set_xlabel('Comprimento')  # Add an x-label to the axes.
    ax.set_ylabel('Temperatura média')  # Add a y-label to the axes.
    ax.set_title(f"Temperatura média X Comprimento \n Kn Bv = {2*kn_bv} e Br = 0 \n Figura: c)")  # Add a title to the axes.
    # ax.legend(loc = 'lower right');  # Add a legend.
    ax.grid(True,alpha = 0.25)

    fig.savefig(f"saved_images/plot_Tmean-Kn Bn = {2*kn_bv} e Br = 0.svg", format="svg", dpi=300)

# for KNBV in [0.0005,0.05,0.5]:
#     plot_Tmean_Z_time(Z_ = 1.5, kn_bv = KNBV)
plot_Tmean_Z_time(Z_ = 4.5, kn_bv = 0.5)

In [None]:
# fill between
def temperatura_media(t_, kn_bv, Z_ = 1.5, resolution = 150, sampling = 800):

    Z = torch.linspace(0, Z_, resolution, dtype=torch.float32).reshape(resolution,1)
    R = torch.from_numpy(chaospy.J(chaospy.Uniform(0, 1)).sample(sampling, rule="latin_hypercube")).reshape(sampling,1)
    time = torch.ones(sampling, 1, dtype=torch.float32)*t_
    Kn_Bv = torch.ones(sampling, 1, dtype=torch.float32)*kn_bv
    
    T = torch.ones(resolution, dtype=torch.float32)
    
    model.eval()
    for i in range(resolution):
        data = torch.hstack((R, torch.ones(sampling, 1)*Z[i], time, Kn_Bv)).to(device, dtype=torch.float32)
        T[i] = T[i]*(torch.mean(model(data)).item())
    
    return T

def plot_confiance_intervals(TMCA, Kn, t, Z_ = 1.5, resolution = 150, sampling = 1000):
    
    Z = torch.linspace(0, Z_, resolution, dtype=torch.float32).reshape(resolution,1)
    Temp_geral = torch.zeros(len(TMCA), resolution, dtype=torch.float32).to(device)
    T_statistics = torch.zeros(2, resolution, dtype=torch.float32)
    
    for et, k in enumerate(TMCA):
        Temp_geral[et] = temperatura_media(t, k*Kn, Z_ = Z_, resolution = resolution, sampling = sampling)

    for i in range(resolution):
        T_statistics[0,i] = torch.mean(Temp_geral[:,i])
        T_statistics[1,i] = torch.std(Temp_geral[:,i])

    x = column_to_row(Z.numpy())
    T_statistics = T_statistics.numpy()

    fig, ax = plt.subplots()
    ax.plot(x, T_statistics[0], '-', label='1')
    ax.fill_between(x, T_statistics[0] - 2*T_statistics[1], T_statistics[0] + 2*T_statistics[1], alpha=0.2)
    ax.set_ylim(None,1.2)
    ax.set_xlabel('comprimento')  # Add an x-label to the axes.
    ax.set_ylabel('temperatura')  # Add a y-label to the axes.
    ax.set_title(f"Quantificação de incerteza para temperatura média \n Kn = {Kn*2} tempo = {t} \n 2 desvios-padrões de confiança")  # Add a title to the axes.
    # ax.legend();  # Add a legend.
    ax.grid(True,alpha = 0.25)

    fig.savefig(f"incerteza - Kn={Kn} tempo={t}.svg", format="svg", dpi=300)

for tempo in [0.2,0.5,1]:
    for KN in [(0.1/2.0),(0.04/2.0),(0.008/2.0),(0.001/2.0)]:    
        plot_confiance_intervals(beta_v, KN, tempo)

In [None]:
# ------------ export the NN to tensorboard ------------------
exemple_data = torch.tensor([0.0,0.1,0.1]).to(device,dtype=torch.float32,).requires_grad_()
writer.add_graph(model,exemple_data)

# ------------ export the result image to tensorboard ------------------

# writer.add_figure("Resultado", fig)
# writer.close()


In [None]:
#---------matplotlib---------------- (2D)
# plt.subplots(figsize=(10,10))
# plt.scatter(*CC_1_CPU.T,facecolor='r',s=5)
# plt.scatter(*CC_2_CPU.T,facecolor='r',s=5)
# plt.scatter(*CC_3_CPU.T,facecolor='r',s=5)
# plt.scatter(*adap_sam.T,s=5)
# plt.show()

#---------plotly----------------
fig = go.Figure(data=[go.Scatter3d(x=adap_sam.T[0], y=adap_sam.T[1], z=adap_sam.T[2], mode='markers',
                                   marker=dict(size=1))])
fig.show()

In [None]:
import timeit

start = timeit.default_timer()

DS_train = especial_sorting(dynamic_S, axis=2, engine="torch")

stop = timeit.default_timer()

print('Time: ', stop - start) 

Time:  0.30932444199999054


In [None]:
def testar_erro():
    # teste dos erros 
    def Adap_MSELoss(X):
        return torch.mean(torch.pow(X,2))
    
    loss_fn = Adap_MSELoss
    
    # ------------------------------------------
    
    problem_domain = [0,1,0,12,0,10,0.0002,1,0.0001,0.1]
    
    uniform_samples = torch.from_numpy(data_sampling(problem_domain, 100000, rule="latin_hypercube" ).T)
    CC_1_test = torch.from_numpy(smap(lambda t: t*0,40000,problem_domain,"x","latin_hypercube").T)
    CC_2_test = torch.from_numpy(smap(lambda t: t*0+1,40000,problem_domain,"x","latin_hypercube").T)
    CC_3_test = torch.from_numpy(smap(lambda t: t*0,40000,problem_domain,"y","latin_hypercube").T)
    CC_4_test = torch.from_numpy(smap(lambda t: t*0+12,40000,problem_domain,"y","latin_hypercube").T)
    CC_5_test = torch.from_numpy(smap(lambda x: x*0,40000,problem_domain,"t","latin_hypercube").T)
    
    model.eval()

    X = uniform_samples.to(device,dtype=torch.float32).requires_grad_()
    CC_1 = CC_1_test.to(device,dtype=torch.float32).requires_grad_()
    CC_2 = CC_2_test.to(device,dtype=torch.float32).requires_grad_()
    CC_3 = CC_3_test.to(device,dtype=torch.float32).requires_grad_()
    CC_4 = CC_4_test.to(device,dtype=torch.float32).requires_grad_()
    CC_5 = CC_5_test.to(device,dtype=torch.float32).requires_grad_()
       
    pred_domain = PDE_loss(model(X),X)
    pred_boundery_1 = BC_1(model(CC_1),CC_1)
    pred_boundery_2 = BC_2(model(CC_2),CC_2)
    pred_boundery_3 = BC_3(model(CC_3))
    pred_boundery_4 = BC_4(model(CC_4),CC_4)
    pred_boundery_5 = BC_5(model(CC_5))
        
    domain_test = loss_fn(pred_domain)
    boundery_test_1 = loss_fn(pred_boundery_1)
    boundery_test_2 = loss_fn(pred_boundery_2)
    boundery_test_3 = loss_fn(pred_boundery_3)
    boundery_test_4 = loss_fn(pred_boundery_4)
    boundery_test_5 = loss_fn(pred_boundery_5)
    
    print(f" testes \n domain_test: {domain_test} \n boundery_test_1: {boundery_test_1} \n boundery_test_2: {boundery_test_2} \n boundery_test_3: {boundery_test_3} \n boundery_test_4: {boundery_test_4} \n boundery_test_5: {boundery_test_5}")

testar_erro()

In [None]:
!zip -r fotos.zip saved_images

In [None]:
!mkdir saved_images_1
!mkdir saved_images_2
!mkdir saved_images_3