In [None]:
import numpy as np
import tensorflow as tf

from sionna.phy.channel.tr38901 import PanelArray, UMa
from sionna.phy.ofdm import ResourceGrid
from sionna.phy.channel import OFDMChannel


def ypr_from_deg(az_deg, tilt_deg=0.0, roll_deg=0.0):
    """
    Returns [yaw, pitch, roll] in radians for Sionna set_topology().
    - yaw   = azimuth (deg → rad)
    - pitch = -tilt   (downtilt is negative)
    - roll  = roll (usually 0)
    """
    yaw = np.deg2rad(az_deg % 360.0).astype(np.float32)
    pitch = -np.deg2rad(tilt_deg).astype(np.float32)  # negative = downtilt
    roll = np.deg2rad(roll_deg).astype(np.float32)
    return np.array([yaw, pitch, roll], dtype=np.float32)

def trisector_orientations_deg(site_az0_deg=0.0, tilt_deg=8.0, roll_deg=0.0):
    """
    Classic tri-sector azimuths: az0, az0+120, az0+240 (degrees).
    Returns shape [3, 3] (yaw, pitch, roll per sector), radians.
    """
    az_list = (np.array([0.0, 120.0, 240.0]) + site_az0_deg) % 360.0
    return np.stack([ypr_from_deg(a, tilt_deg, roll_deg) for a in az_list], axis=0)



def build_trisector_sites(site_xy_list, bs_height_m=25.0, tilt_deg=8.0, site_az0_deg=0.0):
    """
    site_xy_list: list of (x, y) in meters for each site center
    Returns:
      bs_loc:      [1, num_bs, 3]
      bs_orient:   [1, num_bs, 3]
    where num_bs = 3 * len(site_xy_list)
    """
    locs_all, oris_all = [], []
    for (x, y) in site_xy_list:
        locs = np.tile(np.array([x, y, bs_height_m], np.float32), (3, 1))
        oris = trisector_orientations_deg(site_az0_deg=site_az0_deg, tilt_deg=tilt_deg)
        locs_all.append(locs)
        oris_all.append(oris)
    bs_loc = np.concatenate(locs_all, axis=0)[None, ...].astype(np.float32)
    bs_orient = np.concatenate(oris_all, axis=0)[None, ...].astype(np.float32)
    return bs_loc, bs_orient    

In [None]:
# =========================================================
# CONFIG
# =========================================================
# Carrier & array (beamwidth is mostly controlled by aperture below)
fc = 3.5e9  # Hz

# Base-station panel aperture (bigger = narrower beam & higher gain)
BS_ROWS = 4
BS_COLS = 4
V_SP = 0.5    # element vertical spacing [in wavelengths]
H_SP = 0.5    # element horizontal spacing [in wavelengths]

# Sites (tri-sector each). Start with 1 site; add more to scale.
sites = [(0.0, 0.0)]          # e.g., add (500.0, 0.0), (250.0, 400.0), ...
BS_HEIGHT = 25.0              # m
TILT_DEG = 8.0                # electrical downtilt (applied as negative pitch)
SITE_AZ0_DEG = 0.0            # site’s sector-0 azimuth (deg), others at +120/+240

# UEs
NUM_UE = 4000
UE_HEIGHT = 1.5               # m
COVER_RADIUS = 500.0          # approx radius around site cluster to place UEs (m)

# Reference-signal power per RE (simplified RSRP proxy)
TX_RS_POWER_DBM = 0.0


In [None]:
# =========================================================
# Arrays: BS & UE (38.901 element pattern for BS, omni for UE)
# =========================================================
bs_array = PanelArray(
    num_rows_per_panel=BS_ROWS, num_cols_per_panel=BS_COLS,
    polarization='dual', polarization_type='VH',
    antenna_pattern='38.901', carrier_frequency=fc,
    element_vertical_spacing=V_SP, element_horizontal_spacing=H_SP
)
ue_array = PanelArray(
    num_rows_per_panel=1, num_cols_per_panel=1,
    polarization='single', polarization_type='V',
    antenna_pattern='omni', carrier_frequency=fc
)

In [None]:
# =========================================================
# Channel model: 3GPP UMa (no ray tracing; GPU-compatible)
# =========================================================
ch = UMa(carrier_frequency=fc, o2i_model='low',
         ut_array=ue_array, bs_array=bs_array,
         direction='downlink',
         enable_pathloss=True, enable_shadow_fading=True)

In [None]:
# =========================================================
# Build multi-site tri-sector topology
# =========================================================
bs_loc, bs_orient = build_trisector_sites(sites, bs_height_m=BS_HEIGHT,
                                          tilt_deg=TILT_DEG, site_az0_deg=SITE_AZ0_DEG)
B = bs_loc.shape[1]  # total sectors (3 * num_sites)

# For plotting/inspection if you like:
sector_yaws = bs_orient[0, :, 0].copy()         # radians
sector_az_deg = (np.rad2deg(sector_yaws) % 360) # degrees

