Optimization of azimuth and tilt angles for given Tx positions to maximize coverage in a given area.

You can just run all cells, or run all until "Run Optimization - Run Optimization with Gradient Descent and Random Search" and select parameters there.

# Imports

In [1]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = '1' 
import time
import numpy as np
from matplotlib import pyplot as  plt
import torch
import json
from pathlib import Path
from skimage import io
from typing import Sequence
from math import ceil

from lib.pl_datamodule import LitRM_directional
from lib.pl_lightningmodule import LitModelDict

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


## Helper functions

In [2]:
def show_samplesv2(
        tensor_dict :   dict[str, tuple[torch.Tensor, torch.Tensor] | torch.Tensor],
        batch_ids : Sequence | None = None,
        filename : str | Path | None  = None,
        titles : Sequence[str] | str | None  = None,
        vmin : float | None = None,
        vmax : float | None = None,
    ) -> list:
    '''
    Generates and potentially saves a figure displaying several tensors.
    Args:
        tensor_dict     -   dict, keys are names and values are either tensors to be displayed or tuple of these plus coordinates in another tensor 
                            tensors to be displayed must have dim B x C x H x W (display each channel separately) or B x H x W
        batch_ids       -   ids of samples to be displayed, if None, we display all
        filename        -   give filename to save the fig
        title           -   title of the fig
        cut             -   cut all tensors to range [0,1] in imshow
    '''
    ### generate a list of figures, one for each batch_id
    fig_list = []
    if batch_ids is None:
        first = list(tensor_dict.values())[0]
        if isinstance(first, tuple):
            batch_ids = range(first[0].shape[0])
        else:
            batch_ids = range(first.shape[0])
    if isinstance(titles, str):
        titles = [titles for b in batch_ids]
    for batch_id in batch_ids:
        ### generate a list of tensors to be displayed together with title and maybe coordinates to be scattered, this is needed to plan the size of the figure
        display_list = []
        for k, v in tensor_dict.items():
            if isinstance(v, torch.Tensor):
                t_curr = v[batch_id,:].detach().cpu()
                if t_curr.ndim == 2:
                    display_list.append((k, t_curr, None))
                elif t_curr.ndim == 3:
                    display_list.extend([(f'{k} {c}', t_curr[c,:], None) for c in range(t_curr.shape[0])])
                else:
                    raise ValueError(f'got shape {t_curr.shape} for tensor {k}')
            elif isinstance(v, (tuple, list)):
                assert len(v)==2, f'got tensor/coords {v}'
                t_curr, coords_curr = v[0][batch_id,:].detach().cpu(), v[1][batch_id,:].detach().cpu()
                if t_curr.ndim == 2:
                    display_list.append((k, t_curr, coords_curr))
                elif t_curr.ndim == 3:
                    if t_curr.shape[0] == coords_curr.shape[0]:
                        display_list.extend([(f'{k} {c}', t_curr[c,:], coords_curr[c,:]) for c in range(t_curr.shape[0])])
                    else:
                        ### plot all coordinates on all maps
                        if coords_curr.ndim == 1:
                            display_list.extend([(f'{k} {c}', t_curr[c,:], coords_curr) for c in range(t_curr.shape[0])])
                        else:
                            display_list.extend([(f'{k} {c}', t_curr[c,:], torch.transpose(coords_curr, -1,-2)) for c in range(t_curr.shape[0])])
                else:
                    raise ValueError(f'got shape {t_curr.shape} for tensor {k}')
        cols = 5
        rows = int(ceil(len(display_list) / 5))
        fig = plt.figure(figsize=(4*cols, 3*rows))
        if titles is not None:
            fig.suptitle(titles[batch_id])
        for idf, t in  enumerate(display_list):
            fig.add_subplot(rows, cols, idf + 1)
            plt.imshow(t[1], vmax=vmax, vmin=vmin)
            plt.colorbar()
            plt.title(t[0])
            plt.tick_params(
                axis='both',      
                which='both',     
                bottom=False,     
                top=False,        
                labelbottom=False,
                labelleft=False,
                left=False, 
                right=False
                ) 
            if t[2] is not None:
                plt.scatter(t[2][1], t[2][0], c='r', s=10)
        if filename is not None:
            plt.savefig(f'{filename}_{batch_id}.png')
        fig_list.append(fig)
    return fig_list

def load_model_from_dir(previous_model_dir : Path | str):
    '''
    Load a trained model from a directory by automatically combining the checkpoint and config. Note that when using the CLI, we need to use the config
    '''
    import yaml
    sub_dir = list(Path(previous_model_dir).iterdir())
    if not len(sub_dir)==1:
        sub_dir = Path(previous_model_dir)
    else:
        sub_dir = sub_dir[0]
    ckpt1 = list((sub_dir / 'checkpoints').glob('*loss*.ckpt'))
    assert len(ckpt1) == 1, f'checkpoint for model1 ({previous_model_dir}) not (uniquely) determined, found {ckpt1}'
    ckpt1 = ckpt1[0]
    config = sub_dir / 'config.yaml'
    assert config.is_file(), f'config for model1 ({previous_model_dir}) not existing/not a file'
    with open(config, 'r') as f:
        params = yaml.safe_load(f)
    model1_class = LitModelDict[params['model']['class_path'].split('.')[-1]]
    previous = model1_class.load_from_checkpoint(ckpt1)
    print(f'loaded {model1_class} from {ckpt1} using {config}')
    return previous, params


