# Pipeline de Correção de Malha com CFD + ML + PINN
Estudo de caso Lid-Driven Cavity: [Doc OpenFoam Lid-Driven Cavity](https://doc.cfd.direct/openfoam/user-guide-v8/cavity)

O projeto consiste em um estudo de pesquisa sobre a relevância de se aplicar técnicas de Physics-Informed Neural Networks (PINNs) para otimizar o processo de refinamento de malha de simulações CFD.

In [9]:
# Inicializando o ambiente
import numpy as np
import pandas as pd
from __future__ import annotations
import torch.nn as nn
import torch
from torch.utils.data import DataLoader, TensorDataset
from pathlib import Path
from scipy.spatial import cKDTree

## ETL dos dados obtidos pela Simulação CFD

In [None]:
# -----------------------
# Config
# -----------------------
#CSV_PATH = Path("data/cavity_20/cavity_20_full.csv")  # path para CSV consolidado
CSV_PATH = Path("/home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/data/prepared/cavity80_full_norm.parquet")  # path para CSV referência 80x80
OUT_DIR = Path("/home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/data/prepared")
OUT_DIR.mkdir(parents=True, exist_ok=True)

L = 0.1       # tamanho do domínio (0.1m no cavity tutorial)
U_lid = 1.0   # velocidade da tampa
EPS = 1e-10   # tolerância para identificar contornos

# -----------------------
# Helpers
# -----------------------
def require_cols(df: pd.DataFrame, cols: list[str]):
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise ValueError(f"CSV está sem colunas esperadas: {missing}\nColunas encontradas: {list(df.columns)}")

def classify_regions(df: pd.DataFrame) -> pd.DataFrame:
    """
    Classifica pontos em:
    - moving_lid: y=1 (tampa)
    - walls: y=0 ou x=0 ou x=1 (paredes)
    - interior: resto
    """
    x = df["xN"].to_numpy()
    y = df["yN"].to_numpy()
    y_top = y.max()
    y_bot = y.min()
    x_left = x.min()
    x_right = x.max()
    
    eps_edge = 1e-12

    is_top = np.isclose(y, y_top, atol=eps_edge)
    is_bottom = np.isclose(y, y_bot, atol=eps_edge)
    is_left = np.isclose(x, x_left, atol=eps_edge)
    is_right = np.isclose(x, x_right, atol=eps_edge)

    # opcional: remover cantos da tampa (ajuda a média do u ficar ~1)
    is_corner = (is_top & is_left) | (is_top & is_right)

    region = np.full(len(df), "interior", dtype=object)

    # 1) paredes (sem incluir topo)
    region[is_bottom | is_left | is_right] = "walls"

    # 2) tampa por último, sobrescrevendo (exceto cantos)
    region[is_top & (~is_corner)] = "moving_lid"

    df["region"] = region
    return df

# -----------------------
# Main
# -----------------------
df = pd.read_csv(CSV_PATH)

# Colunas esperadas (formato “full” consolidado)
require_cols(df, ["x", "y", "u", "v", "p"])

# Normalização adimensional
df["xN"] = df["x"] / L
df["yN"] = df["y"] / L
df["uN"] = df["u"] / U_lid
df["vN"] = df["v"] / U_lid

# Classificar regiões (BCs vs interior)
df = classify_regions(df)

# Salvar datasets separados
df_full = df[["xN", "yN", "uN", "vN", "p", "region"]].copy()
df_bc = df_full[df_full["region"].isin(["walls", "moving_lid"])].copy()
df_int = df_full[df_full["region"] == "interior"].copy()

# (Opcional) reduzir interior para collocation points (se o dataset for grande)
# df_int = df_int.sample(n=min(20000, len(df_int)), random_state=42)

out_full = OUT_DIR / "cavity80_full_norm.parquet"
out_bc   = OUT_DIR / "cavity80_bc.parquet"
out_int  = OUT_DIR / "cavity80_interior.parquet"

df_full.to_parquet(out_full, index=False)
df_bc.to_parquet(out_bc, index=False)
df_int.to_parquet(out_int, index=False)

print("OK! Arquivos gerados:")
print("-", out_full)
print("-", out_bc, f"(BC: {len(df_bc)})")
print("-", out_int, f"(Interior: {len(df_int)})")

# Checagem rápida das BCs (sanidade)
# Tampa: u=1, v=0 (aprox)
lid = df_bc[df_bc["region"] == "moving_lid"]
if len(lid) > 0:
    print("\nChecagem tampa (média): uN=", lid["uN"].mean(), " vN=", lid["vN"].mean())

## Pré Treinamento com MLP

In [None]:

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Definição do modelo MLP
class MLP(nn.Module):
    def __init__(self, in_dim=2, out_dim=2, hidden=128, depth=4, act="tanh"):
        super().__init__()
        acts = {
            "tanh": nn.Tanh(),
            "relu": nn.ReLU(),
            "gelu": nn.GELU(),
            "silu": nn.SiLU(),
        }
        a = acts.get(act.lower(), nn.Tanh())

        layers = [nn.Linear(in_dim, hidden), a]
        for _ in range(depth - 1):
            layers += [nn.Linear(hidden, hidden), a]
        layers += [nn.Linear(hidden, out_dim)]
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)

