<a href="https://colab.research.google.com/github/ArtyomShabunin/PINNModels/blob/main/experiment_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Эксперимент №1
Разработка PINN для моделирования газотурбинной установки

In [141]:
import torch
import torch.nn as nn
import torch.autograd.functional as F

### 0. CompressorMapInterp — таблично заданная карта по β и скорости вращения

In [142]:
class CompressorMapInterp(nn.Module):
    def __init__(self, betas, speeds, table_phi, table_pi_c, table_eta):
        super().__init__()

        # Сетка значений beta и speed_rel
        self.register_buffer('betas', torch.tensor(betas, dtype=torch.float32))  # β
        self.register_buffer('speeds', torch.tensor(speeds, dtype=torch.float32))  # относит. скорости

        # Таблицы: (n_beta, n_speed)
        self.register_buffer('table_phi', torch.tensor(table_phi))

        self.register_buffer('table_pi_c', torch.tensor(table_pi_c))

        self.register_buffer('table_eta', torch.tensor(table_eta))

    def forward(self, beta, speed_rel):
        """
        :param beta: (batch,) значение бета — непрерывное
        :param speed_rel: (batch,) относительная приведенная скорость вращения
        :return: pi_c, phi, eta — (batch,)
        """

        # Индексы по β
        beta_low_idx = torch.clamp(torch.searchsorted(self.betas, beta, right=False) - 1, 0, len(self.betas) - 2)
        beta_high_idx = beta_low_idx + 1

        beta0 = self.betas[beta_low_idx]
        beta1 = self.betas[beta_high_idx]
        w_beta = (beta - beta0) / (beta1 - beta0 + 1e-6)

        # Индексы по скорости
        speed_low_idx = torch.clamp(torch.searchsorted(self.speeds, speed_rel, right=False) - 1, 0, len(self.speeds) - 2)
        speed_high_idx = speed_low_idx + 1

        speed0 = self.speeds[speed_low_idx]
        speed1 = self.speeds[speed_high_idx]
        w_speed = (speed_rel - speed0) / (speed1 - speed0 + 1e-6)

        def bilinear_interp(table):
            # Получаем значения по индексам
            val00 = table[beta_low_idx, speed_low_idx]
            val01 = table[beta_low_idx, speed_high_idx]
            val10 = table[beta_high_idx, speed_low_idx]
            val11 = table[beta_high_idx, speed_high_idx]

            # Интерполяция по скорости
            val0 = val00 + w_speed * (val01 - val00)
            val1 = val10 + w_speed * (val11 - val10)

            # Интерполяция по β
            return val0 + w_beta * (val1 - val0)

        phi = bilinear_interp(self.table_phi)
        pi_c = bilinear_interp(self.table_pi_c)
        eta = bilinear_interp(self.table_eta)

        return pi_c, phi, eta

### 1. CompressorMapNetBeta — обучаемая карта по β и скорости вращения

In [143]:
class CompressorMapNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(2, 64),
            nn.Tanh(),
            nn.Linear(64, 64),
            nn.Tanh(),
            nn.Linear(64, 3)  # pi_c, m_dot_rel, eta
        )

    def forward(self, beta, speed_rel):
        x = torch.stack([beta, speed_rel], dim=-1)
        out = self.net(x)
        pi_c = out[..., 0].clamp(min=1.01, max=10.0)
        m_dot_rel = out[..., 1].clamp(min=1e-4)
        eta = out[..., 2].clamp(min=0.1, max=1.0)
        return pi_c, m_dot_rel, eta

### 2. Класс `Compressor`

