In [None]:
from leo_utils import arc_point_on_earth, compute_satellite_intersection_point_enu, compute_az_el_dist
import numpy as np


# Generate RX positions

distances_km = [0.001]
azimuths_deg = np.linspace(0, 360, len(distances_km), endpoint=False)
gnd_positions = [np.array([0.0, 0.0, 0.0])]

for d_km, az in zip(distances_km, azimuths_deg):
    pos = arc_point_on_earth(d_km, az)
    gnd_positions.append(pos)
gnd_positions = np.array(gnd_positions)

for i, pos in enumerate(gnd_positions):
    print(f"TX{i}(m): {pos}")


# Compute SAT intersection points

sat_orbit_m = 550e3
angles = [(45, 90), (120, 80), (75,75),(80,88), (65, 85),(130,75)]
# angles = [(45, 90)]
sat_positions = []
delays_ms = []
fspl_db = []
frequency_hz = 10e9 
wavelength = 3e8 / frequency_hz

for az, el in angles:
    pos, delay, dist = compute_satellite_intersection_point_enu(az, el, sat_orbit_m)
    sat_positions.append(pos)
    delays_ms.append(delay)
    fspl = 20 * np.log10(4 * np.pi * dist / wavelength)
    fspl_db.append(fspl)

sat_positions = np.array(sat_positions)
delays_ms = np.array(delays_ms)
fspl_db = np.array(fspl_db)

print("\nSatellite Pos [m]:\n", sat_positions)
# print("\nPropagation delays [ms]:\n", delays_ms)
# print("\nFree-space path loss [dB]:\n", fspl_db)


# Compute az/el/dist per TX-SAT
  
for i, tx in enumerate(gnd_positions): 
    print(f"\nFrom TX{i}:")
    for j, sat in enumerate(sat_positions):
        az, el, dist, n_waves = compute_az_el_dist(sat, tx, frequency_hz)
        print(f"  SAT{j}: az={az:.2f}°, el={el:.2f}°, dist={dist:.2f} m, λ count ≈ {n_waves:.2f}")


In [None]:
"""
Compute Starlink satellites that satisfy user-defined
elevation / azimuth / range constraints and express their
locations in a local ENU Cartesian frame whose origin is the
ground station (x = East, y = North, z = Up).

The user-defined jammer is listed **first** in the output table.
"""

import numpy as np
import pandas as pd
import requests
from skyfield.api import load, EarthSatellite, wgs84
from skyfield.framelib import itrs  # Earth-fixed ITRF frame

# ───────────────────────────────────────────────────────────────
# 0) User-configurable parameters
# ----------------------------------------------------------------
ELEV_MIN_DEG   = 10           # Minimum elevation (deg)
AZ_RANGE_DEG   = (0, 360)     # Azimuth range [start, end)
MAX_SLANT_KM   = 9999         # Maximum slant range (km)

AZ_JAMMER_DEG = 45.0          # 0° = North, 90° = East
R_JAMMER_KM   = 50.0          # Horizontal distance (km)
ALT_JAMMER_M  = 0.0           # Altitude above local horizon (m)

# ───────────────────────────────────────────────────────────────
# 1) Observer definition (Boulder, CO)
# ----------------------------------------------------------------
lat, lon, elev = 40.0822, -105.1092, 1560  # meters
ts = load.timescale()
t = ts.now()
topos = wgs84.latlon(lat, lon, elev)

# Build ECEF → ENU rotation matrix
phi = np.radians(lat)
lam = np.radians(lon)
R_ecef2enu = np.array([
    [-np.sin(lam),              np.cos(lam),               0.0],
    [-np.sin(phi)*np.cos(lam), -np.sin(phi)*np.sin(lam),  np.cos(phi)],
    [ np.cos(phi)*np.cos(lam),  np.cos(phi)*np.sin(lam),  np.sin(phi)]
])

# ───────────────────────────────────────────────────────────────
# 2) Build jammer row FIRST
# ----------------------------------------------------------------
az_rad = np.radians(AZ_JAMMER_DEG)
r_m = R_JAMMER_KM * 1000

jammer_enu = np.array([
    r_m * np.sin(az_rad),  # x_East
    r_m * np.cos(az_rad),  # y_North
    ALT_JAMMER_M           # z_Up
])

rows = [[
    "JAMMER",
    AZ_JAMMER_DEG,
    0.0,                   # Elevation undefined
    R_JAMMER_KM,
    *np.round(jammer_enu, 1)
]]

# ───────────────────────────────────────────────────────────────
# 3) Download Starlink TLEs and append satellites
# ----------------------------------------------------------------
url = "https://celestrak.org/NORAD/elements/gp.php?GROUP=starlink&FORMAT=tle"
HEADERS = {"User-Agent": "sat-tracker/1.0 (+https://github.com/you/repo)"}
tle_lines = requests.get(url, timeout=10).text.strip().splitlines()

