In [20]:
import numpy as np
import h5py
from pathlib import Path
import torch
from einops import rearrange


In [21]:
def create_hdf5_dataset_supersonic_flow(output_path: Path, data: np.ndarray, mach: float):
    """
    Create HDF5 file with the specified format.
    """
    vel = data[...,-2:]
    pressure = data[...,1]
    density = data[...,0]

    # create a zero array for the temperature
    temperature = np.zeros_like(pressure)
    
    filename = f"supersonic_flow_Ma_{mach}.hdf5"

    with h5py.File(output_path / filename, "w") as f:
        # Root attributes
        f.attrs["simulation_parameters"] = ["Ma"]
        f.attrs["Ma"] = mach
        f.attrs["dataset_name"] = "COMSOL_SupersonicFlow"
        f.attrs["grid_type"] = "cartesian"
        f.attrs["n_spatial_dims"] = 2
        f.attrs["n_trajectories"] = data.shape[0]

        # Load data from first file to get dimensions
        x_coords = np.arange(data.shape[2])
        y_coords = np.arange(data.shape[3])
        time_steps = np.arange(data.shape[1])

        # Create dimensions group
        dims = f.create_group("dimensions")
        dims.attrs["spatial_dims"] = ["x", "y"]

        time_dset = dims.create_dataset("time", data=time_steps)
        time_dset.attrs["sample_varying"] = False

        x_dset = dims.create_dataset("x", data=x_coords)
        x_dset.attrs["sample_varying"] = False
        x_dset.attrs["time_varying"] = False

        y_dset = dims.create_dataset("y", data=y_coords)
        y_dset.attrs["sample_varying"] = False
        y_dset.attrs["time_varying"] = False

        # Create boundary conditions group
        bc = f.create_group("boundary_conditions")
        
        x_bc = bc.create_group("x_open")
        x_bc.attrs["associated_dims"] = ["x"]
        x_bc.attrs["associated_fields"] = []
        x_bc.attrs["bc_type"] = "open"
        x_bc.attrs["sample_varying"] = False
        x_bc.attrs["time_varying"] = False

        mask = np.zeros_like(x_coords, dtype=np.bool)
        mask[0] = True
        mask[-1] = True
        x_bc.create_dataset("mask", data=mask)
        x_bc.create_dataset("values", data=np.zeros_like(x_coords))

        # y-boundary
        y_bc = bc.create_group("y_open")
        y_bc.attrs["associated_dims"] = ["y"]
        y_bc.attrs["associated_fields"] = []
        y_bc.attrs["bc_type"] = "open"
        y_bc.attrs["sample_varying"] = False
        y_bc.attrs["time_varying"] = False
        mask = np.zeros_like(y_coords, dtype=np.bool)
        mask[0] = True
        mask[-1] = True
        y_bc.create_dataset("mask", data=mask)
        y_bc.create_dataset("values", data=np.zeros_like(y_coords))


        # Create scalars group
        scalars = f.create_group("scalars")
        scalars.attrs["field_names"] = ["Ma"]

        Ma_dset = scalars.create_dataset("Ma", data=mach)
        Ma_dset.attrs["sample_varying"] = False
        Ma_dset.attrs["time_varying"] = False

        # Create t0_fields group for pressure
        t0_fields = f.create_group("t0_fields")
        t0_fields.attrs["field_names"] = ["pressure","density","temperature"]

        # Load and store pressure field
        pressure_dset = t0_fields.create_dataset(
            "pressure", data=pressure
        )
        pressure_dset.attrs["dim_varying"] = [True, True]
        pressure_dset.attrs["sample_varying"] = True
        pressure_dset.attrs["time_varying"] = True

        # Load and store density field
        density_dset = t0_fields.create_dataset(
            "density", data=density
        )
        density_dset.attrs["dim_varying"] = [True, True]
        density_dset.attrs["sample_varying"] = True
        density_dset.attrs["time_varying"] = True

        # Load and store temperature field
        temperature_dset = t0_fields.create_dataset(
            "temperature", data=temperature
        )
        temperature_dset.attrs["dim_varying"] = [True, True]
        temperature_dset.attrs["sample_varying"] = True
        temperature_dset.attrs["time_varying"] = True
    
        # Create t1_fields group for velocities
        t1_fields = f.create_group("t1_fields")
        t1_fields.attrs["field_names"] = ["velocity"]

        # Load velocity components
        velocity_dset = t1_fields.create_dataset(
            "velocity", data=vel
        )
        velocity_dset.attrs["dim_varying"] = [True, True]
        velocity_dset.attrs["sample_varying"] = True
        velocity_dset.attrs["time_varying"] = True

        # Create empty t2_fields group
        t2_fields = f.create_group("t2_fields")
        t2_fields.attrs["field_names"] = []

