# CFD with PyFR



## Overview

This notebook walks through preparing, running, and post-processing the *2D viscous shock tube* benchmark from the PyFR test cases using the CUDA backend.

1. Install and fetch the test case assets,
2. Convert the mesh and run a PyFR solution on CUDA,
3. Export PyFR objects into VTU and NumPy for later analysis, and
4. Inspect the most recent solution with a static PyVista plot.

Use the cells below as a guide to reproduce the workflow in a Colab/CUDA environment.

In [None]:
!pip install pyfr
!git clone https://github.com/PyFR/PyFR-Test-Cases.git

## Step 1: Prepare the PyFR test case

The commands above ensure that PyFR is installed in this session and that the `PyFR-Test-Cases` repository is available for the viscous shock tube benchmark.

In [2]:
!ls

PyFR-Test-Cases  sample_data


This listing is a quick sanity check to confirm the cloned repo and working files are present before importing the mesh.

In [None]:
import os

sim_dir = "PyFR-Test-Cases/2d-euler-vortex"

if os.path.exists(sim_dir):
    files = os.listdir(sim_dir)
    mesh_file = next((f for f in files if f.endswith(".msh")), None) or next(
        (f for f in files if f.endswith(".msh.xz")), None
    )
    cfg_file = next((f for f in files if f.endswith(".ini")), None)

    if mesh_file and cfg_file:
        print(f"Importing {mesh_file}...")
        # If mesh is compressed, decompress
        if mesh_file.endswith(".msh.xz"):
            os.system(f'xz -d "{os.path.join(sim_dir, mesh_file)}"')
            mesh_file = mesh_file.replace(".msh.xz", ".msh")

        pyfrm_file = mesh_file.replace(".msh", ".pyfrm")

        mesh_file = os.path.join(sim_dir, mesh_file)
        cfg_file = os.path.join(sim_dir, cfg_file)
        pyfrm_file = os.path.join(sim_dir, pyfrm_file)

        print(f"Creating {pyfrm_file}...")
        os.system(f'pyfr import "{mesh_file}" "{pyfrm_file}"')

        print(f"Running simulation with {cfg_file}...")
        os.system(f'pyfr run -b cuda "{pyfrm_file}" "{cfg_file}"')

    else:
        print(files)
        print("Could not find required .msh and .ini files in the directory.")
else:
    print(f"Directory {sim_dir} not found.")

In [None]:
!pip install pyvista
!apt-get install -y xvfb libgl1-mesa-glx

In [None]:
# Install pyvista if not present (needed for filtering)
import pyvista as pv
from google.colab import drive

drive.mount("/content/drive")
import os
import zipfile
from concurrent.futures import ThreadPoolExecutor, as_completed
import subprocess
from tqdm.notebook import tqdm
from multiprocessing import cpu_count
import numpy as np

# ----------------------------
# Setup directories
# ----------------------------
mesh_candidates = [f for f in os.listdir(sim_dir) if f.endswith(".pyfrm")]
if not mesh_candidates:
    raise FileNotFoundError("No .pyfrm file found in " + sim_dir)
mesh_file = os.path.join(sim_dir, mesh_candidates[0])
print("Mesh file:", mesh_file)

# Output folders
root_out = "results"
dens_out = os.path.join(root_out, "density")
velo_out = os.path.join(root_out, "velocity")

os.makedirs(dens_out, exist_ok=True)
os.makedirs(velo_out, exist_ok=True)

# ZIP destination
drive_dest_dir = "/content/drive/MyDrive/vtu_results"
os.makedirs(drive_dest_dir, exist_ok=True)
results_zip = os.path.join(drive_dest_dir, "results_density_velocity.zip")

# Input simulation state files
pyfrs_files = [f for f in os.listdir(".") if f.endswith(".pyfrs")]


# ----------------------------
# EXPORT / READ / SAVE NPY
# ----------------------------
def export_and_filter(f):
    vtu_name = f.replace(".pyfrs", ".vtu")
    base = os.path.splitext(os.path.basename(vtu_name))[0]

    # 1. Export PyFR â†’ VTU
    cmd = ["pyfr", "export", mesh_file, f, vtu_name]
    try:
        subprocess.run(cmd, check=True, capture_output=True)
    except subprocess.CalledProcessError as e:
        print(f"Export failed for {f}:\n{e.stderr.decode()}")
        return []

    saved_files = []

    # 2. Extract density + velocity
    try:
        mesh = pv.read(vtu_name)

        # Cache mesh node coordinates once inside the velocity directory so
        # downstream trajectory tools can build a KD-tree for spatial queries.
        points = np.asarray(mesh.points, dtype=np.float32)
        points_xy = points[:, :2]
        point_targets = [
            os.path.join(velo_out, "points.npy"),
            os.path.join(velo_out, "mesh_points.npy"),
        ]
        for pt in point_targets:
            if not os.path.exists(pt):
                np.save(pt, points_xy)
                saved_files.append(pt)

        if "Density" not in mesh.point_data or "Velocity" not in mesh.point_data:
            print(f"Missing fields in {vtu_name}")
            if os.path.exists(vtu_name):
                os.remove(vtu_name)
            return []

        density = np.array(mesh.point_data["Density"], dtype=np.float32)
        velocity = np.array(mesh.point_data["Velocity"], dtype=np.float32)[
            :, :2
        ]  # keep vx, vy

        # Save to correct folders
        dens_path = os.path.join(dens_out, f"{base}_density.npy")
        velo_path = os.path.join(velo_out, f"{base}_velocity.npy")

        np.save(dens_path, density)
        np.save(velo_path, velocity)

        saved_files.extend([dens_path, velo_path])

    except Exception as e:
        print(f"Error reading {vtu_name}: {e}")

    # 3. Delete temporary VTU
    if os.path.exists(vtu_name):
        os.remove(vtu_name)

    return saved_files


