<a href="https://colab.research.google.com/github/Ssurf777/U-Net_pipe_flow/blob/main/U_net_pipe_flow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# CFD datasets import

In [None]:
import numpy as np
import requests

url = 'https://github.com/Ssurf777/U-Net_pipe_flow/raw/main/data/cfd_data.npy'
r = requests.get(url)

with open('cfd_data.npy', 'wb') as f:
  f.write(r.content)

data = np.load('cfd_data.npy')
data.shape


# Visualization datasets

In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

# Extract x, y, and z columns
x = data[:, 0]
y = data[:, 1]
z = data[:, 2]

# Create a 3D scatter plot
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

ax.scatter(x, y, z, s=1)  # s controls the size of the points

# Set labels and title
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.set_title('3D Point Cloud Visualization')

# Set the view angle to look from the positive y-axis
ax.view_init(elev=0, azim=90) # elev is elevation, azim is azimuthal angle

plt.show()


In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import numpy as np

# x, y, z, psta を想定通り取得している前提
x = data[:, 0]
y = data[:, 1]
z = data[:, 2]
psta = data[:, 6]

# カラーマップ付きの3D散布図
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

sc = ax.scatter(x, y, z, c=psta, cmap='viridis', s=1)  # pstaを色として指定

# カラーバー追加
cbar = plt.colorbar(sc, ax=ax, shrink=0.5)
cbar.set_label('Psta')

# ラベルやビュー設定
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.set_title('3D Pressure Distribution (Pressure)')
ax.view_init(elev=0, azim=90)

plt.show()


In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import numpy as np

# x, y, z, psta を想定通り取得している前提
x = data[:, 0]
y = data[:, 1]
z = data[:, 2]
psta = data[:, 3]

# カラーマップ付きの3D散布図
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

sc = ax.scatter(x, y, z, c=psta, cmap='viridis', s=1)  # pstaを色として指定

# カラーバー追加
cbar = plt.colorbar(sc, ax=ax, shrink=0.5)
cbar.set_label('velocity of u')

# ラベルやビュー設定
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.set_title('3D Pressure Distribution (velo. of u)')
ax.view_init(elev=0, azim=90)

plt.show()


In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import numpy as np

# x, y, z, psta を想定通り取得している前提
x = data[:, 0]
y = data[:, 1]
z = data[:, 2]
psta = data[:, 4]

# カラーマップ付きの3D散布図
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

sc = ax.scatter(x, y, z, c=psta, cmap='viridis', s=1)  # pstaを色として指定

# カラーバー追加
cbar = plt.colorbar(sc, ax=ax, shrink=0.5)
cbar.set_label('velocity of v')

# ラベルやビュー設定
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.set_title('3D Pressure Distribution (velo. of v)')
ax.view_init(elev=0, azim=90)

plt.show()


In [None]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import numpy as np

# x, y, z, psta を想定通り取得している前提
x = data[:, 0]
y = data[:, 1]
z = data[:, 2]
psta = data[:, 5]

# カラーマップ付きの3D散布図
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

sc = ax.scatter(x, y, z, c=psta, cmap='viridis', s=1)  # pstaを色として指定

# カラーバー追加
cbar = plt.colorbar(sc, ax=ax, shrink=0.5)
cbar.set_label('velocity of w')

# ラベルやビュー設定
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
ax.set_title('3D Pressure Distribution (velo. of w)')
ax.view_init(elev=0, azim=90)

plt.show()


# U-Net define

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

def mish(x):
    return x * torch.tanh(F.softplus(x))

def init_weights_kaiming(module):
    if isinstance(module, nn.Conv1d):
        nn.init.kaiming_normal_(module.weight, mode='fan_in', nonlinearity='relu')  # Mish用のHeもrelu指定で問題なし
        if module.bias is not None:
            nn.init.zeros_(module.bias)

class PointNetEncoder(nn.Module):
    def __init__(self, in_channels=3):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels, 64, 1)
        self.conv2 = nn.Conv1d(64, 128, 1)
        self.conv3 = nn.Conv1d(128, 256, 1)
        self.apply(init_weights_kaiming)

    def forward(self, x):
        x = x.transpose(1, 2)        # [B, 3, N]
        x1 = mish(self.conv1(x))     # [B, 64, N]
        x2 = mish(self.conv2(x1))    # [B, 128, N]
        x3 = mish(self.conv3(x2))    # [B, 256, N]
        return x1, x2, x3

class PointNetDecoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.upconv1 = nn.Conv1d(256 + 128, 128, 1)
        self.upconv2 = nn.Conv1d(128 + 64, 64, 1)
        self.out = nn.Conv1d(64, 4, 1)
        self.apply(init_weights_kaiming)

    def forward(self, x1, x2, x3):
        up1 = mish(self.upconv1(torch.cat([x3, x2], dim=1)))  # [B, 128, N]
        up2 = mish(self.upconv2(torch.cat([up1, x1], dim=1))) # [B, 64, N]
        out = self.out(up2)                                   # [B, 4, N]
        return out.transpose(1, 2)                            # [B, N, 4]

class PointUNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = PointNetEncoder()
        self.decoder = PointNetDecoder()

    def forward(self, x):  # x: [B, N, 3]
        x1, x2, x3 = self.encoder(x)
        out = self.decoder(x1, x2, x3)
        return out  # [B, N, 4]


# dataloader

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

class CFDDataset(Dataset):
    def __init__(self, data_path):
        super().__init__()
        data = np.load(data_path)  # shape: (N, 7)
        self.inputs = data[:, 0:3].astype(np.float32)  # x, y, z
        #self.targets = data[:, 3:7].astype(np.float32)  # u, v, w, p
        self.targets = data[:, 6].astype(np.float32)  # p

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return self.inputs[idx], self.targets[idx]


In [None]:
import torch
from torch.utils.data import Dataset
import numpy as np

class CFDDataset2(Dataset):
    def __init__(self, data_path):
        super().__init__()
        data = np.load(data_path).astype(np.float32)  # shape: (N, 7)
        self.inputs = data[:, 0:3]    # x, y, z
        self.raw_targets = data[:, 6] # p

        # --- Min-Max Scaling ---
        self.p_min = self.raw_targets.min()
        self.p_max = self.raw_targets.max()
        self.targets = (self.raw_targets - self.p_min) / (self.p_max - self.p_min)

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return self.inputs[idx], self.targets[idx]

    def inverse_transform(self, norm_p):
        """正規化されたpを元のスケールに戻す"""
        return norm_p * (self.p_max - self.p_min) + self.p_min


In [None]:
# データ読み込み
train_dataset = CFDDataset2('cfd_data.npy')
train_loader = DataLoader(train_dataset, batch_size=1024, shuffle=False, drop_last=True)


# Trainer

In [None]:
from torch import nn, optim
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PointUNet().to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-2)

# Epoch loop
num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0

    for batch_inputs, batch_targets in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        # reshape [B, 3] → [B, N, 3] : ここでN=1として扱う（点ごとに）
        batch_inputs = batch_inputs.unsqueeze(1).to(device)   # shape: [B, 1, 3]
        batch_targets = batch_targets.unsqueeze(1).to(device) # shape: [B, 1, 4]

        optimizer.zero_grad()
        outputs = model(batch_inputs)  # shape: [B, 1, 4]
        loss = criterion(outputs, batch_targets)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    print(f"Epoch {epoch+1} - Loss: {epoch_loss/len(train_loader):.6f}")


In [None]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import r2_score
from tqdm import tqdm
import torch

# モデルを評価モードに
model.eval()

all_predictions = []
all_targets = []

with torch.no_grad():
    for batch_inputs, batch_targets in tqdm(train_loader, desc="Predicting"):
        # batch_inputs: [B, N, 3]
        # batch_targets: [B, N] または [B, N, 1]
        batch_inputs = batch_inputs.to(device)
        batch_targets = batch_targets.to(device)

        # モデル出力 [B, N, 4]
        outputs = model(batch_inputs)

        # pの予測値を抽出（出力の4番目）
        p_pred = outputs[:, :, 3].cpu().numpy()  # shape: [B, N]
        p_true = batch_targets.cpu().numpy()     # shape: [B, N]

        all_predictions.append(p_pred)
        all_targets.append(p_true)