sats = []
for i in range(0, len(tle_lines) - 2, 3):  # Process full TLE blocks
    name = tle_lines[i].strip()
    l1 = tle_lines[i+1].strip()
    l2 = tle_lines[i+2].strip()
    if not (l1.startswith("1 ") and l2.startswith("2 ")):
        continue
    try:
        sats.append((name, EarthSatellite(l1, l2, name, ts)))
    except ValueError:
        continue

# ───────────────────────────────────────────────────────────────
# 4) Evaluate satellites
# ----------------------------------------------------------------
for name, sat in sats:
    alt, az, dist = (sat - topos).at(t).altaz()
    slant_km = dist.km

    if alt.degrees < ELEV_MIN_DEG:
        continue
    if not (AZ_RANGE_DEG[0] <= az.degrees < AZ_RANGE_DEG[1]):
        continue
    if slant_km > MAX_SLANT_KM:
        continue

    sat_ecef = sat.at(t).frame_xyz(itrs).m
    obs_ecef = topos.at(t).frame_xyz(itrs).m
    enu = R_ecef2enu @ (sat_ecef - obs_ecef)

    rows.append([
        name,
        round(az.degrees, 2),
        round(alt.degrees, 2),
        round(slant_km, 1),
        *np.round(enu, 1)
    ])


NameError: name 'rows' is not defined

In [None]:

# ───────────────────────────────────────────────────────────────
# 5) Output results
# ----------------------------------------------------------------
columns = ["Name", "Azimuth (°)", "Elevation (°)", "Slant km",
           "x_East (m)", "y_North (m)", "z_Up (m)"]
df = pd.DataFrame(rows, columns=columns)
df = df.sort_values(by="Slant km", ascending=True).reset_index(drop=True)
print(df.to_string(index=False))


In [None]:
from sionna.rt import Scene, Receiver, Transmitter, PlanarArray, PathSolver
import vsat_dish_3gpp
import numpy as np
import math

jam_rows = 32
jam_cols = 32
jam_antennas = jam_cols*jam_rows

sat_rows = 1
sat_cols = 1
sat_antennas = sat_cols*sat_rows

tx_rows = 32
tx_cols = 32
tx_antennas = tx_cols*tx_rows
# Create scene
scene = Scene()     

for tx_name in scene.transmitters:
    scene.remove(tx_name)
for rx_name in scene.receivers:
    scene.remove(rx_name)   
# Antenna pattern
scene.synthetic_array = True
scene.frequency = 10e9

tx_array = PlanarArray(num_rows=tx_rows ,
                        num_cols=tx_cols,
                        vertical_spacing=0.5,
                        horizontal_spacing=0.5,
                        pattern="tr38901",
                        polarization="V")

jam_array = PlanarArray(num_rows=jam_rows ,
                            num_cols=jam_cols,
                            vertical_spacing=0.5,
                            horizontal_spacing=0.5,
                        #  pattern="vsat_dish",
                            pattern="tr38901",
                            polarization="V")

sat_array = PlanarArray(num_rows=sat_rows ,
                             num_cols=sat_cols,
                             vertical_spacing=0.5,
                             horizontal_spacing=0.5,
                             pattern="iso",
                             polarization="V")



# satellite as rx
scene.rx_array = sat_array
for i, pos in enumerate(sat_positions):
    # rx = Receiver(name=f"rx{i}", position=pos, orientation = [0, 180, 0])
    rx = Receiver(name=f"rx{i}", position=pos)
    scene.add(rx)

    rx.look_at(gnd_positions[0])
    
    
    



    
    

scene.tx_array = tx_array
tx = Transmitter(name=f"tx{0}", position=gnd_positions[0], display_radius=200)
scene.add(tx)
tx.look_at(sat_positions[0])
p_solver  = PathSolver()
paths = p_solver(scene=scene,
                 max_depth=0,
                 los=True,
                 synthetic_array=True,
                 seed=41)

# CIR analysis
a_tx, tau_tx = paths.cir(normalize_delays=False, out_type="numpy")

for tx_name in scene.transmitters:
    scene.remove(tx_name)

scene.tx_array = jam_array
tx = Transmitter(name=f"tx{1}", position=gnd_positions[1], display_radius=200)
scene.add(tx)
# tx.look_at(sat_positions[0])
tx.look_at(sat_positions[0])


p_solver  = PathSolver()
paths = p_solver(scene=scene,
                 max_depth=0,
                 los=True,
                 synthetic_array=True,
                 seed=41)


