In [None]:
### switch to parent folder in order to import functions
import os
os.chdir('..')
from pathlib import Path
assert Path('./dataset').exists(), f'We are in the wrong directory. Restart may be required.'
from matplotlib import pyplot as plt
import json
import numpy as np
from lib.utils import dBm_gray, plot_dict, calc_loss_with_mask, print_metricsTest, gray_dBm, calculate_fspl_with_offset, rbf_interpolate, tps_interpolate, get_rm_adjusted
from lib.data_loading import load_env_inputs, load_tx_inputs
from lib.modulesIPPNet import RadioNetAnySize
import torch
from PIL import Image
from collections import defaultdict

# Load the data

In [None]:
shape_target = (32, 32)
measurement_dir = Path('dataset/data_seminar_room')
data_file_old = measurement_dir / '2501/measured_data_2501.json' # only LoS
data_file_new = measurement_dir / '2511/measured_data_2511.json' # with whiteboard blocking LoS

with open(data_file_old, 'r') as f:
    data_old = json.load(f)
with open(data_file_new, 'r') as f:
    data_new  = json.load(f)

data_new_mit = {t : {m : md for m, md in td.items() if 'mit' in m and not 'Widerholung' in m} for t, td in data_new.items()}    
data_new_ohne = {t : {m : md for m, md in td.items() if 'ohne' in m and not 'Widerholung' in m} for t, td in data_new.items()}    



# Generate tensors from data

In [None]:
tx_file = measurement_dir / '2501/raster_seminar_room_256/projectwi_files_tx.json'
with open(tx_file, 'r') as f:
    tx_coords_dict = json.load(f)

pl_trnc = -71
pl_max = -12

def extract_data(observation_areas: list, test_areas: list, percentage_threshold: float, tx_id: int, seed: int) -> tuple:
    """
    Extract and prepare observation and ground truth data from measurement areas.
    
    Args:
        observation_areas: List of tuples (data_dict, area_names) for observation data
        test_areas: List of tuples (data_dict, area_names) for ground truth data
        percentage_threshold: Minimum percentage of available samples to include a point
        tx_id: Transmitter ID (1-4)
        seed: Random seed for reproducibility
        
    Returns:
        Tuple of (obs_inp, gt, gt_mask, obs_mask, tx_coords) as tensors
    """
    n_obs_points = sum(len(d[1]) for d in observation_areas)
    rng = np.random.default_rng(seed=seed)
    point_ids = rng.integers(low=0, high=20, size=n_obs_points)
    point_idx = 0

    obs_inp = np.ones((32, 32)) * (-500)
    for data_dict, obs_area_list in observation_areas:
        for obs_area in obs_area_list:
            tx_data = data_dict[f"Tx{tx_id}"][obs_area].values()
            points_usable = [point_data for point_data in tx_data if point_data["percentage_samples_available"] > percentage_threshold]
            try:
                point_used = points_usable[point_ids[point_idx]%len(points_usable)]
            except:
                print(f'{point_ids = }\n{point_idx = }\n{points_usable = }\n{len(points_usable)=}\n{point_idx%len(points_usable)=}')
                raise Exception('stop')
            assert obs_inp[*point_used['coordinates']] == -500, f'{point_used=} already appears in obs_inp?'
            obs_inp[*point_used['coordinates']] = point_used['pl']
            point_idx += 1

    obs_inp = torch.tensor(dBm_gray(obs_inp, pl_trnc=pl_trnc, pl_max=pl_max, clip=True))
    obs_mask = obs_inp > 0

    gt = np.ones((32, 32)) * (-500)
    for data_dict, obs_area_list in test_areas:
        for obs_area in obs_area_list:
            tx_data = data_dict[f"Tx{tx_id}"][obs_area].values()
            for point_data in tx_data:
                if point_data["percentage_samples_available"] > percentage_threshold:
                    assert gt[*point_data['coordinates']] == -500, f'{gt[*point_data["coordinates"]]=}'
                    if obs_mask[*point_data['coordinates']] > 0:
                        continue
                    gt[*point_data['coordinates']] = point_data["pl"]
    gt = torch.tensor(dBm_gray(gt, pl_trnc=pl_trnc, pl_max=pl_max, clip=True))
    gt_mask = gt > 0

    return obs_inp.unsqueeze(0).unsqueeze(0), gt.unsqueeze(0).unsqueeze(0), gt_mask.unsqueeze(0).unsqueeze(0), obs_mask.unsqueeze(0).unsqueeze(0), torch.tensor(tx_coords_dict[str(tx_id - 1)]).unsqueeze(0)



