# Introduce Earth System Data

Earth System data usually uses discrete points in space to describe the physical feature field

![earth_system_field.png](attachment:d8dfa96b-c320-49bb-b6b1-4d0ef7a06ebe.png)

For example, earth system data may use regular grid.

\[Challenge 1\] However, store high-resolution earth system data takes large space.

![earth_system_grid.png](attachment:b7ef1ed5-17a3-4498-b4d1-bad0c454b061.png)

\[Challenge 2\] When we query a point in the physical field, we may need to perform interpolation. Interpolation may be slow due to irregular data. (Example: RBF interpolation in MPAS-Ocean data)

![interpolation.png](attachment:c9684d8b-50c2-4ad5-a002-f2d818a0cea0.png)

# Image Regression

To address the two challenges, we can use neural network to represent the field.

1. Use a compact neural network to compress the data
2. Query the network without interpolation

Neural Network

- Input: coordinates
- Output: RGB in image, temperature in sea surface temperature data, etc.

![image_regression_network.png](attachment:986be8d5-1ca5-48c0-aa00-0bb2afd8f90e.png)

# Import libraries

In [None]:
from models import *
from matplotlib import image
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

# Training with a Fixed Seed
To ensure reproducibility of neural network training results, it is important to fix the random seed. This guarantees that all stochastic components—such as network weight initialization and data shuffling—produce consistent results across runs.

You can also check if a GPU is available using:

- torch.cuda.is_available()

In [None]:
# Configuration of seeds and PyTorch device
np.random.seed(0)
torch.manual_seed(0)
if torch.cuda.is_available():
    device = torch.device('cuda:0')
    print('Running on the GPU')
else:
    device = torch.device('cpu')
    print('Running on the CPU')

# Data Loading and Visualization

In this tutorial, we focus on an image regression task, where the goal is to learn a function that maps spatial coordinates to scalar values. Although we work with hydrodynamic simulation data here, the approach can be extended to other spatial data types, such as earth system model fields.

We aim to learn a model $\hat{f}$ that approximates the true underlying function $f$:

$$
f(\mathbf{x}) = y
$$

where:
- $ \mathbf{x} $ is input coordinate, which can be 2D (x, y) or 3D (x, y, z)
- $ y $ is the corresponding target value at that coordinate

![smoke_regression_network.png](attachment:88d1a492-7412-49a9-a783-21612d49ff89.png)

### Machine Learning Pipeline Overview

1. Load data
2. Preprocess data
3. Split into training and testing sets
4. Train the model
5. Evaluate the model

To simplify this tutorial, we use a single field for both training and testing. Therefore, we will skip step 3 (train/test split).

# 1. Load Smoke Data and Pre-process Smoke Data

In this tutorial, we use a synthetic smoke simulation dataset. The simulation involves a ball that continuously releases smoke, which rises within a box due to buoyancy forces.

The dataset contains 161 timesteps, and each timestep is a $\mathbf{128 × 128}$ grid representing a 2D smoke density field. That means each field contains $\mathbf{128 × 128}$ spatial points, with each point holding a scalar value indicating the smoke density.

### Preprocessing Steps
- Load the dataset.
- Normalize the data to the range [0,1], which is essential for stable training and consistent visualization.

We finally visualize the last timestep to inspect the smoke pattern and validate the data loading process.

In [None]:
data = np.load('smoke_data.npy')
maxval, minval = np.max(data), np.min(data)
data = (data - minval) / (maxval - minval)
data = data.transpose(0, 2, 1)

print('Data shape:', data.shape)

target_field = data[-1].reshape(-1, 1)

plt.imshow(data[-1], origin='lower', cmap='plasma', vmin=0.0, vmax=1.0)
plt.tight_layout()
plt.axis('off')
plt.show()

# 2. Prepare Coordinate Data

In our model, the input is a spatial coordinate, and the output is the corresponding scalar value. We've already loaded the values (i.e., smoke densities) from the dataset. Now, we need to generate the corresponding coordinate inputs.

Recall the function we aim to approximate:

$$
f(\mathbf{x}) = y
$$