def rmse(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
    return torch.sqrt(torch.mean((a - b) ** 2))

def mae(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
    return torch.mean(torch.abs(a - b))

In [7]:
# Treinamento do modelo MLP para o problema da cavidade
DATA_20 = Path("/home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/data/prepared/cavity20_full_norm.parquet")
OUT_MODEL = Path("/home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/notebook/outputs/models/mlp_uv_cavity20.pt")
OUT_MODEL.parent.mkdir(parents=True, exist_ok=True)

df = pd.read_parquet(DATA_20)

# Treino supervisionado: usa todos os pontos (inclui BC e interior)
X = torch.tensor(df[["xN", "yN"]].to_numpy(), dtype=torch.float32)
Y = torch.tensor(df[["uN", "vN"]].to_numpy(), dtype=torch.float32)

ds = TensorDataset(X, Y)
dl = DataLoader(ds, batch_size=256, shuffle=True)

model = MLP(in_dim=2, out_dim=2, hidden=128, depth=4, act="tanh").to(DEVICE)
opt = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=0.0)

best = float("inf")
for epoch in range(1, 2001):
    model.train()
    loss_acc = 0.0

    for xb, yb in dl:
        xb = xb.to(DEVICE)
        yb = yb.to(DEVICE)

        pred = model(xb)
        loss = torch.mean((pred - yb) ** 2)

        opt.zero_grad()
        loss.backward()
        opt.step()

        loss_acc += loss.item() * xb.size(0)

    loss_epoch = loss_acc / len(ds)

    # “early-ish” print
    if epoch % 200 == 0 or epoch == 1:
        print(f"epoch={epoch:4d} mse={loss_epoch:.6e}")

    # salva melhor
    if loss_epoch < best:
        best = loss_epoch
        torch.save({"state_dict": model.state_dict()}, OUT_MODEL)

print("\nOK! Modelo salvo em:", OUT_MODEL)

epoch=   1 mse=5.569831e-02
epoch= 200 mse=3.943373e-03
epoch= 400 mse=3.041830e-03
epoch= 600 mse=2.441379e-03
epoch= 800 mse=1.415596e-03
epoch=1000 mse=1.694558e-03
epoch=1200 mse=1.648691e-03
epoch=1400 mse=1.246705e-03
epoch=1600 mse=1.229178e-03
epoch=1800 mse=1.214867e-03
epoch=2000 mse=1.083441e-03

OK! Modelo salvo em: /home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/notebook/outputs/models/mlp_uv_cavity20.pt


## Avaliação do Modelo ML

In [None]:
# Avaliação do modelo MLP treinado no dataset 20x20, mas avaliado no 80x80
DATA_80 = Path("/home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/data/prepared/cavity80_full_norm.parquet")
MODEL_PATH = Path("/home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/notebook/outputs/models/mlp_uv_cavity20.pt")

# Ajuste se quiser acelerar:
N_EVAL = None  # None = usa tudo; ou um int tipo 20000/30000


df = pd.read_parquet(DATA_80)

if N_EVAL is not None and len(df) > N_EVAL:
    df = df.sample(n=N_EVAL, random_state=42).reset_index(drop=True)

# --- Tensores para erro (sem grad) ---
X0 = torch.tensor(df[["xN", "yN"]].to_numpy(), dtype=torch.float32, device=DEVICE)
Y = torch.tensor(df[["uN", "vN"]].to_numpy(), dtype=torch.float32, device=DEVICE)

model = MLP(in_dim=2, out_dim=2, hidden=128, depth=4, act="tanh").to(DEVICE)
ckpt = torch.load(MODEL_PATH, map_location=DEVICE)
model.load_state_dict(ckpt["state_dict"])
model.eval()

with torch.no_grad():
    pred = model(X0)
    rmse_uv = rmse(pred, Y).item()
    mae_uv = mae(pred, Y).item()

# --- Divergência via autograd (precisa de grad) ---
X = X0.detach().clone().requires_grad_(True)
pred_g = model(X)
u = pred_g[:, 0:1]
v = pred_g[:, 1:2]

grads_u = torch.autograd.grad(
    u, X, grad_outputs=torch.ones_like(u),
    retain_graph=True, create_graph=False
)[0]
grads_v = torch.autograd.grad(
    v, X, grad_outputs=torch.ones_like(v),
    retain_graph=False, create_graph=False
)[0]

du_dx = grads_u[:, 0:1]
dv_dy = grads_v[:, 1:2]
div = du_dx + dv_dy

div_l2 = torch.sqrt(torch.mean(div**2)).item()

print("=== Avaliação MLP (treinado no 20×20, avaliado no 80×80) ===")
print(f"N pontos avaliados: {len(df)}")
print(f"RMSE(u,v): {rmse_uv:.6e}")
print(f" MAE(u,v): {mae_uv:.6e}")
print(f"L2(div):   {div_l2:.6e}   (quanto menor, mais incompressível)")

=== Avaliação MLP (treinado no 20×20, avaliado no 80×80) ===
N pontos avaliados: 3686
RMSE(u,v): 2.852906e-02
 MAE(u,v): 9.914955e-03
L2(div):   5.531185e-01   (quanto menor, mais incompressível)


## Refinamento de malha Coerse -> Fine

Correção de dataset para utilizar no modelo de refinamento

In [None]:
# Correção via k-d tree (interpolação dos resultados 20x20 para 80x80)
df_c = pd.read_parquet(DATA_20)
df_f = pd.read_parquet(DATA_80)

Pc = df_c[["xN","yN"]].to_numpy()
Pf = df_f[["xN","yN"]].to_numpy()

tree = cKDTree(Pc)
dist, idx = tree.query(Pf, k=4)  # 4 vizinhos

# pesos inverso da distância (IDW)
w = 1.0 / (dist + 1e-12)
w = w / w.sum(axis=1, keepdims=True)

def interp(col):
    vals = df_c[col].to_numpy()[idx]  # (Nfine, k)
    return (w * vals).sum(axis=1)

u_c2f = interp("uN")
v_c2f = interp("vN")
p_c2f = interp("p")

out = pd.DataFrame({
    "xN": df_f["xN"].to_numpy(),
    "yN": df_f["yN"].to_numpy(),
    "u_c": u_c2f, "v_c": v_c2f, "p_c": p_c2f,
    "du": df_f["uN"].to_numpy() - u_c2f,
    "dv": df_f["vN"].to_numpy() - v_c2f,
    "dp": df_f["p"].to_numpy()  - p_c2f,
})

out.to_parquet("/home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/data/prepared/cavity20_to_80_delta.parquet", index=False)
print("OK:", out.shape)

OK: (3686, 8)


### Retreino do Modelo MLP para aplicar o refinamento

In [13]:
# Treinamento do modelo MLP para o problema da cavidade
DATA_20 = Path("/home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/data/prepared/cavity20_to_80_delta.parquet")
OUT_MODEL = Path("/home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/notebook/outputs/models/mlp_uv_refine_cavity20.pt")
OUT_MODEL.parent.mkdir(parents=True, exist_ok=True)

df = pd.read_parquet(DATA_20)

# Treino supervisionado: usa todos os pontos (inclui BC e interior)
X = torch.tensor(df[["xN","yN","u_c","v_c","p_c"]].to_numpy(), dtype=torch.float32)
Y = torch.tensor(df[["du","dv","dp"]].to_numpy(), dtype=torch.float32)

ds = TensorDataset(X, Y)
dl = DataLoader(ds, batch_size=256, shuffle=True)

model = MLP(in_dim=5, out_dim=3, hidden=128, depth=4, act="tanh").to(DEVICE)
opt = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=0.0)