def get_model_inputs(config : dict, tx_id : int, obs_inp : torch.Tensor, dataset_dir : Path) -> torch.Tensor:
    """
    Relevant code from the dataset to generate CNN inputs.
    """
    with open(Path(config['dataset_dir']) / config['env_raster_subdir']  / 'rasterization_parameters.json', 'r') as f:
        raster_params = json.load(f)
    n_classes = len(raster_params['materials']) + 1
    raster_heights = raster_params['heights']
    x_steps = raster_params['x_steps']
    y_steps = raster_params['y_steps']
    x_step_size = raster_params['x_max'] / raster_params['x_steps']
    y_step_size = raster_params['y_max'] / raster_params['y_steps']
    env_id = 'wi_files'


    inputs_cnn = torch.empty((0, x_steps, y_steps))
    if config.get('use_material_properties', False) or config.get('use_material_classes', True):
        inputs_cnn = load_env_inputs(
            inputs=inputs_cnn, 
            env_id_here=env_id, 
            env_raster_subdir=dataset_dir / (f'raster_seminar_room_256' if not 'bin' in config['env_raster_subdir'] else 'raster_seminar_room_256_bin'), 
            raster_heights=raster_heights, 
            n_classes=n_classes, 
            use_material_classes=config.get('use_material_classes', True), 
            use_material_properties=config.get('use_material_properties', False)
        )

    inputs_cnn = torch.concat([inputs_cnn, torch.repeat_interleave(torch.repeat_interleave(obs_inp.squeeze(0), 8, -1), 8, -2)])


    inputs_cnn, _ = load_tx_inputs(
        env_tx_subdir=dataset_dir / f'raster_seminar_room_256',
        env_id_here=env_id,
        tx_id=tx_id - 1,
        use_tx_one_hot=config['use_tx_one_hot'],
        inputs=inputs_cnn,
        x_step_size=x_step_size,
        y_step_size=y_step_size,
        use_tx_distance=config['use_tx_distance'],
        use_Tx_distToRx=config['use_Tx_distToRx'],
        rx_height=1,
        use_log_distance=config['use_log_distance'],
        use_fspl=config['use_fspl'],
        frequency_hz=5.82e9,
        return_coords=True,
        pl_max=pl_max,
        pl_trnc=pl_trnc,
    ) # type: ignore

    return inputs_cnn.to(torch.float32).unsqueeze(0)

In [None]:
tx_id = 3

obs_inp, gt, gt_mask, obs_mask, tx_coords = extract_data(
    observation_areas=[(data_old, [f'MeasurementArea_{k}' for k in [1,2,3,4,7,8]]),
                        (data_new_mit, [f'MeasurementArea_{k}-mit_Abschattung' for k in [5, 6]])
                        ],
    test_areas=[(data_new_mit, [f'MeasurementArea_{k}-mit_Abschattung' for k in [5, 6]])],
    percentage_threshold=50,
    tx_id=tx_id,
    seed=0
)

plot_dict(dict(obs_inp=obs_inp, gt=gt, gt_mask=gt_mask, obs_mask=obs_mask), in_batch_id=0, close_fig=False)

# Test baselines on example sample

## FSPL, interpolation