# Generate network inputs
- Define network inputs as differentiable functions of Tx azimuth and elevation angle, as far as possible.

## Antenna patters
- The antenna pattern inputs are generated by looking up the gain corresponding to combinations of azimuth and tilt in a file so far, we approximate them as a function of the coordinates and angles.

In [None]:
### Load and visualize an example antenna pattern, of the widest antenna used in the dataset. This one has a HPBW of 90° and a FNBW of 120°
patt = np.load('./dataset/antenna_patterns/pattern_8.npy')

plt.figure(figsize=(10, 5))
plt.imshow(patt)
plt.xlabel('phi [deg]')
plt.ylabel('theta [deg]')
plt.colorbar()
plt.title('Antenna pattern in 2D')

### The pattern is symmetric, so we can flatten it to a 1D pattern 
patt_flat = np.zeros((360))
patt_flat[:180] = patt[90, 180:360]
patt_flat[180:] = patt[90, :180]

### in 2D
plt.figure(figsize=(10, 5))
k = 10
r_angles = torch.arange(-torch.pi, torch.pi, step=2*torch.pi/360)
fnbw = 120 / 360 * 2 * torch.pi

for k in [1, 5, 10]:
        gain_floor = 1 / (1 + torch.exp(-2 * k * (-r_angles**2 + fnbw /2 ))) * 250 - 250
        plt.plot(r_angles, gain_floor, label=f'approximation {k=}')

plt.plot(r_angles, patt_flat , label='original', c='black')
plt.xlabel('phi [rad]')
plt.ylabel('gain [dB]')
plt.legend()
plt.title('Antenna pattern along theta=0')

phis = torch.arange(0, 2*np.pi, step=2*np.pi/360).unsqueeze(0)
thetas = torch.arange(0, np.pi, step=2*np.pi/360).unsqueeze(1)
r_angles = torch.sqrt((thetas - torch.pi/2)**2 + torch.minimum(phis, 2*torch.pi - phis)**2)

plt.figure(figsize=(10, 5))
plt.imshow(1 / (1 + torch.exp(-2 * k * (-r_angles**2 + fnbw /2 ))) * 250 - 250)
plt.colorbar()
plt.xlabel('phi [deg]')
plt.ylabel('theta [deg]')
plt.title(f'Approximated antenna pattern in 2D ({k=})')

## Generate inputs using a nn.Module
- Generate antenna pattern as above and all other inputs, as far as possible as differentiable functions of Tx azimuth and elevation, as input for trained CNN

In [4]:
class translate_coords_angles_img(torch.nn.Module):
    '''translate tx coords and angles, ndsms in a smooth differentiable way into network inputs, as far as possible'''
    def __init__(self, 
        # hpbw = 90,
        fnbw = 120,
        x_shift = 0,
        k_logistic = 10,
        x_dim = 256,
        y_dim = 256,
        tx_height = 2,
        img = True,
        img_rgb = False,
        ndsm_all = False,
        tx_one_hot = True,
        azimuth = False,
        dist2d = False,
        coords_ga = False,
        z_max = 32,
        z_min_tx = 6,
        eps_sqrt = 0.0001,
        gain_approx = 'logistic',
        device = torch.device('cuda'),
        *args,
        **kwargs
    ):
        super().__init__()
        # self.hpbw = hpbw / 360 * 2 * torch.pi
        self.fnbw = fnbw / 360 * 2 * torch.pi
        self.x_shift = x_shift / 360 * 2 * torch.pi
        self.k_logistic = k_logistic

        xy_range = torch.arange(256, dtype=torch.float32, device=device)
        self.xc_base = torch.nn.Parameter(torch.repeat_interleave(xy_range.reshape((-1, 1)), repeats=x_dim, dim=1), requires_grad=False)
        self.yc_base = torch.nn.Parameter(torch.repeat_interleave(xy_range.reshape((1, -1)), repeats=y_dim, dim=0), requires_grad=False)
        self.tx_height = torch.nn.Parameter(torch.tensor([tx_height], device=device), requires_grad=False)
        self.z_max = torch.nn.Parameter(torch.tensor([z_max], device=device), requires_grad=False)
        self.min_height = torch.nn.Parameter(torch.tensor([z_min_tx], device=device), requires_grad=False)

        self.img = img
        self.img_rgb = img_rgb
        self.ndsm_all = ndsm_all
        self.tx_one_hot = tx_one_hot
        self.azimuth = azimuth
        self.dist2d = dist2d
        self.coords_ga = coords_ga        
        self.eps_sqrt = eps_sqrt
        self.gain_approx = gain_approx

        self.device = device
    
    def forward(self, coords : torch.Tensor, angles : torch.Tensor, inputs : torch.Tensor):
        ### assuming we have a batch dimension...
        ### split up inputs, we expect them to always contain ndsm plus RGBIR image, nothing else

        assert inputs.shape[1] == 5, f'inputs shape: {inputs.shape}'
        ndsm_all = inputs[:,:1,:,:] * self.z_max
        img = inputs[:,1:,:,:]

        xt, yt, tx_phi, tx_theta = coords[:,0].view((-1,1,1,1)), coords[:,1].view((-1,1,1,1)), angles[:,0].view((-1,1,1,1)), angles[:,1].view((-1,1,1,1))
        zt = torch.maximum(ndsm_all[torch.arange(ndsm_all.shape[0]),:,xt.type(torch.int32).squeeze(), yt.type(torch.int32).squeeze()].view((-1,1,1,1)) + self.tx_height, self.min_height)

        outputs = []

        if self.tx_one_hot:
            tx_one_hot_tens = torch.zeros_like(ndsm_all)
            tx_one_hot_tens[torch.arange(ndsm_all.shape[0]), 0, xt.type(torch.int32).squeeze(), yt.type(torch.int32).squeeze()] = zt.squeeze() / self.z_max
            outputs.append(tx_one_hot_tens)
        if self.ndsm_all:
            outputs.append(ndsm_all / self.z_max)
        
        ### build cylindrical coordinate system
        xc, yc = self.xc_base - xt, self.yc_base - yt
        phi = torch.arctan2(yc, xc)
        ### rotate
        tx_phi = tx_phi % (2*np.pi)
        phi = phi - tx_phi
        ### correct
        phi = torch.where(phi < -1 * torch.pi, 2 * torch.pi + phi, phi)
        phi = torch.where(phi > torch.pi, phi - 2 * torch.pi, phi)

        r = torch.sqrt(xc**2 + yc**2 + self.eps_sqrt)

        if self.dist2d:
            outputs.append(r / (np.sqrt(2) * 255))
        if self.coords_ga:
            outputs.extend([
                xt * torch.ones((1, 1, 256, 256), device=self.device) / 256,
                yt * torch.ones((1, 1, 256, 256), device=self.device) / 256, 
                zt * torch.ones((1, 1, 256, 256), device=self.device) / self.z_max, 
                torch.ones((xt.shape[0], 1, 1, 1), device=self.device) * self.xc_base.unsqueeze(0) / 256, 
                torch.ones((xt.shape[0], 1, 1, 1), device=self.device) * self.yc_base.unsqueeze(0) / 256
            ])

        ### antenna gain map
        if self.fnbw < 2 * np.pi:
            tx_theta = torch.clip(tx_theta, 0, np.pi)
            theta_floor = torch.arccos(-zt / torch.sqrt(xc**2 + yc**2 + zt**2)) - tx_theta
            r_angles = torch.sqrt(phi**2 + theta_floor**2 + self.eps_sqrt)
            if self.gain_approx == 'logistic':
                gain_floor = torch.min(1 / (1 + torch.exp(-2 * self.k_logistic * (-r_angles + self.fnbw /2 + self.x_shift))), 1 / (1 + torch.exp(-2 * self.k_logistic * (r_angles + self.fnbw /2 + self.x_shift))))
            elif self.gain_approx == 'gauss':
                gain_floor = torch.exp(-1 * self.k_logistic * r_angles**2)
            outputs.append(gain_floor)

        if self.azimuth:
            outputs.append(phi)
        if self.img:
            outputs.append(img)
        if self.img_rgb:
            outputs.append(img[:,:3,:,:])
        
        return torch.cat(outputs, dim=1)
        