In [None]:
# =========================================================
# Drop UEs in a box that covers all sites ± COVER_RADIUS
# =========================================================
xs = [xy[0] for xy in sites]; ys = [xy[1] for xy in sites]
minx, maxx = min(xs) - COVER_RADIUS, max(xs) + COVER_RADIUS
miny, maxy = min(ys) - COVER_RADIUS, max(ys) + COVER_RADIUS

rng = np.random.default_rng(7)
ue_x = rng.uniform(minx, maxx, size=NUM_UE).astype(np.float32)
ue_y = rng.uniform(miny, maxy, size=NUM_UE).astype(np.float32)
ue_z = np.full(NUM_UE, UE_HEIGHT, np.float32)
ue_loc = np.stack([ue_x, ue_y, ue_z], axis=1)[None, ...]  # [1, U, 3]

# UE orientation/velocity & outdoor flag
ue_orient = np.tile(np.array([np.pi, 0.0, 0.0], np.float32), (NUM_UE, 1))[None, ...]
ue_vel = np.zeros_like(ue_loc)                 # static UEs
in_state = np.zeros((1, NUM_UE), dtype=bool)   # outdoors

# Apply topology
ch.set_topology(ut_loc=tf.constant(ue_loc),
                bs_loc=tf.constant(bs_loc),
                ut_orientations=tf.constant(ue_orient),
                bs_orientations=tf.constant(bs_orient),
                ut_velocities=tf.constant(ue_vel),
                in_state=tf.constant(in_state))

In [None]:
# =========================================================
# Tiny resource grid just to obtain H(f); no payload needed
# =========================================================
rg = ResourceGrid(num_ofdm_symbols=1, fft_size=32, subcarrier_spacing=30e3,
                  num_tx=B, num_streams_per_tx=1)

ofdm_ch = OFDMChannel(channel_model=ch, resource_grid=rg, return_channel=True)

# Dummy input to trigger channel generation
x = tf.zeros([1, B, bs_array.num_ant, rg.num_ofdm_symbols, rg.fft_size], tf.complex64)
_, H = ofdm_ch(x)  # H: [1, U, ue_ant, B, bs_ant, S, N]


In [None]:
# =========================================================
# RSRP approximation from channel gain
# =========================================================
H2 = tf.math.square(tf.abs(H))
# Sum over RX & TX antenna ports; average over symbols/subcarriers
G = tf.reduce_mean(tf.reduce_sum(H2, axis=[2, 4]), axis=[-1, -2])  # [1, U, B]
G = tf.squeeze(G, axis=0).numpy()                                  # [U, B]

def dbm_to_watt(dbm): return 1e-3 * 10.0**(dbm/10.0)
def watt_to_dbm(w):    return 10.0*np.log10(np.maximum(w, 1e-30)) + 30.0

P_rs_W = dbm_to_watt(TX_RS_POWER_DBM)
RSRP_W = P_rs_W * G
RSRP_dBm = watt_to_dbm(RSRP_W)                                     # [U, B]

# Serving sector per UE
best = RSRP_dBm.argmax(axis=1)

# Quick sanity print
print(f"Sectors (B) = {B}, UEs = {NUM_UE}")
print("First 5 UE best-sector results:")
for i in range(5):
    s = best[i]
    print(f"  UE {i}: best sector {s}, RSRP = {RSRP_dBm[i, s]:.1f} dBm")

In [None]:
# ==== PLOT PACK (paste after you have RSRP_dBm, ue_loc, bs_loc, bs_orient) ====
import numpy as np
import matplotlib.pyplot as plt

# Derive what we need from your sim outputs
try:
    best
except NameError:
    best = RSRP_dBm.argmax(axis=1)

ue_xy = ue_loc[0, :, :2]            # [U,2]
bs_xy = bs_loc[0, :, :2]            # [B,2]
sector_yaws = bs_orient[0, :, 0]    # yaw (radians)
B = bs_xy.shape[0]

# ---------- 1) Coverage scatter: UEs colored by serving sector ----------
plt.figure(figsize=(7,7))
sc = plt.scatter(ue_xy[:,0], ue_xy[:,1], c=best, s=12, alpha=0.9)
plt.scatter(bs_xy[:,0], bs_xy[:,1], marker='^', s=180, edgecolors='k', linewidths=1.0, zorder=3)

# draw boresight arrows for each sector
arrow_len = 75.0
for b in range(B):
    yaw = float(sector_yaws[b])
    x, y = bs_xy[b]
    plt.arrow(x, y, arrow_len*np.cos(yaw), arrow_len*np.sin(yaw),
              head_width=18.0, length_includes_head=True, zorder=2)

plt.gca().set_aspect('equal', adjustable='box')
plt.title('Serving sector by color')
cbar = plt.colorbar(sc, ticks=range(B))
cbar.set_label('Sector ID')
plt.xlabel('x [m]'); plt.ylabel('y [m]')
plt.grid(True, linestyle=':')
plt.tight_layout()
plt.show()