In [None]:
methods = {
    'fspl_offset' : (calculate_fspl_with_offset, [None]), 
    'rbf' : (rbf_interpolate, [None, 1, 2, 3, 5, 7, 9]), 
    'tps' : (tps_interpolate, [0, 0.01, 0.1, 0.25, 0.5, 0.75, 1])
}
for name, (fun, param_list) in methods.items():
    for param in param_list:
        output = fun(inputs=obs_inp, observation_mask=obs_mask, tx_coords=tx_coords, pl_trnc=pl_trnc, pl_max=pl_max, param=param)
        diff = (gt - output) * gt_mask
        err = calc_loss_with_mask(pred=output, target=gt, mask=gt_mask, observation_mask=obs_mask, alpha=1, metrics=None)
        plot_dict(dict(obs_inp=obs_inp, gt=gt, output=output, diff=diff), in_batch_id=0, close_fig=False, suptitle=f'{name} {param} {torch.sqrt(err) * (71-12):.1f}dB RMSE')

## Ray-Tracing


In [None]:
# LoS
sim_output_dir = measurement_dir / '2501'
rms_rt_np = {tx : np.zeros((32, 32)) for tx in range(1, 5)}
rms_rt = {}
for tx in rms_rt_np.keys():
    img = np.array(Image.open(sim_output_dir / f'sim_{tx=}.png')) / 255
    rms_rt_np[tx][:img.shape[0], :img.shape[1]] = img
    rms_rt[tx] = torch.Tensor(rms_rt_np[tx]).unsqueeze(0)

# nLoS as well
sim_output_dir_whiteboard = measurement_dir / '2511'
rms_rt_np_whiteboard = {tx : np.zeros((32, 32)) for tx in range(1, 5)}
rms_rt_whiteboard = {}
for tx in rms_rt_np.keys():
    img = np.array(Image.open(sim_output_dir_whiteboard / f'sim_{tx=}.png')) / 255
    rms_rt_np_whiteboard[tx][:img.shape[0], :img.shape[1]] = img
    rms_rt_whiteboard[tx] = torch.Tensor(rms_rt_np_whiteboard[tx]).unsqueeze(0)


output = get_rm_adjusted(inputs=obs_inp, observation_mask=obs_mask, rm=rms_rt[tx_id])
diff = (gt - output) * gt_mask
err = calc_loss_with_mask(pred=output, target=gt, mask=gt_mask, observation_mask=obs_mask, alpha=1, metrics=None)
plot_dict(dict(obs_inp=obs_inp, gt=gt, output=output, diff=diff), in_batch_id=0, close_fig=False, suptitle=f'RT {torch.sqrt(err) * (71-12):.1f}dB RMSE')

output_whiteboard = get_rm_adjusted(inputs=obs_inp, observation_mask=obs_mask, rm=rms_rt_whiteboard[tx_id])
diff = (gt - output_whiteboard) * gt_mask
err = calc_loss_with_mask(pred=output_whiteboard, target=gt, mask=gt_mask, observation_mask=obs_mask, alpha=1, metrics=None)
plot_dict(dict(obs_inp=obs_inp, gt=gt, output=output_whiteboard, diff=diff), in_batch_id=0, close_fig=False, suptitle=f'RT whiteboard {torch.sqrt(err) * (71-12):.1f}dB RMSE')



## CNNs

In [None]:
model_dir = Path('logs/RadioNetAnySize/20251210-12:17:45') # bin, pert
# model_dir = Path('logs/RadioNetAnySize/20251110-12:25:12') # 'No Environment'
weights = torch.load(model_dir / 'BestModel.pt', weights_only=True)
for k, v in weights.items():
    if not 'conv' in k:
        continue
    else:
        in_ch = v.shape[1]
        break

with open(model_dir / 'parameters.json', 'r') as f:
    config = json.load(f)

model = RadioNetAnySize(inputs=in_ch, initial_downsampling=int(config['raster_size'] / 32))
model.load_state_dict(weights)

Without showing the whiteboard in the inputs

