In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt

In [6]:
# Define the PINN Model for 2D Advection-Diffusion
class PINN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super(PINN, self).__init__()
        self.layers = nn.ModuleList()
        self.layers.append(nn.Linear(input_dim, hidden_dim))
        for _ in range(num_layers - 1):
            self.layers.append(nn.Linear(hidden_dim, hidden_dim))
        self.layers.append(nn.Linear(hidden_dim, output_dim))
        self.activation = nn.Tanh()

    def forward(self, inputs):
        for layer in self.layers[:-1]:
            inputs = self.activation(layer(inputs))
        output = self.layers[-1](inputs)
        return output

def loss_function(pinn, inputs, c_true, u, v, D, S):
    inputs.requires_grad_(True)
    
    # Network prediction
    c_pred = pinn(inputs)
    
    # Split inputs into x, y, t
    x = inputs[:, 0:1]
    y = inputs[:, 1:2]
    t = inputs[:, 2:3]
    
    # First derivatives
    c_t = torch.autograd.grad(c_pred, t, grad_outputs=torch.ones_like(c_pred), create_graph=True)[0]
    c_x = torch.autograd.grad(c_pred, x, grad_outputs=torch.ones_like(c_pred), create_graph=True)[0]
    c_y = torch.autograd.grad(c_pred, y, grad_outputs=torch.ones_like(c_pred), create_graph=True)[0]
    
    # Second derivatives
    c_xx = torch.autograd.grad(c_x, x, grad_outputs=torch.ones_like(c_x), create_graph=True)[0]
    c_yy = torch.autograd.grad(c_y, y, grad_outputs=torch.ones_like(c_y), create_graph=True)[0]
    
    # Ensure u and v are connected to the computation graph
    u = u.to(inputs.device)
    v = v.to(inputs.device)
    
    # PDE Residual (Advection-Diffusion Equation)
    pde_residual = c_t - D*(c_xx + c_yy) + u*c_x + v*c_y - S(x, y, t)
    
    # Data loss (compare with simulation data)
    data_loss = torch.mean((c_pred - c_true)**2)
    
    # Total loss
    total_loss = torch.mean(pde_residual**2) + data_loss
    return total_loss

In [7]:
# Generate Training and Testing Data (Chronological Split)
def generate_train_test_data(data, train_ratio=0.6):
    # Get coordinates
    x_coords = data['xC'].values
    y_coords = data['yC'].values
    t_coords = data['time'].values.astype(float) / 1e9  # Convert to seconds
    
    # Create meshgrid
    X, Y, T = np.meshgrid(x_coords, y_coords, t_coords, indexing='ij')
    
    # Flatten and convert to tensors
    inputs = torch.tensor(np.hstack([X.ravel()[:, None], 
                                   Y.ravel()[:, None],
                                   T.ravel()[:, None]]), dtype=torch.float32)
    
    # Get target data and velocity fields
    c = torch.tensor(data['c'].values.ravel(), dtype=torch.float32).unsqueeze(1)
    u = torch.tensor(data['u'].values.ravel(), dtype=torch.float32).unsqueeze(1)
    v = torch.tensor(data['v'].values.ravel(), dtype=torch.float32).unsqueeze(1)
    
    # Split chronologically based on time
    num_time_steps = len(t_coords)
    split_time_idx = int(num_time_steps * train_ratio)
    
    # Training data (first 60% of time steps)
    train_mask = (inputs[:, 2] <= t_coords[split_time_idx])
    inputs_train = inputs[train_mask]
    c_train = c[train_mask]
    u_train = u[train_mask]
    v_train = v[train_mask]
    
    # Testing data (remaining 40% of time steps)
    test_mask = (inputs[:, 2] > t_coords[split_time_idx])
    inputs_test = inputs[test_mask]
    c_test = c[test_mask]
    u_test = u[test_mask]
    v_test = v[test_mask]
    
    return inputs_train, c_train, u_train, v_train, inputs_test, c_test, u_test, v_test

In [8]:
# Training Loop
def train_pinn(pinn, inputs_train, c_train, u_train, v_train, D, S, num_epochs, lr):
    optimizer = optim.Adam(pinn.parameters(), lr=lr)
    for epoch in range(num_epochs):
        optimizer.zero_grad()
        loss = loss_function(pinn, inputs_train, c_train, u_train, v_train, D, S)
        loss.backward()
        optimizer.step()
        if epoch % 100 == 0:
            print(f"Epoch {epoch}, Loss: {loss.item()}")

In [None]:
# Main Function
def main():
    # Parameters
    input_dim = 3  # x, y, t
    hidden_dim = 64
    output_dim = 1
    num_layers = 5
    D = 0.1  # Diffusion coefficient
    S = lambda x, y, t: 0.0  # Source term
    epochs = 1000
    lr = 0.001

    # Load and prepare data
    data = xr.open_dataset("output-tracer-release_2025-02-16.nc")
    (inputs_train, c_train, u_train, v_train,
     inputs_test, c_test, u_test, v_test) = generate_train_test_data(data)

    # Initialize PINN
    pinn = PINN(input_dim, hidden_dim, output_dim, num_layers)

    # Train PINN
    train_pinn(pinn, inputs_train, c_train, u_train, v_train, D, S, epochs, lr)

    # Test PINN
    with torch.no_grad():
        c_pred_train = pinn(inputs_train)
        c_pred_test = pinn(inputs_test)
    
    # Calculate MSE
    train_mse = torch.mean((c_pred_train - c_train)**2)
    test_mse = torch.mean((c_pred_test - c_test)**2)
    print(f"Train MSE: {train_mse.item()}")
    print(f"Test MSE: {test_mse.item()}")

    # Plot results
    plt.figure(figsize=(12, 5))
    
    # Training data plot
    plt.subplot(1, 2, 1)
    plt.scatter(inputs_train[:, 0], inputs_train[:, 1], c=c_pred_train.numpy(), 
                cmap='viridis', s=2, label='Predicted')
    plt.colorbar(label='Concentration')
    plt.title('Training Data Predictions')
    
    # Test data plot
    plt.subplot(1, 2, 2)
    plt.scatter(inputs_test[:, 0], inputs_test[:, 1], c=c_pred_test.numpy(), 
                cmap='viridis', s=2, label='Predicted')
    plt.colorbar(label='Concentration')
    plt.title('Test Data Predictions')
    
    plt.tight_layout()
    plt.show()

if __name__ == "__main__":
    main()