In [1]:
import os
os.chdir( "/home/lucasfuzato/TCC/CODE" )

import torch as tc
device = "cuda" if tc.cuda.is_available() else "cpu"
print( device )

import torch.utils.data as tdt
import torch.autograd as tgd
import pandas as pd

from typing import *
from itertools import *
from src.util.aux_fun import *
from src.util.loader_data import meu_loader

import matplotlib.pyplot as plt

r1_net = tc.load( "data/r1_net.pt")
r1_net.eval()

tr_df = pd.read_csv( TR_FILE )
cond_df = pd.read_csv( COND_FILE )

cuda


FileNotFoundError: [Errno 2] No such file or directory: 'dados/r1_net.pt'

In [None]:
def time_grad( t : tc.Tensor , state : tc.Tensor ) -> tc.Tensor:

    '''
    computes the time gradient of state in regards to t
    '''

    # t_clone = tc.tensor( t , requires_grad = True )
    return tgd.grad(
        state,
        t,
        grad_outputs = tc.ones_like( state ),
        create_graph = True
    )[ 0 ]


def pinn_fit_fun( net : tnn.Module , opm : top.Optimizer , domain : tc.Tensor , alpha : float = 0.5  ):

    def physics_reg( ):

        '''
        regularization factor to make the net obey the laws of physics
        throughout the entire domain of the problem
        '''

        # network aproximation
        t_clone = tc.clone( domain ).requires_grad_( True )
        d_pos_hat = net( t_clone )

        # error on x
        x_hat  = d_pos_hat.T[ 0 ]
        vx_hat = time_grad( t_clone , x_hat )  # first derivative
        vx = tc.clone( vx_hat ).detach()
        ax = -AIR_DRAG*tc.abs( vx )*vx         # expected ax
        ax_hat = time_grad( t_clone , vx_hat )
        loss_x = tnn.functional.mse_loss( ax , ax_hat )

        # error on y
        y_hat  = d_pos_hat.T[ 1 ]
        vy_hat = time_grad( t_clone , y_hat )  # first derivative
        vy = tc.clone( vy_hat ).detach()
        ay = -AIR_DRAG*tc.abs( vy )*vy - G     # expected ay
        ay_hat = time_grad( t_clone , vy_hat )
        loss_y = tnn.functional.mse_loss( ay , ay_hat )

        return ( loss_x + loss_y )/2


    def update( t : tc.Tensor , pos : tc.Tensor ):

        net.zero_grad()

        # ---------------------------------
        # data loss
        pos_hat = net( t )
        data_loss = tnn.functional.mse_loss( pos , pos_hat )

        # --------------------------------
        # physics loss
        p_loss = physics_reg()

        loss = data_loss + p_loss*alpha
        loss.backward()
        opm.step()
        
    return update

In [None]:
def make_r2( ):
    return make_net( [ 1 , 64 , 64 , 2 ] , tnn.Softplus , omega = 0.1 )

def predict_trajectory( initial_condition , max_r1_time = 1. , max_r2_time = 5. , num_points = 100 ):

    # ----------------------------------------------------------------------------------
    # predicting the first part of the trajectory using
    # the r1 net

    # unpacking the initial conditions and getting the time
    r1_times = tc.linspace( 0 , max_r1_time , 25 , device = DEVICE , dtype = STD_TYPE , requires_grad = True )
    ( theta_0 , v_0 ) = initial_condition

    # making the input
    r1_input = tc.zeros( 25 , 3 , dtype = STD_TYPE , device = DEVICE )
    r1_input[ : , 0 ] = theta_0
    r1_input[ : , 1 ] = v_0
    r1_input[ : , 2 ] = r1_times

    r1_output = r1_net( r1_input ).detach()

    #----------------------------------------------------------------------
    # predicting the rest of the trajectory using a pinn

    # making the r2_net
    r2_net , opm = make_r2()
    r2_collate = tc.linspace( 0 , max_r2_time , num_points , device = DEVICE , dtype = STD_TYPE , requires_grad = True )
    r2_fit = pinn_fit_fun( r2_net , opm , r2_collate )

    # using the r1_output as training set for the r2_net
    r2_x = tc.clone( r1_times )
    r2_y = r1_output

    # fitting the network
    for _ in range( 100 ):
        r2_fit( r2_x , r2_y )
    
    r2_output = r2_net( r2_collate ).detach().cpu().numpy()
    return r1_output , r2_output

def get_real_trajectory( simu_id ):

    where_id = tr_df[ "simu_id" ] == simu_id
    tr_vals = tr_df.loc[ where_id ][ [ "t" , "x" , "y" ] ]

    theta_0 = cond_df.at[ simu_id , "theta_0" ]
    v_0 = cond_df.at[ simu_id , "v_0" ]
    initial_cond = ( theta_0 , v_0 )

    return tr_vals , initial_cond

def plot_trajectory( simu_id ):

    tr_vals , initial_cond = get_real_trajectory( simu_id )
    
    max_time = tr_vals[ "t" ].max()
    r1_tr , r2_tr = predict_trajectory( initial_cond , max_r2_time = max_time )

    fig , ax = plt.subplots( figsize = ( 12 , 8 ))
    ax.set_aspect( "equal" )

    x = tr_vals[ "x" ].to_numpy()
    y = tr_vals[ "y" ].to_numpy()
    ax.plot( x , y , "--k" , lw = .5 , label = "Runge Kutta trajectory" )

    r1_x = r1_tr.T[ 0 ]
    r1_y = r1_tr.T[ 1 ]
    ax.plot( r1_x , r1_y , "*b" , label = "R1 trajectory: t <= 1 sec" )

    r2_x = r2_tr.T[ 0 ]
    r2_y = r2_tr.T[ 1 ]
    ax.plot( r2_x , r2_y , "-g" , label = "R2 trajectory" )

    plt.legend()
    plt.show()




In [None]:
plot_trajectory( 36 )