In [None]:
inputs_cnn = get_model_inputs(config, tx_id, obs_inp, dataset_dir=Path('dataset/data_seminar_room/2501'))
output_cnn = model(inputs_cnn)
diff = (gt - output_cnn) * gt_mask
err = calc_loss_with_mask(pred=output_cnn, target=gt, mask=gt_mask, observation_mask=obs_mask, alpha=1, metrics=None)
plot_dict(dict(obs_inp=obs_inp, gt=gt, output=output_cnn, diff=diff), in_batch_id=0, close_fig=False, suptitle=f'CNN {torch.sqrt(err) * (71-12):.1f}dB RMSE')

## With whiteboard

In [None]:
inputs_cnn = get_model_inputs(config, tx_id, obs_inp, dataset_dir=Path('dataset/data_seminar_room/2511'))
output_cnn = model(inputs_cnn)
diff = (gt - output_cnn) * gt_mask
err = calc_loss_with_mask(pred=output_cnn, target=gt, mask=gt_mask, observation_mask=obs_mask, alpha=1, metrics=None)
plot_dict(dict(obs_inp=obs_inp, gt=gt, output=output_cnn, diff=diff), in_batch_id=0, close_fig=False, suptitle=f'CNN {torch.sqrt(err) * (71-12):.1f}dB RMSE')

# Now in a Loop to generate some statistics of errors

# Baselines

In [None]:
tx_id = 3

n_test = 500
for name, (fun, param_list) in methods.items():
    for param in param_list:
        print(f'\n{name=}\t{param=}')
        metrics = defaultdict(float)
        for sample_id in range(n_test):
            obs_inp, gt, gt_mask, obs_mask, tx_coords = extract_data(
                observation_areas=[(data_old, [f'MeasurementArea_{k}' for k in [1,2,3,4,7,8]]),
                                    (data_new_mit, [f'MeasurementArea_{k}-mit_Abschattung' for k in [5, 6]])
                                    ],
                test_areas=[(data_new_mit, [f'MeasurementArea_{k}-mit_Abschattung' for k in [5, 6]])],
                percentage_threshold=50,
                tx_id=tx_id,
                seed=sample_id
            )
            output = fun(inputs=obs_inp, observation_mask=obs_mask, tx_coords=tx_coords, pl_trnc=pl_trnc, pl_max=pl_max, param=param)
            diff = (gt - output) * gt_mask
            err = calc_loss_with_mask(pred=output, target=gt, mask=gt_mask, observation_mask=obs_mask, alpha=1, metrics=metrics)
        print_metricsTest(metrics=metrics, epoch_samples=n_test, phase='test', pl_max=pl_max, pl_trnc=pl_trnc, log_name=None)
        plot_dict(dict(obs_inp=obs_inp, gt=gt, output=output, diff=diff), in_batch_id=0, close_fig=False, suptitle=f'{name} {param} {torch.sqrt(err) * (71-12):.1f}dB RMSE')



Ray-Tracing

In [None]:
tensors_plot = []

metrics_rt = defaultdict(float)
metrics_rt_wb = defaultdict(float)
for sample_id in range(n_test):
    obs_inp, gt, gt_mask, obs_mask, tx_coords = extract_data(
        observation_areas=[(data_old, [f'MeasurementArea_{k}' for k in [1,2,3,4,7,8]]),
                            (data_new_mit, [f'MeasurementArea_{k}-mit_Abschattung' for k in [5, 6]])
                            ],
        test_areas=[(data_new_mit, [f'MeasurementArea_{k}-mit_Abschattung' for k in [5, 6]])],
        percentage_threshold=50,
        tx_id=tx_id,
        seed=sample_id
    )

    output = get_rm_adjusted(inputs=obs_inp, observation_mask=obs_mask, rm=rms_rt[tx_id])
    diff = (gt - output) * gt_mask
    err = calc_loss_with_mask(pred=output, target=gt, mask=gt_mask, observation_mask=obs_mask, alpha=1, metrics=metrics_rt)

    output_whiteboard = get_rm_adjusted(inputs=obs_inp, observation_mask=obs_mask, rm=rms_rt_whiteboard[tx_id])
    diffwb = (gt - output_whiteboard) * gt_mask
    errwb = calc_loss_with_mask(pred=output_whiteboard, target=gt, mask=gt_mask, observation_mask=obs_mask, alpha=1, metrics=metrics_rt_wb)