In [162]:
class Compressor(nn.Module):
    def __init__(self, map):
        super().__init__()
        self.map = map
        self.N_T_design = 9547 # приведенная скорость вращения, об/мин
        self.cp = 1005.0
        self.R = 287.0
        self.A_ref = nn.Parameter(torch.tensor(1.0))  # может быть обучаемой

    def forward(self, p_in, T_in, p_out, omega, n_iter=10):

        # Относительная приведенная скорость вращения
        N_T = 100*omega/torch.sqrt(T_in/290)/self.N_T_design

        # Целевая степень повышения давления
        pi_c_target = (p_out / p_in).clamp(min=1.01)

        # Инициализация β как параметр, подлежащий оптимизации
        beta = torch.full_like(p_in, 4, requires_grad=True)

        # Оптимизация β через градиентный спуск (можно заменить на nn.Parameter + оптимизатор)
        for _ in range(n_iter):
            pi_c_pred, _, _ = self.map(beta, N_T)
            loss = (pi_c_pred - pi_c_target).pow(2).mean()
            grad = torch.autograd.grad(loss, beta, create_graph=True)[0]
            beta = (beta - 1 * grad).detach().clamp(min=0.01).requires_grad_()

        # Финальные значения по оптимизированному β
        pi_c, phic, eta = self.map(beta, N_T)

        # Расход воздуха
        w = phic*p_in/1.013e5/torch.sqrt(T_in/290)

        rho_in = p_in / (self.R * T_in)

        T_out = T_in * (1 + (pi_c ** ((1.4 - 1) / 1.4) - 1) / eta)

        delta_h = self.cp * (T_out - T_in)
        torque = (w * delta_h) / (omega * 2 * torch.pi * 60 + 1e-6)

        return {
            'w': w,
            'T_out': T_out,
            'torque': torque,
            'beta': beta.detach(),
            'pi_c': pi_c,
            'eta': eta
        }

In [145]:
# # Кастомный слой для ограничения параметра в [min_val, max_val]
# class ScaledSigmoid(nn.Module):
#     def __init__(self, min_val, max_val):
#         super().__init__()
#         self.min_val = min_val
#         self.max_val = max_val

#     def forward(self, x):
#         return self.min_val + (self.max_val - self.min_val) * torch.sigmoid(x)

# class CombustionChamber(nn.Module):
#     def __init__(self, LHV=50e6, gamma=1.4, R=287.0):
#         super().__init__()
#         self.LHV = LHV
#         self.gamma = gamma
#         self.R = R

#         # Параметры без ограничений
#         self.raw_eta_comb = nn.Parameter(torch.tensor([0.0]))
#         self.raw_k_loss = nn.Parameter(torch.tensor([0.0]))

#         # Слои масштабирования sigmoid → [a, b]
#         self.scale_eta = ScaledSigmoid(0.8, 1.0)
#         self.scale_k_loss = ScaledSigmoid(0.0, 0.5)

#     def forward(self, p_in, p_out, T_in, fuel_flow):
#         eta_comb = self.scale_eta(self.raw_eta_comb)
#         k_loss = self.scale_k_loss(self.raw_k_loss)

#         # Учёт аэродинамического сопротивления
#         delta_p = k_loss * p_in.clamp(min=1e-3)
#         p_chamber = p_in - delta_p

#         # Расход воздуха по уравнению изохорного баланса
#         air_flow = (fuel_flow * self.LHV * eta_comb) / (self.R * T_in * (self.gamma / (self.gamma - 1)))

#         # Температура газов на выходе
#         Q_total = fuel_flow * self.LHV * eta_comb
#         cp_gas = self.gamma * self.R / (self.gamma - 1)
#         m_total = air_flow + fuel_flow
#         T_out = T_in + Q_total / (m_total * cp_gas)

#         return {
#             'air_flow': air_flow,
#             'gas_flow': m_total,
#             'T_out': T_out,
#             'p_out': p_out,
#             'eta_comb': eta_comb,
#             'k_loss': k_loss
#         }


In [146]:
class PressureTemperatureBC(nn.Module):
    def __init__(self, p_val, T_val):
        super().__init__()
        self.p = nn.Parameter(torch.tensor(p_val))
        self.T = nn.Parameter(torch.tensor(T_val))

    def forward(self):
        return self.p, self.T


