In [2]:
# UE localization simulation (E1–E6)

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import List, Tuple, Optional

#Global config
SEED = 7
np.random.seed(SEED)

AREA = 100.0
COARSE_STEP = 20.0
F_GHZ = 3.5
RSS0 = -75.0 # [dBm] at d0
D0 = 1.0
N_PL = 2.0 # path-loss exponent
SIGMA_DB = 2.0
EPS = 1e-9
K_NEAREST = 9
N_TRIALS = 100 # Monte-Carlo repeats
H_ALT = 100.0

#Flight budget (для E4 trade-off
FLIGHT_SPEED_MPS = 3.0
HOVER_SEC_PER_POINT = 4.0

CALIB_BIAS_DB = 0.0
ANCHOR_BIASES_DB: Optional[np.ndarray] = None

OUTDIR = os.path.join(os.getcwd(), "results")
os.makedirs(OUTDIR, exist_ok=True)

#L_pen(f)
def L_pen_db(material: str, f_ghz: float) -> float:
    m = material.lower()
    if m == "glass":      return 2.0 + 0.2*f_ghz
    if m == "irr_glass":  return 23.0 + 0.3*f_ghz
    if m == "concrete":   return 5.0 + 4.0*f_ghz
    if m == "wood":       return 4.85 + 0.12*f_ghz
    return 0.0

# obstacles
@dataclass
class RectObs:
    xmin: float; ymin: float; xmax: float; ymax: float; material: str

def coarse_anchors(step: float = None) -> np.ndarray:
    s = COARSE_STEP if step is None else float(step)
    xs = np.arange(0, AREA + 1e-6, s)
    ys = np.arange(0, AREA + 1e-6, s)
    return np.array([(x, y) for x in xs for y in ys], dtype=float)

def vertical_wall(material: str) -> List[RectObs]:
    # Thin vertical wall around x~58 m spanning scene in y
    return [RectObs(57.5, -5.0, 58.5, 105.0, material)]

def make_obstacles(kind: str) -> List[RectObs]:
    k = kind.lower()
    if k == "los": return []
    if k in ["concrete","glass","irr_glass","wood"]:
        return vertical_wall(k)
    return []

def orient(a, b, c):
    return np.sign((b[0]-a[0])*(c[1]-a[1]) - (b[1]-a[1])*(c[0]-a[0]))

def seg_intersect(p, q, a, b) -> bool:
    o1 = orient(p,q,a); o2 = orient(p,q,b)
    o3 = orient(a,b,p); o4 = orient(a,b,q)
    return (o1*o2 < 0) and (o3*o4 < 0)

def segment_hits_rect(p, q, R: RectObs) -> bool:
    A=(R.xmin,R.ymin); B=(R.xmax,R.ymin); C=(R.xmax,R.ymax); D=(R.xmin,R.ymax)
    return seg_intersect(p,q,A,B) or seg_intersect(p,q,B,C) or \
           seg_intersect(p,q,C,D) or seg_intersect(p,q,D,A)

# RSS model and WLS solver
def ray_penetration_db(a: Tuple[float,float], x: Tuple[float,float],
                       obstacles: List[RectObs]) -> float:
    total = 0.0
    for R in obstacles:
        if segment_hits_rect(a, x, R):
            total += L_pen_db(R.material, F_GHZ)
    return total

def rss_measure(a, x, obstacles: List[RectObs], sigma_db: float = None) -> float:
    sdb = SIGMA_DB if sigma_db is None else float(sigma_db)
    d = max(np.hypot(a[0]-x[0], a[1]-x[1]), 1e-3)
    Ldist = 10*N_PL*np.log10(d/D0)
    Lpen  = ray_penetration_db(a, x, obstacles)
    noise = np.random.normal(0.0, sdb)
    return RSS0 - Ldist - Lpen + noise  # dB

def rss_measure_biased(a, x, obstacles: List[RectObs], sigma_db: float = None, anchor_idx: Optional[int]=None) -> float:
    r = rss_measure(a, x, obstacles, sigma_db)
    bias = CALIB_BIAS_DB
    if ANCHOR_BIASES_DB is not None and anchor_idx is not None:
        if 0 <= anchor_idx < len(ANCHOR_BIASES_DB):
            bias += float(ANCHOR_BIASES_DB[anchor_idx])
    return r + bias

