# 2D Elastic Wave PINN with FDTD Data Integration

このノートブックは、FDTDシミュレーションデータを用いた2D弾性波方程式PINNの完全なワークフローを示します。

## 概要

**2D Elastic Wave Equations (Dimensionless Form)**:
- Longitudinal: $\frac{\partial^2 \tilde{U}_x}{\partial \tilde{t}^2} = \frac{\partial^2 \tilde{U}_x}{\partial \tilde{x}^2} + \frac{\partial^2 \tilde{U}_x}{\partial \tilde{y}^2}$
- Transverse: $\frac{\partial^2 \tilde{U}_y}{\partial \tilde{t}^2} = (c_t/c_l)^2 \left(\frac{\partial^2 \tilde{U}_y}{\partial \tilde{x}^2} + \frac{\partial^2 \tilde{U}_y}{\partial \tilde{y}^2}\right)$

where $(c_t/c_l)^2 \approx 0.25$ for Aluminum 6061-T6.

## ワークフロー

1. FDTDデータの読み込みと無次元化
2. Train/Validation分割
3. 2D PINNモデルの構築（実装予定）
4. 訓練の実行（実装予定）
5. R²スコアによる検証
6. 2D空間分布の可視化

## セットアップとインポート

In [3]:
# Standard library imports
import sys
from pathlib import Path

# Add project root to path
project_root = Path.cwd().parent
sys.path.insert(0, str(project_root))

# Third-party imports
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch

# PINN module imports
from pinn.data.fdtd_loader import FDTDDataLoaderService
from pinn.data.dimensionless_scaler import CharacteristicScales, DimensionlessScalerService
from pinn.validation.r2_score import R2ScoreCalculator
from pinn.utils.config_loader import ConfigLoaderService
from pinn.utils.seed_manager import SeedManager

# Configure plotting
sns.set_style("whitegrid")
plt.rcParams["figure.figsize"] = (14, 10)

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")

PyTorch version: 2.4.0+cu124
CUDA available: True
CUDA device: NVIDIA GeForce RTX 3060


## Step 1: 設定ファイルの読み込み

2D PINN用のYAML設定を読み込みます。

In [4]:
# Load configuration
config_path = project_root / "configs" / "pinn_2d_example.yaml"
config_loader = ConfigLoaderService()
config = config_loader.load_config(config_path)

# Also load raw YAML for fields not in pydantic model
import yaml

with open(config_path) as f:
    config_dict = yaml.safe_load(f)

print(f"Experiment: {config.experiment_name}")
print(
    f"\nDomain: x ∈ [{config.domain.x_min}, {config.domain.x_max}], t ∈ [{config.domain.t_min:.2e}, {config.domain.t_max:.2e}]"
)
print(f"\nNetwork: {config.network.layer_sizes}")
print(f"Activation: {config.network.activation}")
print(f"\nTraining iterations: {config_dict['training']['iterations']}")
print(f"Learning rate: {config.training.learning_rate}")
print(f"Loss weights: {config.training.loss_weights}")

# Set random seed
SeedManager.set_seed(config.seed)
print(f"\nRandom seed: {config.seed}")

Experiment: pinn_2d_parametric_elastic_wave

Domain: x ∈ [0.0, 0.04], t ∈ [3.50e-06, 1.00e-05]

Network: [5, 64, 64, 64, 4]
Activation: tanh

Training iterations: 20000
Learning rate: 0.001
Loss weights: {'pde': 1.0, 'data': 1.0}

Random seed: 42


## Step 2: FDTDデータの読み込み

FDTDシミュレーションデータを読み込み、無次元化を適用します。

In [None]:
# Data directory (modify if needed)
data_dir = Path("/home/manat/project2/PINN_data")

# Check if real data exists, otherwise use synthetic data
if not data_dir.exists() or not list(data_dir.glob("p*_d*.npz")):
    print("⚠️  Real FDTD data not found in /PINN_data")
    print("    Creating synthetic data for demonstration...\n")

    # Create temporary directory with synthetic data
    import tempfile

    data_dir = Path(tempfile.mkdtemp())

    # Generate synthetic FDTD file
    np.random.seed(42)
    nx, ny, nt = 80, 40, 60
    n_total = nx * ny * nt

    x_grid = np.linspace(0, 0.04, nx)
    y_grid = np.linspace(0, 0.02, ny)
    t_grid = np.linspace(3.5e-6, 6.5e-6, nt)

    X, Y, T = np.meshgrid(x_grid, y_grid, t_grid, indexing="ij")

    synthetic_file = data_dir / "p1250_d100.npz"
    np.savez(
        synthetic_file,
        x=X.flatten(),
        y=Y.flatten(),
        t=T.flatten(),
        T1=np.random.randn(n_total) * 1e9,
        T3=np.random.randn(n_total) * 1e9,
        Ux=np.random.randn(n_total) * 1e-9,
        Uy=np.random.randn(n_total) * 1e-9,
        p=1.25e-3,
        d=0.1e-3,
        w=0.3e-3,
        seed=42,
        nx_sample=nx,
        ny_sample=ny,
        nt_sample=nt,
    )

    print(f"✓ Created synthetic data: {synthetic_file}")
    print(f"  Shape: {nx}×{ny}×{nt} = {n_total} samples\n")