- $ \mathbf{x} $ = input coordinates (x, y)
- $ y $ = output value (smoke density).

The smoke dataset is defined on a regular grid—a structured grid where points are evenly spaced along each axis. In our case, each field has a resolution of $\mathbf{128 × 128}$, meaning there are $\mathbf{128}$ evenly spaced points along both the X and Y axes.

### Coordinate Generation
To better understand this, let’s begin with a simple example: a $\mathbf{5 × 5}$ grid.

We use np.linspace to generate evenly spaced coordinates in the range [0,1]

This gives us the X-coordinates for one axis. We will then generate the full 2D coordinate grid from this.

In [None]:
# Simple np.linspace example
x = np.linspace(0, 1, 5)
print(x)

We can also create y coodinates

In [None]:
y = np.linspace(0, 1, 5)

We can generate the full 2D coordinate grid for a $\mathbf{5 × 5}$ resolution using np.meshgrid. This function takes two 1D arrays (for the X and Y axes) and returns coordinate matrices for each point on the grid.

In [None]:
gridx, gridy = np.meshgrid(x, y, indexing='ij')
print(gridx.shape, gridy.shape)
print(gridx)
print(gridy)

Recall input

- $ \mathbf{x} $ = Input coordinates (x, y) -> array shape (2,)

Neural network can parallel compute multiple input and output at the same time. The array shape will become (N, 2) for N input coordinates.

We can change the shape from (5, 5) to (25, 1) for 25 input coordinate x and y.

The coordinates will be combined through stack.

# Coordinate Shape for Neural Network Input

Recall that each input to the model is a 2D coordinate:

- $\mathbf{x}$ = input coordinates (x, y) → represented as an array of shape (2,)

Since neural networks can process multiple inputs in parallel, we batch the inputs. This means the input array will have shape (N, 2), where N is the number of coordinates.

In our 5 x 5 example:

- We have 25 total coordinates,

- The X and Y values are originally 5 x 5 arrays.

To prepare the inputs:

1. Flatten the X and Y arrays from shape (5, 5) to (25,).

2. Stack them together to form a (25, 2) array.

In [None]:
# single grid point
singlexy = np.array([0.5, 0.5])
print('Single input of the function f', singlexy.shape)

# create multiple grid points
gridx = gridx.reshape(-1, 1)
gridy = gridy.reshape(-1, 1)
print(gridx.shape, gridy.shape)
gridxy = np.concatenate((gridx, gridy), axis=1)
print(gridxy)
print('Multiple input of the function f', gridxy.shape)

Now, we can prepare the coordinates for the smoke data using the same process. Note that we use .astype(np.float32) to convert the coordinate array to 32-bit floating-point numbers, which is the preferred data type for neural network training in PyTorch.

In [None]:
# Create the location for points
x = np.linspace(0, 1, 128).astype(np.float32)
y = np.linspace(0, 1, 128).astype(np.float32)
gridx, gridy = np.meshgrid(x, y)
gridx = gridx.reshape(-1, 1)
gridy = gridy.reshape(-1, 1)
gridxy = np.concatenate((gridx, gridy), axis=1)
print(gridxy.shape)

# 3. Train Model

Recall our original goal: we aim to approximate the true function $f$ using a neural network.

$$
f(\mathbf{x}) = y
$$

We define a model $\hat{f}$, parameterized by $\beta$, to approximate $f$:

$$
\underset{\beta}{\hat{f}}(\mathbf{x}) = \hat{y}
$$

where:

- $\mathbf{x}$ = input coordinate (x, y)
- $ y $ = true smoke density
- $ \hat{y} $ = predicted smoke density
- $ \beta $ = model parameters (weights and biases)

### Model Configuration

Before training, we need to configure the model architecture.

![mlp_structure.png](attachment:1c351889-7ea7-458e-84b8-3d76cfe2b03f.png)

- Yellow: input_dim
- Green: hidden_dim
- Red: output_dim
- Green Layers: num_layers

- .to(device): Send the model to GPU, which accelerates the model training!

