
# XML â†’ Radio Map Dataset Pipeline

This notebook converts a **Mitsuba XML scene** into training-ready tensors with the exact contract:

- **rss**: `(1, H, W)` float32, normalized to ~[-1, 1]
- **cond**: `(C_cond, H, W)` float32, with **elevation as channel 0**
- **elevation**: `(H, W)` float32 (also included in `cond[0]`)

Saved as a dict via `np.save(...).item()` with keys:
`'rss'`, `'cond'`, `'elevation'`.

The notebook is parameterized by the XML path.


## Parameters

In [17]:

# ---- USER PARAMETERS ----
XML_PATH = "/Users/matthewgrech/ECE2T5 1st Sem/ECE496/Diffusion/scene_generation/studio_setup.xml"          # Path to Mitsuba XML
MESH_ROOT = "/Users/matthewgrech/ECE2T5 1st Sem/ECE496/Diffusion/scene_generation"                 # Root directory for mesh files
H, W = 128, 128                  # Output grid resolution
RSS_DB_FLOOR = -100.0            # dB floor for normalization
RSS_DB_SCALE = 50.0              # scale for normalization
OUTPUT_PATH = "sample.npy"      # Output sample file


## Imports

In [4]:

import xml.etree.ElementTree as ET
from pathlib import Path
import numpy as np


## XML Parsing

In [7]:

def parse_scene_xml(xml_path):
    tree = ET.parse(xml_path)
    root = tree.getroot()

    # Materials
    material_map = {}
    for i, bsdf in enumerate(root.findall("bsdf")):
        material_map[bsdf.attrib["id"]] = i

    shapes = []
    for shape in root.findall("shape"):
        filename = shape.find("string[@name='filename']").attrib["value"]
        bsdf_ref = shape.find("ref").attrib["id"]

        shapes.append({
            "mesh": filename,
            "material_id": material_map[bsdf_ref],
            "semantic": 0 if "Plane" in filename else 1  # 0=ground, 1=building
        })
    return shapes, material_map

shapes, material_map = parse_scene_xml(XML_PATH)
print(f"Loaded {len(shapes)} shapes")    


Loaded 15 shapes


## PLY Loader

In [8]:

def load_ply(path):
    with open(path) as f:
        lines = f.readlines()

    i = lines.index("end_header\n") + 1
    header = lines[:i]

    v_count = int([l for l in header if l.startswith("element vertex")][0].split()[-1])
    f_count = int([l for l in header if l.startswith("element face")][0].split()[-1])

    verts = np.array([list(map(float, l.split())) for l in lines[i:i+v_count]], dtype=np.float32)
    faces = np.array([list(map(int, l.split()[1:])) for l in lines[i+v_count:i+v_count+f_count]], dtype=np.int32)

    return verts, faces


## Rasterization Utilities

In [9]:

def rasterize_height(vertices, H, W):
    # Simple orthographic projection to grid
    xy = vertices[:, :2]
    z = vertices[:, 2]

    xy_min, xy_max = xy.min(axis=0), xy.max(axis=0)
    xy_norm = (xy - xy_min) / (xy_max - xy_min + 1e-6)

    xi = np.clip((xy_norm[:,0] * (W-1)).astype(int), 0, W-1)
    yi = np.clip((xy_norm[:,1] * (H-1)).astype(int), 0, H-1)

    elev = np.zeros((H, W), dtype=np.float32)
    count = np.zeros((H, W), dtype=np.float32)

    for x, y, zz in zip(xi, yi, z):
        elev[y, x] += zz
        count[y, x] += 1

    elev /= np.maximum(count, 1.0)
    return elev


## Build Elevation & Material Maps

In [20]:

elevation = np.zeros((H, W), dtype=np.float32)
material_channels = len(material_map)
material_maps = np.zeros((material_channels, H, W), dtype=np.float32)

for s in shapes:
    V, F = load_ply(Path(MESH_ROOT) / s["mesh"])
    elev_map = rasterize_height(V, H, W)
    elevation = np.maximum(elevation, elev_map)
    material_maps[s["material_id"]] = np.maximum(material_maps[s["material_id"]], elev_map > 0)

# Normalize elevation to [0,1]
elev_min, elev_max = elevation.min(), elevation.max()
elevation_norm = (elevation - elev_min) / (elev_max - elev_min + 1e-6)


UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc1 in position 206: invalid start byte

## Transmitter Heatmap (Example: Single TX at Center)

In [None]:

yy, xx = np.meshgrid(np.arange(H), np.arange(W), indexing='ij')
tx_y, tx_x = H // 2, W // 2
sigma = 10.0
tx_heatmap = np.exp(-((xx - tx_x)**2 + (yy - tx_y)**2) / (2 * sigma**2)).astype(np.float32)


## Assemble cond Tensor

In [None]:

cond = np.concatenate(
    [
        elevation_norm[None, ...],        # channel 0
        tx_heatmap[None, ...],             # channel 1
        material_maps                      # material one-hot channels
    ],
    axis=0
).astype(np.float32)

print("cond shape:", cond.shape)


## Generate RSS Target (Placeholder / Simulator Hook)

In [None]:

# Placeholder RSS map (replace with Mitsuba/Sionna output)
rss_db = -30.0 * tx_heatmap  # dummy signal decay
rss_db = rss_db.astype(np.float32)

rss_norm = (rss_db + abs(RSS_DB_FLOOR)) / RSS_DB_SCALE
rss_norm = np.clip(rss_norm, -1.0, 1.0)

rss = rss_norm[None, ...].astype(np.float32)
print("rss shape:", rss.shape)


## Save Sample

In [None]:

sample = {
    "rss": rss,
    "cond": cond,
    "elevation": elevation_norm.astype(np.float32)
}

np.save(OUTPUT_PATH, sample)
print(f"Saved sample to {OUTPUT_PATH}")


In [None]:
# ---- Produce multiple samples for training ----
OUTPUT_DIR = 'dataset_samples'
NUM_SAMPLES = 64
os.makedirs(OUTPUT_DIR, exist_ok=True)
for i in range(NUM_SAMPLES):
    # randomize a transmitter location and small elevation perturbation
    tx_y = np.random.randint(0, H)
    tx_x = np.random.randint(0, W)
    sigma = 8.0 + np.random.rand()*6.0
    yy, xx = np.meshgrid(np.arange(H), np.arange(W), indexing='ij')
    tx_heatmap = np.exp(-((xx - tx_x)**2 + (yy - tx_y)**2) / (2 * sigma**2)).astype(np.float32)
    # simple placeholder RSS (replace with simulator output when available)
    rss_db = -30.0 * tx_heatmap + np.random.randn(H, W).astype(np.float32) * 2.0
    rss_norm = (rss_db + abs(RSS_DB_FLOOR)) / RSS_DB_SCALE
    rss_norm = np.clip(rss_norm, -1.0, 1.0)
    rss = rss_norm[None, ...].astype(np.float32)
    # assemble cond: elevation first, then tx heatmap and material maps
    cond = np.concatenate([elevation_norm[None, ...], tx_heatmap[None, ...], material_maps], axis=0).astype(np.float32)
    sample = {'rss': rss, 'cond': cond, 'elevation': elevation_norm.astype(np.float32)}
    fn = os.path.join(OUTPUT_DIR, f'sample_{i:04d}.npy')
    np.save(fn, sample)
print(f'Wrote {NUM_SAMPLES} samples to {OUTPUT_DIR}')


In [None]:
# ---- Quick train run using interm_demo.py ----
import sys
sys.path.insert(0, '.')  # ensure local imports work
from interm_demo import TimeCondUNet, RadioMapDataset, train
# build filenames list
filenames = sorted([f for f in os.listdir(OUTPUT_DIR) if f.endswith('.npy')])
ds = RadioMapDataset(OUTPUT_DIR, filenames)
# infer cond channels from first sample
tmp = np.load(os.path.join(OUTPUT_DIR, filenames[0]), allow_pickle=True).item()
cond_channels = tmp['cond'].shape[0]
print('Detected cond channels =', cond_channels)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = TimeCondUNet(in_ch=1, cond_channels=cond_channels, base_ch=32, channel_mults=(1,2,4), num_res_blocks=2, time_emb_dim=128, cond_emb_dim=64)
# small smoke-train (1 epoch) to verify everything wires up
train(model, ds, device=device, epochs=1, batch_size=8, lr=2e-4, timesteps=200, save_every=1, out_dir='./tmp_ckpt')