# Create FDTD data loader
loader = FDTDDataLoaderService(data_dir=data_dir)

# Get available .npz files
npz_files = sorted(data_dir.glob("p*_d*.npz"))
print(f"Found {len(npz_files)} FDTD data files:")
for f in npz_files:
    print(f"  - {f.name}")

⚠️  Real FDTD data not found in /PINN_data
    Creating synthetic data for demonstration...

✓ Created synthetic data: /tmp/tmpj2aq5gqi/p1250_d100.npz
  Shape: 80×40×60 = 192000 samples

Found 1 FDTD data files:
  - p1250_d100.npz


## Step 3: 無次元化スケーラーの作成

物理定数から特性スケールを計算し、無次元化を適用します。

In [None]:
# Load elastic constants from config
import yaml

with open(config_path) as f:
    config_dict = yaml.safe_load(f)

elastic_lambda = float(config_dict["domain"]["elastic_lambda"])
elastic_mu = float(config_dict["domain"]["elastic_mu"])
density = float(config_dict["domain"]["density"])

print("Elastic constants (Aluminum 6061-T6):")
print(f"  λ = {elastic_lambda:.2e} Pa")
print(f"  μ = {elastic_mu:.2e} Pa")
print(f"  ρ = {density:.1f} kg/m³")

# Estimate U_ref from first file
sample_data = loader.load_file(npz_files[0])
U_ref = np.std(np.concatenate([sample_data.Ux, sample_data.Uy]))

print(f"\nEstimated U_ref: {U_ref:.2e} m")

# Create characteristic scales
scales = CharacteristicScales.from_physics(
    domain_length=config.domain.x_max,
    elastic_lambda=elastic_lambda,
    elastic_mu=elastic_mu,
    density=density,
    displacement_amplitude=U_ref,
)

# Create scaler
scaler = DimensionlessScalerService(scales)

print(f"\nCharacteristic scales:")
print(f"  L_ref = {scales.L_ref:.4f} m")
print(f"  T_ref = {scales.T_ref:.2e} s")
print(f"  U_ref = {scales.U_ref:.2e} m")
print(f"  σ_ref = {scales.sigma_ref:.2e} Pa")
print(f"  c_l   = {scales.velocity_ref:.0f} m/s")

## Step 4: データのロードと正規化

複数のFDTDファイルを読み込み、無次元化を適用します。

In [None]:
# Load and normalize data
dataset = loader.load_multiple_files(npz_files, apply_dimensionless=True, scaler=scaler)

print(f"✓ Loaded {len(dataset.x)} samples from {len(npz_files)} file(s)")
print(f"\nData ranges (dimensionless):")
print(f"  x:  [{np.min(dataset.x):.3f}, {np.max(dataset.x):.3f}]")
print(f"  y:  [{np.min(dataset.y):.3f}, {np.max(dataset.y):.3f}]")
print(f"  t:  [{np.min(dataset.t):.3f}, {np.max(dataset.t):.3f}]")
print(f"  T1: [{np.min(dataset.T1):.3f}, {np.max(dataset.T1):.3f}]")
print(f"  T3: [{np.min(dataset.T3):.3f}, {np.max(dataset.T3):.3f}]")
print(f"  Ux: [{np.min(dataset.Ux):.3f}, {np.max(dataset.Ux):.3f}]")
print(f"  Uy: [{np.min(dataset.Uy):.3f}, {np.max(dataset.Uy):.3f}]")
print(f"\nParameter ranges (normalized [0,1]):")
print(f"  pitch_norm: [{np.min(dataset.pitch_norm):.3f}, {np.max(dataset.pitch_norm):.3f}]")
print(f"  depth_norm: [{np.min(dataset.depth_norm):.3f}, {np.max(dataset.depth_norm):.3f}]")

## Step 5: Train/Validation分割

データを訓練用と検証用に分割します。

In [None]:
# Train/val split
train_ratio = config_dict["training"].get("train_ratio", 0.8)
validation_equals_train = config_dict["training"].get("validation_equals_train", False)

train_data, val_data = loader.train_val_split(
    dataset, train_ratio=train_ratio, seed=config.seed, validation_equals_train=validation_equals_train
)

