In [7]:
import math
import os
import shutil
import sys
import pickle
import time
sys.path.extend(['../SamplingAssistedPathlossRadioMapPrediction/'])

import numpy as np
import pandas as pd
from tqdm import tqdm
import torch
import imageio.v3 as iio
from matplotlib import pyplot as plt
from skimage.transform import resize
from skimage.io import imread
from PIL import Image
from torchvision.io import read_image

from src.algorithms import MLSP
from src.networks import UNetModel
from src.datamodules import MLSPDatamodule
from src.datamodules.datasets import PathlossDataset
from src.utils.mlsp.featurizer import featurizer, add_points
from src.utils.mlsp.augmentations import normalize_size, resize_db
from src.utils.mlsp.types import RadarSampleInputs, RadarSample

%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [3]:
main_path = "./ICASSP_TEST_DATA/"
ss_path = "./ICASSP_TEST_DATA/rate0.5/"
rate = 0.005

input_dir = os.path.join(main_path, f"Inputs/Task_1/")
output_dir = os.path.join(main_path, f"Outputs/Task_1/")
positions_dir = os.path.join(main_path, "Test_Data_Positions/")
radiation_patterns_dir = os.path.join(main_path, "Test_Radiation_Patterns/")
sampling_dir = os.path.join(main_path, f"Outputs/Task_1/")
# sampling_dir = os.path.join(ss_path, f"sampledGT")
INITIAL_PIXEL_SIZE = 0.25
IMG_TARGET_SIZE = 640

In [4]:
def get_radar_sample_input(b, ant, f, sp):    
    input_file = f"B{b}_Ant{ant}_f{f}_S{sp}.png"
    output_file = f"B{b}_Ant{ant}_f{f}_S{sp}.png"
    radiation_file = f"Ant{ant}_Pattern.csv"
    position_file = f"Positions_B{b}_Ant{ant}_f{f}.csv"
    
    if os.path.exists(os.path.join(input_dir, input_file)):
        freq_mhz = freqs_mhz[f - 1]
        input_img_path = os.path.join(input_dir, input_file)
        output_img_path = os.path.join(output_dir, output_file)
        positions_path = os.path.join(positions_dir, position_file)
        radiation_pattern_file = os.path.join(radiation_patterns_dir, radiation_file)
        sampling_file = input_img_path
        output_img_path = sampling_file

        radar_sample_inputs = RadarSampleInputs(
            file_name=input_file,
            freq_MHz=freq_mhz,
            input_file=input_img_path,
            output_file=output_img_path,
            position_file=positions_path,
            radiation_pattern_file=radiation_pattern_file,
            sampling_position=sp,
            ids=(b, ant, f, sp),
        )
        return radar_sample_inputs
        
def read_sample(inputs):
    if isinstance(inputs, RadarSampleInputs):
        inputs = inputs.asdict()
    file_name = inputs["file_name"]
    freq_MHz = inputs["freq_MHz"]
    input_file = inputs["input_file"]
    output_file = inputs["output_file"]
    position_file = inputs["position_file"]
    sampling_position = inputs["sampling_position"]
    radiation_pattern_file = inputs["radiation_pattern_file"]

    input_img = read_image(input_file).float()
    C, H, W = input_img.shape

    output_img = read_image(output_file).float()
    if output_img.size(0) == 1:  # If single channel, remove channel dimension
        output_img = output_img.squeeze(0)

    sampling_positions = pd.read_csv(position_file)
    x_ant, y_ant, azimuth = sampling_positions.loc[int(sampling_position), ["Y", "X", "Azimuth"]]
    radiation_pattern_np = np.genfromtxt(radiation_pattern_file, delimiter=',')
    radiation_pattern = torch.from_numpy(radiation_pattern_np).float()

