# Sistema de Compressão

$ \frac{d\dot{m}}{dt} = \frac{A_1}{L_C}(\phi (N(t), \dot{m})P_1 - P_P(t)) $

$ \frac{d P_P}{dt} = \frac{C_1^2}{\nu _P}(\dot{m}(t) - \alpha (t) K_\nu \sqrt{P_P - P_{out}}) $

$ \begin{matrix} A_1 & = & 2.6\centerdot 10^{-3} m² \\
\nu _P & = & 2.0 m³ \\
L_C & = & 2.0 m \\
K_\nu & = & \frac{0.38 kg}{(kPa)^{0.5}s} \\
P_1 & = & 4.5 MPa \\
P{out} & = & 5.0 MPa \end{matrix}
$

$ \frac{d\dot{m}}{dt} = \frac{2.6\centerdot 10^{-3}}{2.0}(\phi (N(t), \dot{m})\centerdot 4.5 - P_P) $

$ \frac{d P_P}{dt} = \frac{479.029^2}{2.0}(\dot{m} - \alpha {0.38} \sqrt{P_P - 5.0}) $

#### Importações

In [19]:
import numpy as np
from scipy.optimize import fsolve
import casadi as ca
import plotly.graph_objects as go
import optuna
from plotly.subplots import make_subplots
import time
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
from scipy.interpolate import griddata


#### Classes

In [20]:
class Interpolation:
    def __init__(self, file_path, decimal = ','):
        self.file_path = file_path
        self.decimal = decimal
        
    def load_data(self):
        self.data = pd.read_csv(self.file_path, decimal=self.decimal)
        self.N_rot = np.arange(2e4,6e4,1e3) # Vai de 20000hz até 60000hz, Shape: (40,)
        self.Mass = np.arange(3,21.1,0.1) # Vai de 3 até 21, Shape: (181,)
        self.Phi = self.data.values # Valores da tabela, Shape: (40,181)
    
    def interpolate(self):
        # Criar uma malha densa para interpolação
        phi_flat = self.Phi.ravel(order='F')
        lut = ca.interpolant('name','bspline',[self.N_rot, self.Mass],phi_flat)

        return lut
    

##### Solução Numérica

In [21]:
class Simulation:
    def __init__(self, A1, Lc, kv, P1, P_out, C, alphas, N_RotS, nAlphas, nData, perturb,tempo, dt, interpolation):
        self.A1 = A1
        self.Lc = Lc
        self.kv = kv
        self.P1 = P1
        self.P_out = P_out
        self.C = C
        self.alphas = alphas
        self.N_RotS = N_RotS
        self.nAlphas = nAlphas
        self.nData = nData
        self.perturb = perturb
        self.dt = dt
        self.tempo = tempo
        
        #Interpolação
        self.interpolation = interpolation
        self.data = None
        self.N_rot = None
        self.Mass = None
        self.Phi = None

        self.interval = [np.linspace(i * self.tempo, (i + 1) * self.tempo, self.nData) for i in range(self.nAlphas)]
        self.time = 0
        
        self.alpha_values = []
        self.N_values = []
        self.massFlowrate = []
        self.PlenumPressure = []
        self.Phi_values = []
        self.RNN_train = []
        self.RNN_trainFut = []
        

    def fun(self, variables, alpha, N):
        (x, y) = variables  # x e y são escalares
        phi_value = float(self.interpolation([N, x]))  # Garantir que phi_value é escalar
        eqn_1 = (self.A1 / self.Lc) * ((phi_value * self.P1) - y) * 1e3
        eqn_2 = (self.C**2) / 2 * (x - alpha * self.kv * np.sqrt(y * 1000 - self.P_out * 1000))
        return [eqn_1, eqn_2]


    def run(self):
        lut = self.interpolation
        # Condições iniciais
        result = fsolve(self.fun, (10, 10), args=(self.alphas[0],self.N_RotS[0]))
        print(result)
        init_m, init_p = result

        # Variáveis CasADi
        x = ca.MX.sym('x', 2)
        p = ca.MX.sym('p', 2)  # Parâmetros (alpha e N)
        alpha, N = p[0], p[1]  # Divisão dos parâmetros

        # Solução Numérica
        tm1 = time.time()
        for i in range(self.nAlphas):
            alpha_value = self.alphas[i] + np.random.normal(0, self.perturb, self.nData)
            N_value = self.N_RotS[i] + np.random.normal(0, 50, self.nData)
            self.alpha_values.append(alpha_value)
            self.N_values.append(N_value)

            rhs = ca.vertcat((self.A1 / self.Lc) * ((lut(ca.vertcat(N, x[0])) * self.P1) - x[1]) * 1e3,
                             (self.C**2) / 2 * (x[0] - alpha * self.kv * np.sqrt(x[1] * 1000 - self.P_out * 1000)))
            
            ode = {'x': x, 'ode': rhs, 'p': p}

            F = ca.integrator('F', 'cvodes', ode, self.interval[0][0],self.dt)

            for j in range(self.nData):
                params = [alpha_value[j], N_value[j]]
                sol = F(x0=[init_m, init_p], p=params)
                xf_values = np.array(sol["xf"])
                aux1, aux2 = xf_values
                self.massFlowrate.append(aux1)
                self.PlenumPressure.append(aux2)
                self.Phi_values.append(lut(ca.vertcat(N_value[j], aux1)))
                init_m = aux1[-1]
                init_p = aux2[-1]
                self.RNN_train.append([aux1[0], aux2[0], alpha_value[j], N_value[j]])
                self.RNN_trainFut.append([aux1[0], aux2[0], alpha_value[j], N_value[j]])

        tm2 = time.time()
        self.time = tm2-tm1
        self.massFlowrate = np.reshape(self.massFlowrate, [self.nAlphas, self.nData])
        self.PlenumPressure = np.reshape(self.PlenumPressure, [self.nAlphas, self.nData])
        self.Phi_values = np.reshape(self.Phi_values, [self.nAlphas, self.nData])