class PressureBC(nn.Module):
    def __init__(self, p_val):
        super().__init__()
        self.p = nn.Parameter(torch.tensor(p_val))

    def forward(self):
        return self.p


class SpeedBC(nn.Module):
    def __init__(self, speed_val):
        super().__init__()
        self.speed_rel = nn.Parameter(torch.tensor(speed_val))

    def forward(self):
        return self.speed_rel

In [188]:
class MassFlowTemperatureBC(nn.Module):
    def __init__(self, m_flow, T):
        super().__init__()
        self.m_flow = nn.Parameter(torch.tensor(m_flow))
        self.T = nn.Parameter(torch.tensor(T))

    def forward(self):
      return self.m_flow, self.T

In [219]:
class FlowResistance(nn.Module):
    def __init__(self, resistance_coef=1.0):
        super().__init__()
        self.K = nn.Parameter(torch.tensor(resistance_coef))

    def forward(self, p_in, p_out):
        m_flow = torch.sqrt((p_in-p_out).clamp(min=0)/self.K)
        return m_flow

In [202]:
class VolumeNode(nn.Module):
    def __init__(self,
                 volume=0.1, dt=0.01, R=287.0, cp=1005.0,
                 start_p=1e5, start_T=300):
        super().__init__()
        self.volume = volume  # м³ — объём узла
        self.dt = dt  # временной шаг
        self.R = R
        self.cp = cp

        # Инициализируем давление и температуру в узле (будут обновляться)
        self.register_buffer('p', torch.tensor(start_p))  # Па
        self.register_buffer('T', torch.tensor(start_T))  # К

    def forward(self, m_in, T_in, m_out):
        """
        m_in: подача воздуха в узел (кг/с)
        T_in: температура входящего воздуха (К)
        m_out: расход из узла (кг/с)

        Возвращает: давление и температура в узле на текущем шаге
        """
        # Текущая масса в узле
        m_curr = (self.p * self.volume) / (self.R * self.T)

        # Массовый и энергетический балансы
        m_new = m_curr + self.dt * (m_in - m_out)
        e_curr = m_curr * self.cp * self.T
        e_in = m_in * self.cp * T_in
        e_out = m_out * self.cp * self.T
        e_new = e_curr + self.dt * (e_in - e_out)

        # Обновляем состояние
        self.T = (e_new / (m_new * self.cp)).clamp(min=100.0, max=3000.0)
        self.p = (m_new * self.R * self.T / self.volume).clamp(min=1e3, max=1e7)

        return self.p, self.T

In [310]:
# Кастомный слой для ограничения параметра в [min_val, max_val]
class ScaledSigmoid(nn.Module):
    def __init__(self, min_val, max_val):
        super().__init__()
        self.min_val = min_val
        self.max_val = max_val

    def forward(self, x):
        return self.min_val + (self.max_val - self.min_val) * torch.sigmoid(x)

