In [8]:
import numpy as np
import math
from fractions import Fraction
from tqdm import tqdm
import multiprocessing as mp
import os
from haversine import haversine, Unit

In [2]:
# Astronomy-related constants

TE = 24 * 3600  # One Earth sidereal day in seconds

RE = 6371e3  # Mean radius of Earth in meters

u = 3.986e14  # Standard gravitational parameter (μ = GM) in m^3/s^2

K = RE / pow(u, 1/3) * pow(2 * np.pi, 2/3)  

eps = 25 * np.pi / 180  # Maximum elevation angle for ground access, in radians (25°)

# Compute satellite orbital period (h in meters)
def satellite_period(h):
    a = RE + h
    T = float(2 * np.pi * pow(a**3 / u, 0.5))
    return T

# Compute coverage angle eta (in radians)
def coverage_eta(T):
    eta = math.acos(K * math.cos(eps) / pow(T, 2/3)) - eps
    return eta

# Approximate an integer ratio of a and b
def approximate_ratio(a, b, precision=1e-3):
    if b == 0:
        raise ValueError("Denominator cannot be zero")
    ratio = Fraction(a, b).limit_denominator(int(1 / precision))
    return ratio.numerator, ratio.denominator
def is_cover(l_sat,p_sat,l_cell,p_cell,eta):
    if (p_sat-2*eta)<-np.pi/2 or (p_sat+2*eta)>np.pi/2 or (l_sat-2*eta)<-np.pi or (l_sat+2*eta)>np.pi: ## 如果纬度范围超界，直接用球面距离判断
        d=haversine((np.degrees(p_sat),np.degrees(l_sat)),(np.degrees(p_cell),np.degrees(l_cell)),unit=Unit.METERS)
        if d <= eta*RE:
            return True
    else:
        if (p_sat-2*eta)<=p_cell<=(p_sat+2*eta) and (l_sat-2*eta)<=l_cell<=(l_sat+2*eta):  # 预筛选
            d=haversine((np.degrees(p_sat),np.degrees(l_sat)),(np.degrees(p_cell),np.degrees(l_cell)),unit=Unit.METERS)
            if d <= eta*RE:
                return True
    return False
def calculate_cell_area():
    """
    Compute the surface area of each ground cell based on latitude.

    Returns:
        list: S_set, area (in m²) of each demand cell.
    """
    S_set = []
    for demand in demand_list:
        # Convert latitude to radians
        p_cell = np.radians(demand['lat_lon'][0])  # latitude in radians

        # Grid height (north-south span) in meters
        h = RE * np.radians(cell_size)

        # Width of top and bottom edges (east-west spans) in meters
        L1 = RE * np.sin(np.pi / 2 - abs(p_cell)) * np.radians(cell_size)
        tmp = abs(p_cell - np.radians(cell_size))
        L2 = RE * np.sin(np.pi / 2 - min(tmp, np.pi / 2)) * np.radians(cell_size)

        # Adjust for cells near the South Pole
        if tmp > np.pi / 2:
            h = RE * abs(p_cell + np.pi / 2)

        # Approximate area using trapezoid formula
        S = 0.5 * h * (L1 + L2)
        S_set.append(S)
    
    return S_set

def cal_supply_all_cell(sat_pos):
    """
    Compute per-cell supply coverage of a satellite at a given position.

    Args:
        sat_pos (list): Satellite position [lat (rad), lon (rad), height (km)]

    Returns:
        list: [supply_list (length = num_cells), sat_pos]
              Each supply_list entry is the normalized per-user supply for a ground cell.
    """
    p_sat, l_sat, h = sat_pos  # latitude, longitude (in radians), height (in km)
    r_set = []  # stores [cell_id, area, density] for covered cells

    for idx, demand in enumerate(demand_list):
        # Convert ground cell coordinates to radians
        p_cell = np.radians(demand['lat_lon'][0])  # latitude
        l_cell = np.radians(demand['lat_lon'][1])  # longitude

        # Compute satellite coverage cone
        T = satellite_period(h * 1e3)
        eta = coverage_eta(T)

        # Check if satellite covers this ground cell
        if is_cover(l_sat, p_sat, l_cell, p_cell, eta):
            S = S_set[idx]
            r_set.append([idx, S, demand['density']])
        else:
            r_set.append([idx, 0, 0])

    # Normalize supply to ensure total allocation matches user_per_sat
    S_sum = np.sum([item[1] for item in r_set])
    supply_list = [item[1] / S_sum * user_per_sat if S_sum > 0 else 0 for item in r_set]

    return [supply_list, sat_pos]