##### Modelo de Rede Neural

In [22]:
class MyModel(nn.Module):
    def __init__(self, units, A1, Lc, kv, P1, P_out, C, dt, x_min, x_max, interpolation):
        self.A1 = A1
        self.Lc = Lc
        self.kv = kv
        self.P1 = P1
        self.P_out = P_out
        self.C = C
        self.dt = dt
        self.x_min = x_min
        self.x_max = x_max
        self.interpolation = interpolation
        super(MyModel, self).__init__()
        
        # Camada de entrada
        self.input_layer = nn.Linear(3, 4)
        
        # Camadas RNN
        self.rnn_layers = nn.ModuleList([
            nn.LSTM(input_size=4, hidden_size=units, batch_first=True, bidirectional=True, bias= True)
        ])
        self.rnn_layers.append(nn.LSTM(input_size=units*2, hidden_size=units, batch_first=True, bias= True, bidirectional=True))
        self.rnn_layers.append(nn.RNN(input_size=units*2, hidden_size=units, batch_first=True, bias= True))
        
        
        # Camada densa
        self.dense = nn.Linear(units, 2)

        self._initialize_weights()

    def _initialize_weights(self):
        # Inicializador Xavier (Glorot) nos pesos das camadas RNN
        for rnn_layer in self.rnn_layers:
            for name, param in rnn_layer.named_parameters():
                if 'weight' in name:
                    nn.init.xavier_uniform_(param)
                elif 'bias' in name:
                    nn.init.zeros_(param)

        nn.init.xavier_uniform_(self.dense.weight)
        nn.init.zeros_(self.dense.bias)

    def forward(self, inputs):
        # Passagem pelas camadas RNN
        rnn_output = 2 * (inputs - self.x_min) / (self.x_max - self.x_min) - 1
        for rnn_layer in self.rnn_layers:
            rnn_output, _ = rnn_layer(rnn_output)
            rnn_output = torch.tanh(rnn_output)  # Aplicando tanh explicitamente após cada camada RNN
         # Pegando apenas a última saída da sequência e desnormalizando
        dense_output = self.dense(rnn_output[:, -1, :])  # Dimensão [batch_size, hidden_size * num_directions]
        desnormalizado = ((dense_output + 1) / 2) * (self.x_max[:, :, :2] - self.x_min[:, :, :2]) + self.x_min[:, :, :2]
          # Pegando apenas a última saída da sequência
        return desnormalizado
    
    def loss_custom(self, y_true, y_pred, inputs):
        # Implementação da função de perda
        interp = []
        for i in range (0, inputs.shape[0]):
            aux = self.interpolation([(inputs[i, -1, -1]).item(), (y_pred[0, i, 0]).item()])
            interp.append(aux)
        interp = np.array(interp)
        interp = torch.tensor(interp, dtype=torch.float32)

        m_t = (11* y_pred[:, :, 0] - 18 *inputs[:, -2, 0] + 9 * inputs[:, -3, 0] - 2 * inputs[:, 0, 0])/(6*self.dt)
        p_t = (11* y_pred[:, :, 1] - 18 *inputs[:, -2, 1] + 9 * inputs[:, -2, 1] - 2 * inputs[:, -2, 1])/(6*self.dt)
        
        fLoss_mass = torch.mean(torch.square(m_t - (self.A1/self.Lc)*(( interp * self.P1) - y_pred[:, :, 1]) * 1e3))
        fLoss_pres = torch.mean(torch.square(p_t - (self.C**2)/2 * (y_pred[:, :, 0] - inputs[:, -1, -2]* self.kv * torch.sqrt((torch.abs(y_pred[:, :, 1] * 1000 - self.P_out * 1000))))))
        
        phys_loss = fLoss_mass + fLoss_pres
        data_loss =  torch.mean((y_true[:, 0, 0] - y_pred[:, :, 0]) ** 2) + torch.mean((y_true[:, 0, 1] - y_pred[:, :, 1]) ** 2)
        
        return data_loss +  0*phys_loss

    def train_model(self, model, train_loader, lr, epochs, optimizers):
        optimizer = optimizers(model.parameters(), lr=lr)
        model.train()
        
        for epoch in range(epochs):
            total_loss = 0
            for inputs, y_true in train_loader:
                optimizer.zero_grad()
                
                y_pred = model(inputs)
                loss = self.loss_custom(y_true, y_pred, inputs)
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
            print(f"Epoch [{epoch+1}/{epochs}], Loss: {total_loss / len(train_loader)}")
            
    def test_model(self, x_test, interval, model):
        model.eval()
        massFlowrate100 = [x_test[0, 0, 0].item(), x_test[0, 1, 0].item(), x_test[0, 2, 0].item()]
        PlenumPressure100 = [x_test[0, 0, 1].item(), x_test[0, 1, 1].item(), x_test[0, 2, 1].item()]

        # Preparar o tensor inicial fora do loop
        input_tensor = torch.zeros((1, 3, 4), dtype=torch.float32)

        # Loop de previsões
        tm1 = time.time()
        for i in range(len(interval)):
            # Atualizar os valores do tensor diretamente
            input_tensor[0, :, 0] = torch.tensor(massFlowrate100[-3:])
            input_tensor[0, :, 1] = torch.tensor(PlenumPressure100[-3:])
            input_tensor[0, :, 2] = x_test[i, :, 2]
            input_tensor[0, :, 3] = x_test[i, :, 3]
            print(input_tensor.shape)
            # Previsão com desativação do gradiente
            with torch.no_grad():
                prediction100 = model(input_tensor)

            # Adicionar previsões diretamente
            massFlowrate100.append(prediction100[0, 0, 0].item())
            PlenumPressure100.append(prediction100[0, 0, 1].item())
        tm2 = time.time()
        timeteste = tm2 - tm1
        model.train()
        return massFlowrate100, PlenumPressure100, timeteste