ReLU: Common neural network non-linear activation function (non-linear activation function makes neuarl network to learn complicate function!)

![relu.png](attachment:230a4d52-5f9b-417d-9ee2-737f2d074853.png)

Sigmoid: Our model output will be in [0, 1]. Use Sigmoid to constrain the output domain.

![sigmoid.png](attachment:1a86fe62-fa92-494f-af42-4a9bfd281129.png)

### Loss Function: Mean Squared Error (MSE)

We use MSE to measure the difference between predicted and true values:

$$
MSE = \frac{1}{n} \sum_{i=1}^{n} (f(\mathbf{x}) - \underset{\beta}{\hat{f}}(\mathbf{x}))^{2}
$$

where:
- $ n $ is the number of training samples

In [None]:
# Configure the model
mlp_model = MLP(input_dim=2, hidden_dim=64, output_dim=1, num_layers=3, activation=nn.ReLU(), last_activation=nn.Sigmoid(), bias=True, activate_final=True)
mlp_model.to(device)
optimizer = torch.optim.Adam(mlp_model.parameters(), lr=1e-4)
criterion = torch.nn.MSELoss()

We set the number of epochs as the stopping criterion for model training. In this tutorial, an epoch refers to one full pass through the image.

To monitor training progress, we set checkpoints at selected epochs. These checkpoints store the model prediction results:

- Inspect intermediate results,

- Compare performance across different stages of training.

In [None]:
epochs = 800
ckpts = [10, 50, 100, 200, 400, 800]
xy = torch.from_numpy(gridxy)
density = torch.from_numpy(target_field)
xy = xy.to(device)
density = density.to(device)

The model training process consists of two main steps: forward and backward.

### Forward Pass

In the forward pass, we input the coordinate $\mathbf{x}$ into the model to obtain a prediction $\hat{y}$

$$
\underset{\beta}{\hat{f}}(\mathbf{x}) = \hat{y}
$$


Backward Pass
In the backward pass, we compute the loss between the predicted value $\hat{y}$ and the ground truth $y$ using the mean squared error (MSE).
The model parameters $\beta$ are then updated by backpropagation, which minimizes the loss:

- Compute gradients of the loss with respect to $\beta$
- Update $\beta$ using an optimizer (e.g., SGD, Adam)

In [None]:
losses1 = []
mlp_preds = []
for epoch in tqdm(range(epochs)):
    # ===================forward=====================
    density_pred = mlp_model(xy)
    loss = criterion(density_pred, density)
    # ===================backward====================
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses1.append(loss.data.cpu().numpy())
    if (epoch+1) in ckpts:
        with torch.no_grad():
            density_pred = mlp_model(xy)
            density_pred = density_pred.cpu().numpy()
            density_pred = density_pred.reshape((128, 128))
            mlp_preds.append(density_pred)

# 4. Test Model

In this step, we perform a forward pass (inference) using the trained model on the test data.

Note: In this tutorial, the training and testing data are the same. This allows us to evaluate how well the model fits the training field, but does not measure generalization to unseen data.

In [None]:
with torch.no_grad():
    density_pred = mlp_model(xy)
    density_pred = density_pred.cpu().numpy()
    density_pred = density_pred.reshape((128, 128))

plt.imshow(density_pred, origin='lower', cmap='plasma', vmin=0.0, vmax=1.0)
plt.tight_layout()
plt.axis('off')
plt.show()

During training, we record the loss value at each epoch and plot a line graph to visualize the loss trend over time. This helps us assess whether the model is converging and how effectively it is learning.

In [None]:
plt.plot(losses1)

Here, we present the model predictions based on the checkpoints saved during training.

In [None]:
# Show the results during training
fig = plt.figure(figsize=(18, 3))
rows, columns = 1, 6
for i in range(len(ckpts)):
    fig.add_subplot(rows, columns, i+1) 
    plt.imshow(mlp_preds[i].reshape(128, 128), origin='lower', cmap='plasma', vmin=0.0, vmax=1.0)
    plt.axis('off') 
    plt.title('epochs: {0}'.format(ckpts[i]))

# Spectral Bias