def rss_to_range(r_db: float) -> float:
    return D0 * 10**((RSS0 - r_db)/(10*N_PL))

def pick_nearest(anchors_all: np.ndarray, x: Tuple[float,float], k: int = None):
    k = K_NEAREST if k is None else int(k)
    d = np.hypot(anchors_all[:,0]-x[0], anchors_all[:,1]-x[1])
    return anchors_all[np.argsort(d)[:k]]

def wls_multilat(anchors: np.ndarray, rss: np.ndarray, x_true: Tuple[float,float],
                 obstacles: List[RectObs], correct: str = "none") -> Tuple[float,float,np.ndarray]:
    r_use = rss.copy()
    for i, a in enumerate(anchors):
        if correct.lower() == "oracle":
            r_use[i] += ray_penetration_db(tuple(a), x_true, obstacles)

    d = np.array([rss_to_range(r) for r in r_use])

    a1 = anchors[0]; d1 = d[0]
    A=[]; b=[]
    for i in range(1, len(anchors)):
        ai = anchors[i]; di = d[i]
        A.append(2*(ai - a1))
        b.append((ai@ai - a1@a1) + d1*d1 - di*di)
    A=np.asarray(A,float); b=np.asarray(b,float)

    w = 1.0/(d[1:]**2 + EPS)
    W = np.diag(w)
    ATA = A.T @ W @ A
    ATb = A.T @ W @ b
    try:
        p = np.linalg.solve(ATA, ATb)
    except np.linalg.LinAlgError:
        p = np.linalg.pinv(ATA) @ ATb
    return float(p[0]), float(p[1]), d

#Altitude offset helpers
def altitude_offset_wls(anchors_xy: np.ndarray, ue_est_xy: Tuple[float,float], h: float) -> float:
    d = np.linalg.norm(anchors_xy - np.array(ue_est_xy, float), axis=1)
    delta_i = np.sqrt(d**2 + h**2) - d
    return float(np.sqrt(np.mean(delta_i**2)))

def corrected_rmse(raw_err: float, delta_eff: float) -> float:
    return np.sqrt(max(raw_err**2 - delta_eff**2, 0.0))

#Plot helpers
def _annotate(ax, bars):
    for b in bars:
        h = b.get_height()
        ax.annotate(f"{h:.1f}", xy=(b.get_x()+b.get_width()/2, h),
                    xytext=(0,3), textcoords="offset points",
                    ha="center", va="bottom", fontsize=9)

def plot_bars_rmse(df_full: pd.DataFrame, title: str, fname: str, altitude_corrected=False):
    labels = df_full["Scenario"].tolist()
    x = np.arange(len(labels))
    width = 0.35
    fig, ax = plt.subplots(figsize=(8.6,4.8))
    if not altitude_corrected:
        bars1 = ax.bar(x - width/2, df_full["RMSE_raw [m]"], width, label="Raw")
        bars2 = ax.bar(x + width/2, df_full["RMSE_corr [m]"], width, label="Oracle corrected")
        _annotate(ax, bars1); _annotate(ax, bars2)
        ax.set_title(title)
    else:
        bars3 = ax.bar(x - width/2, df_full["RMSE_raw_altcorr [m]"], width, label="Raw (alt-corr)")
        bars4 = ax.bar(x + width/2, df_full["RMSE_corr_altcorr [m]"], width, label="Corrected (alt-corr)")
        _annotate(ax, bars3); _annotate(ax, bars4)
        ax.set_title(title + " — Altitude-corrected")
    ax.set_xticks(x); ax.set_xticklabels(labels, rotation=15)
    ax.set_ylabel("Position RMSE [m]")
    ax.legend(loc="upper left", framealpha=0.9)
    plt.tight_layout()
    plt.savefig(os.path.join(OUTDIR, fname), dpi=300)
    plt.close(fig)