print(f"Train samples: {len(train_data.x)}")
print(f"Val samples: {len(val_data.x)}")
if validation_equals_train:
    print("Note: Validation = Training (for overfitting monitoring)")

## Step 6: R²スコアの計算（合成予測で検証）

R²スコア計算機能をテストします。実際のPINN訓練後は、ここでモデル予測と比較します。

In [None]:
# Create R² score calculator
calculator = R2ScoreCalculator()

# For demonstration, create synthetic predictions
# (In actual workflow, these would be PINN model predictions)
n_samples_test = min(1000, len(val_data.x))

y_true = {
    "T1": val_data.T1[:n_samples_test],
    "T3": val_data.T3[:n_samples_test],
    "Ux": val_data.Ux[:n_samples_test],
    "Uy": val_data.Uy[:n_samples_test],
}

# Synthetic predictions: add small noise to true values
np.random.seed(config.seed)
y_pred = {k: v + np.random.randn(len(v)) * 0.05 * np.std(v) for k, v in y_true.items()}

# Compute R² scores
r2_scores = calculator.compute_r2_multi_output(y_true, y_pred)

print("R² Scores (Synthetic Predictions):")
print("=" * 40)
for field, r2 in r2_scores.items():
    print(f"  {field:4s}: {r2:.4f}")
print("=" * 40)

# Format report
report = calculator.format_report(r2_scores, threshold=0.9)
print("\n" + report)

print("\nNote: After PINN training, replace synthetic predictions with actual model outputs.")

## Step 7: 2D空間分布の可視化

特定の時刻における2D波動場を可視化します（FDTDデータ）。