a_jam, tau_jam = paths.cir(normalize_delays=False, out_type="numpy")

H0 = a_tx[:, 0, 0, :, 0, 0]  
H1 = a_jam[:, 0, 0, :, 0, 0]  





In [None]:
H0.shape

In [None]:
import numpy as np
from scipy.linalg import eigh
def best_w0(H0, P):
    A = H0.conj().T @ np.linalg.inv(P) @ H0
    _, eigvecs = eigh(A)                 # ascending order
    w0 = eigvecs[:, -1].reshape(-1, 1)
    return w0

def best_w1(H0, H1, w0, N0, E1):
    a = H0 @ w0
    S = H1.conj().T @ np.outer(a, a.conj()) @ H1
    beta = E1 / N0
    C = np.eye(H1.shape[1]) + beta * (H1.conj().T @ H1)
    # largest gen-eigvec of (S,C)
    eigvals, eigvecs = eigh(S, C)
    w1 = eigvecs[:, -1].reshape(-1, 1)
    # q /= norm(q)
    return w1, E1 * np.outer(w1, w1.conj())

def jam_tx_game(H0, H1, N0, E0, E1, tol=1e-20,its=30):
    N0tx = H0.shape[1]
    N1tx = H1.shape[1]
    w0 = np.ones(N0tx) / np.sqrt(N0tx)
    w1 = np.ones(N1tx) / np.sqrt(N1tx)
    Q1 = E1 * np.outer(w1, w1.conj())
    P = H1 @ Q1 @ H1.conj().T + N0*np.eye(H0.shape[0])
    snr_hist = []
    for _ in range(its):
        if _ == 0:
            print("Q1 shape:", Q1.shape)
            print("P shape:", P.shape)
        w0 = best_w0(H0, P)
        w1, Q1 = best_w1(H0, H1, w0, N0, E1)
        P   = H1 @ Q1 @ H1.conj().T + N0 * np.eye(H0.shape[0])
        snr = E0 * (w0.conj().T @ H0.conj().T @ np.linalg.inv(P) @ H0 @ w0).real
        snr_hist.append(snr)
        # if len(snr_hist) > 1 and abs(snr_hist[-1] - snr_hist[-2]) / snr_hist[-1] < tol:
        #     break
        # print("w0 shape:", w0.shape)
        # print("w1 shape:", w1.shape)
    return w0, w1, snr, snr_hist


In [None]:
H1.shape

In [None]:
c = 3e8  # speed of light (m/s)
fc = 10e9  # Carrier frequency: 10 GHz
wavelength = c / fc
bandwidth = 100e6  # 100 MHz
tx_power_dbm = 30  #  dBm
jam_power_dbm = 50
k = 1.38e-23  # Boltzmann 
GT = 13 # db gain-to-noise-temperature for for 0.33m Equivalent satellite antenna aperturesatellites, or can be 5 dB K^(-1) for 0.13m Equivalent satellite antenna aperture
La = 5 # dB
GT_linear_inv = 10 ** (-GT / 10)
La_linear = 10 ** (La / 10)
noise_power_watt = k * bandwidth * GT_linear_inv * La_linear
Tx_power_watt = 10 ** ((tx_power_dbm  - 30)/ 10)  
jam_power_watt = 10 ** ((jam_power_dbm  - 30)/ 10)
   

In [None]:
w0_opt, w1_opt, snr, snr_curve = jam_tx_game(H0, H1, noise_power_watt, Tx_power_watt, jam_power_watt,its=50)

In [None]:
import matplotlib.pyplot as plt
snr_db = 10 * np.log10(np.array(snr_curve).squeeze())

plt.figure(figsize=(6, 4))
plt.plot(snr_db, marker='o')
plt.xlabel("Iteration")
plt.ylabel("SINR (dB)")
plt.title("SINR Convergence Curve (in dB)")
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
import numpy as np
import cvxpy as cp