#### Constantes e Variáveis Auxiliares

In [23]:
np.random.seed(42)
print(np.random.seed)

# Constantes
A1 = (2.6)*(10**-3)
Lc = 2
kv = 0.38
P1 = 4.5
P_out = 5
C = 479

timestep = 3 # Passos no passado para prever o próximo
nAlphas = 7 # Número de vezes que o Alfa irá mudar, considere o treino e os testes.
alphas = np.random.uniform(0.35,0.65, nAlphas+1) # Abertura da válvula
N_RotS = np.random.uniform(27e3, 5e4, nAlphas+1)
epochs = 1000
nData = 1000
nDataTeste = nData//nAlphas
perturb = 1e-4
tempo = 100
dt = 0.1 # Tempo amostral

# Variáveis auxiliares
interval = [np.linspace(i * tempo, (i + 1) * tempo, nData) for i in range(nAlphas)]
interval_test = [np.linspace(i * tempo, (i + 1) * tempo, nDataTeste) for i in range(nAlphas)]
massFlowrate = []
PlenumPressure = []
alpha_values = []
RNN_train = []
RNN_trainFut = []

<built-in function seed>


### Solução Numérica

##### Interpolador

##### Cálculo da Solução

In [24]:
lut = Interpolation('/home/matheus/Documents/IC/IC_git/UFBA/Sistema_Compressao/tabela_phi.csv')
lut.load_data()
interpolation = lut.interpolate()
# Crie uma instância da classe Simulation
sim = Simulation(A1, Lc, kv, P1, P_out, C, alphas, N_RotS, nAlphas, nData, perturb, tempo, dt, interpolation)
# Execute a simulação
sim.run()