best = float("inf")
for epoch in range(1, 2001):
    model.train()
    loss_acc = 0.0

    for xb, yb in dl:
        xb = xb.to(DEVICE)
        yb = yb.to(DEVICE)

        pred = model(xb)
        loss = torch.mean((pred - yb) ** 2)

        opt.zero_grad()
        loss.backward()
        opt.step()

        loss_acc += loss.item() * xb.size(0)

    loss_epoch = loss_acc / len(ds)

    # “early-ish” print
    if epoch % 200 == 0 or epoch == 1:
        print(f"epoch={epoch:4d} mse={loss_epoch:.6e}")

    # salva melhor
    if loss_epoch < best:
        best = loss_epoch
        torch.save({"state_dict": model.state_dict()}, OUT_MODEL)

print("\nOK! Modelo salvo em:", OUT_MODEL)

epoch=   1 mse=3.823323e-02
epoch= 200 mse=6.954193e-03
epoch= 400 mse=5.833791e-03
epoch= 600 mse=4.287472e-03
epoch= 800 mse=3.776036e-03
epoch=1000 mse=3.535685e-03
epoch=1200 mse=3.418433e-03
epoch=1400 mse=3.370805e-03
epoch=1600 mse=3.234067e-03
epoch=1800 mse=3.221830e-03
epoch=2000 mse=3.353427e-03