We have blurred image although we trained the MLP after 800 epochs. This issue is called spectral bias.

Reason: Neighbor coordinate may generate totally different RGB value. In simple MLP, it is difficult to fit the sharp boundary

![spectral_bias.png](attachment:9c0d7fc2-7d09-4888-907d-29f09dc1702e.png)

We will introduce two ways to address spectral bias. The key idea is to map the low frequency input coordinates to high frequency latents.

1. Positional Encoding
2. Periodic Activation Function

What is low frequency to high frequency? For example, we have two close points 1 and 2. We can map the points by three different sin function. The result vectors are not similar.

![low_freq_to_high_freq.png](attachment:375fadaa-7488-404f-af27-f6dc66f48d18.png)

# Positional Encoding

We use Fourier feature here.

![fourier_feature.png](attachment:1896f19e-ec26-4322-a5c3-26a1039d8412.png)

where:
- $ x $: input value
- $ B $: frequency values ($2^{n}$)
- $ m $: number of frequencies

For example, we can use 4 frequecies = [1, 2, 4, 8]. For 1 input, we will have new vector size =  1 (input) x 4 (freq) x 2 (sin, cos) = 8. For N inputs and M frequencies, we will have new vector size = N x M x 2.

![fourier_feature_example.png](attachment:24fc84ac-bf56-4cc4-bc80-a704072d824c.png)


We will use the same training configuration as before, except for the model architecture.

This time, the model includes positional encoding to enhance its ability to represent high-frequency details.

![positional_encoding_model.png](attachment:df98d2f0-9b64-46c9-9166-53b9a9e93cb1.png)

In [None]:
# Positional Encoding 
freq = 30
PE = PositionalEncoding(sigma=10, m=freq)
pemlp_model = MLP(input_dim=freq*2*2, hidden_dim=64, output_dim=1, num_layers=3, activation=nn.ReLU(), last_activation=nn.Sigmoid(), bias=True, activate_final=True)
pemlp_model.to(device)
optimizer = torch.optim.Adam(pemlp_model.parameters(), lr=1e-4)
criterion = torch.nn.MSELoss()

In [None]:
losses2 = []
pemlp_preds = []
for epoch in tqdm(range(epochs)):
    # ===================forward=====================
    density_pred = pemlp_model(PE(xy))
    loss = criterion(density_pred, density)
    # ===================backward====================
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses2.append(loss.data.cpu().numpy())
    if (epoch+1) in ckpts:
        with torch.no_grad():
            density_pred = pemlp_model(PE(xy))
            density_pred = density_pred.cpu().numpy()
            density_pred = density_pred.reshape((128, 128))
            pemlp_preds.append(density_pred)

In [None]:
with torch.no_grad():
    density_pred = pemlp_model(PE(xy))
    density_pred = density_pred.cpu().numpy()
    density_pred = density_pred.reshape((128, 128))

plt.imshow(density_pred, origin='lower', cmap='plasma', vmin=0.0, vmax=1.0)
plt.tight_layout()
plt.axis('off')
plt.show()

In [None]:
plt.plot(losses1, label='w/o PE')
plt.plot(losses2, label='w/ PE')
plt.legend()
plt.show()

In [None]:
# Show the results during training
fig, axs = plt.subplots(2, 6, facecolor='white')
fig.set_figheight(6)
fig.set_figwidth(18)
axs[0, 0].set(ylabel="MLP w/o PE")
axs[1, 0].set_ylabel('MLP w/ PE')
rows, columns = 2, 6
for i in range(len(ckpts)):
    axs[0, i].set_title('epochs: {0}'.format(ckpts[i]))
    axs[0, i].imshow(mlp_preds[i].reshape(128, 128), origin='lower', cmap='plasma', vmin=0.0, vmax=1.0)
    axs[0, i].set_xticks([])
    axs[0, i].set_yticks([])
    axs[1, i].imshow(pemlp_preds[i].reshape(128, 128), origin='lower', cmap='plasma', vmin=0.0, vmax=1.0)
    axs[1, i].set_xticks([])
    axs[1, i].set_yticks([])

