## Guided Flow Matching Design Optimization Example Notebook

Thank you for your interest in our flow matching based design optimization method for minimizing structural vibrations in plates!

With this notebook and google colab, you can try out the pipeline for optimizing novel plates for your own objective functions with no setup required. Of course, this notebook can also be run on a local machine with gpu access. However, you might need to install some additional packages, as we are only installing those missing on colab.

You need to be signed in with your google account. Please also make sure that you are connected to a gpu runtime by by selecting 'runtime' change runtime to e.g. T4 GPU. The following code snippet will show a table with gpu information if you are connnected to a gpu runtime. To run the code snippet, simply click on the left edge. or press (Ctrl + enter) after selecting it.

In [None]:
!nvidia-smi

The following two code snippets are necessary to set up the environment and download the model weights. Simply run them before continuing. It takes around 2 minutes.


In [None]:
%%capture
# install environment
!pip install \
    munch==4.0 \
    wandb \
    h5py==3.12 \
    hdf5plugin==4.2 \
    flow_matching \
    torchinterp1d==1.1
!pip install git+https://github.com/JanvDelden/template.git

The following downloads depend on the download speed of google colab, which can be quite slow sometimes and take around 2 minutes. In total, around 150 MB are downloaded.

In [None]:
!wget https://data.goettingen-research-online.de/api/access/datafile/125255 -O regression_model.ckpt
!wget https://data.goettingen-research-online.de/api/access/datafile/125254 -O flow_matching_model.ckpt
!wget https://data.goettingen-research-online.de/api/access/datafile/125124 -O moments_dict.pt

!git clone https://github.com/ecker-lab/Optimizing_Vibrating_Plates.git
%cd Optimizing_Vibrating_Plates
%pip install -e .

## Pipeline

We first import the required functions and set up the flow matching as well as the regression model.


In [None]:
import torch
import os
import json
import numpy as np
from plate_optim.flow_matching.models.unet import UNetModel
from plate_optim.regression.regression_model import get_net, get_mean_from_velocity_field
from plate_optim.utils.data import get_moments
from plate_optim.utils.guidance import _callable_constructor, get_loss_fn
from flow_matching.solver import ODESolver


flow_matching_path='/content/flow_matching_model.ckpt'
regression_path='/content/regression_model.ckpt'
moments_dict_path='/content/moments_dict.pt'


# load flow matching model
config = json.load(open('/content/Optimizing_Vibrating_Plates/configs/flow_matching_config.json'))
model = UNetModel(**config).cuda()
model.load_state_dict(torch.load(flow_matching_path)['model_state_dict'])

# load regression model
regression_model = get_net(conditional=True, len_conditional=3, scaling_factor=32).cuda()
regression_model.load_state_dict(torch.load(regression_path)['model_state_dict'])
regression_model.eval(), model.eval()

# load mean and standard deviation of data
out_mean, out_std, field_mean, field_std = get_moments(moments_dict_path=moments_dict_path, device='cuda')
mean_conditional_param = [50, 0.5, 0.5] # Boundary condition, force_position x, force_position y
std_conditional_param = [28.8675, 0.173205,  0.173205]

# specify general variables
device = 'cuda'
resolution = [96, 128]
n_samples = 4

Here we specify the physical parameters of the optimized plates as well as the optimization range for the frequencies. Based on this we construct an objective function and build our velocity model from the objective function and the flow matching model. Feel free to change e.g. the parameters min_freq and max_freq, that specify the optimization range between 1 and 300.

In [None]:
# specify objective function
min_freq, max_freq = 100, 200
phy_para = [0, 0.34444444444444444, 0.35] # 0, 0.31 / 0.9, 0.21 / 0.6

frequencies = torch.linspace(-1.0, 1, 300).cuda()[min_freq:max_freq].unsqueeze(0).repeat(n_samples, 1)
condition_norm = torch.tensor([phy_para], device=device, dtype=torch.float32).repeat(n_samples, 1)
condition = (condition_norm -  torch.tensor(mean_conditional_param).float().cuda().unsqueeze(0))\
        / torch.tensor(std_conditional_param).float().unsqueeze(0).cuda()