# ----------------------------
# MULTITHREAD PROCESSING
# ----------------------------
created_files = []
max_threads = min(4, cpu_count())
print(f"Processing {len(pyfrs_files)} files with {max_threads} threads...")

with ThreadPoolExecutor(max_workers=max_threads) as executor:
    futures = {executor.submit(export_and_filter, f): f for f in pyfrs_files}
    for future in tqdm(as_completed(futures), total=len(futures)):
        result = future.result()
        if result:
            created_files.append(result)


# ----------------------------
# ZIP ALL NPY FILES
# ----------------------------
print(f"Zipping all .npy files into {results_zip}...")

with zipfile.ZipFile(results_zip, "w", compression=zipfile.ZIP_DEFLATED) as z:
    for group in created_files:
        for f in group:
            z.write(f, arcname=os.path.relpath(f, root_out))

print("Done. Saved:", results_zip)

In [None]:
import os
import glob
import re

# Configuration
sim_dir = "PyFR-Test-Cases/2d-viscous-shock-tube"
mesh_path = os.path.join(sim_dir, "viscous-shock-tube.pyfrm")
output_vtu = "solution.vtu"

# 1. Find the latest solution file in the current directory
soln_files = glob.glob("*.pyfrs")
if not soln_files:
    print("No solution files (.pyfrs) found in current directory.")
else:
    # Regex to extract time from filename
    def get_time(f):
        match = re.search(r"viscous-shock-tube-([\d\.]+)\.pyfrs", f)
        return float(match.group(1)) if match else -1.0

    latest_soln = max(soln_files, key=get_time)
    time_val = get_time(latest_soln)
    print(f"Found latest solution: {latest_soln} (Time: {time_val})")

    # 2. Export to VTU
    if os.path.exists(mesh_path):
        print(f"Exporting using mesh: {mesh_path}...")
        !pyfr export "{mesh_path}" "{latest_soln}" "{output_vtu}"

        # 3. Visualize
        if os.path.exists(output_vtu):
            print("Generating plot...")
            pv.start_xvfb()
            pv.set_jupyter_backend("static")

            mesh = pv.read(output_vtu)

            # Select 'Density' if available, otherwise fall back
            scalar_name = (
                "Density" if "Density" in mesh.array_names else mesh.array_names[0]
            )
            print(f"Plotting scalar: {scalar_name}")

            # Use Plotter explicitly to avoid keyword argument issues
            pl = pv.Plotter(off_screen=True)
            pl.add_mesh(mesh, scalars=scalar_name, cmap="viridis", show_scalar_bar=True)
            pl.add_title(f"Viscous Shock Tube: {scalar_name} at t={time_val}")
            pl.camera_position = "xy"
            pl.show()
            img = pl.screenshot("visual.png")
            print("Plot saved as 'visual.png'")
        else:
            print("Export failed: VTU file not created.")
    else:
        print(f"Mesh file not found at {mesh_path}")

In [None]:
# Generate synthetic axial-compressor velocity snapshots
import os
from tqdm import trange

out_dir = "data/axial-compressor/velocity"
os.makedirs(out_dir, exist_ok=True)

# Polar grid settings (annulus between hub and tip)
nr = 64
ntheta = 128
r_hub = 0.05
r_tip = 0.15
r = np.linspace(r_hub, r_tip, nr)
theta = np.linspace(0, 2 * np.pi, ntheta, endpoint=False)
R, Theta = np.meshgrid(r, theta, indexing="xy")  # shapes (ntheta, nr)

# Node coordinates (x, y)
x = (R * np.cos(Theta)).ravel()
y = (R * np.sin(Theta)).ravel()
points = np.vstack([x, y]).T
np.save(os.path.join(out_dir, "points.npy"), points)

# Snapshot generation parameters
snapshots = 64
rpm = 15000
omega = 2 * np.pi * (rpm / 60.0)  # rad/s
blade_count = 18

for t in trange(snapshots, desc="Generating axial-compressor snapshots"):
    phase = 2 * np.pi * t / snapshots

    # Base azimuthal velocity (scaled) and a blade-phase perturbation
    v_theta = 0.2 * omega * R
    perturb = 0.1 * (r_tip - R) / (r_tip - r_hub) * np.sin(blade_count * Theta - phase)

    # Small radial inflow/outflow component to mimic axial/blade forcing
    v_r = (
        0.05 * (1 - (R - r_hub) / (r_tip - r_hub)) * np.cos(blade_count * Theta - phase)
    )

    # Convert polar velocities to cartesian components
    vx = (v_r * np.cos(Theta) - (v_theta + perturb) * np.sin(Theta)).ravel()
    vy = (v_r * np.sin(Theta) + (v_theta + perturb) * np.cos(Theta)).ravel()

    vel = np.vstack([vx, vy]).T.astype(np.float32)
    np.save(os.path.join(out_dir, f"velocity_{t:04d}.npy"), vel)

print(f"Saved {snapshots} velocity snapshots to {out_dir}")
print("Saved node coordinates to", os.path.join(out_dir, "points.npy"))