#Cost helpers for coarse scan
def _grid_counts(step: float, area: float = AREA) -> Tuple[int,int,int]:
    nx = int(np.floor(area/step)) + 1
    ny = nx
    return nx, ny, nx*ny

def _serpentine_path_len(step: float, area: float = AREA) -> float:
    nx, ny, _ = _grid_counts(step, area)
    horiz = (nx-1) * step * ny
    vert  = (ny-1) * step
    return horiz + vert

def _coarse_scan_costs(step: float,
                       speed_mps: float = FLIGHT_SPEED_MPS,
                       dwell_s: float = HOVER_SEC_PER_POINT,
                       area: float = AREA) -> dict:
    nx, ny, npts = _grid_counts(step, area)
    D = _serpentine_path_len(step, area)
    t_move = D / max(speed_mps, 1e-6)
    t_hover = npts * dwell_s
    T = t_move + t_hover
    return {
        "nx": nx, "ny": ny, "points": npts,
        "path_len[m]": D,
        "flight_time[s]": T,
        "flight_time[min]": T/60.0
    }

# Core simulator for a scenario
def simulate_scene(material: str, N_trials=N_TRIALS, use_bias=False, step=None, k_nearest=None,
                   h_alt=None, correct_mode="none", sigma_db=None, n_pl=None, freq_ghz=None) -> dict:
    global N_PL, SIGMA_DB, F_GHZ
    anchors_all = coarse_anchors(step=step)
    rmse_raw = []; rmse_cor = []; Ns = []
    deltas_raw = []; rmse_raw_alt = []; rmse_cor_alt = []
    k_use = K_NEAREST if k_nearest is None else int(k_nearest)

    n_pl_old, sig_old, f_old = N_PL, SIGMA_DB, F_GHZ
    if n_pl is not None: N_PL = float(n_pl)
    if sigma_db is not None: SIGMA_DB = float(sigma_db)
    if freq_ghz is not None: F_GHZ = float(freq_ghz)
    h_use = H_ALT if h_alt is None else float(h_alt)

    for _ in range(N_trials):
        x_true = (np.random.uniform(30, 70), np.random.uniform(30, 70))
        obstacles = make_obstacles(material)
        anchors = pick_nearest(anchors_all, x_true, k_use)

        # RSS
        rss = []
        for idx, a in enumerate(anchors):
            if use_bias:
                rss.append(rss_measure_biased(tuple(a), x_true, obstacles, SIGMA_DB, idx))
            else:
                rss.append(rss_measure(tuple(a), x_true, obstacles, SIGMA_DB))
        rss = np.array(rss)

        # NLoS fraction
        nlos_count = sum([ray_penetration_db(tuple(a), x_true, obstacles) > 0 for a in anchors])
        Ns.append(nlos_count/len(anchors))

        # WLS positions
        p_raw = wls_multilat(anchors, rss, x_true, obstacles, correct="none")[:2]
        p_cor = wls_multilat(anchors, rss, x_true, obstacles, correct=correct_mode)[:2]

        e_raw = np.hypot(p_raw[0]-x_true[0], p_raw[1]-x_true[1])
        e_cor = np.hypot(p_cor[0]-x_true[0], p_cor[1]-x_true[1])
        rmse_raw.append(e_raw); rmse_cor.append(e_cor)

        # altitude-corrected variants
        delta_eff = altitude_offset_wls(anchors, p_raw, h_use)
        deltas_raw.append(delta_eff)
        rmse_raw_alt.append(corrected_rmse(e_raw, delta_eff))
        rmse_cor_alt.append(corrected_rmse(e_cor, delta_eff))

    N_PL, SIGMA_DB, F_GHZ = n_pl_old, sig_old, f_old

    scenario_name = material.capitalize() if material!="irr_glass" else "IRR glass"
    return {
        "Scenario": scenario_name,
        "RMSE_raw [m]": float(np.sqrt(np.mean(np.square(rmse_raw)))),
        "STD_raw [m]":  float(np.std(rmse_raw, ddof=1)),
        "RMSE_corr [m]": float(np.sqrt(np.mean(np.square(rmse_cor)))),
        "STD_corr [m]":  float(np.std(rmse_cor, ddof=1)),
        "RMSE_raw_altcorr [m]": float(np.sqrt(np.mean(np.square(rmse_raw_alt)))),
        "RMSE_corr_altcorr [m]": float(np.sqrt(np.mean(np.square(rmse_cor_alt)))),
        "Mean Delta_eff [m]": float(np.mean(deltas_raw)),
        "Mean N": float(np.mean(Ns)),
        "n_pl": N_PL, "sigma_db": SIGMA_DB, "f_ghz": F_GHZ, "k": k_use,
        "step": COARSE_STEP if step is None else float(step), "h_alt": h_use,
        "correct": correct_mode, "use_bias": bool(use_bias)
    }