OK! Modelo salvo em: /home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/notebook/outputs/models/mlp_uv_refine_cavity20.pt


In [14]:

DATA_DELTA = Path("/home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/data/prepared/cavity20_to_80_delta.parquet")
MODEL_PATH = Path("/home/matheus/Documentos/Mestrado/Pesquisa/Lid_driven_cavity/notebook/outputs/models/mlp_uv_refine_cavity20.pt")

N_EVAL = None  # ex: 30000

df = pd.read_parquet(DATA_DELTA)
if N_EVAL is not None and len(df) > N_EVAL:
    df = df.sample(n=N_EVAL, random_state=42).reset_index(drop=True)

# -----------------------------
# Monta coarse projetado e fine (ground truth)
# -----------------------------
u_c = torch.tensor(df["u_c"].to_numpy(), dtype=torch.float32, device=DEVICE).unsqueeze(1)
v_c = torch.tensor(df["v_c"].to_numpy(), dtype=torch.float32, device=DEVICE).unsqueeze(1)
p_c = torch.tensor(df["p_c"].to_numpy(), dtype=torch.float32, device=DEVICE).unsqueeze(1)

du = torch.tensor(df["du"].to_numpy(), dtype=torch.float32, device=DEVICE).unsqueeze(1)
dv = torch.tensor(df["dv"].to_numpy(), dtype=torch.float32, device=DEVICE).unsqueeze(1)
dp = torch.tensor(df["dp"].to_numpy(), dtype=torch.float32, device=DEVICE).unsqueeze(1)

u_f = u_c + du
v_f = v_c + dv
# p_f = p_c + dp  # se quiser avaliar pressão depois

Y_uv = torch.cat([u_f, v_f], dim=1)

# -----------------------------
# Carrega modelo delta: (x,y,u_c,v_c,p_c) -> (du,dv,dp)
# -----------------------------
X0 = torch.tensor(df[["xN","yN","u_c","v_c","p_c"]].to_numpy(), dtype=torch.float32, device=DEVICE)

model = MLP(in_dim=5, out_dim=3, hidden=128, depth=4, act="tanh").to(DEVICE)
ckpt = torch.load(MODEL_PATH, map_location=DEVICE)
model.load_state_dict(ckpt["state_dict"])
model.eval()

with torch.no_grad():
    d_pred = model(X0)
du_p = d_pred[:, 0:1]
dv_p = d_pred[:, 1:2]
# dp_p = d_pred[:, 2:3]

u_hat = u_c + du_p
v_hat = v_c + dv_p
Yhat_uv = torch.cat([u_hat, v_hat], dim=1)

rmse_uv = rmse(Yhat_uv, Y_uv).item()
mae_uv  = mae(Yhat_uv, Y_uv).item()

# -----------------------------
# Divergência: precisa de grad wrt (x,y)
# -----------------------------
# Atenção: divergência só faz sentido como derivada de (x,y).
# Então criamos uma cópia com requires_grad nos 2 primeiros dims.
Xg = X0.detach().clone()
Xg.requires_grad_(True)

pred_g = model(Xg)
u_hat_g = u_c + pred_g[:, 0:1]
v_hat_g = v_c + pred_g[:, 1:2]

# grads wrt Xg: col 0=x, col 1=y
grads_u = torch.autograd.grad(
    u_hat_g, Xg, grad_outputs=torch.ones_like(u_hat_g),
    retain_graph=True, create_graph=False
)[0]
grads_v = torch.autograd.grad(
    v_hat_g, Xg, grad_outputs=torch.ones_like(v_hat_g),
    retain_graph=False, create_graph=False
)[0]

du_dx = grads_u[:, 0:1]
dv_dy = grads_v[:, 1:2]
div = du_dx + dv_dy
div_l2 = torch.sqrt(torch.mean(div**2)).item()

print("=== Avaliação MLP de refino (20->80) ===")
print(f"N pontos avaliados: {len(df)}")
print(f"RMSE(u,v): {rmse_uv:.6e}")
print(f" MAE(u,v): {mae_uv:.6e}")
print(f"L2(div):   {div_l2:.6e}   (quanto menor, mais incompressível)")


=== Avaliação MLP de refino (20->80) ===
N pontos avaliados: 3686
RMSE(u,v): 2.225302e-02
 MAE(u,v): 6.936904e-03
L2(div):   1.053433e+00   (quanto menor, mais incompressível)


## Modelo PINN