loss_fn = get_loss_fn(regression_model, condition, frequencies, field_mean, field_std)

# build velocity model
velocity_model = _callable_constructor(model, loss_fn, save_path='/content/optimization')

Now we can solve the constructed ODE starting from random noise x_0. In this case, we use the euler method and in total 20 steps. The alpha parameter, that controls the relative strength of the guidance pushing towards minimizing the objective function, is set to 1.

In [None]:
# run solver starting from random noise x_0
solver = ODESolver(velocity_model=velocity_model)
x_0 = torch.randn([n_samples, 1, *resolution], dtype=torch.float32, device=device)
samples = solver.sample(
    time_grid=torch.linspace(0, 1, int(1 / 0.05) + 1),
    x_init=x_0,
    method='euler',
    step_size=0.05,
    alpha=1,
    norm_grad=True,
    return_intermediates=True,
)
final_samples = samples[-1].detach().cpu()


Now, we can take a look at our generated beading patterns and their dynamic response as evaluated by the regression model. The results differ for different sampled x_0 and changed optimization parameters.

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
fig, ax = plt.subplots(2,  min(n_samples, 8), figsize=(10, 4))

with torch.no_grad():
    frequencies_full = torch.linspace(-1.0, 1, 300).cuda().unsqueeze(0).repeat(n_samples, 1)
    pred = regression_model(final_samples.cuda(), condition, frequencies_full)
    prediction = get_mean_from_velocity_field(pred, field_mean, field_std, frequencies_full)

for i in range(min(n_samples, 8)):
    ax[0, i].imshow(final_samples[i].cpu().numpy().squeeze()[::-1], cmap='gray')
    ax[0, i].plot(condition_norm[0, 1].cpu() * 128, (1-condition_norm[0, 2].cpu()) * 96,'x', color='red', markersize=4.5, markeredgewidth=1.5)
    ax[0, i].axis('off')


    frequencies_ = torch.arange(1, 301)
    ax[1, i].plot(frequencies_, prediction[i].cpu(), lw=0.8, color='grey', ls='--')
    ax[1, i].set_ylim(-25, 65)
    ax[1, i].grid(alpha=0.3, lw=0.5)
    sns.despine(ax=ax[1, i], offset=5)
    ax[1, i].set_title(f'{i}: {prediction[i][min_freq:max_freq].cpu().mean().item():.3f}')
    ax[1, i].set_xlabel('Velocity (dB)')
    ax[1, i].set_ylabel('Frequency (Hz)')
    plt.tight_layout()


We can also visualize the optimization process reproducing one of the plots from our paper.

In [None]:
import re

sample_id = torch.argmin(prediction[:, min_freq:max_freq].mean(dim=1))
files = os.listdir('/content/optimization')
pattern = re.compile(r'^(grad|v_flow)_(\d+\.\d+)\.pt$')

# Collect (value, filename) pairs for each category
groups = {'grad': [], 'v_flow': []}
for fname in files:
    m = pattern.match(fname)
    if not m:
        continue
    prefix, num_str = m.group(1), m.group(2)
    num = float(num_str)
    groups[prefix].append((num, fname))

# Sort by the numeric suffix and load data
for key in groups:
    groups[key].sort(key=lambda pair: pair[0])
grad_suffixes  = [num for num, _ in groups['grad']]
vflow_suffixes = [num for num, _ in groups['v_flow']]
grad_tensors  = [torch.load(os.path.join('/content/optimization', fname)) for _, fname in groups['grad']]
vflow_tensors = [torch.load(os.path.join('/content/optimization', fname)) for _, fname in groups['v_flow']]
grads   = torch.cat(grad_tensors,  dim=1)
v_flows = torch.cat(vflow_tensors, dim=1)

In [None]:
# produce the actual plot
n_plots = 11
intermediate_sample = samples[:,sample_id,0].cpu().numpy()[:, ::-1]
grad = grads[sample_id].cpu().numpy()[:, ::-1]
v_flow = v_flows[sample_id].cpu().numpy()[:, ::-1]

