**Urban Chemical Safety Challenge: PINN Baseline Notebook**

This notebook demonstrates solving the Convection-Diffusion equation using Physics-Informed Neural Networks (PINNs).
The solution includes:
1. Problem Setup
3. PINN Model Definition
4. Training Loop
6. Submission File Creation

In [2]:
# --- IMPORTS ---
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd 
import os
import subprocess

In [3]:
from tensorflow.keras.activations import swish, mish

In [76]:
# --- CONFIGURABLE PARAMETERS ---
# Domain boundaries
x_min, x_max = 0.0, 1.0  # x-coordinate bounds
y_min, y_max = 0.0, 1.0  # y-coordinate bounds
eps = 1e-4  # Diffusion coefficient

# Neural network architecture and training parameters
layers = [2, 30, 1]  # Network architecture: [input_dim, hidden_dim, output_dim]
lr_initial = 0.01    # Initial learning rate for Adam optimizer
epochs = 10000        # Number of training epochs
beta = 1.0          # Weight for boundary condition loss term

# Sampling parameters
N_interior = 1000    # Number of interior training points
N_boundary = 100     # Number of boundary training points

# File paths
test_path = "../input/casml-2024-scientific-machine-learning-challenge/test.csv"
submission_path = "/kaggle/working/submission.csv"

In [77]:
def set_device():
    """
    Configure and select the computational device (GPU/CPU).
    
    Returns:
        str: Device identifier ('/GPU:0' or '/CPU:0')
    """
    gpus = tf.config.list_physical_devices('GPU')
    if gpus:
        print("Using GPU")
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        return '/GPU:0'
    print("Using CPU")
    return '/CPU:0'

In [78]:
device = set_device()  

Using GPU


In [79]:
def generate_boundary_points(xmin, xmax, ymin, ymax, num_points):
    """
    Generate training points along the domain boundaries.
    
    Args:
        xmin, xmax (float): x-coordinate bounds
        ymin, ymax (float): y-coordinate bounds
        num_points (int): Total number of boundary points to generate
    
    Returns:
        tuple: Arrays of x and y coordinates for boundary points
    """
    num_points_per_edge = num_points // 4
    # Generate points for each boundary edge
    x_left = np.full((num_points_per_edge, 1), xmin)
    y_left = np.random.uniform(ymin, ymax, (num_points_per_edge, 1))
    x_right = np.full((num_points_per_edge, 1), xmax)
    y_right = np.random.uniform(ymin, ymax, (num_points_per_edge, 1))
    x_bottom = np.random.uniform(xmin, xmax, (num_points_per_edge, 1))
    y_bottom = np.full((num_points_per_edge, 1), ymin)
    x_top = np.random.uniform(xmin, xmax, (num_points_per_edge, 1))
    y_top = np.full((num_points_per_edge, 1), ymax)
    
    # Stack all boundary points
    x_boundary = np.vstack([x_left, x_right, x_bottom, x_top])
    y_boundary = np.vstack([y_left, y_right, y_bottom, y_top])
    return x_boundary, y_boundary

In [80]:
def u_bc(x, y):
    """
    Define the Dirichlet boundary condition.
    
    Args:
        x (tf.Tensor): x-coordinates of boundary points
        y (tf.Tensor): y-coordinates of boundary points
    
    Returns:
        tf.Tensor: Boundary values (zero in this case)
    """
    return np.zeros_like(x)

In [81]:
def f(x, y):
    """
    Define the forcing function for the PDE.
    
    Args:
        x (tf.Tensor): x-coordinates
        y (tf.Tensor): y-coordinates
    
    Returns:
        tf.Tensor: Values of the forcing function at given points
    """
    return 2*eps*(-x + np.exp(2*(x - 1)/eps)) + x*y**2 + 6*x*y - x*np.exp(3*(y - 1)/eps) - y**2*np.exp(2*(x - 1)/eps) + 2*y**2 - 6*y*np.exp(2*(x - 1)/eps) - 2*np.exp(3*(y - 1)/eps) + np.exp((2*x + 3*y - 5)/eps)