# shape: [total_samples]
predicted_pressure_normalized = np.concatenate(all_predictions, axis=0).flatten()
target_pressure_normalized = np.concatenate(all_targets, axis=0).flatten()

# 元スケールに戻す
predicted_pressure_original = train_dataset.inverse_transform(predicted_pressure_normalized)
target_pressure_original = train_dataset.inverse_transform(target_pressure_normalized)

# R²スコア
r2 = r2_score(target_pressure_original, predicted_pressure_original)
print(f"\nR² Score (Pressure): {r2:.4f}")

# 散布図の描画
plt.figure(figsize=(8, 6))
plt.scatter(target_pressure_original, predicted_pressure_original, alpha=0.5, s=1)
plt.xlabel("Actual Pressure")
plt.ylabel("Predicted Pressure")
plt.title("Actual vs. Predicted Pressure")
plt.grid(True)

# 完全一致の赤線
min_val = min(target_pressure_original.min(), predicted_pressure_original.min())
max_val = max(target_pressure_original.max(), predicted_pressure_original.max())
plt.plot([min_val, max_val], [min_val, max_val], color='red', linestyle='--', label='Perfect Prediction')
plt.legend()
plt.tight_layout()
plt.show()


# Individual scalre Training using U-Net

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import grad

# --------------------------------------------
# Mish Activation
# --------------------------------------------
def mish(x):
    return x * torch.tanh(F.softplus(x))

# --------------------------------------------
# Kaiming Initialization
# --------------------------------------------
def init_weights_kaiming(module):
    if isinstance(module, nn.Conv1d):
        nn.init.kaiming_normal_(module.weight, mode='fan_in', nonlinearity='relu')
        if module.bias is not None:
            nn.init.zeros_(module.bias)

# --------------------------------------------
# Simple PointNet-like U-Net Block for Scalar Output
# --------------------------------------------
class ScalarUNet(nn.Module):
    def __init__(self, in_channels=3):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels, 64, 1)
        self.conv2 = nn.Conv1d(64, 128, 1)
        self.conv3 = nn.Conv1d(128, 256, 1)

        self.up1 = nn.Conv1d(256 + 128, 128, 1)
        self.up2 = nn.Conv1d(128 + 64, 64, 1)
        self.out = nn.Conv1d(64, 1, 1)

        self.apply(init_weights_kaiming)

    def forward(self, x):
        x = x.transpose(1, 2)  # [B, 3, N]
        x1 = mish(self.conv1(x))
        x2 = mish(self.conv2(x1))
        x3 = mish(self.conv3(x2))

        up1 = mish(self.up1(torch.cat([x3, x2], dim=1)))
        up2 = mish(self.up2(torch.cat([up1, x1], dim=1)))
        out = self.out(up2)  # [B, 1, N]
        return out.transpose(1, 2)  # [B, N, 1]

# --------------------------------------------
# Combined Model with 4 U-Nets and Physics Loss
# --------------------------------------------
class PhysicsInformedCFDModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.net_u = ScalarUNet()
        self.net_v = ScalarUNet()
        self.net_w = ScalarUNet()
        self.net_p = ScalarUNet()

    def forward(self, coords):
        # coords: [B, N, 3] with requires_grad=True
        u = self.net_u(coords)  # [B, N, 1]
        v = self.net_v(coords)
        w = self.net_w(coords)
        p = self.net_p(coords)
        return u, v, w, p

    def pde_loss(self, coords, u, v, w, p):
        grads_u = grad(u, coords, grad_outputs=torch.ones_like(u), create_graph=True)[0]  # [B, N, 3]
        grads_v = grad(v, coords, grad_outputs=torch.ones_like(v), create_graph=True)[0]
        grads_w = grad(w, coords, grad_outputs=torch.ones_like(w), create_graph=True)[0]

        du_dx = grads_u[:, :, 0]
        dv_dy = grads_v[:, :, 1]
        dw_dz = grads_w[:, :, 2]

        continuity = du_dx + dv_dy + dw_dz  # [B, N]
        loss_cont = torch.mean(continuity ** 2)

        return loss_cont