def compute_beamforming_vector(H, mode="eig1"):
    """
    Compute the transmit beamforming vector using one of:
    - 'eig1': Eigen-decomposition on unweighted Q
    - 'eig2': Eigen-decomposition on weighted Q (equalizes TX energy per direction)
    - 'avg' : Normalized average of effective channels
    - 'fair': Max-min fairness (equalize RX received energy)

    Parameters:
        H: numpy array of shape (N_rx, rx_antennas, N_tx, tx_antennas)
        mode: 'eig1', 'eig2', 'avg', or 'fair'

    Returns:
        w: Normalized beamforming vector of shape (tx_antennas, 1)
        norm_last_h_eff: Norm of last h_eff computed (debug/info)
    """
    N_rx, rx_antennas, N_tx, tx_antennas = H.shape
    assert N_tx == 1, "Only supports single TX (N_tx == 1)"

    w_r = np.ones((rx_antennas, 1), dtype=complex) / np.sqrt(rx_antennas)

    if mode in ("eig1", "eig2"):
        Q = np.zeros((tx_antennas, tx_antennas), dtype=complex)
        for rx in range(N_rx):
            H_rx = H[rx, :, 0, :]           # (rx_antennas, tx_antennas)
            h_eff = H_rx.conj().T @ w_r     # (tx_antennas, 1)
            alpha = 1.0 / np.linalg.norm(h_eff)**2 if mode == "eig2" else 1.0
            Q += alpha * (h_eff @ h_eff.conj().T)
        eigvals, eigvecs = np.linalg.eigh(Q)
        w = eigvecs[:, -1]

    elif mode == "avg":
        w_sum = np.zeros((tx_antennas, 1), dtype=complex)
        for rx in range(N_rx):
            H_rx = H[rx, :, 0, :]
            h_eff = H_rx.conj().T @ w_r
            h_eff_unit = h_eff / np.linalg.norm(h_eff)
            w_sum += h_eff_unit
        w = w_sum[:, 0]
        w /= np.linalg.norm(w)
    elif mode == "fair":
        w_var = cp.Variable((tx_antennas, 1), complex=True)
        t = cp.Variable()
        constraints = [cp.norm(w_var) <= 1]

        for rx in range(N_rx):
            H_rx = H[rx, :, 0, :]              # (M, N)
            h_eff = H_rx.conj().T @ w_r        # (N, 1)
            A = h_eff @ h_eff.conj().T
            A = (A + A.conj().T) / 2           # Force Hermitian
            expr = cp.real(cp.matmul(cp.matmul(w_var.H, A), w_var))[0, 0]
            constraints.append(expr >= t)

        problem = cp.Problem(cp.Maximize(t), constraints)
        problem.solve()
        w = w_var.value[:, 0]


    else:
        raise ValueError("mode must be 'eig1', 'eig2', 'avg', or 'fair'")

    w = w.reshape(-1, 1)
    return w, np.linalg.norm(h_eff)


In [None]:
H = a_jam[..., 0, 0]   
w1,h1 = compute_beamforming_vector(H, mode="eig1")  # unweighted eig
w2,h2= compute_beamforming_vector(H, mode="eig2")  # weighted eig
w3 ,h3= compute_beamforming_vector(H, mode="avg")   # avg sum
# w4 ,_= compute_beamforming_vector(H, mode="fair")   # 

In [None]:
def compute_rx_gains(H, w):
    """
    Compute beamforming gains at each RX.

    Parameters:
        H: shape (N_rx, rx_antennas, 1, tx_antennas)
        w: beamforming vector of shape (tx_antennas, 1)

    Returns:
        gains: numpy array of shape (N_rx,), each entry is a gain (scalar)
    """
    N_rx, rx_antennas, _, tx_antennas = H.shape
    w_r = np.ones((rx_antennas, 1), dtype=complex) / np.sqrt(rx_antennas)
    gains = np.zeros(N_rx)

    for rx in range(N_rx):
        H_rx = H[rx, :, 0, :]                       # (rx_antennas, tx_antennas)
        h_combined = w_r.conj().T @ H_rx @ w        # scalar (1×M) · (M×N) · (N×1)
        gains[rx] = np.abs(h_combined[0, 0])**2     # real-valued scalar

    return 10 * np.log10(gains )

In [None]:
# w0 = np.ones((sat_antennas, 1), dtype=complex) / np.sqrt(sat_antennas)
# gains0 = compute_rx_gains(H, w0)
# gains1 = compute_rx_gains(H, w1)
# gains2 = compute_rx_gains(H, w2)
# gains3 = compute_rx_gains(H, w3)
# print(gains0)
# print(gains1)  # array of shape (N_rx,), each is beamforming gain
# print(gains2)  # array of shape (N_rx,), each is beamforming gain
# print(gains3)  # array of shape (N_rx,), each is beamforming gain

I want to add that if the jammer and the desired source are within the diffraction limits of the receiver aperture, then we are in bigger trouble as you cannot use beamforming to null out the jammer. Then nonlinear subtraction methods may have to be used and that will hit the receiver sensitivity a lot I a sure. But without it there is no hope as both signal and jammer are smack on the beam.

In [None]:
rng = np.random.default_rng(42)
M, n = 4, 3               # 4 satellites, each 3×3 = 9 antennas
N_r = n*n

h0 = (rng.standard_normal((M, N_r)) +
        1j*rng.standard_normal((M, N_r))) * 0.8
h1 = (rng.standard_normal((M, N_r)) +
        1j*rng.standard_normal((M, N_r))) * 0.5

In [None]:
h0.shape