In [44]:
"""   

"""
# Define the PINN neural network
class PINN(tf.keras.Model):
    def __init__(self):
        super(PINN, self).__init__()
        self.dense1 = tf.keras.layers.Dense(256, activation='tanh', kernel_initializer='glorot_normal')
        self.dense2 = tf.keras.layers.Dense(128, activation='tanh', kernel_initializer='glorot_normal')
        self.dense3 = tf.keras.layers.Dense(, activation='tanh', kernel_initializer='glorot_normal')
        self.dense4 = tf.keras.layers.Dense(1, kernel_initializer='glorot_normal')

    def call(self, x, y):
        # Ensure x and y are 1D tensors
        x = tf.reshape(x, [-1])  # Shape: (batch_size,)
        y = tf.reshape(y, [-1])  # Shape: (batch_size,)
        # Expand dimensions to make x and y 2D tensors
        x = tf.expand_dims(x, axis=-1)  # Shape: (10000, 1)
        y = tf.expand_dims(y, axis=-1)  # Shape: (10000, 1)

    # Concatenate along axis 1
    #xy = tf.concat([x, y], axis=1)  # Shape: (10000, 2)
        xy = tf.concat([x, y], axis=1)
        h = self.dense1(xy)
        h = self.dense2(h)
        h = self.dense3(h)
        u = self.dense4(h)
        return u

In [10]:
def sine_activation(x):
    return tf.sin(x)


In [59]:
class PINN(tf.keras.Model):
    def __init__(self):
        super(PINN, self).__init__()
        
        # Layers with sine activation
        self.dense1 = tf.keras.layers.Dense(100, activation='swish', kernel_initializer='glorot_normal')
        self.norm1 = tf.keras.layers.LayerNormalization()
        self.dense2 = tf.keras.layers.Dense(100, activation='swish', kernel_initializer='glorot_normal')
        self.norm2 = tf.keras.layers.LayerNormalization()
        self.dense3 = tf.keras.layers.Dense(100, activation='swish', kernel_initializer='glorot_normal')
        self.norm3 = tf.keras.layers.LayerNormalization()
        
        # Residual layer
        self.dense_residual = tf.keras.layers.Dense(100, activation='linear', kernel_initializer='glorot_normal')
        
        # Final output layer
        self.dense4 = tf.keras.layers.Dense(1, kernel_initializer='glorot_normal')

    def call(self, x, y):
        # Ensure x and y are 1D tensors
        x = tf.reshape(x, [-1])  # Shape: (batch_size,)
        y = tf.reshape(y, [-1])  # Shape: (batch_size,)
        x = tf.expand_dims(x, axis=-1)  # Shape: (batch_size, 1)
        y = tf.expand_dims(y, axis=-1)  # Shape: (batch_size, 1)

        # Concatenate inputs
        xy = tf.concat([x, y], axis=1)  # Shape: (batch_size, 2)

        # Forward pass with normalization and residual connection
        h = self.dense1(xy)
        h = self.norm1(h)
        h = self.dense2(h)
        h = self.norm2(h)
        h = self.dense3(h)
        h = self.norm3(h)
        
        # Add residual connection
        h = h + self.dense_residual(xy)
        
        # Final output
        u = self.dense4(h)
        return u