## Function to load Tx positions and angles from the dataset

In [5]:
tx_ant_dir = Path('./dataset/tx_antennas/')
tx_ant_file_name = '{}_{}_{}_txparams.json'

def get_coords_angles(sample_id, batch_id, tx_ant_dir, tx_ant_file_name, coords_requires_grad=False, angles_requires_grad=True, dtype=torch.float32, tx_ids=None, device=torch.device('cuda')):
    '''same as above, but we also retrieve the angles for each location and return the average to obtain a starting point for the optimization'''
    with open(tx_ant_dir / tx_ant_file_name.format(*[sample_id[i][batch_id] for i in range(3)]), 'r') as f:
        tx_ant_all = json.load(f)
    ca = {}
    for txparams in tx_ant_all.values():
        coords  = tuple(txparams['tx_coords'][:2])
        if coords in ca.keys():
            ca[coords].append([txparams['phi'], txparams['theta']])
        else:
            ca[coords] = [[txparams['phi'], txparams['theta']]]
    try:
        if tx_ids is not None:
            coords_tensor = torch.tensor([k for i, k in enumerate(ca.keys()) if i in tx_ids], device=device, dtype=dtype, requires_grad=coords_requires_grad)
            angles_tensor = torch.tensor([np.mean(np.array(v), axis=0) for i, v in enumerate(ca.values()) if i in tx_ids], device=device, dtype=dtype, requires_grad=angles_requires_grad)
        else:
            coords_tensor = torch.tensor(list(ca.keys()), device=device, dtype=dtype, requires_grad=coords_requires_grad)
            angles_tensor = torch.tensor([np.mean(np.array(v), axis=0) for v in ca.values()], device=device, dtype=dtype, requires_grad=angles_requires_grad)
    except RuntimeError as e:
        print(f'Error: {e}\n {ca.keys()=}, {ca.values()=}')

    return coords_tensor, angles_tensor

# Load a trained model to generate the RM predictions

In [None]:
### load a trained model from a checkpoint, compare its outputs with original inputs and the generated ones
ckpt_dir = './ckpts_paper/img/LitUNetDCN_LitRM_directional'
ckpt_dir = './ckpts_paper/img_ndsm_coords/LitUNetDCN_LitRM_directional'
net, params = load_model_from_dir(ckpt_dir)
net = net.to(device).eval()
print([k for k,v in params['data']['init_args'].items() if v is True])


eps_sqrt = 0.001
k_logistic = 10
tr = translate_coords_angles_img(**params['data']['init_args'], eps_sqrt=eps_sqrt, k_logistic=k_logistic, old_azimuth=True)