RNN_train = sim.RNN_train
RNN_trainFut = sim.RNN_trainFut
massFlowrate = sim.massFlowrate
PlenumPressure = sim.PlenumPressure
alpha_values = sim.alpha_values
phi_values = sim.Phi_values

print(sim.time)

[7.84122136 6.99175493]
2.4118106365203857


##### Gráfico do Modelo

In [25]:
fig = make_subplots(rows=2, cols=2, subplot_titles=("Vazão vs Tempo", "Pressão vs Tempo", "Alpha vs Tempo", "Phi vs Tempo"))

for i in range(0, nAlphas):
    # Vazão
    fig.add_trace(go.Scatter(x=interval[i], y=np.squeeze(massFlowrate[i]), mode='lines',
                             name='Vazão', legendgroup='massflow', showlegend=i == 0), row = 1, col = 1)
    # Pressão
    fig.add_trace(go.Scatter(x=interval[i], y=np.squeeze(PlenumPressure[i]), mode='lines',
                             name='Pressão', legendgroup='pressure', showlegend=i == 0), row = 1, col = 2)
    # Alphas
    fig.add_trace(go.Scatter(x=interval[i], y=np.squeeze(alpha_values[i]), mode='lines', 
                             name='Alphas', line=dict(dash='dash'), legendgroup='alpha', showlegend=i == 0), row = 2, col = 1)
    # Phi
    fig.add_trace(go.Scatter(x=interval[i], y=np.squeeze(phi_values[i]), mode='lines', 
                             name='Alphas', line=dict(dash='dash'), legendgroup='alpha', showlegend=i == 0), row = 2, col = 2)

# Atualiza layout
fig.update_layout(
    xaxis_title='Tempo',
    grid=dict(rows=1, columns=3),
    template='plotly',
    showlegend=False,
    height = 600
)

# Mostra a figura
fig.show()


### Rede Neural

##### Dados de Treino

In [26]:
RNN_train = np.array(RNN_train)


X_train = []
y_train = []

for i in range(len(RNN_train) - timestep):
    X_train.append(RNN_train[i:i + timestep])  
    if i + timestep < len(RNN_train):           
        y_train.append(RNN_train[i + timestep, :2])  

X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)

x_min = X_train.amin(dim=(0, 1), keepdim=True)
x_max = X_train.amax(dim=(0, 1), keepdim=True)
print(x_min)

y_train = y_train.unsqueeze(1)

tensor([[[4.0480e+00, 5.5509e+00, 3.6715e-01, 2.7328e+04]]])


##### Rede

##### Treinamento

In [27]:
model = MyModel(50, A1, Lc, kv, P1, P_out, C, dt, x_min, x_max, interpolation)

train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size= 32, shuffle=True)
Adam = optim.Adam
model.train_model(model, train_loader, 1e-5,epochs, Adam)

Epoch [1/1000], Loss: 7.355125775620273
Epoch [2/1000], Loss: 4.479731196137868
Epoch [3/1000], Loss: 1.9772710683138948
Epoch [4/1000], Loss: 0.6862575369610634
Epoch [5/1000], Loss: 0.31012041091102444
Epoch [6/1000], Loss: 0.19248400063819537
Epoch [7/1000], Loss: 0.13489362770972185
Epoch [8/1000], Loss: 0.09779682964698909
Epoch [9/1000], Loss: 0.07139345208293499
Epoch [10/1000], Loss: 0.053221925574282535
Epoch [11/1000], Loss: 0.041620093823057605
Epoch [12/1000], Loss: 0.034562162072589137
Epoch [13/1000], Loss: 0.030225958751693163
Epoch [14/1000], Loss: 0.02719238257996704
Epoch [15/1000], Loss: 0.024870379394149943
Epoch [16/1000], Loss: 0.022614746039826848
Epoch [17/1000], Loss: 0.020721709540700666
Epoch [18/1000], Loss: 0.018746870036181657
Epoch [19/1000], Loss: 0.016931540481576093
Epoch [20/1000], Loss: 0.01524738905330499
Epoch [21/1000], Loss: 0.013647845033003383
Epoch [22/1000], Loss: 0.01218082317253192
Epoch [23/1000], Loss: 0.010760352437567289
Epoch [24/1000]