In [82]:
class PINN(tf.keras.Model):
    def __init__(self, l2_lambda=1e-3, dropout_rate=0.2):
        super(PINN, self).__init__()
        
        # Define the L2 regularizer
        l2_regularizer = tf.keras.regularizers.L2(l2_lambda)
        
        # Layers with L2 regularization
        self.dense1 = tf.keras.layers.Dense(
            128, activation='tanh', 
            kernel_initializer='glorot_normal', 
            kernel_regularizer=l2_regularizer
        )
        self.dropout1 = tf.keras.layers.Dropout(dropout_rate)  # Dropout layer
        self.norm1 = tf.keras.layers.LayerNormalization()
        
        self.dense2 = tf.keras.layers.Dense(
            128, activation='tanh', 
            kernel_initializer='glorot_normal', 
            kernel_regularizer=l2_regularizer
        )
        
        self.dropout2 = tf.keras.layers.Dropout(dropout_rate)  # Dropout layer
        self.norm2 = tf.keras.layers.LayerNormalization()
        
        self.dense3 = tf.keras.layers.Dense(
            128, activation='tanh', 
            kernel_initializer='glorot_normal', 
            kernel_regularizer=l2_regularizer
        )
        self.dropout3 = tf.keras.layers.Dropout(dropout_rate)  # Dropout layer
        self.norm3 = tf.keras.layers.LayerNormalization()
        
        self.dense_residual = tf.keras.layers.Dense(
            50, activation='linear', 
            kernel_initializer='glorot_normal', 
            kernel_regularizer=l2_regularizer
        )
        
        self.dense4 = tf.keras.layers.Dense(
            1, kernel_initializer='glorot_normal', 
            kernel_regularizer=l2_regularizer
        )

    def call(self, x, y, training=False):
        # Ensure x and y are 1D tensors
        x = tf.reshape(x, [-1])  # Shape: (batch_size,)
        y = tf.reshape(y, [-1])  # Shape: (batch_size,)
        x = tf.expand_dims(x, axis=-1)  # Shape: (batch_size, 1)
        y = tf.expand_dims(y, axis=-1)  # Shape: (batch_size, 1)

        # Concatenate inputs
        xy = tf.concat([x, y], axis=1)  # Shape: (batch_size, 2)

        # Forward pass with Dropout
        h = self.dense1(xy)
        h = self.dropout1(h, training=training)  # Apply dropout
        h = self.norm1(h)
        h = self.dense2(h)
        h = self.dropout2(h, training=training)  # Apply dropout
        h = self.norm2(h)
        h = self.dense3(h)
        h = self.dropout3(h, training=training)  # Apply dropout
        h = self.norm3(h)
        
        # Add residual connection
        h = h + self.dense_residual(xy)
        
        # Final output
        u = self.dense4(h)
        return u


In [66]:
model = PINN()

In [84]:
def pde_loss(model, train_interior, eps):
    """
    Calculate the PDE residual loss using automatic differentiation.
    
    Args:
        model (tf.keras.Model): Neural network model
        train_interior (tf.Tensor): Interior training points
        eps (float): Diffusion coefficient
    
    Returns:
        tf.Tensor: Mean squared PDE residual
    """
    x = tf.expand_dims(train_interior[:, 0], axis=1)
    y = tf.expand_dims(train_interior[:, 1], axis=1)
    
    # Calculate derivatives using automatic differentiation
    with tf.GradientTape(persistent=True) as tape1:
        tape1.watch(x)
        tape1.watch(y)
        with tf.GradientTape(persistent=True) as tape2:
            tape2.watch(x)
            tape2.watch(y)
            u = model(tf.concat([x, y], axis=1))
        grad_x = tape2.gradient(u, x)  # First derivatives
        grad_y = tape2.gradient(u, y)
    grad2_x = tape1.gradient(grad_x, x)  # Second derivatives
    grad2_y = tape1.gradient(grad_y, y)
    
    # Calculate PDE residual: -eps*(uxx + uyy) + 2*ux + 3*uy - f(x,y)
    residual = -eps * (grad2_x + grad2_y) + 2 * grad_x + 3 * grad_y - f(x, y)
    return tf.reduce_mean(tf.square(residual))