## Test the loaded model and the generated inputs
- Load a few samples, show the inputs and targets and also replicate the inputs by our method above and generate outputs from the network again.
- The approximated antenna pattern could still be improved to match the original one closer. But this is not really the point for us here, we just want to see that the networks produce reasonable, realistic radio maps.

In [None]:
i_min = 3
i_max = 3
batch_size = 32

### load sample ids, quick test
id_file = './dataset/sample_ids_antennas.json'

with open(id_file, 'r') as f:
    sample_data = json.load(f)

map_ids = sample_data['map_ids']
sample_ids = sample_data['sample_ids']

## test for a few samples from the dataset whether inputs agree (azimuth, dist2d and gain floor)
dm_net = LitRM_directional(**{**params['data']['init_args'], 'augmentation' : False, 'batch_size' : batch_size, 'shuffle' : False})
dm_net.setup('test')
dm_net.test_set.get_ids = True
loader_net = dm_net.test_dataloader()

dm_tr = LitRM_directional(ndsm_all=True, img=True, tx_one_hot=False, ndsms=False, ant_gain_floor=False, ant_gain_top=False, augmentation=False, batch_size=batch_size, shuffle=False)
dm_tr.setup()
dm_tr.setup('test')
dm_tr.test_set.get_ids = True
loader_tr = dm_tr.test_dataloader()


    
for i, ((inp_net, targ_net, _, sample_id_net), (inp_tr, targ_tr, _, sample_id_tr)) in enumerate(zip(loader_net, loader_tr)):
    if i < i_min:
        continue
    if i > i_max:   
        break
    with torch.no_grad():
        assert torch.all(targ_net==targ_tr) and sample_id_net==sample_id_tr
        samples_to_show = []
        ### for each sample in the batch, load the txparams, then create the inputs for the network in one batch together
        coords = torch.zeros((inp_net.shape[0], 2), device=device)
        angles = torch.zeros((inp_net.shape[0], 2), device=device)
        titles = []
        for k in range(inp_net.shape[0]):
            with open(tx_ant_dir / tx_ant_file_name.format(*[sample_id_net[i][k] for i in range(3)]), 'r') as f:
                tx_ant = json.load(f)[sample_id_net[3][k]]
            if tx_ant['antenna']==8:
                samples_to_show.append(k)
            
            coords[k,:] = torch.tensor(tx_ant['tx_coords'][:2]) 
            angles[k,:] = torch.tensor([tx_ant['phi'], tx_ant['theta']])
            titles.append(f'antenna {tx_ant["antenna"]} {"_".join([sample_id_net[i][k] for i in range(3)])}')
            if len(samples_to_show) >= 5:
                break
        inputs_for_tr = inp_tr.to(device, torch.float32)
        inputs_for_rm = tr(coords, angles, inputs=inputs_for_tr)
        rms_orig = net(inp_net.to(device, torch.float32))
        rms_gen = net(inputs_for_rm)

        show_samplesv2({'targ' : targ_net, 'input_orig': inp_net,'rms_orig': rms_orig, 'inputs_for_rm': inputs_for_rm,  'rms_gen' : rms_gen}, titles=titles, batch_ids=samples_to_show)



# Functions for conversion between log and linear scale, coverage scores for optimization

In [9]:
### values from our dataset, for conversion between [0,1] grayscale used by networks and dBm
PL_min = -127
PL_max = -50

def gray_dBm(t, PL_min, PL_max):
        return t * (PL_max - PL_min) + PL_min

def dBm_gray(dBm, PL_min, PL_max):
    return (dBm - PL_min) / (PL_max - PL_min)

### functions used to calculate the coverage score
def boltzman(t, a):
    '''boltzman operator, approximates minimum of t in a smooth way for a < 0'''
    m = np.nanmax((a * t).detach().cpu().numpy())
    num = torch.nansum(t * torch.exp(a * t - m))
    denom = torch.nansum(torch.exp(a * t - m))
    return num / denom

def sum_lin(rms_gray, PL_min, PL_max, tx_power_dBm=0, clip=True):
    '''Sums up all received powers from rms_gray and converts back to gray-scale. Using logsumexp we can avoid numerical problems due to very small numbers in linear scale'''
    ### store the positions where the CNN predicted the minimum possible path loss it has seen during training, set to 0W / 0mW later in summation
    mask = (rms_gray <= 0)
    rms_dBm = gray_dBm(rms_gray, PL_max=PL_max, PL_min=PL_min) + tx_power_dBm
    sum_dBm = 10 / np.log(10) * torch.logsumexp(torch.where(mask, -torch.inf, rms_dBm * np.log(10) / 10), dim=0, keepdim=True)
    if clip:
        return torch.clip(dBm_gray(sum_dBm, PL_max=PL_max, PL_min=PL_min), 0, 1)
    else:
        return dBm_gray(sum_dBm, PL_max=PL_max, PL_min=PL_min)