class CombustionChamber(nn.Module):
    def __init__(self,
                 volume=0.1, dt=0.01,
                 LHV=50e6, R=287.0, cp=1005.0,
                 start_p = 1e5, start_T = 300):
        super().__init__()
        self.volume = volume  # м³ — объём камеры сгорания
        self.dt = dt  # временной шаг
        self.LHV = LHV
        self.R = R
        self.cp = cp

        # Инициализируем давление и температуру в узле (будут обновляться)
        self.register_buffer('p', torch.tensor(start_p))  # Па
        self.register_buffer('T', torch.tensor(start_T))  # К

        # Параметры без ограничений
        self.raw_eta_comb = nn.Parameter(torch.tensor([0.0]))
        # self.raw_k_loss = nn.Parameter(torch.tensor([0.0]))

        # Слои масштабирования sigmoid → [a, b]
        self.scale_eta = ScaledSigmoid(0.8, 1.0)
        # self.scale_k_loss = ScaledSigmoid(0.0, 0.5)

    def forward(self, T_air, air_flow, fuel_flow, flue_gas_flow):

        # Текущая масса в узле
        m_curr = (self.p * self.volume) / (self.R * self.T)

        H_air = air_flow * self.cp * T_air
        H_fuel = fuel_flow * self.LHV
        H_flue_gas = flue_gas_flow * self.cp * self.T

        # Массовый и энергетический балансы
        m_new = m_curr + self.dt * (air_flow + fuel_flow - flue_gas_flow)
        e_curr = m_curr * self.cp * self.T
        e_new = e_curr + self.dt * (H_air + H_fuel - H_flue_gas)

        # Обновляем состояние
        self.T = (e_new / (m_new * self.cp)).clamp(min=100.0, max=3000.0)
        self.p = (m_new * self.R * self.T / self.volume).clamp(min=1e3, max=1e7)

        return self.p, self.T


In [203]:
# import torch
# import torch.nn as nn

# class GTUModel(nn.Module):
#     def __init__(self):
#         super().__init__()
#         # Граничные условия
#         self.bc_inlet = PressureTemperatureBC(1e5, 288.15)  # p, T
#         self.bc_comb_out = PressureBC(2e5)                  # p
#         self.bc_speed = SpeedBC(1.0)                        # относительная скорость

#         # Компоненты
#         self.volume_node = VolumeNode(volume=0.1, dt=0.01)
#         self.compressor = Compressor(CompressorMapNet)                     # с CompressorMapNetBeta
#         self.combustion_chamber = CombustionChamber()      # с тепловым балансом и сопротивлением

#     def forward_iteration(self, fuel_flow):
#         # Граничные условия
#         p_inlet, T_inlet = self.bc_inlet()
#         p_comb_out = self.bc_comb_out()
#         speed_rel = self.bc_speed()

#         # Текущее состояние узла
#         p_node, T_node = self.volume_node.p, self.volume_node.T

#         # Компрессор
#         comp_out = self.compressor(p_inlet, T_inlet, p_node, speed_rel)
#         m_air_in = comp_out['w']
#         T_air_out = comp_out['T_out']

#         # Камера сгорания
#         comb_out = self.combustion_chamber(p_node, p_comb_out, T_node, fuel_flow)
#         m_air_out = comb_out['air_flow']

#         # Обновление состояния узла
#         p_node_new, T_node_new = self.volume_node(m_air_in, T_air_out, m_air_out)

#         return {
#             'compressor': comp_out,
#             'combustion_chamber': comb_out,
#             'volume_node': {'p': p_node_new, 'T': T_node_new}
#         }

#     def forward(self, fuel_flow, max_iters=100, tol=1e-3):
#         # Копии начального состояния (градиенты не нужны)
#         prev_p = self.volume_node.p.detach().clone()
#         prev_T = self.volume_node.T.detach().clone()

#         for i in range(max_iters - 1):
#             # Важно: не используем torch.no_grad()
#             _ = self.forward_iteration(fuel_flow)

#             dp = (self.volume_node.p - prev_p).abs()
#             dT = (self.volume_node.T - prev_T).abs()

#             if dp < tol and dT < tol:
#                 break

#             prev_p = self.volume_node.p.detach().clone()
#             prev_T = self.volume_node.T.detach().clone()

#             # print(f'Невязка по давлению: {self.volume_node.p - prev_p}')
#             # print(f'Невязка по температуре: {self.volume_node.T - prev_T}')

#         # print(f'Невязка по давлению: {self.volume_node.p - prev_p}')
#         # print(f'Невязка по температуре: {self.volume_node.T - prev_T}')

#         # Последняя итерация — с сохранением графа
#         return self.forward_iteration(fuel_flow)


In [204]:
# model = GTUModel()