In [22]:
# swap x and y
def swap_x_y(data):
    data = data.swapaxes(1, 2)
    return data


def interpolate_data(data):
    data = torch.from_numpy(data)
    data_r = rearrange(data, "traj t h w c -> (traj t) c h w")
    interpolated_data = torch.nn.functional.interpolate(
        data_r, size=(256,128), mode="bicubic", align_corners=False
    )
    interpolated_data = rearrange(interpolated_data, "(traj t) c h w -> traj t h w c", traj=data.shape[0])
    return interpolated_data.numpy()

In [23]:
# find all numpy arrays

base_path = Path("/scratch/zsa8rk/datasets/supersonic_flow")
data_path = base_path / "data"
data_path.mkdir(exist_ok=True)
for file in base_path.glob("np/*.npy"):
    ma = float(file.stem[5:8])
    data = np.load(file)
    data = swap_x_y(data)
    # add traj axis to the front
    data = data[np.newaxis, ...]
    data = interpolate_data(data)
    create_hdf5_dataset_supersonic_flow(data_path, data, ma)

In [24]:
import shutil
import random
from pathlib import Path

# data_path = Path("C:/Users/zsa8rk/Coding/Large-Physics-Foundation-Model/data/datasets/cooled_object_pipe_flow_air/data")
def split_datasets(data_path: Path, train_ratio: float = 0.8, val_ratio: float = 0.1, test_ratio: float = 0.1):
    """Split hdf5 files into train/val/test directories.
    
    Parameters
    ----------
    data_path : Path
        Path to directory containing hdf5 files
    train_ratio : float, optional
        Ratio of files to use for training, by default 0.8
    val_ratio : float, optional
        Ratio of files to use for validation, by default 0.1
    test_ratio : float, optional
        Ratio of files to use for testing, by default 0.1
    """
    # Create subdirectories
    train_dir = data_path / "train"
    val_dir = data_path / "valid" 
    test_dir = data_path / "test"
    
    for dir in [train_dir, val_dir, test_dir]:
        dir.mkdir(exist_ok=True)
    
    # Get list of hdf5 files
    hdf5_files = list(data_path.glob("*.hdf5"))
    
    # Shuffle files
    random.shuffle(hdf5_files)
    
    # Calculate split indices
    n_files = len(hdf5_files)
    n_train = int(n_files * train_ratio)
    n_val = int(n_files * val_ratio)
    
    # Split files
    train_files = hdf5_files[:n_train]
    val_files = hdf5_files[n_train:n_train + n_val]
    test_files = hdf5_files[n_train + n_val:]
    
    # Move files to respective directories
    for file in train_files:
        shutil.move(file, train_dir / file.name)
    
    for file in val_files:
        shutil.move(file, val_dir / file.name)
        
    for file in test_files:
        shutil.move(file, test_dir / file.name)
    print(f"Split {n_files} files into:")
    print(f"Train: {len(train_files)} files")
    print(f"Validation: {len(val_files)} files") 
    print(f"Test: {len(test_files)} files")

# Split the datasets
split_datasets(base_path / "data")

Split 39 files into:
Train: 31 files
Validation: 3 files
Test: 5 files