# Exercise: Try different hyperparameters

Hyperparameters in this neural network

- frequency
- number of hidden nodes
- number of layers

You can use Ctrl + / (Windows) or Cmd + / to remove the first # for the selected lines

In [None]:
# freq = ?
# PE = PositionalEncoding(sigma=10, m=freq)
# custom_pemlp = MLP(input_dim=freq*2*2, hidden_dim=?, output_dim=1, num_layers=?, activation=nn.ReLU(), last_activation=nn.Sigmoid(), bias=True, activate_final=True)

In [None]:
# exercise_losses = []
# custom_pemlp_preds = []
# for epoch in tqdm(range(epochs)):
#     # ===================forward=====================
#     density_pred = custom_pemlp(PE(xy))
#     loss = criterion(density_pred, density)
#     # ===================backward====================
#     optimizer.zero_grad()
#     loss.backward()
#     optimizer.step()
#     exercise_losses.append(loss.data.cpu().numpy())
#     if (epoch+1) in ckpts:
#         with torch.no_grad():
#             density_pred = pemlp_model(PE(xy))
#             density_pred = density_pred.cpu().numpy()
#             density_pred = density_pred.reshape((128, 128))
#             custom_pemlp_preds.append(density_pred)

In [None]:
# with torch.no_grad():
#     density_pred = custom_pemlp(PE(xy))
#     density_pred = density_pred.cpu().numpy()
#     density_pred = density_pred.reshape((128, 128))

# plt.imshow(density_pred, origin='lower', cmap='plasma', vmin=0.0, vmax=1.0)
# plt.tight_layout()
# plt.axis('off')
# plt.show()

In [None]:
# plt.plot(exercise_losses, label='Exercise Model')
# plt.legend()
# plt.show()

# Periodic Activation Function

Use sin function to replace ReLU in MLP

![periodic_activation_function_model.png](attachment:953cd085-47ad-4006-b35e-387ed7d0c9c2.png)

In [None]:
# SIREN
siren_model = Siren(in_features=2, hidden_features=64, hidden_layers=3, out_features=1, outermost_linear=True)
siren_model.to(device)
optimizer = torch.optim.Adam(siren_model.parameters(), lr=1e-4)
criterion = torch.nn.MSELoss()

In [None]:
losses3 = []
siren_preds = []
for epoch in tqdm(range(epochs)):
    # ===================forward=====================
    density_pred = siren_model(xy)
    loss = criterion(density_pred, density)
    # ===================backward====================
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses3.append(loss.data.cpu().numpy())
    if (epoch+1) in ckpts:
        with torch.no_grad():
            density_pred = siren_model(xy)
            density_pred = density_pred.cpu().numpy()
            density_pred = density_pred.reshape((128, 128))
            siren_preds.append(density_pred)

In [None]:
with torch.no_grad():
    density_pred = siren_model(xy)
    density_pred = density_pred.cpu().numpy()
    density_pred = density_pred.reshape((128, 128))

plt.imshow(density_pred, origin='lower', cmap='plasma', vmin=0.0, vmax=1.0)
plt.tight_layout()
plt.axis('off')
plt.show()

In [None]:
plt.plot(losses1, label='w/o PE')
plt.plot(losses2, label='w/ PE')
plt.plot(losses3, label='Siren')
plt.legend()
plt.ylim(-0.01, 0.1)
plt.show()

In [None]:
# Show the results during training
fig, axs = plt.subplots(3, 6, facecolor='white')
fig.set_figheight(9)
fig.set_figwidth(18)
axs[0, 0].set(ylabel="MLP w/o PE")
axs[1, 0].set_ylabel('MLP w/ PE')
axs[2, 0].set_ylabel('Siren')
rows, columns = 2, 6
for i in range(len(ckpts)):
    axs[0, i].set_title('epochs: {0}'.format(ckpts[i]))
    axs[0, i].imshow(mlp_preds[i].reshape(128, 128), origin='lower', cmap='plasma', vmin=0.0, vmax=1.0)
    axs[0, i].set_xticks([])
    axs[0, i].set_yticks([])
    axs[1, i].imshow(pemlp_preds[i].reshape(128, 128), origin='lower', cmap='plasma', vmin=0.0, vmax=1.0)
    axs[1, i].set_xticks([])
    axs[1, i].set_yticks([])
    axs[2, i].imshow(siren_preds[i].reshape(128, 128), origin='lower', cmap='plasma', vmin=0.0, vmax=1.0)
    axs[2, i].set_xticks([])
    axs[2, i].set_yticks([])