In [85]:
def bc_loss(model, x_bd):
    """
    Calculate the boundary condition loss.
    
    Args:
        model (tf.keras.Model): Neural network model
        x_bd (tf.Tensor): Boundary points
    
    Returns:
        tf.Tensor: Mean squared error at boundary points
    """
    # Boundary Loss
    u_pred = model(x_bd)
    x = tf.expand_dims(x_bd[:, 0], axis=1)
    y = tf.expand_dims(x_bd[:, 1], axis=1)
    u_exact = u_bc(x, y)
    
    return tf.reduce_mean(tf.square(u_pred - u_exact))

In [86]:
# Early stopping class
class EarlyStopping:
    def __init__(self, patience=5):
        self.patience = patience
        self.best_loss = float('inf')
        self.wait = 0

    def should_stop(self, current_loss):
        if current_loss < self.best_loss:
            self.best_loss = current_loss
            self.wait = 0
        else:
            self.wait += 1
        return self.wait >= self.patience

In [91]:
def train(model, epochs, train_interior, train_boundary, eps, beta, lr_initial):
    """
    Train the PINN model.
    
    Args:
        model (tf.keras.Model): Neural network model
        epochs (int): Number of training epochs
        train_interior (tf.Tensor): Interior training points
        train_boundary (tf.Tensor): Boundary training points
        eps (float): Diffusion coefficient
        beta (float): Weight for boundary condition loss
        lr_initial (float): Initial learning rate
    """
    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=lr_initial,
    decay_steps=1000,  # Number of steps before the learning rate decays
    decay_rate=0.9,    # The rate at which the learning rate will decay
    staircase=True     # Whether to apply the decay in discrete steps
)
    optimizer = tf.optimizers.Adam(learning_rate=lr_schedule)
    early_stop = EarlyStopping(patience=5)
    
    for epoch in range(epochs):
        # Compute gradients and update model parameters
        with tf.GradientTape() as tape:
            loss_pde = pde_loss(model, train_interior, eps)
            loss_bc = bc_loss(model, train_boundary)
            total_loss = loss_pde + beta * loss_bc
            
        grads = tape.gradient(total_loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))
        
        # Print training progress
        if epoch % 1000 == 0 or epoch == epochs-1:
            print(f"Epoch {epoch}, Total Loss: {total_loss.numpy()}, PDE Loss: {loss_pde.numpy()}, BC Loss: {loss_bc.numpy()}")

In [92]:
def create_submission(model, test_path, output_path):
    """
    Create submission file with model predictions.
    
    Args:
        model (tf.keras.Model): Trained neural network model
        test_path (str): Path to test data CSV file
        output_path (str): Path for output submission file
    """
    test_data = pd.read_csv(test_path)
    xy = test_data[['x', 'y']].values
    predictions = model.predict(xy)
    test_data['u'] = predictions
    test_data.to_csv(output_path, index=False)

In [93]:
model.summary()

In [94]:
# --- MAIN EXECUTION ---
# Generate interior training points
x_train_interior = np.random.uniform(x_min, x_max, (N_interior, 1))
y_train_interior = np.random.uniform(y_min, y_max, (N_interior, 1))
train_interior = tf.convert_to_tensor(np.hstack([x_train_interior, y_train_interior]), dtype=tf.float32)

# Generate boundary training points
x_boundary, y_boundary = generate_boundary_points(x_min, x_max, y_min, y_max, N_boundary)
train_boundary = tf.convert_to_tensor(np.hstack([x_boundary, y_boundary]), dtype=tf.float32)

# Create and train model on specified device
with tf.device(device):
    # Define neural network architecture
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(2,)),
        *[tf.keras.layers.Dense(units, activation='tanh') for units in layers[1:-1]],
        tf.keras.layers.Dense(1)
    ])
    
    # Train the model
    train(model, epochs, train_interior, train_boundary, eps, beta, lr_initial)