# # Топливо — входной параметр, требующий градиента
# fuel_flow = torch.tensor([0.5], requires_grad=True)

# # Прямой проход
# output = model(fuel_flow)

# # Пример целевой функции: хотим T_out = 1400 K
# target_T = torch.tensor([1400.0])
# loss = (output["combustion_chamber"]["T_out"] - target_T).pow(2).mean()

# # Обратный проход
# loss.backward()

# # Проверим градиенты
# print(fuel_flow.grad)  # dL/dfuel_flow
# # print(model.combustion_chamber.raw_eta_comb.grad)  # dL/deta

In [205]:
# output

### Тестируем модель компрессора

In [220]:
betas = [1,2,3,4,5,6,7,8]
speeds = [87.92, 95.24, 102.57, 109.9]
table_phi = [
            [25.33, 40, 60, 70],
            [35, 48.48, 69, 79],
            [42, 58, 74.64, 83],
            [45.33, 62, 76.5, 84.37],
            [52, 66.33, 77.7, 85],
            [56.5, 68.5, 78.8, 85.36],
            [60, 70, 79.82, 86.06],
            [61.51, 70.88, 80.87, 86.66]
        ]
table_pi_c = [
            [5, 5.3, 5.8, 6],
            [4.6, 5, 5.4, 5.6],
            [4.2, 4.6, 5, 5.3],
            [4, 4.4, 4.7, 5],
            [3.6, 4, 4.3, 4.5],
            [3.2, 3.5, 3.8, 4],
            [2.5, 2.7, 2.95, 3.1],
            [1.7, 1.9, 2, 2.1]
        ]
table_eta = [
            [0.746, 0.73, 0.7, 0.68],
            [0.8, 0.836, 0.87, 0.85],
            [0.835, 0.89, 0.924, 0.88],
            [0.85, 0.91, 0.92, 0.886],
            [0.875, 0.913, 0.9, 0.88],
            [0.872, 0.873, 0.85, 0.8],
            [0.8, 0.77, 0.741, 0.72],
            [0.654, 0.63, 0.605, 0.592]
        ]

In [221]:
cmap = CompressorMapInterp(betas, speeds, table_phi, table_pi_c, table_eta)
cmap.forward(torch.tensor([4]), torch.tensor([95.24]))

(tensor([4.4000]), tensor([62.0000]), tensor([0.9100]))

In [222]:
compressor = Compressor(CompressorMapInterp(betas, speeds, table_phi, table_pi_c, table_eta))

In [209]:
compressor.forward(
    p_in=torch.tensor([1e5]),
    T_in=torch.tensor([20+273.15]),
    p_out=torch.tensor([4e5]),
    omega=torch.tensor([9547]))

{'w': tensor([72.0758], grad_fn=<DivBackward0>),
 'T_out': tensor([453.3205], grad_fn=<MulBackward0>),
 'torque': tensor([3.2236], grad_fn=<DivBackward0>),
 'beta': tensor([5.3413]),
 'pi_c': tensor([4.0021], grad_fn=<AddBackward0>),
 'eta': tensor([0.8899], grad_fn=<AddBackward0>)}

### Тестируем модель узла