idx = torch.linspace(0, len(intermediate_sample)- 1, steps=n_plots).round().long()
fig, axes = plt.subplots(4, n_plots, figsize=(6.75*1.5, 2*1.5), gridspec_kw={'wspace': 0, 'hspace': 0})

# turn off axes for images-only rows
for ax in axes[:3].flat:
    ax.axis('off')

for col, id in enumerate(idx):
    t = id/(len(intermediate_sample)-1)
    axes[0, col].imshow(intermediate_sample[id], cmap='gray')
    axes[0, col].plot(condition_norm[sample_id, 1].cpu() * 128, (1-condition_norm[sample_id, 2].cpu()) * 96,'x',
                       color='red', markersize=4.5, markeredgewidth=1.5)
    axes[0, col].set_title(f"t={t:.2f}", pad=2)
    if t < 0.75:
        vmax_scale = max(np.abs(grad[id].min()), np.abs(grad[id].max()))
        axes[1, col].imshow(grad[id], cmap='gray', vmin=-vmax_scale, vmax=vmax_scale)
    if t < 1:
        axes[2, col].imshow(v_flow[id], cmap='gray')

    with torch.no_grad():
        inp = torch.from_numpy(intermediate_sample[id][::-1].copy()).view(1,1,96,128).cuda()
        pred = regression_model(inp, condition[:1], frequencies_full[:1])
        prediction = get_mean_from_velocity_field(pred, field_mean, field_std, frequencies_full[:1])

    ax3 = axes[3, col]
    ax3.plot(frequencies_.cpu(), prediction[0].cpu(), lw=0.8, color='grey', ls='--')
    ax3.set_ylim(-25, 65)
    ax3.set_xticklabels([])
    ax3.grid(alpha=0.3, lw=0.5)
    ax3.set_yticklabels([])
    ax3.tick_params(axis='y', length=0)
    ax3.tick_params(axis='x', length=0)
    sns.despine(ax=ax3, left=True, bottom=True, offset=5)

fig.subplots_adjust(left=0, right=1, top=1, bottom=0)


The first row in the above plot shows x_t, the second the gradient signal from the objective function, and the third row the prediction from the flow matching model. The last row shows the predicted dynamic response given x_t.

Next, lets turn this plot into a video.

In [None]:
from matplotlib.animation import FuncAnimation, FFMpegWriter
from IPython.display import HTML, display

# Prepare the three frame‐stacks
intermediates_best = samples[:, sample_id, 0].cpu().numpy()

frames1 = intermediates_best[:, ::-1]
frames2 = grad
frames3 = v_flow
frames2 = np.pad(frames2, ((0, frames1.shape[0] - frames2.shape[0]), (0, 0), (0, 0)), mode='edge')
frames3 = np.pad(frames3, ((0, frames1.shape[0] - frames3.shape[0]), (0, 0), (0, 0)), mode='edge')

n_frames = frames1.shape[0]

# Set up a wide figure with 3 subplots
fig, axes = plt.subplots(1, 3, figsize=(1.28*4*3, 0.96*4))
ims = []
for ax, frames in zip(axes, (frames1, frames2, frames3)):
    im = ax.imshow(frames[0], cmap="gray", animated=True)
    ax.plot(
    condition_norm[sample_id, 1].cpu() * 128,
    (1-condition_norm[sample_id, 2].cpu()) * 96,
    'x', color='red', markersize=8.5, markeredgewidth=2.5
    )
    ax.axis("off")
    ims.append(im)

def update(i):
    vmin, vmax = frames1[i].min(), frames1[i].max(); ims[0].set_clim(vmin, vmax)
    for im, frames in zip(ims, (frames1, frames2, frames3)):
        im.set_array(frames[i])
    return ims


ani = FuncAnimation(fig, update, frames=n_frames, interval=500, blit=True)
fig.tight_layout(pad=0)
plt.subplots_adjust(left=0, right=1, top=1, bottom=0)
video_path = f"/content/combined_{sample_id}.mp4"
writer = FFMpegWriter(fps=1)
ani.save(video_path, writer=writer)

plt.close(fig)
display(HTML(ani.to_jshtml()))
print(f"Saved combined video to: {video_path}")