# Exercise: Super Resolution

We use $\textbf{128 x 128}$ grid for training and inference.

How about using higher resolution?

1. Prepare different x coordinates
2. Prepare different y coordinates
3. Combine x, y coordinates
4. Inference the model
5. Visualize the results

In [None]:
# 1. Prepare different x coordinates

In [None]:
# 2. Prepare different y coordinates

In [None]:
# 3. Combine x, y coordinates

In [None]:
# 4. Inference the model (which model to use?)

In [None]:
# 5. Visualize the results

# Extra Topic: From one timestep to multiple timesteps

### We can skip this topic if we don't have time.

We know how to train a model for single timestep field. Can we train a model for multiple timestep?

Let's visualize the animation for multiple timestep.

In [None]:
from matplotlib.animation import FuncAnimation

time_data = data[-16:]
fig, ax = plt.subplots()
im = ax.imshow(time_data[0], origin='lower', cmap='plasma', animated=True)

def update(frame):
    im.set_array(time_data[frame])
    ax.set_title(f"Time step: {frame}")
    return [im]

ani = FuncAnimation(fig, update, frames=16, blit=True, interval=300)

# Showing animation in notebook
from IPython.display import HTML
HTML(ani.to_jshtml())

# What is the challenge for time-varying data?

Can we just extend the input from (x, y) to (x, y, t) for periodic activation function model?

Let's try!

1. We need to prepare the training data for (x, y, t)

In [None]:
# Create the location for points
x = np.linspace(0, 1, 128).astype(np.float32)
y = np.linspace(0, 1, 128).astype(np.float32)
t = np.linspace(0, 1, 16).astype(np.float32)
gridt, gridx, gridy = np.meshgrid(t, x, y, indexing='ij')
gridt = gridt.reshape(-1, 1)
gridx = gridx.reshape(-1, 1)
gridy = gridy.reshape(-1, 1)
gridtxy = np.concatenate((gridt, gridx, gridy), axis=1)
gridtxy = gridtxy.reshape(16, -1, 3)
print(gridtxy.shape)

2. Basic settings for epochs, input data, and output data

In [None]:
epochs = 800
ckpts = [10, 50, 100, 200, 400, 800]
txy = torch.from_numpy(gridtxy)
density = torch.from_numpy(time_data.reshape(16, -1, 1))
txy = txy.to(device)
density = density.to(device)

3. Setup the model. We can just change the in_feature to 3.

In [None]:
# SIREN
time_varying_siren_model = Siren(in_features=3, hidden_features=128, hidden_layers=4, out_features=1, outermost_linear=True)
time_varying_siren_model.to(device)
optimizer = torch.optim.Adam(siren_model.parameters(), lr=1e-4)
criterion = torch.nn.MSELoss()

4. Training the model

In [None]:
losses4 = []
siren_preds = []
for epoch in tqdm(range(epochs)):
    epoch_loss = []
    for t in range(16):
        # ===================forward=====================
        density_pred = time_varying_siren_model(txy[t])
        loss = criterion(density_pred, density[t])
        # ===================backward====================
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_loss.append(loss.data.cpu().numpy())
    losses4.append(np.mean(epoch_loss))
    if (epoch+1) in ckpts:
        with torch.no_grad():
            current_time_siren_preds = []
            for t in range(16):
                density_pred = time_varying_siren_model(txy[t])
                density_pred = density_pred.cpu().numpy()
                density_pred = density_pred.reshape((128, 128))
                current_time_siren_preds.append(density_pred)
            current_time_siren_preds = np.array(current_time_siren_preds)
            siren_preds.append(current_time_siren_preds)