In [3]:
starlink_pos=np.load("data/2025_01_01_starlink.npy",allow_pickle=True)
print(len(starlink_pos))
print(starlink_pos[0])
demand_list=np.load("../../Dataset/Demand/starlink_supply_2025_01_01_99449.npy",allow_pickle=True)
cell_size=4
user_per_sat = 960
S_set=calculate_cell_area() 

6793
[9.24978122e-02 1.43109492e+00 4.53357621e+02]


In [10]:
with mp.Pool(processes=100,) as pool:
    full_cover = list(tqdm(pool.imap(cal_supply_all_cell, starlink_pos), total=len(starlink_pos),mininterval=5))
    pool.close()  # Prevents any more tasks from being submitted to the pool
    pool.join()   # Wait for the worker processes to exit
total_supply=[]
for item in full_cover:
    if len(total_supply)==0:
        total_supply=np.array(item[0])
    else:
        total_supply+=np.array(item[0])
# np.save("total_supply_starlink.npy",total_supply)

100%|██████████| 6793/6793 [00:08<00:00, 761.72it/s]


In [13]:
supply_demand_ratio=[]
for idx,demand in enumerate(demand_list):
    if demand['density']!=0:
        supply_demand_ratio.append(total_supply[idx]/demand['density'])
print(np.mean(supply_demand_ratio))
np.save("data/supply_demand_ratio_official_starlink.npy",supply_demand_ratio)

1522.0362757975377


In [14]:
# south America
demand_list_south=np.load("data/south_america_starlink_demand_99449.npy",allow_pickle=True)
supply_demand_ratio_south=[]
for idx,demand in enumerate(demand_list_south):
    if demand['density']!=0:
        supply_demand_ratio_south.append(total_supply[idx]/demand['density'])
print(np.mean(supply_demand_ratio_south))
np.save("data/supply_demand_ratio_south_starlink.npy",supply_demand_ratio_south)

943.5332722148289


In [12]:
def get_grid_id(lat_deg: float, lon_deg: float) -> int:
    """
    Return the grid ID based on latitude and longitude.

    Grid layout:
        - 11 rows (latitude bands): from +90°N to -90°S
        - 11 columns (longitude bands): from -180° to +180°
        - Total 121 grids, numbered row-wise from top-left to bottom-right

    Args:
        lat_deg (float): Latitude in degrees
        lon_deg (float): Longitude in degrees

    Returns:
        int: Grid ID (0–120), or -1 if coordinates are invalid
    """
    # Normalize longitude to range [-180, 180)
    lon_deg = ((lon_deg + 180) % 360) - 180

    # Determine row index based on latitude
    lat = 90
    row = -1
    for i in range(11):
        band_height = 16 if i < 10 else 20
        if lat_deg <= lat and lat_deg > (lat - band_height) or (i == 10 and lat_deg >= -90):
            row = i
            break
        lat -= band_height

    # Determine column index based on longitude
    lon = -180
    col = -1
    for j in range(11):
        band_width = 32 if j < 10 else 40
        if lon_deg >= lon and lon_deg < (lon + band_width) or (j == 10 and lon_deg <= 180):
            col = j
            break
        lon += band_width

    # Handle exact edge cases (e.g. -90°, 180°)
    if row == -1 and lat_deg == -90:
        row = 10
    if col == -1 and lon_deg == 180:
        col = 10

    # Compute and return grid ID
    if row != -1 and col != -1:
        return row * 11 + col
    else:
        print(f"Warning: Could not determine grid for lat={lat_deg}, lon={lon_deg}")
        return -1

In [15]:
demand_list_backbone=np.load("data/backbone_satellite_demand.npy",allow_pickle=True)
starlink_data=[0]*len(demand_list_backbone)
for sat in starlink_pos:
    lat=np.degrees(sat[0])
    lon=np.degrees(sat[1])
    cid=get_grid_id(lat, lon)
    starlink_data[cid]+=1
supply_demand_ratio_backbone=[]
for idx,demand in enumerate(demand_list_backbone):
    if demand!=0:
        supply_demand_ratio_backbone.append(starlink_data[idx]/demand)
print(np.mean(supply_demand_ratio_backbone))
np.save("data/supply_demand_ratio_backbone_starlink.npy",supply_demand_ratio_backbone)

16.52536589969745