def get_min(sum_gray, target_area, PL_min, PL_max, a=None, verbose=False, title=None, smooth=False, q=None):
    '''
    Sums up all received powers from rms_gray and returns the minimum value of the resulting tensor in the target_area. 
    If a is not None, uses boltzman instead of the strict minimum. 
    Returned mininmum is in gray_scale.
    '''
    ### set all values outside the target area to the maximum received power for calculation of the
    # sum_gray_max = torch.where(target_area, sum_gray, 1)
    sum_gray = torch.where(target_area, sum_gray, torch.nan)

    m_real = torch.tensor(np.nanmin(sum_gray.detach().cpu().numpy()), device=sum_gray.device, dtype=sum_gray.dtype) if q is None else torch.nanquantile(sum_gray, q=q)
    m_smooth = boltzman(sum_gray, a) if smooth and a is not None else m_real.clone()
    if a is not None and smooth:
        m = boltzman(sum_gray, a)
    else:
        if q is None:
            m = torch.tensor(np.nanmin(sum_gray.detach().cpu().numpy()), device=sum_gray.device, dtype=sum_gray.dtype)
        else:
            m = torch.nanquantile(sum_gray, q=q)
    if verbose:
        print(f'smooth score={m_smooth.item():.3f}, \t in dBm: {gray_dBm(m, PL_max=PL_max, PL_min=PL_min).item():.3f} ',
            f'\treal score={m_real:.3f}, \t in dBm: {gray_dBm(m_real, PL_max=PL_max, PL_min=PL_min).item():.3f} ')
        show_samplesv2({'power sum in target area': torch.where(target_area, gray_dBm(sum_gray, PL_max=PL_max, PL_min=PL_min), torch.nan)}, titles=[title] if title is not None else None)
        plt.show()

    return m_smooth, m_real

def smooth_threshold(a, thresh, steepness=10):
    '''Smooth thresholding function, returns a value between 0 and 1.'''
    return torch.sigmoid(steepness * (a - thresh))

def count_above_thresh(sum_gray, target_area, thresh, steepness=None, verbose=False, title=None, smooth=False):
    '''
    Counts the number of pixels in sum_gray in the target area that are above thresh.
    If steepness is not None, uses smooth_threshold instead of strict thresholding.
    '''
    
    above_thresh_real = (sum_gray > thresh) 
    above_thresh_smooth = smooth_threshold(sum_gray, thresh, steepness) if steepness is not None and smooth else above_thresh_real.clone()

    sum_above_thresh_real = torch.sum(above_thresh_real * target_area)
    sum_above_thresh_smooth = torch.sum(above_thresh_smooth * target_area)
    
    if verbose:
        print(f'above threshold smooth: {int(sum_above_thresh_smooth.item())} / {torch.sum(target_area).item()} pixels in target area, \
              without smoothing: {sum_above_thresh_real.item()} / {torch.sum(target_area).item()} pixels in target area')
        
        show_samplesv2({
            'power sum/SINR' : sum_gray, 
            'above threshold smooth': above_thresh_smooth * target_area, 
            'above threshold' : above_thresh_real * target_area
            }, titles=[title] if title is not None else None)
        plt.show()
    return sum_above_thresh_smooth, sum_above_thresh_real

def SINR(rms_gray, noise_dBm, tx_power_dBm, PL_max, PL_min):
    '''
    Calculates the signal to interference plus noise ratio (SINR) in the target area of rms_gray, using the threshold thresh.
    '''
    power_dBm = gray_dBm(rms_gray, PL_max=PL_max, PL_min=PL_min) + tx_power_dBm
    power_gray = dBm_gray(power_dBm, PL_max=PL_max, PL_min=PL_min)
    sum_others_dBm = torch.zeros_like(rms_gray)
    for i in range(rms_gray.shape[0]):
        sum_others_dBm[i,:] = gray_dBm(sum_lin(torch.cat([power_gray[torch.arange(power_gray.shape[0]) != i,:], dBm_gray(torch.ones_like(rms_gray[:1,...]) * noise_dBm, PL_max=PL_max, PL_min=PL_min)]), clip=False, PL_max=PL_max, PL_min=PL_min), PL_max=PL_max, PL_min=PL_min)
    diff_dB = power_dBm - sum_others_dBm
    return torch.amax(diff_dB, dim=0, keepdim=True)

# Optimization routines

## Optimize angles using gradient descent