Epoch 0, Total Loss: 12.44084358215332, PDE Loss: 12.417240142822266, BC Loss: 0.023603124544024467
Epoch 1000, Total Loss: 0.11585079878568649, PDE Loss: 0.006468627136200666, BC Loss: 0.10938217490911484
Epoch 2000, Total Loss: 0.09852267801761627, PDE Loss: 0.0029851424042135477, BC Loss: 0.09553753584623337
Epoch 3000, Total Loss: 0.09567185491323471, PDE Loss: 0.002423297381028533, BC Loss: 0.09324856102466583
Epoch 4000, Total Loss: 0.09447090327739716, PDE Loss: 0.001969624776393175, BC Loss: 0.09250127524137497
Epoch 5000, Total Loss: 0.09634186327457428, PDE Loss: 0.0010609515011310577, BC Loss: 0.09528090804815292
Epoch 6000, Total Loss: 0.09235922247171402, PDE Loss: 0.0015017822152003646, BC Loss: 0.09085743874311447
Epoch 7000, Total Loss: 0.09100686013698578, PDE Loss: 0.0018544330960139632, BC Loss: 0.08915242552757263
Epoch 8000, Total Loss: 0.08952370285987854, PDE Loss: 0.001773103722371161, BC Loss: 0.08775059878826141
Epoch 9000, Total Loss: 0.08813782781362534, PDE

In [95]:
create_submission(model, test_path, submission_path)  # Generating csv file for submission

[1m5000/5000[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 988us/step


In [96]:
import numpy as np
import pandas as pd

def compute_l2_error(test_file):
    # Load test data
    data = pd.read_csv(test_file)
    # Extract true and predicted values
    u_pred = data['u'].values  # Replace with the column name for predicted values
    u_true = np.array([f(x, y) for x, y in zip(data['x'], data['y'])])  # Compute true solution for each (x, y)
    # Compute l2 error
    l2_error = np.sqrt(np.mean((u_pred - u_true) ** 2))
    return l2_error

# Example usage
test_file = "/kaggle/working/submission.csv"  # Replace with your test file path
l2_error = compute_l2_error(test_file)
print(f"L2 Error: {l2_error}")


L2 Error: 3.0745425679990412


In [None]:
import numpy as np
import tensorflow as tf
from fastvpinns import VPINN

# Define the PDE: Convection-Diffusion Equation
def convection_diffusion_residual(u, x, y, u_x, u_y, u_xx, u_yy, eps, beta):
    """
    Residual for the convection-diffusion equation:
    -eps * (u_xx + u_yy) + beta[0] * u_x + beta[1] * u_y
    """
    return -eps * (u_xx + u_yy) + beta[0] * u_x + beta[1] * u_y

# Generate training data
x_min, x_max, y_min, y_max = 0, 1, 0, 1
N_train = 1000  # Number of training points
x_train = np.random.uniform(x_min, x_max, (N_train, 1))
y_train = np.random.uniform(y_min, y_max, (N_train, 1))
train_points = np.hstack([x_train, y_train])

# PDE parameters
eps = 0.01
beta = [1.0, 1.0]

# Define the VPINN model
model = VPINN(
    input_dim=2,           # (x, y)
    output_dim=1,          # u(x, y)
    layers=[64, 64, 64],   # Hidden layers
    activation='tanh',     # Activation function
    num_test_funcs=10      # Number of test functions
)

# Compile the model with the variational residual
model.compile(
    residual_fn=lambda u, x, y, u_x, u_y, u_xx, u_yy: convection_diffusion_residual(
        u, x, y, u_x, u_y, u_xx, u_yy, eps, beta
    ),
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    metric='mse'
)

# Train the model
model.fit(train_points, epochs=5000, batch_size=64)

# Predict on test data
test_points = np.random.uniform([x_min, y_min], [x_max, y_max], (500, 2))
predictions = model.predict(test_points)

# Save predictions for submission
import pandas as pd
submission = pd.DataFrame(test_points, columns=['x', 'y'])
submission['u_pred'] = predictions
submission.to_csv("submission.csv", index=False)