#E1
def E1_baseline():
    scenarios = ["los", "concrete", "glass", "irr_glass", "wood"]
    rows = [simulate_scene(s, correct_mode="oracle") for s in scenarios]
    df = pd.DataFrame(rows)
    df.to_csv(os.path.join(OUTDIR, "E1_baseline.csv"), index=False)
    plot_bars_rmse(df, f"E1 Baseline RMSE (f={F_GHZ} GHz, N={N_TRIALS})", "E1_baseline_raw.png", altitude_corrected=False)
    plot_bars_rmse(df, "E1 Baseline RMSE", "E1_baseline_altcorr.png", altitude_corrected=True)
    return df

#E2
def E2_sensitivity_n_sigma(material="concrete", n_vals=(1.8,2.0,2.2,2.4), sig_vals=(1,2,3,4)):
    data = []
    for n in n_vals:
        for s in sig_vals:
            row = simulate_scene(material, correct_mode="oracle", n_pl=n, sigma_db=s)
            row["n_pl"] = n; row["sigma_db"] = s; row["Scenario"] = material
            data.append(row)
    df = pd.DataFrame(data)
    df.to_csv(os.path.join(OUTDIR, "E2_sensitivity.csv"), index=False)

    n_ticks = list(n_vals); s_ticks = list(sig_vals)
    Z_raw = np.zeros((len(n_ticks), len(s_ticks)))
    Z_cor = np.zeros_like(Z_raw)
    for i, n in enumerate(n_ticks):
        for j, s in enumerate(s_ticks):
            r = df[(df["n_pl"]==n) & (df["sigma_db"]==s)]
            Z_raw[i,j] = float(r["RMSE_raw [m]"])
            Z_cor[i,j] = float(r["RMSE_corr [m]"])

    for tag, Z in [("raw", Z_raw), ("corrected", Z_cor)]:
        fig, ax = plt.subplots(figsize=(6.8,4.8))
        im = ax.imshow(Z, origin="lower",
                       extent=(min(s_ticks), max(s_ticks), min(n_ticks), max(n_ticks)),
                       aspect="auto")
        plt.colorbar(im, ax=ax, label="RMSE [m]")
        ax.set_xlabel("σ_shadow [dB]"); ax.set_ylabel("n (path-loss exponent)")
        ax.set_title(f"E2: RMSE heatmap ({material}, {tag})")
        plt.tight_layout()
        plt.savefig(os.path.join(OUTDIR, f"E2_heatmap_{material}_{tag}.png"), dpi=300)
        plt.close(fig)
    return df

# E3
def E3_calibration_bias(material="concrete", deltas=(-3,-2,0,2,3), per_anchor_std=1.0):
    global CALIB_BIAS_DB, ANCHOR_BIASES_DB
    rows = []

    for d in deltas:
        CALIB_BIAS_DB = float(d)
        ANCHOR_BIASES_DB = None
        r = simulate_scene(material, correct_mode="oracle", use_bias=True)
        r["Delta_RSS0"] = d; r["PerAnchorStd"] = 0.0
        rows.append(r)

    CALIB_BIAS_DB = 0.0
    ANCHOR_BIASES_DB = np.random.normal(0.0, per_anchor_std, K_NEAREST)
    r2 = simulate_scene(material, correct_mode="oracle", use_bias=True)
    r2["Delta_RSS0"] = 0.0; r2["PerAnchorStd"] = per_anchor_std
    rows.append(r2)

    df = pd.DataFrame(rows)
    df.to_csv(os.path.join(OUTDIR, "E3_calibration_bias.csv"), index=False)

    df_d = df[df["PerAnchorStd"]==0.0].sort_values("Delta_RSS0")
    fig, ax = plt.subplots(figsize=(6.8,4.6))
    ax.plot(df_d["Delta_RSS0"], df_d["RMSE_raw [m]"], marker="o", label="Raw")
    ax.plot(df_d["Delta_RSS0"], df_d["RMSE_corr [m]"], marker="o", label="Oracle corrected")
    ax.set_xlabel("Global RSS0 bias [dB]"); ax.set_ylabel("Position RMSE [m]")
    ax.set_title("E3: RMSE vs RSS0 bias")
    ax.legend(); plt.tight_layout()
    plt.savefig(os.path.join(OUTDIR, "E3_rmse_vs_bias.png"), dpi=300); plt.close(fig)
    return df