In [37]:
def GD(angles, coords, inputs_for_tr, tr, net, target_area, PL_min, PL_max, max_iter=100, learning_rate=0.1, opt_fn=get_min, opt_fn_params={}, sum_fn=sum_lin, sum_fn_params={}, verbose_loop=True, output_dir=None, *args, **kwargs):
    if output_dir is not None:
        output_dir = Path(output_dir)
    angles = angles.clone().detach().requires_grad_(True)


    optimizer = torch.optim.Adam([angles], lr=learning_rate, maximize=True)
    best_angles = angles.clone().detach()
    inputs_for_rm = tr(coords, angles, inputs=inputs_for_tr)
    rms_gen = net(inputs_for_rm)
    sum_gray = sum_fn(rms_gen, **sum_fn_params)
    if output_dir is not None:
        output_dir.mkdir(parents=True, exist_ok=True)
        for i in range(rms_gen.shape[0]):
            io.imsave(output_dir / f'rm_initial_{i}.png', np.clip(rms_gen[i,...].detach().cpu().numpy() * 255, 0, 255).astype(np.uint8))
        io.imsave(output_dir / f'sum_initial.png', np.clip(torch.where(target_area, sum_gray, 0).squeeze().detach().cpu().numpy() * 255, 0, 255).astype(np.uint8))
        io.imsave(output_dir / f'sum_initial_0_50.png', np.clip(torch.where(target_area, sum_gray, 0).squeeze().detach().cpu().numpy() / 50 * 255, 0, 255).astype(np.uint8))
        io.imsave(output_dir / f'target_area.png', np.clip(target_area.squeeze().detach().cpu().numpy() * 255, 0, 255).astype(np.uint8))    
        np.save(output_dir / f'sum_initial.npy', sum_gray.detach().cpu().numpy())
        np.save(output_dir / f'target_area.npy', target_area.detach().cpu().numpy())
    best_score_smooth, best_score_real = opt_fn(sum_gray, target_area, **opt_fn_params, verbose=True, title=f'initial angles')
    best_score_smooth, best_score_real = best_score_smooth.item(), best_score_real.item()

    show_samplesv2({'radio maps' :  torch.clip(gray_dBm(rms_gen, PL_max=PL_max, PL_min=PL_min), PL_min, PL_max).transpose(0,1),
                    'sum' :  torch.where(target_area, gray_dBm(sum_gray, PL_min=PL_min, PL_max=PL_max), torch.nan)}, 
                    titles=['radio maps [dBm] inital angles'], filename=output_dir.with_name(f'{output_dir.name}_initial')if output_dir is not None else None,
    )#vmin=PL_min, vmax=PL_max)
    best_it = -1

    for i in range(max_iter):
        if max_iter>10 and i % (max_iter//10) == 0:            
            print(f'.', end='')
        optimizer.zero_grad()
        inputs_for_rm = tr(coords, angles, inputs=inputs_for_tr)
        rms_gen = net(inputs_for_rm)
        sum_gray = sum_fn(rms_gen, **sum_fn_params)
        score_smooth, score_real = opt_fn(sum_gray, target_area, **opt_fn_params, verbose=(i % (max_iter//10) == 0 or i==max_iter-1) and verbose_loop, title=f'iter {i}')
        score_smooth.backward()
        optimizer.step()
        if score_real > best_score_real:
            best_score_real = score_real.item()
            best_score_smooth = score_smooth.item()
            best_angles = angles.clone().detach()
            best_it = i
            if verbose_loop:
                print(f'iter {i}: score increased to {best_score_smooth:.3f} smooth, {best_score_real:.3f} real angles={best_angles.cpu().numpy().tolist()}')
                if output_dir is not None:
                    for i in range(rms_gen.shape[0]):
                        io.imsave(output_dir / f'rm_final_{i}.png', np.clip(rms_gen[i,...].detach().cpu().numpy() * 255, 0, 255).astype(np.uint8))
                    io.imsave(output_dir / f'sum_final.png', np.clip(torch.where(target_area, sum_gray, 0).squeeze().detach().cpu().numpy() * 255, 0, 255).astype(np.uint8))
                    io.imsave(output_dir / f'sum_final_0_50.png', np.clip(torch.where(target_area, sum_gray, 0).squeeze().detach().cpu().numpy() / 50 * 255, 0, 255).astype(np.uint8))
                    np.save(output_dir / f'sum_final.npy', sum_gray.detach().cpu().numpy())
                show_samplesv2({'radio maps' :  torch.clip(gray_dBm(rms_gen, PL_max=PL_max, PL_min=PL_min), PL_min, PL_max).transpose(0,1), 
                            'sum' : torch.where(target_area, gray_dBm(sum_gray, PL_min=PL_min, PL_max=PL_max), torch.nan)}, titles=['radio maps [dBm] best angles'], 
                            filename=output_dir)#, vmin=PL_min, vmax=PL_max)
    print(f'{best_it=}, \t{best_score_smooth=:.3f} ({gray_dBm(best_score_smooth, PL_max=PL_max, PL_min=PL_min):.3f}dBm), \t{best_score_real=:.3f} ({gray_dBm(best_score_real, PL_max=PL_max, PL_min=PL_min):.3f}dBm), \t{best_angles.tolist()=}')            

    return best_angles, best_score_smooth, best_score_real
   

In [38]:
def random_search(angles_orig, coords, inputs_for_tr, tr, net, target_area, PL_min, PL_max, device=torch.device('cuda'), max_iter=100, opt_fn=get_min, opt_fn_params={}, sum_fn=sum_lin, sum_fn_params={}, verbose_loop=True, output_dir=None, *args, **kwargs):
    if output_dir is not None:
        output_dir = Path(output_dir)
    with torch.no_grad():
        best_angles = angles_orig.clone().detach()
        best_it = -1

        inputs_for_rm = tr(coords, angles_orig, inputs=inputs_for_tr)
        rms_gen = net(inputs_for_rm)
        sum_gray = sum_fn(rms_gen, **sum_fn_params)
        if output_dir is not None:
            output_dir.mkdir(parents=True, exist_ok=True)
            for i in range(rms_gen.shape[0]):
                io.imsave(output_dir / f'rm_initial_{i}.png', np.clip(rms_gen[i,...].detach().cpu().numpy() * 255, 0, 255).astype(np.uint8))
            io.imsave(output_dir / f'sum_initial.png', np.clip(torch.where(target_area, sum_gray, 0).squeeze().detach().cpu().numpy() * 255, 0, 255).astype(np.uint8))
            io.imsave(output_dir / f'sum_initial_0_50.png', np.clip(torch.where(target_area, sum_gray, 0).squeeze().detach().cpu().numpy() / 50 * 255, 0, 255).astype(np.uint8))
            io.imsave(output_dir / f'target_area.png', np.clip(target_area.squeeze().detach().cpu().numpy() * 255, 0, 255).astype(np.uint8))
            np.save(output_dir / f'sum_initial.npy', sum_gray.detach().cpu().numpy())
            np.save(output_dir / f'target_area.npy', target_area.detach().cpu().numpy())
        best_score_smooth, best_score_real = opt_fn(sum_gray, target_area, **opt_fn_params, verbose=True, title=f'initial angles')
        best_score_smooth, best_score_real = best_score_smooth.item(), best_score_real.item()
        show_samplesv2({'radio maps' : torch.clip(gray_dBm(rms_gen, PL_min=PL_min, PL_max=PL_max), PL_min, PL_max).transpose(0,1),
                        'sum' :  torch.where(target_area, gray_dBm(sum_gray, PL_min=PL_min, PL_max=PL_max), torch.nan)}, 
                       titles=['radio maps [dBm] inital angles'], filename=output_dir.with_name(f'{output_dir.name}_initial')if output_dir is not None else None,
        )#vmin=PL_min, vmax=PL_max)

        for i in range(max_iter):
            ### search in given phi+/-3pi/4, theta in [pi/2, pi]
            phis = angles_orig[:,0] + 3/2 * torch.pi * (torch.rand(angles_orig.shape[0], dtype=torch.float32, device=device) - 0.5)
            thetas = torch.pi / 2 + torch.pi / 2 * torch.rand(angles_orig.shape[0], dtype=torch.float32, device=device)
            angles = torch.stack([phis, thetas], dim=-1)
            
            inputs_for_rm = tr(coords, angles, inputs=inputs_for_tr)
            rms_gen = net(inputs_for_rm)
            sum_gray = sum_fn(rms_gen, **sum_fn_params)
            score_smooth, score_real = opt_fn(sum_gray, target_area, **opt_fn_params)


            if score_real > best_score_real:
                best_score_real = score_real.item()
                best_score_smooth = score_smooth.item()
                best_angles = angles.clone().detach()
                best_it = i
                if verbose_loop:
                    print(f'iter {i}: score increased to {best_score_smooth:.3f} smooth, {best_score_real:.3f} real angles={best_angles.cpu().numpy().tolist()}')
                    if output_dir is not None:
                        for i in range(rms_gen.shape[0]):
                            io.imsave(output_dir / f'rm_final_{i}.png', np.clip(rms_gen[i,...].detach().cpu().numpy() * 255, 0, 255).astype(np.uint8))
                        io.imsave(output_dir / f'sum_final.png', np.clip(torch.where(target_area, sum_gray, 0).squeeze().detach().cpu().numpy() * 255, 0, 255).astype(np.uint8))
                        io.imsave(output_dir / f'sum_final_0_50.png', np.clip(torch.where(target_area, sum_gray, 0).squeeze().detach().cpu().numpy() / 50 * 255, 0, 255).astype(np.uint8))
                        np.save(output_dir / f'sum_final.npy', sum_gray.detach().cpu().numpy())
                    show_samplesv2({'radio maps' :  torch.clip(gray_dBm(rms_gen, PL_max=PL_max, PL_min=PL_min), PL_min, PL_max).transpose(0,1), 
                                'sum' : torch.where(target_area, gray_dBm(sum_gray, PL_min=PL_min, PL_max=PL_max), torch.nan)}, titles=['radio maps [dBm] best angles'], 
                                filename=output_dir)#, vmin=PL_min, vmax=PL_max)

                
        # if verbose_loop:
        print(f'{best_it=}, \t{best_score_smooth=:.3f} ({gray_dBm(best_score_smooth, PL_max=PL_max, PL_min=PL_min):.3f}dBm), \t{best_score_real=:.3f} ({gray_dBm(best_score_real, PL_max=PL_max, PL_min=PL_min):.3f}dBm), \t{best_angles.tolist()=}')            
        return best_angles, best_score_smooth, best_score_real
        

# Run Optimization

## Setup, data

In [None]:
### define the dataset and loader to obtain maps and tx coords
batch_size = 1
seed = 1

dm2 = LitRM_directional(tx_one_hot=False, ndsms=True, ant_gain_floor=False, ant_gain_top=False, ndsm_all=True, img=True, augmentation=False, batch_size=batch_size, shuffle=False)
dm2.setup('test')
loader = dm2.test_dataloader(shuffle=True)#, seed=seed if not already_set_seed else None)
iterator = iter(loader)


### Use a Predefined Scenario
- We define the two scenarios used in the paper.
- Alternatively, one can iterate through the dataloader from the previous cell to find an interesting map and define the target_area etc similarly to below.

In [31]:
def get_scenario(scenario : int, tx_power_dBm, PL_max, PL_min, smooth, a=-5):
    if scenario == 1:
        i, j, s = 3815, 58105, 'ur'
        tx_ids = [0, 1, 3]
        rectangles = [
            [20, -20, 20, -20]
        ]

        ### summation of all powers or SINR
        sum_fn = sum_lin
        sum_fn_params = {'tx_power_dBm' : tx_power_dBm, 'clip' : True, 'PL_max' : PL_max, 'PL_min' : PL_min}

        ### choose optimization target, either min over target area or counting locations with a value above some threshold
        opt_fn = get_min
        opt_fn_params = {'a' : a if smooth else None, 'PL_max' : PL_max, 'PL_min' : PL_min, 'smooth' : smooth, 'q' : 0.1}


    elif scenario==2:
        i, j, s = 3860, 58140, 'lr'
        tx_ids = [0, 2, 3]
        # streets
        rectangles = [
            [-80, -50, 0, -40],
            [80, -1, 55, 90],
            [0, -1, 175, 220]
        ]

        sum_fn = SINR
        sum_fn_params = {'noise_dBm' : -104, 'tx_power_dBm' : tx_power_dBm, 'PL_max' : PL_max, 'PL_min' : PL_min}

        opt_fn = count_above_thresh
        thresh = 20
        opt_fn_params = {'thresh': thresh, 'steepness': 0.1 if smooth else None, 'smooth' : smooth}

    else:
        raise ValueError(f'{scenario=} not defined. But you can choose a map and set all parameters yourself.')
    
    return i, j, s, tx_ids, rectangles, sum_fn, sum_fn_params, opt_fn, opt_fn_params


In [32]:
### Choose a scenario and parameters
scenario = 2
tx_power_dBm = 23
a = -5 # steepness for smooth thresholding
output_dir = Path(f'./logs_opt/{scenario=}')

In [None]:
### visualize the scenario
i, j, s, tx_ids, rectangles, sum_fn, sum_fn_params, opt_fn, opt_fn_params = get_scenario(scenario=scenario, tx_power_dBm=tx_power_dBm, PL_min=PL_min, PL_max=PL_max, smooth=True, a=a)
### prepare data
inp, _, _ = dm2.test_set.__getsample__((i,j,s,'0'))
ndsm_build, ndsm_veg = inp[0,:,:], inp[1,:,:]
img_rgb = inp[-4:-1,...]
sample_id = [[i], [j], [s], [0]]
target_area = torch.zeros_like(ndsm_build)
for x_min, x_max, y_min, y_max in rectangles:
    target_area[x_min:x_max, y_min:y_max] = 1
target_area[ndsm_build > 0] = 0

coords, angles_orig = get_coords_angles(sample_id, 0, tx_ant_dir=tx_ant_dir, tx_ant_file_name=tx_ant_file_name, coords_requires_grad=False, angles_requires_grad=True, tx_ids=tx_ids)
inputs_for_tr = inp[2:,:,:].to(device, torch.float32).repeat(coords.shape[0], 1, 1, 1)


fig = plt.figure(figsize=(12,6))
fig.add_subplot(1, 2, 1)
plt.imshow(torch.stack([ndsm_build, target_area, torch.zeros_like(target_area)], dim=-1))
plt.scatter(coords[:,1].cpu(), coords[:,0].cpu(), s=20, c='b')
fig.add_subplot(1, 2, 2)
plt.imshow(img_rgb.permute(1, 2, 0))
plt.scatter(coords[:,1].cpu(), coords[:,0].cpu(), s=20, c='b')

## Run Optimization with Gradient Descent and Random Search

In [34]:
### optimization method and params
opt_method1 = random_search
opt_method2 = GD

learning_rate = 0.1
max_iter = 500

ta = target_area.to(device, torch.bool)

In [None]:
### run method 1
i, j, s, tx_ids, rectangles, sum_fn, sum_fn_params, opt_fn, opt_fn_params = get_scenario(scenario=scenario, tx_power_dBm=tx_power_dBm, PL_min=PL_min, PL_max=PL_max, smooth=opt_method1==GD, a=a)
print(f'optimization method1 {opt_method1.__name__}, {max_iter=}, {learning_rate=}, {opt_fn=}, {opt_fn_params=}, {sum_fn=}')

s = time.time()
angles_calc, score_smooth, score_real = opt_method1(angles_orig, coords, inputs_for_tr, tr, net, ta, PL_min=PL_min, PL_max=PL_max, max_iter=max_iter, learning_rate=learning_rate, opt_fn=opt_fn, opt_fn_params=opt_fn_params, sum_fn=sum_fn, sum_fn_params=sum_fn_params, output_dir=output_dir / opt_method1.__name__)

print(f'{time.time() - s:.2f}s')

In [None]:
### run method2
i, j, s, tx_ids, rectangles, sum_fn, sum_fn_params, opt_fn, opt_fn_params = get_scenario(scenario=scenario, tx_power_dBm=tx_power_dBm, PL_min=PL_min, PL_max=PL_max, smooth=opt_method2==GD, a=a)
print(f'optimization method2 {opt_method2.__name__}, {max_iter=}, {learning_rate=}, {opt_fn=}, {opt_fn_params=}, {sum_fn=}')

s = time.time()
angles_calc, score_smooth, score_real = opt_method2(angles_orig, coords, inputs_for_tr, tr, net, ta, PL_min=PL_min, PL_max=PL_max, max_iter=max_iter, learning_rate=learning_rate, opt_fn=opt_fn, opt_fn_params=opt_fn_params, sum_fn=sum_fn, sum_fn_params=sum_fn_params, output_dir=output_dir / opt_method2.__name__)

print(f'{time.time() - s:.2f}s')

# TO BE REMOVED

In [39]:
def gray_to_color(filename : Path, vmin : float, vmax : float):
    if filename.stem.endswith('_color'):
        return
    fig = plt.figure(frameon=False)
    fig.set_size_inches(1,1)
    ax = plt.Axes(fig, [0., 0., 1., 1.])
    ax.set_axis_off()
    fig.add_axes(ax)
    arr = io.imread(filename)
    plt.imshow(arr, vmin=vmin, vmax=vmax)
    # plt.axis('off')
    # plt.box(False)
    plt.savefig(filename.with_stem(f'{filename.stem}_color'),dpi=256)

In [None]:
for png_file in output_dir.rglob("*.png"):
    gray_to_color(png_file, vmin=0, vmax=255)
    print(png_file)