print_metricsTest(metrics=metrics_rt, epoch_samples=n_test, phase='test', pl_max=pl_max, pl_trnc=pl_trnc, log_name=None)
print_metricsTest(metrics=metrics_rt_wb, epoch_samples=n_test, phase='test', pl_max=pl_max, pl_trnc=pl_trnc, log_name=None)

plot_dict(dict(obs_inp=obs_inp, gt=gt, output=output, diff=diff), in_batch_id=0, close_fig=False, suptitle=f'RT {torch.sqrt(err) * (71-12):.1f}dB RMSE')
plot_dict(dict(obs_inp=obs_inp, gt=gt, output=output_whiteboard, diff=diffwb), in_batch_id=0, close_fig=False, suptitle=f'RT WB {torch.sqrt(errwb) * (71-12):.1f}dB RMSE')

tensors_plot.extend([obs_inp[0], gt[0], output[0], output_whiteboard[0]])

# CNN

In [None]:
with torch.no_grad():

    for (model_dir, name) in [
            (Path('logs/RadioNetAnySize/20251210-12:17:45'), 'Binary, SNDA'),
            (Path('logs/RadioNetAnySize/20251110-12:25:12'), 'No Environment'),
        ]:

        try:
            weights = torch.load(model_dir / 'BestModel.pt', weights_only=True)
        except FileNotFoundError as e:
            print(str(model_dir), e)
            continue

        for k, v in weights.items():
            if not 'conv' in k:
                continue
            else:
                in_ch = v.shape[1]
                break

        with open(model_dir / 'parameters.json', 'r') as f:
            config = json.load(f)

        model = RadioNetAnySize(inputs=in_ch, initial_downsampling=int(config['raster_size'] / 32))
        model.load_state_dict(weights)
        for experiment, dataset_dir in [('object visible', measurement_dir / '2501'), ('object not visible', measurement_dir / '2511')]:
            print(f'\n\n{name}\t{experiment}')
            metrics = defaultdict(float)
            for sample_id in range(n_test):
                obs_inp, gt, gt_mask, obs_mask, tx_coords = extract_data(
                    observation_areas=[(data_old, [f'MeasurementArea_{k}' for k in [1,2,3,4,7,8]]),
                                        (data_new_mit, [f'MeasurementArea_{k}-mit_Abschattung' for k in [5, 6]])
                                        ],
                    test_areas=[(data_new_mit, [f'MeasurementArea_{k}-mit_Abschattung' for k in [5, 6]])],
                    percentage_threshold=50,
                    tx_id=tx_id,
                    seed=sample_id
                )
                inputs_cnn = get_model_inputs(config, tx_id, obs_inp, dataset_dir=dataset_dir)
                output_cnn = model(inputs_cnn)
                diff = (gt - output_cnn) * gt_mask
                err = calc_loss_with_mask(pred=output_cnn, target=gt, mask=gt_mask, observation_mask=obs_mask, alpha=1, metrics=metrics)
            print_metricsTest(metrics=metrics, epoch_samples=n_test, phase='test', pl_max=pl_max, pl_trnc=pl_trnc, log_name=None)
            plot_dict(dict(obs_inp=obs_inp, gt=gt, output=output_cnn, diff=diff), in_batch_id=0, close_fig=False, suptitle=f'CNN {name} {dataset_dir.stem}{torch.sqrt(err) * (71-12):.1f}dB RMSE')
            tensors_plot.append(output_cnn[0])