##### Dados de teste

In [32]:
massFlowrateTeste = []
PlenumPressureTeste = []
RNN_test = []
x_test = []
alpha_valuesTeste = []
aux1 = []
aux2 = []
interval3 = np.linspace(0, tempo*nAlphas, len(train_dataset))
alphasTeste = np.random.uniform(0.35,0.65, nAlphas) # Abertura da válvula
N_RotSTeste = np.random.uniform(27e3, 5e4, nAlphas+1)

sim = Simulation(A1, Lc, kv, P1, P_out, C, alphasTeste,N_RotSTeste, nAlphas, nData, perturb, tempo, dt, interpolation)

sim.run()

RNN_test = sim.RNN_train
massFlowrateTeste = sim.massFlowrate
PlenumPressureTeste = sim.PlenumPressure
alpha_valuesTeste = sim.alpha_values
print(sim.time)

RNN_test = np.array(RNN_test)

for i in range(len(RNN_test) - 3):
    x_test.append(RNN_test[i:i + 3])

x_test = torch.tensor(x_test, dtype=torch.float32)

massFlowrateTeste = np.array(massFlowrateTeste)
PlenumPressureTeste = np.array(PlenumPressureTeste)

[4.28211581 5.78610182]
2.176377773284912


##### Gráfico Rede Neural

In [33]:
fig = make_subplots(rows=1, cols=2, subplot_titles=("Mass Flow Rate vs Time", "Plenum Pressure vs Time"))

tm1 = time.time()
# Colocando o modelo em modo de avaliação

# Supondo que x_test já esteja definido e seja um tensor PyTorch
with torch.no_grad():  
    prediction = model(x_test)

mass = prediction[:, :, 0]
pressure = prediction[:, :, 1]

mass = mass.detach().numpy()
pressure = pressure.detach().numpy()

tm2 = time.time()
print(tm2-tm1)

fig.add_trace(go.Scatter(x=interval3,y=np.squeeze(mass),mode='lines',
                                line=dict(dash='solid')),row=1, col=1)
fig.add_trace(go.Scatter(x=interval3,y=np.squeeze(pressure),mode='lines',
                                line=dict(dash='solid')),row=1, col=2)


for i in range(nAlphas):
    # Modelo
    fig.add_trace(go.Scatter(x=np.squeeze(interval_test[i]), y=np.squeeze(massFlowrateTeste[i]), mode='lines',name='Model Mass Flow Rate', line=dict(dash='dash', color='red')),
                  row=1, col=1)
    fig.add_trace(go.Scatter(x=np.squeeze(interval_test[i]), y=np.squeeze(PlenumPressureTeste[i]), mode='lines', name= 'Model Plenum Pressure', line=dict(dash='dash', color='red')),
                  row=1, col=2)

fig.update_layout(
    title='Resultados Rede Neural',
    xaxis_title='Time',
    yaxis_title='Value',
    template='plotly',
    showlegend=False
)
fig.show()


0.1270432472229004


In [34]:
massFlowrate100, PlenumPressure100,time2 = model.test_model(x_test, interval3, model)
print(time2)

torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size([1, 3, 4])
torch.Size

In [35]:
fig3 = make_subplots(rows=1, cols=2, subplot_titles=("Mass Flow Rate vs Time", "Plenum Pressure vs Time"))

fig3.add_trace(go.Scatter(x=interval3,y=np.squeeze(massFlowrate100),mode='lines',
                                line=dict(dash='solid')),row=1, col=1)
fig3.add_trace(go.Scatter(x=interval3,y=np.squeeze(PlenumPressure100),mode='lines',
                                line=dict(dash='solid')),row=1, col=2)

for i in range(nAlphas):
    # Modelo
    fig3.add_trace(go.Scatter(x=np.squeeze(interval_test[i]), y=np.squeeze(massFlowrateTeste[i]), mode='lines',name='Model Mass Flow Rate', line=dict(dash='dash', color='red')),
                  row=1, col=1)
    fig3.add_trace(go.Scatter(x=np.squeeze(interval_test[i]), y=np.squeeze(PlenumPressureTeste[i]), mode='lines', name= 'Model Plenum Pressure', line=dict(dash='dash', color='red')),
                  row=1, col=2)

fig3.update_layout(
    title='Resultados Rede Neural',
    xaxis_title='Time',
    yaxis_title='Value',
    template='plotly',
    showlegend=False
)
fig3.show()