# ---------- 2) RSRP CDF per sector ----------
plt.figure(figsize=(7,4))
for b in range(B):
    vals = np.sort(RSRP_dBm[:, b])
    p = np.linspace(0, 1, len(vals), endpoint=True)
    plt.plot(vals, p, label=f'Sector {b}')
plt.xlabel('RSRP [dBm]'); plt.ylabel('CDF')
plt.title('RSRP CDF by sector')
plt.grid(True, linestyle=':')
plt.legend()
plt.tight_layout()
plt.show()

# ---------- 3) UE×Sector heatmap ----------
idx = np.argsort(RSRP_dBm.max(axis=1))
plt.figure(figsize=(7,4))
plt.imshow(RSRP_dBm[idx].T, aspect='auto', origin='lower')
plt.yticks(range(B), [f'S{b}' for b in range(B)])
plt.xlabel('UE index (sorted by best RSRP)')
plt.ylabel('Sector')
plt.title('RSRP [dBm] per UE / sector')
plt.colorbar(label='RSRP [dBm]')
plt.tight_layout()
plt.show()

In [None]:
# ---------- 4) OPTIONAL: coverage raster (best-sector RSRP over a grid) ----------
# This samples a grid of points and computes best-sector RSRP in batches to avoid OOM.
# Requires: ch, ofdm_ch, rg, bs_array, bs_loc, bs_orient, TX_RS_POWER_DBM, UE_HEIGHT, and B.

def coverage_raster(xmin, xmax, ymin, ymax, step=40.0, batch=512, device=None):
    xs = np.arange(xmin, xmax+step, step, dtype=np.float32)
    ys = np.arange(ymin, ymax+step, step, dtype=np.float32)
    X, Y = np.meshgrid(xs, ys)
    pts = np.stack([X.ravel(), Y.ravel(), np.full(X.size, UE_HEIGHT, np.float32)], axis=1)
    ut_loc_grid = pts[None, ...]  # [1, Ugrid, 3]

    Z_list = []
    Ugrid = ut_loc_grid.shape[1]
    for s in range(0, Ugrid, batch):
        e = min(s+batch, Ugrid)
        loc_b = ut_loc_grid[:, s:e, :]
        # default handset orientation for grid points
        ut_or_b = np.tile(np.array([[np.pi, 0.0, 0.0]], np.float32), (1, e-s, 1))

        ch.set_topology(ut_loc=tf.constant(loc_b),
                        bs_loc=tf.constant(bs_loc),
                        ut_orientations=tf.constant(ut_or_b),
                        bs_orientations=tf.constant(bs_orient),
                        ut_velocities=tf.zeros_like(tf.constant(loc_b)),
                        in_state=tf.zeros((1, e-s), dtype=tf.bool))

        x = tf.zeros([1, B, bs_array.num_ant, rg.num_ofdm_symbols, rg.fft_size], tf.complex64)
        if device:
            with tf.device(device):
                _, H = ofdm_ch(x)
        else:
            _, H = ofdm_ch(x)

        H2 = tf.math.square(tf.abs(H))
        G  = tf.reduce_mean(tf.reduce_sum(H2, axis=[2,4]), axis=[-1,-2])   # [1, U_b, B]
        Gb = tf.squeeze(G, 0).numpy()
        P_rs_W = 1e-3 * 10**(TX_RS_POWER_DBM/10.0)
        RSRP_W = P_rs_W * Gb
        RSRP_b = 10.0*np.log10(np.maximum(RSRP_W, 1e-30)) + 30.0
        Z_list.append(RSRP_b.max(axis=1))  # best-sector

    Z = np.concatenate(Z_list, axis=0).reshape(Y.shape)
    return xs, ys, Z

# choose a window that covers your sites
xs = [xy[0] for xy in [(float(x), float(y)) for (x, y) in bs_xy]]  # sector coords, for bounds
ys = [xy[1] for xy in [(float(x), float(y)) for (x, y) in bs_xy]]
pad = 400.0
xmin, xmax = min(xs)-pad, max(xs)+pad
ymin, ymax = min(ys)-pad, max(ys)+pad

# If you’re tight on VRAM, you can do device='/CPU:0' for this raster call.
xx, yy, ZZ = coverage_raster(xmin, xmax, ymin, ymax, step=40.0, batch=512, device=None)

plt.figure(figsize=(8,6))
plt.pcolormesh(xx, yy, ZZ, shading='auto')
plt.scatter(bs_xy[:,0], bs_xy[:,1], marker='^', s=160, edgecolors='k', linewidths=1.0)
for b in range(B):
    yaw = float(sector_yaws[b]); x, y = bs_xy[b]
    plt.arrow(x, y, 75*np.cos(yaw), 75*np.sin(yaw),
              head_width=18.0, length_includes_head=True)
plt.gca().set_aspect('equal', adjustable='box')
plt.title('Coverage raster (best-sector RSRP) [dBm]')
plt.xlabel('x [m]'); plt.ylabel('y [m]')
plt.colorbar(label='Best-sector RSRP [dBm]')
plt.tight_layout()
plt.show()
# ==== end plot pack ====