In [None]:
# Select 3 time snapshots for visualization
t_unique = np.unique(dataset.t)
t_indices = [len(t_unique) // 4, len(t_unique) // 2, 3 * len(t_unique) // 4]
t_snapshots = [t_unique[i] for i in t_indices]

print(f"Time snapshots (dimensionless): {[f'{t:.3f}' for t in t_snapshots]}")

# Get unique x, y grids
x_unique = np.unique(dataset.x)
y_unique = np.unique(dataset.y)

print(f"Spatial grid: {len(x_unique)}×{len(y_unique)}")

# Visualize Ux field at 3 time snapshots
fig, axes = plt.subplots(1, 3, figsize=(16, 5))

for i, (ax, t_val) in enumerate(zip(axes, t_snapshots)):
    # Find data at this time
    mask = np.abs(dataset.t - t_val) < 1e-6

    if np.sum(mask) > 0:
        x_plot = dataset.x[mask]
        y_plot = dataset.y[mask]
        ux_plot = dataset.Ux[mask]

        # Reshape to 2D grid if possible
        if len(x_plot) == len(x_unique) * len(y_unique):
            X = x_plot.reshape(len(x_unique), len(y_unique))
            Y = y_plot.reshape(len(x_unique), len(y_unique))
            Ux_grid = ux_plot.reshape(len(x_unique), len(y_unique))

            # Plot using pcolormesh
            im = ax.pcolormesh(X, Y, Ux_grid, cmap="RdBu_r", shading="auto")
            ax.set_xlabel("x (dimensionless)", fontsize=10)
            ax.set_ylabel("y (dimensionless)", fontsize=10)
            ax.set_title(f"t = {t_val:.3f}", fontsize=12)
            ax.set_aspect("equal")
            plt.colorbar(im, ax=ax, label="Ux")
        else:
            # Scatter plot if grid structure is unclear
            scatter = ax.scatter(x_plot, y_plot, c=ux_plot, cmap="RdBu_r", s=1)
            ax.set_xlabel("x", fontsize=10)
            ax.set_ylabel("y", fontsize=10)
            ax.set_title(f"t = {t_val:.3f}", fontsize=12)
            plt.colorbar(scatter, ax=ax, label="Ux")

plt.suptitle("FDTD Data: Ux Field at Different Times", fontsize=14, y=1.02)
plt.tight_layout()
plt.show()

print("\n✓ Visualization complete")

## まとめ

このノートブックでは以下を実装しました：

1. ✓ FDTDデータの読み込みと無次元化
2. ✓ Train/Validation分割
3. ✓ R²スコア計算機能のテスト
4. ✓ 2D波動場の可視化（基本プロット）
5. ✓ plot_time_snapshots可視化（Task 4.3）
6. ✓ plot_spatial_heatmap誤差分布可視化（Task 4.4）

## 今後の拡張（実装予定）

- 2D PINNモデルの構築と訓練（PINNModelBuilder2DService使用）
- 実際のPINN訓練ループとR²モニタリングコールバック
- パラメトリック予測（異なるpitch/depthでの評価）
- 損失重み調整と収束性の改善

**Note**: 本ノートブックは現在、合成予測データを使用してR²計算および可視化機能を実証しています。  
完全なPINN訓練パイプライン実装後は、実際のPINN予測との比較が可能になります。

## 参考

- Design: `.kiro/specs/pinn-2d-fdtd-integration/design.md`
- Requirements: `.kiro/specs/pinn-2d-fdtd-integration/requirements.md`
- Loss Weight Validation: `docs/loss_weight_validation.md`
- Implementation: Tasks 4.2-4.4 (R² monitoring, visualization)

In [None]:
# Import PlotGeneratorService
from pinn.validation.plot_generator import PlotGeneratorService

# Create plot generator
plot_gen = PlotGeneratorService()

# Prepare data for plot_time_snapshots (requires physical units in meters)
# First, denormalize data back to physical units
x_physical = dataset.x * scales.L_ref
y_physical = dataset.y * scales.L_ref
t_physical = dataset.t * scales.T_ref

# Get unique grids
x_unique_phys = np.unique(x_physical)
y_unique_phys = np.unique(y_physical)
t_unique_phys = np.unique(t_physical)

# Select 3 time snapshots
t_snap_indices = [len(t_unique_phys) // 4, len(t_unique_phys) // 2, 3 * len(t_unique_phys) // 4]
t_snap_list = [t_unique_phys[i] for i in t_snap_indices]

print(f"Selected time snapshots (physical): {[f'{t * 1e6:.2f} µs' for t in t_snap_list]}")

# Prepare FDTD and "PINN" data dictionaries
fdtd_data = {}
pinn_pred = {}

for t_val in t_snap_list:
    # Find data at this time
    mask = np.abs(t_physical - t_val) < 1e-9

    if np.sum(mask) > 0:
        x_at_t = x_physical[mask]
        y_at_t = y_physical[mask]
        ux_at_t = dataset.Ux[mask] * scales.U_ref  # Denormalize to physical units

        # Reshape to 2D grid
        if len(x_at_t) == len(x_unique_phys) * len(y_unique_phys):
            Ux_grid = ux_at_t.reshape(len(y_unique_phys), len(x_unique_phys))
            fdtd_data[t_val] = Ux_grid

            # Synthetic PINN prediction (add small noise for demo)
            pinn_pred[t_val] = Ux_grid + np.random.randn(*Ux_grid.shape) * 0.05 * np.std(Ux_grid)

# Generate plot
if len(fdtd_data) >= 3:
    fig, axes = plot_gen.plot_time_snapshots(
        x=x_unique_phys, y=y_unique_phys, t_list=t_snap_list, fdtd_data=fdtd_data, pinn_pred=pinn_pred, output_field="Ux"
    )
    plt.show()
    print("\n✓ plot_time_snapshots visualization complete")
else:
    print("⚠️  Not enough time snapshots for visualization")

## Step 9: plot_spatial_heatmap可視化（Task 4.4）

誤差分布ヒートマップを生成し、空間領域全体でのPINN予測精度を評価します。

In [None]:
# Calculate spatial error distribution for middle time snapshot
t_mid = t_snap_list[1]

if t_mid in fdtd_data and t_mid in pinn_pred:
    # Compute absolute error
    error_map = np.abs(pinn_pred[t_mid] - fdtd_data[t_mid])

    # Generate heatmap
    fig, ax = plot_gen.plot_spatial_heatmap(x=x_unique_phys, y=y_unique_phys, error=error_map, output_field="Ux")
    plt.show()

    print(f"\n✓ Spatial heatmap visualization complete")
    print(f"Error statistics:")
    print(f"  Mean error: {np.mean(error_map):.2e} m")
    print(f"  Max error: {np.max(error_map):.2e} m")
    print(f"  Relative error: {np.mean(error_map) / np.std(fdtd_data[t_mid]) * 100:.2f}%")
else:
    print("⚠️  Data not available for heatmap")

## まとめ

このノートブックでは以下を実装しました：

1. ✓ FDTDデータの読み込みと無次元化
2. ✓ Train/Validation分割
3. ✓ R²スコア計算機能のテスト
4. ✓ 2D波動場の可視化

## 今後の拡張

- 2D PINNモデルの構築と訓練（PINNModelBuilder2DService使用）
- PINN予測とFDTDデータの比較プロット
- パラメトリック予測（異なるpitch/depthでの評価）
- 損失重み調整と収束性の改善

## 参考

- Design: `.kiro/specs/pinn-2d-fdtd-integration/design.md`
- Requirements: `.kiro/specs/pinn-2d-fdtd-integration/requirements.md`
- Loss Weight Validation: `docs/loss_weight_validation.md`