In [291]:
class VolumeSystem(nn.Module):
    def __init__(self):
        super().__init__()
        # Граничные условия
        self.bc_inlet = MassFlowTemperatureBC(100.,300.)
        self.bc_outlet = PressureBC(1.0e5)

        # Компоненты
        self.volume_node = VolumeNode(volume=0.1, dt=0.0001, start_p=2.0e5, start_T=500.)
        self.pipe = FlowResistance(resistance_coef=1.0)

    def forward_iteration(self):
        # Граничные условия
        m_inlet, T_inlet = self.bc_inlet()
        p_outlet = self.bc_outlet()

        # Текущее состояние узла
        p_node, T_node = self.volume_node.p, self.volume_node.T

        # Аэродинамическое сопротивление
        m_outlet = self.pipe(p_node, p_outlet)


        # Обновление состояния узла
        p_node_new, T_node_new = self.volume_node(m_inlet, T_inlet, m_outlet)

        return {
            'm_inlet': m_inlet,
            'm_outlet': m_outlet,
            'volume_node': {'p': p_node_new, 'T': T_node_new}
        }


    def forward(self, max_iters=1000, tol=1e-3):
        # Копии начального состояния (градиенты не нужны)
        prev_p = self.volume_node.p.detach().clone()
        prev_T = self.volume_node.T.detach().clone()

        for i in range(max_iters - 1):
            # Важно: не используем torch.no_grad()
            _ = self.forward_iteration()

            dp = (self.volume_node.p - prev_p).abs()
            dT = (self.volume_node.T - prev_T).abs()

            if dp < tol and dT < tol:
                print(f'Итераций выполнено: {i}')
                break

            prev_p = self.volume_node.p.detach().clone()
            prev_T = self.volume_node.T.detach().clone()

        # Последняя итерация — с сохранением графа
        return self.forward_iteration()

In [292]:
model = VolumeSystem()

In [293]:
model()

Итераций выполнено: 132


{'m_inlet': Parameter containing:
 tensor(100., requires_grad=True),
 'm_outlet': tensor(99.9989, grad_fn=<SqrtBackward0>),
 'volume_node': {'p': tensor(109999.8047, grad_fn=<ClampBackward1>),
  'T': tensor(300.0021, grad_fn=<ClampBackward1>)}}

### Тестируем модель камеры сгорания

In [313]:
class CombustionSystem(nn.Module):
    def __init__(self):
        super().__init__()
        # Граничные условия
        self.air_inlet = MassFlowTemperatureBC(100.,300.)
        self.fuel_inlet = MassFlowTemperatureBC(2.,300.)
        self.bc_outlet = PressureBC(1.0e5)

        # Компоненты
        self.combustion_node = CombustionChamber(
            volume=0.1, dt=0.0001, start_p=2.0e5, start_T=500.)
        self.pipe = FlowResistance(resistance_coef=1.0)

    def forward_iteration(self):
        # Граничные условия
        m_air, T_air = self.air_inlet()
        m_fuel, _ = self.fuel_inlet()
        p_outlet = self.bc_outlet()

        # Текущее состояние узла
        p_comb, T_comb = self.combustion_node.p, self.combustion_node.T

        # Аэродинамическое сопротивление
        m_fuel_gas = self.pipe(p_comb, p_outlet)

        # Обновление состояния узла
        p_node_new, T_node_new = self.combustion_node(T_air, m_air, m_fuel, m_fuel_gas)

        return {
            'm_fuel_gas': m_fuel_gas,
            'combustion_node': {'p': p_node_new, 'T': T_node_new}
        }


    def forward(self, max_iters=1000, tol=1e-3):
        # Копии начального состояния (градиенты не нужны)
        prev_p = self.combustion_node.p.detach().clone()
        prev_T = self.combustion_node.T.detach().clone()

        for i in range(max_iters - 1):
            # Важно: не используем torch.no_grad()
            _ = self.forward_iteration()

            dp = (self.combustion_node.p - prev_p).abs()
            dT = (self.combustion_node.T - prev_T).abs()

            if dp < tol and dT < tol:
                print(f'Итераций выполнено: {i}')
                break

            prev_p = self.combustion_node.p.detach().clone()
            prev_T = self.combustion_node.T.detach().clone()

        # Последняя итерация — с сохранением графа
        return self.forward_iteration()

In [314]:
model = CombustionSystem()
model()

Итераций выполнено: 36


{'m_fuel_gas': tensor(102.0002, grad_fn=<SqrtBackward0>),
 'combustion_node': {'p': tensor(110404., grad_fn=<ClampBackward1>),
  'T': tensor(1269.6313, grad_fn=<ClampBackward1>)}}