Visualize the loss function.

In [None]:
plt.plot(losses4, label='Time-varying Siren')
plt.legend()
plt.show()

Visualize the results

In [None]:
fig, ax = plt.subplots()
im = ax.imshow(siren_preds[5][0], origin='lower', cmap='plasma', animated=True)

def update(frame):
    im.set_array(siren_preds[5][frame])
    ax.set_title(f"Time step: {frame}")
    return [im]

ani = FuncAnimation(fig, update, frames=16, blit=True, interval=300)

# Showing animation in notebook
from IPython.display import HTML
HTML(ani.to_jshtml())

### Conclusion: Time-varying data is complex. Simply extend the input dimensions is not applicable.

How to train a model for time-varying data? We can use one learnable latent for one timestep.

We have two choices.

1. Use periodic activation function model

![image.png](attachment:29c0c062-0bc0-46ec-9009-fe2874f01aed.png)

2. Use positional encoding

![image.png](attachment:5fe42112-5ef1-4aa3-b1f7-c37111da24e2.png)

Let's try to see which one is better.

In [None]:
x = np.linspace(0, 1, 128).astype(np.float32)
y = np.linspace(0, 1, 128).astype(np.float32)
gridx, gridy = np.meshgrid(x, y)
gridx = gridx.reshape(-1, 1)
gridy = gridy.reshape(-1, 1)
gridxy = np.concatenate((gridx, gridy), axis=1)
gridxy = gridxy.reshape(-1, 2)
print(gridxy.shape)

In [None]:
epochs = 800
ckpts = [10, 50, 100, 200, 400, 800]
xy = torch.from_numpy(gridxy)
density = torch.from_numpy(time_data.reshape(16, -1, 1))
xy = xy.to(device)
density = density.to(device)

# Periodic Activation Function for time-varying data

In [None]:
class Time_Varying_Siren(nn.Module):
    def __init__(self, in_features, timesteps, latent_size, hidden_features, hidden_layers, out_features, outermost_linear=False, 
                 first_omega_0=30, hidden_omega_0=30.):
        super().__init__()

        # Learnable latent for each timestep
        self.latents = nn.Parameter(torch.randn(timesteps, latent_size))  # shape: (T, latent_dim)
        
        self.net = []
        self.net.append(SineLayer(latent_size + in_features, hidden_features, 
                                  is_first=True, omega_0=first_omega_0))

        for i in range(hidden_layers):
            self.net.append(SineLayer(hidden_features, hidden_features, 
                                      is_first=False, omega_0=hidden_omega_0))

        if outermost_linear:
            final_linear = nn.Linear(hidden_features, out_features)
            
            with torch.no_grad():
                final_linear.weight.uniform_(-np.sqrt(6 / hidden_features) / hidden_omega_0, 
                                              np.sqrt(6 / hidden_features) / hidden_omega_0)
                
            self.net.append(final_linear)
        else:
            self.net.append(SineLayer(hidden_features, out_features, 
                                      is_first=False, omega_0=hidden_omega_0))
        
        self.net = nn.Sequential(*self.net)
    
    def forward(self, coords, t):
        latent = self.latents[t]
        latent = latent.expand(coords.size(0), -1)
        output = self.net(torch.cat([coords, latent], dim=-1))
        return output

In [None]:
# Time-Varying SIREN
time_varying_siren_model = Time_Varying_Siren(in_features=2, timesteps=16, latent_size=128, hidden_features=128, hidden_layers=4, out_features=1, outermost_linear=True)
time_varying_siren_model.to(device)
optimizer = torch.optim.Adam(siren_model.parameters(), lr=1e-4)
criterion = torch.nn.MSELoss()