# E4
def E4_geometry(material="concrete", steps=(10,15,20,25), Ks=(6,9,12)):

    rows = []
    for s in steps:
        for k in Ks:
            r = simulate_scene(material, correct_mode="oracle", step=s, k_nearest=k)
            r["COARSE_STEP"] = s
            r["K"] = k
            rows.append(r)
    df = pd.DataFrame(rows)
    df.to_csv(os.path.join(OUTDIR, "E4_geometry.csv"), index=False)

    fig, ax = plt.subplots(figsize=(7.0,4.6))
    for k in Ks:
        dd = df[df["K"]==k].sort_values("COARSE_STEP")
        ax.plot(dd["COARSE_STEP"], dd["RMSE_raw [m]"], marker="o", label=f"Raw, K={k}")
        ax.plot(dd["COARSE_STEP"], dd["RMSE_corr [m]"], marker="o", linestyle="--", label=f"Corr, K={k}")
    ax.set_xlabel("COARSE_STEP [m]"); ax.set_ylabel("Position RMSE [m]")
    ax.set_title("E4: RMSE vs grid step and K")
    ax.legend(ncol=2); plt.tight_layout()
    plt.savefig(os.path.join(OUTDIR, "E4_rmse_vs_stepK.png"), dpi=300); plt.close(fig)

    cost_rows = []
    for s in steps:
        c = _coarse_scan_costs(s)
        c["COARSE_STEP"] = s
        cost_rows.append(c)
    df_cost = pd.DataFrame(cost_rows)
    df_cost.to_csv(os.path.join(OUTDIR, "E4_coarse_costs.csv"), index=False)

    df_plot = df.merge(df_cost, on="COARSE_STEP")
    df_plot.to_csv(os.path.join(OUTDIR, "E4_geometry_tradeoff.csv"), index=False)

    fig, ax = plt.subplots(figsize=(8.6,4.8))
    for k in Ks:
        sub = df_plot[df_plot["K"]==k].sort_values("flight_time[min]")
        ax.plot(sub["flight_time[min]"], sub["RMSE_raw [m]"], marker="o", label=f"Raw, K={k}")
        ax.plot(sub["flight_time[min]"], sub["RMSE_corr [m]"], marker="o", linestyle="--", label=f"Corr, K={k}")
    ax.set_xlabel(f"Coarse-scan flight time [min] (v={FLIGHT_SPEED_MPS} m/s, dwell={HOVER_SEC_PER_POINT} s/pt)")
    ax.set_ylabel("Position RMSE [m]")
    ax.set_title("E4: RMSE vs coarse-scan flight time")
    ax.legend(ncol=2, framealpha=0.9)
    plt.tight_layout()
    plt.savefig(os.path.join(OUTDIR, "E4_tradeoff_RMSE_vs_time.png"), dpi=300)
    plt.close(fig)

    fig, ax = plt.subplots(figsize=(8.6,4.8))
    for k in Ks:
        sub = df_plot[df_plot["K"]==k].sort_values("points")
        ax.plot(sub["points"], sub["RMSE_raw [m]"], marker="o", label=f"Raw, K={k}")
        ax.plot(sub["points"], sub["RMSE_corr [m]"], marker="o", linestyle="--", label=f"Corr, K={k}")
    ax.set_xlabel("Number of coarse waypoints")
    ax.set_ylabel("Position RMSE [m]")
    ax.set_title("E4: RMSE vs number of coarse waypoints")
    ax.legend(ncol=2, framealpha=0.9)
    plt.tight_layout()
    plt.savefig(os.path.join(OUTDIR, "E4_tradeoff_RMSE_vs_points.png"), dpi=300)
    plt.close(fig)

    return df