#     pl_clip = float("inf")
    pl_clip = torch.tensor(160, dtype=torch.float32)

    sample = RadarSample(
        file_name=file_name,
        task_idx=1,
        pl_clip=pl_clip,
        H=H,
        W=W,
        x_ant=x_ant,
        y_ant=y_ant,
        azimuth=azimuth,
        freq_MHz=freq_MHz,
        input_img=input_img,
        output_img=output_img,
        radiation_pattern=radiation_pattern,
        pixel_size=INITIAL_PIXEL_SIZE,
        mask=torch.ones_like(input_img[0]),
    )

    # Ensure the antenna is within bounds
    sample = PathlossDataset.pad_sample(sample)

    return sample

In [8]:
Buildings = range(1, 7)
ant_ids = [1]
freq = [1]
freqs_mhz = [868]
solution = pd.DataFrame()

os.makedirs(f"./samplePoints", exist_ok=True)
sampling_coords_dict = {}
runtimes = []
for Antenna_ID in (ant_ids):  
    for f_i in freq:
        for b in (Buildings):
            for sp in tqdm(range(0, 50), total=50):
                start = time.time()
                radar_sample_inputs = get_radar_sample_input(b, Antenna_ID, f_i, sp)
                if radar_sample_inputs is None:
                    continue
                sample = read_sample(radar_sample_inputs)
                total_pixels = sample.H * sample.W
                num_zeros = math.ceil(total_pixels * rate)
                mask = torch.zeros((sample.H, sample.W), dtype=torch.bool)
                if num_zeros > 0:
                    sampling_coords = add_points(
                        mask,
                        sample.x_ant, sample.y_ant, num_zeros
                    )
                end = time.time()
                runtimes.append(end - start)
                y = Image.fromarray(mask.numpy()).convert("RGB")
                image_name = f"B{b}_Ant{Antenna_ID}_f{f_i}_S{sp}.png"
                sampling_coords_dict[image_name] = sampling_coords
                y.save(f"./samplePoints/{image_name}.png")

with open("sampling_coords_dict.pkl", "wb") as f:
    pickle.dump(sampling_coords_dict, f)

print(f"Average runtime: {np.mean(runtimes)} seconds")

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:00<00:00, 159.94it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:00<00:00, 79.28it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:00<00:00, 61.15it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:00<00:00, 160.52it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:00<00:0

Average runtime: 0.005670607089996338 seconds





In [6]:
with open("sampling_coords_dict.pkl", "rb") as f:
    sampling_coords_dict = pickle.load(f)
sampling_coords_dict

{'B1_Ant1_f1_S0.png': [(99, 143),
  (52, 62),
  (9, 17),
  (91, 47),
  (73, 142),
  (33, 35),
  (19, 15),
  (97, 111),
  (63, 56),
  (36, 50),
  (82, 17),
  (88, 103),
  (92, 179),
  (102, 155),
  (91, 163),
  (44, 54),
  (72, 116),
  (44, 11),
  (84, 168),
  (75, 41),
  (88, 0),
  (63, 12),
  (66, 120),
  (55, 1),
  (92, 18),
  (36, 23),
  (102, 63),
  (5, 44),
  (94, 36),
  (79, 103),
  (26, 64),
  (57, 151),
  (28, 6),
  (4, 8),
  (55, 10),
  (3, 57),
  (51, 84),
  (103, 38),
  (62, 46),
  (67, 28),
  (38, 15),
  (100, 24),
  (24, 33),
  (1, 50),
  (98, 0),
  (29, 48),
  (64, 183),
  (74, 97),
  (56, 31),
  (18, 166),
  (83, 59),
  (28, 56),
  (12, 37),
  (64, 1),
  (101, 177),
  (60, 70),
  (5, 33),
  (93, 57),
  (75, 22),
  (18, 72),
  (77, 85),
  (83, 138),
  (13, 0),
  (72, 162),
  (91, 87),
  (59, 22),
  (51, 163),
  (101, 127),
  (82, 7),
  (103, 92),
  (24, 178),
  (20, 3),
  (85, 42),
  (29, 18),
  (60, 174),
  (8, 178),
  (91, 29),
  (10, 61),
  (58, 141),
  (46, 3),
  (42,