In [None]:
losses5 = []
siren_preds = []
for epoch in tqdm(range(epochs)):
    epoch_loss = []
    for t in range(16):
        # ===================forward=====================
        density_pred = time_varying_siren_model(xy, t)
        loss = criterion(density_pred, density[t])
        # ===================backward====================
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_loss.append(loss.data.cpu().numpy())
    losses5.append(np.mean(epoch_loss))
    if (epoch+1) in ckpts:
        with torch.no_grad():
            current_time_siren_preds = []
            for t in range(16):
                density_pred = time_varying_siren_model(xy, t)
                density_pred = density_pred.cpu().numpy()
                density_pred = density_pred.reshape((128, 128))
                current_time_siren_preds.append(density_pred)
            current_time_siren_preds = np.array(current_time_siren_preds)
            siren_preds.append(current_time_siren_preds)

In [None]:
plt.plot(losses5, label='Time-varying Siren')
plt.legend()
plt.show()

In [None]:
fig, ax = plt.subplots()
im = ax.imshow(siren_preds[5][0], origin='lower', cmap='plasma', animated=True)

def update(frame):
    im.set_array(siren_preds[5][frame])
    ax.set_title(f"Time step: {frame}")
    return [im]

ani = FuncAnimation(fig, update, frames=16, blit=True, interval=100)

# Showing animation in notebook
from IPython.display import HTML
HTML(ani.to_jshtml())

# Positional Encoding for time-varying data

In [None]:
class Time_Varying_MLP(nn.Module):
    def __init__(self, input_dim: int, timesteps: int, latent_size: int, hidden_dim: int, output_dim:int, num_layers: int, activation=nn.ReLU(), last_activation=nn.Sigmoid(), bias=True, activate_final=False):
        super().__init__()

        # Learnable latent for each timestep
        self.latents = nn.Parameter(torch.randn(timesteps, latent_size))  # shape: (T, latent_dim)
        
        self.net = []
        self.net.append(MLPLayer(input_dim+latent_size, hidden_dim, activation, bias=bias, activate=True))
        for i in range(num_layers-1):
            self.net.append(MLPLayer(hidden_dim, hidden_dim, activation, bias=bias, activate=True))
        self.net.append(MLPLayer(hidden_dim, output_dim, last_activation, bias=bias, activate=activate_final))
        
        self.net = nn.Sequential(*self.net)
        
    def forward(self, coords, t) -> torch.Tensor:
        latent = self.latents[t]
        latent = latent.expand(coords.size(0), -1)
        output = self.net(torch.cat([coords, latent], dim=-1))
        return output

In [None]:
# Positional Encoding 
freq = 30
PE = PositionalEncoding(sigma=10, m=freq)
pemlp_model = Time_Varying_MLP(input_dim=freq*2*2, timesteps=16, latent_size=128, hidden_dim=128, output_dim=1, num_layers=4, activation=nn.ReLU(), last_activation=nn.Sigmoid(), bias=True, activate_final=True)
pemlp_model.to(device)
optimizer = torch.optim.Adam(pemlp_model.parameters(), lr=1e-4)
criterion = torch.nn.MSELoss()

In [None]:
losses6 = []
preds = []
for epoch in tqdm(range(epochs)):
    epoch_loss = []
    for t in range(16):
        # ===================forward=====================
        density_pred = pemlp_model(PE(xy), t)
        loss = criterion(density_pred, density[t])
        # ===================backward====================
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        epoch_loss.append(loss.data.cpu().numpy())
    losses6.append(np.mean(epoch_loss))
    if (epoch+1) in ckpts:
        with torch.no_grad():
            current_preds = []
            for t in range(16):
                density_pred = pemlp_model(PE(xy), t)
                density_pred = density_pred.cpu().numpy()
                density_pred = density_pred.reshape((128, 128))
                current_preds.append(density_pred)
            current_time_siren_preds = np.array(current_preds)
            preds.append(current_time_siren_preds)

In [None]:
plt.plot(losses6, label='Time-varying PE')
plt.legend()
plt.show()

In [None]:
fig, ax = plt.subplots()
im = ax.imshow(preds[5][0], origin='lower', cmap='plasma', animated=True)

def update(frame):
    im.set_array(preds[5][frame])
    ax.set_title(f"Time step: {frame}")
    return [im]

ani = FuncAnimation(fig, update, frames=16, blit=True, interval=300)

# Showing animation in notebook
from IPython.display import HTML
HTML(ani.to_jshtml())