# E5
def rss_avg(a, x, obstacles, K=1):
    vals = [rss_measure(a, x, obstacles) for _ in range(K)]
    return float(np.mean(vals))

def E5_time_averaging(material="concrete", K_vals=(1,3,5,7)):
    rows = []
    anchors_all = coarse_anchors()
    for K in K_vals:
        rmse_e = []
        for _ in range(N_TRIALS):
            ue_true = (np.random.uniform(30,70), np.random.uniform(30,70))
            obstacles = make_obstacles(material)
            anchors = pick_nearest(anchors_all, ue_true, K_NEAREST)
            rss = np.array([rss_avg(tuple(a), ue_true, obstacles, K) for a in anchors])
            p = wls_multilat(anchors, rss, ue_true, obstacles, correct="none")[:2]
            rmse_e.append(np.hypot(p[0]-ue_true[0], p[1]-ue_true[1]))
        rows.append({"K_samples":K, "RMSE_raw [m]": float(np.sqrt(np.mean(np.square(rmse_e))))})
    df = pd.DataFrame(rows)
    df.to_csv(os.path.join(OUTDIR, "E5_time_averaging.csv"), index=False)

    fig, ax = plt.subplots(figsize=(6.6,4.6))
    ax.plot(df["K_samples"], df["RMSE_raw [m]"], marker="o")
    ax.set_xlabel("K samples per point (@1 Hz)"); ax.set_ylabel("Position RMSE [m]")
    ax.set_title("E5: Benefit of temporal averaging")
    plt.tight_layout()
    plt.savefig(os.path.join(OUTDIR, "E5_time_averaging.png"), dpi=300); plt.close(fig)
    return df

# E6
def E6_frequency(materials=("concrete","glass","irr_glass","wood"), freqs=(2.6,3.5,4.9)):
    rows = []
    for f in freqs:
        for m in materials:
            r = simulate_scene(m, correct_mode="oracle", freq_ghz=f)
            r["f_GHz"] = f
            rows.append(r)
    df = pd.DataFrame(rows)
    df.to_csv(os.path.join(OUTDIR, "E6_frequency.csv"), index=False)

    for f in freqs:
        dff = df[df["f_GHz"]==f]
        plot_bars_rmse(dff, f"E6 RMSE by material (f={f} GHz)", f"E6_bars_{f:.1f}GHz.png", altitude_corrected=False)
    return df

# Runner
def main():
    print("[run_campaign] Output directory:", OUTDIR)
    summaries = {}

    print("E1 ..."); summaries["E1"] = E1_baseline()
    print("E2 ..."); summaries["E2"] = E2_sensitivity_n_sigma()
    print("E3 ..."); summaries["E3"] = E3_calibration_bias()
    print("E4 ..."); summaries["E4"] = E4_geometry()
    print("E5 ..."); summaries["E5"] = E5_time_averaging()
    print("E6 ..."); summaries["E6"] = E6_frequency()

    with open(os.path.join(OUTDIR, "INDEX_README.txt"), "w") as f:
        f.write("Simulation campaign outputs:\n")
        for root, _, files in os.walk(OUTDIR):
            for fn in sorted(files):
                if fn.lower().endswith((".csv",".png",".txt")):
                    f.write(f" - {fn}\n")

    print("Done. See the 'results' folder.")

if __name__ == "__main__":
    main()


[run_campaign] Output directory: /content/results
E1 ...
E2 ...


  Z_raw[i,j] = float(r["RMSE_raw [m]"])
  Z_cor[i,j] = float(r["RMSE_corr [m]"])


E3 ...
E4 ...
E5 ...
E6 ...
Done. See the 'results' folder.
