In [0]:
%python
%pip install sgp4

In [0]:
# Read the data from the Delta table
df = spark.read.table("workspace.default.objects_in_LEO_0904").toPandas()

In [0]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sgp4.api import Satrec, WGS72

# --- Position helper ---
def get_position(tle1, tle2, when="2025-09-10T00:00:00"):
    sat = Satrec.twoline2rv(tle1, tle2, WGS72)
    ts = datetime.fromisoformat(when)
    e, r, v = sat.sgp4(ts.year, ts.timetuple().tm_yday + ts.hour/24 + ts.minute/(24*60) + ts.second/(24*3600))
    if e != 0:
        return None
    return np.array(r)  # (x,y,z) in km

def distance_km(p1, p2):
    return np.linalg.norm(p1 - p2)

# --- Miss distance sweep ---
def miss_distance_sweep(tle1_a, tle2_a, tle1_b, tle2_b,
                        center_dt="2025-09-10T00:00:00", hours=24, step_min=10):
    center = datetime.fromisoformat(center_dt)
    t = center - timedelta(hours=hours)
    t_end = center + timedelta(hours=hours)

    d_min, t_min = float("inf"), None
    while t <= t_end:
        ts = t.strftime("%Y-%m-%dT%H:%M:%S")
        pa = get_position(tle1_a, tle2_a, ts)
        pb = get_position(tle1_b, tle2_b, ts)
        if pa is None or pb is None:
            t += timedelta(minutes=step_min)
            continue
        d = distance_km(pa, pb)
        if d < d_min:
            d_min, t_min = d, t
        t += timedelta(minutes=step_min)
    return d_min, t_min

# --- Build nearest neighbor pairs ---
def build_candidate_pairs(df, top_n=50):
    # quick proxy distance metric: altitude + inclination differences
    df["prox_score"] = (df["ALTITUDE_KM"].rank() + df["INCLINATION_p"].rank())
    # take top_n nearest "neighbors" by sorting
    top_df = df.nsmallest(top_n, "prox_score")
    pairs = []
    for i, row_a in top_df.iterrows():
        for j, row_b in top_df.iterrows():
            if i >= j:  # avoid duplicate/self
                continue
            pairs.append((row_a["OBJECT_ID"], row_b["OBJECT_ID"],
                          row_a["TLE_LINE1"], row_a["TLE_LINE2"],
                          row_b["TLE_LINE1"], row_b["TLE_LINE2"]))
    return pairs

# --- Run miss distance on candidate pairs ---
pairs = build_candidate_pairs(df, top_n=30)   # adjust N
results = []

for obj_a, obj_b, tle1_a, tle2_a, tle1_b, tle2_b in pairs:
    d_min, t_min = miss_distance_sweep(tle1_a, tle2_a, tle1_b, tle2_b,
                                       center_dt="2025-09-10T00:00:00",
                                       hours=12, step_min=15)
    results.append((obj_a, obj_b, d_min, t_min))

conjunctions_df = pd.DataFrame(results, columns=["Object_A", "Object_B", "Miss_Distance_km", "TCA"])

# Display the DataFrame as a table
display(conjunctions_df)

In [0]:
def get_position(tle1, tle2, when="2025-09-10T00:00:00"):
    sat = Satrec.twoline2rv(tle1, tle2, WGS72)
    ts = datetime.fromisoformat(when)
    jd = ts.timetuple().tm_yday + ts.hour/24 + ts.minute/(24*60) + ts.second/(24*3600)
    e, r, v = sat.sgp4(ts.year, jd)
    if e != 0:   # SGP4 error
        return None
    return np.array(r)  # (x,y,z) in km

def miss_distance_sweep(tle1_a, tle2_a, tle1_b, tle2_b,
                        center_dt="2025-09-10T00:00:00", hours=24, step_min=10):
    center = datetime.fromisoformat(center_dt)
    t = center - timedelta(hours=hours)
    t_end = center + timedelta(hours=hours)

    d_min, t_min = float("inf"), None
    while t <= t_end:
        ts = t.strftime("%Y-%m-%dT%H:%M:%S")
        pa = get_position(tle1_a, tle2_a, ts)
        pb = get_position(tle1_b, tle2_b, ts)
        if pa is None or pb is None:   # skip bad propagation
            t += timedelta(minutes=step_min)
            continue
        d = distance_km(pa, pb)
        if d < d_min:
            d_min, t_min = d, t
        t += timedelta(minutes=step_min)

    if d_min == float("inf"):  # never found valid distance
        return None, None
    return d_min, t_min


In [0]:
results = []
for obj_a, obj_b, tle1_a, tle2_a, tle1_b, tle2_b in pairs:
    d_min, t_min = miss_distance_sweep(tle1_a, tle2_a, tle1_b, tle2_b,
                                       center_dt="2025-09-10T00:00:00",
                                       hours=12, step_min=15)
    if d_min is not None:   # keep only valid cases
        results.append((obj_a, obj_b, d_min, t_min))

conjunctions_df = pd.DataFrame(results, columns=["Object_A", "Object_B", "Miss_Distance_km", "TCA"])


In [0]:
display(conjunctions_df)

In [0]:
# Filter the DataFrame for Object_A = '2022-064F'
filtered_df = conjunctions_df[conjunctions_df["Object_A"] == "2022-064F"]

# Display the filtered DataFrame
display(filtered_df)

In [0]:
# Read the data from the Delta table
leo_df = spark.read.table("workspace.default.objects_in_LEO_0904").toPandas()

# Join filtered_df with leo_df to get additional information for Object_A
joined_df = conjunctions_df.merge(leo_df[["OBJECT_ID", "OBJECT_TYPE"]], 
                              left_on="Object_A", 
                              right_on="OBJECT_ID", 
                              how="left").rename(columns={"OBJECT_TYPE": "Object_Type_A"}).drop(columns=["OBJECT_ID"])

# Join again to get additional information for Object_B
joined_df = joined_df.merge(leo_df[["OBJECT_ID", "OBJECT_TYPE"]], 
                            left_on="Object_B", 
                            right_on="OBJECT_ID", 
                            how="left").rename(columns={"OBJECT_TYPE": "Object_Type_B"}).drop(columns=["OBJECT_ID"])

# Display the joined DataFrame
display(joined_df)

In [0]:
# Define a threshold for high-risk conjunctions (e.g., < 10 km)
HIGH_RISK_THRESHOLD_KM = 10

# Filter for high-risk conjunctions
high_risk_df = joined_df[joined_df["Miss_Distance_km"] < HIGH_RISK_THRESHOLD_KM]

# Display the high-risk conjunctions
display(high_risk_df)

In [0]:
#Collision probability
import numpy as np
import pandas as pd
import scipy.linalg as la
from typing import Optional, Tuple

# ----------------------------
# Helper: Monte-Carlo Pc estimator
# ----------------------------
def estimate_pc_monte_carlo(
    rel_pos_km: np.ndarray,        # mean relative position vector r = r1 - r2 (shape (3,))
    cov_rel_km2: np.ndarray,       # combined covariance matrix (3x3) in km^2
    hbr_km: float,                 # combined hard-body radius in km
    n_samples: int = 200_000,
    rng_seed: Optional[int] = 0
) -> Tuple[float, float]:
    """
    Monte Carlo estimate of collision probability.
    Returns (p_est, stderr).
    """
    rng = np.random.default_rng(rng_seed)

    # numerical jitter for near-singular covariance
    jitter = 1e-12 * np.trace(cov_rel_km2) if np.isfinite(np.trace(cov_rel_km2)) else 1e-12
    cov_use = cov_rel_km2 + np.eye(3) * jitter

    # try cholesky, fallback to eigh
    try:
        L = la.cholesky(cov_use, lower=True)
    except la.LinAlgError:
        evals, evecs = la.eigh(cov_use)
        evals_clipped = np.clip(evals, 0, None)
        L = (evecs * np.sqrt(evals_clipped)) @ evecs.T

    # draw standard normals, transform
    z = rng.standard_normal(size=(n_samples, 3))
    samples = rel_pos_km.reshape(1, 3) + (z @ L.T)  # shape (N,3)
    d = np.linalg.norm(samples, axis=1)
    hits = (d <= hbr_km)
    p = hits.mean()
    stderr = np.sqrt(p * (1 - p) / n_samples) if n_samples > 0 else 0.0
    return float(p), float(stderr)

# ----------------------------
# Defaults (tweak as needed)
# ----------------------------
# Default positional sigma per object (ECI axes) in km [sigma_x, sigma_y, sigma_z]
# We treat these as approximations in ECI. If you have RSW covs, transform them externally.
DEFAULT_SIGMA_OBJ_KM = np.array([0.05, 0.50, 0.05])  # radial, along-track, cross-track approximations
DEFAULT_RADIUS_M = 1.0  # default geometric radius in meters for each object
DEFAULT_SAMPLES = 200_000

# ----------------------------
# Utility to build combined covariance matrix (diagonal) if no covariance provided
# ----------------------------
def build_combined_covariance(sigma_obj1_km, sigma_obj2_km):
    """
    sigma_obj* : array-like length 3 (sigma_x, sigma_y, sigma_z) in km
    returns combined covariance C = diag(sigma1^2 + sigma2^2)
    """
    s1 = np.asarray(sigma_obj1_km).reshape(3)
    s2 = np.asarray(sigma_obj2_km).reshape(3)
    C = np.diag(s1**2 + s2**2)
    return C

# ----------------------------
# Main batch function
# ----------------------------
def compute_collision_probabilities(
    conjunctions_df: pd.DataFrame,
    get_relpos_func=None,
    # get_relpos_func(row) -> np.array([rx,ry,rz]) relative vector in km at TCA
    # if None, will try to use column 'r_rel_km' in df
    sigma_defaults_km: np.ndarray = DEFAULT_SIGMA_OBJ_KM,
    radius_default_m: float = DEFAULT_RADIUS_M,
    n_samples: int = DEFAULT_SAMPLES,
    random_seed: int = 0
) -> pd.DataFrame:
    """
    For each row in conjunctions_df, estimate Pc. Returns an expanded DataFrame with columns:
      rel_x, rel_y, rel_z, cov_trace, HBR_km, Pc, Pc_stderr.
    Required columns (if get_relpos_func is None): 'r_rel_km' (iterable of 3), 'Object_A', 'Object_B', 'TCA'
    """
    rows = []
    for idx, row in conjunctions_df.iterrows():
        # obtain relative vector
        if get_relpos_func is not None:
            rel = get_relpos_func(row)
        else:
            if "r_rel_km" in row and pd.notna(row["r_rel_km"]):
                # assume stored as list/tuple or string like "[x, y, z]"
                val = row["r_rel_km"]
                if isinstance(val, str):
                    import ast
                    rel = np.asarray(ast.literal_eval(val), dtype=float)
                else:
                    rel = np.asarray(val, dtype=float)
            else:
                # fallback: use miss_distance direction-agnostic — place along x axis
                if "Miss_Distance_km" in row and not pd.isna(row["Miss_Distance_km"]):
                    d = float(row["Miss_Distance_km"])
                else:
                    d = float(row.get("miss_distance", np.nan) or np.nan)
                if np.isnan(d):
                    # cannot proceed: skip
                    rows.append({**row.to_dict(), "Pc": np.nan, "Pc_stderr": np.nan, "note": "no_relpos_no_missdist"})
                    continue
                rel = np.array([d, 0.0, 0.0])

        # build covariance (diagonal) for both objects (use defaults or per-row if provided)
        sigma_a = row.get("sigma_objA_km", None) or sigma_defaults_km
        sigma_b = row.get("sigma_objB_km", None) or sigma_defaults_km
        sigma_a = np.asarray(sigma_a, dtype=float).reshape(3)
        sigma_b = np.asarray(sigma_b, dtype=float).reshape(3)
        C = build_combined_covariance(sigma_a, sigma_b)

        # HBR: per-row radii override or default
        rA_m = row.get("radius_A_m", None) or radius_default_m
        rB_m = row.get("radius_B_m", None) or radius_default_m
        hbr_km = (float(rA_m) + float(rB_m)) / 1000.0

        # Estimate Pc
        try:
            p_est, se = estimate_pc_monte_carlo(rel, C, hbr_km, n_samples=n_samples, rng_seed=random_seed)
        except Exception as e:
            p_est, se = np.nan, np.nan

        res = dict(row)
        res.update({
            "rel_x_km": float(rel[0]), "rel_y_km": float(rel[1]), "rel_z_km": float(rel[2]),
            "cov_trace_km2": float(np.trace(C)),
            "HBR_km": hbr_km,
            "Pc": p_est,
            "Pc_stderr": se
        })
        rows.append(res)

    out_df = pd.DataFrame(rows)
    return out_df

# ----------------------------
# Example usage:
# ----------------------------
# Suppose you have 'risky_df' with columns ['Object_A','Object_B','Miss_Distance_km','TCA'].
# If you also computed the relative ECI vector at TCA for each pair, put it in column 'r_rel_km' (list or string).
#
# results_df = compute_collision_probabilities(risky_df, get_relpos_func=None,
#                                              sigma_defaults_km=np.array([0.05,0.5,0.05]),
#                                              radius_default_m=1.0,
#                                              n_samples=200000, random_seed=42)
#
# results_df.to_csv("collision_probabilities_estimates.csv", index=False)


In [0]:
results_df = compute_collision_probabilities(joined_df, get_relpos_func=None,
                                             sigma_defaults_km=np.array([0.05,0.5,0.05]),
                                             radius_default_m=1.0,
                                             n_samples=200000, random_seed=42)
                                             
        

In [0]:
display(results_df)

In [0]:
%pip install skyfield

In [0]:
#another approach
from skyfield.api import EarthSatellite, load
import pandas as pd
import numpy as np

ts = load.timescale()

# Get x,y,z for a TLE at a given date
def get_xyz(tle1, tle2, target_date):
    sat = EarthSatellite(tle1, tle2)
    year, month, day = map(int, target_date.split("-"))
    t = ts.utc(year, month, day)
    geocentric = sat.at(t)
    pos = geocentric.position.km
    return pos[0], pos[1], pos[2]

# Distance function
def distance_km(p1, p2):
    return np.linalg.norm(np.array(p1) - np.array(p2))

# Simplified collision probability
def collision_probability(d, sigma=0.1, area=0.0013):
    if d == np.inf:
        return 0.0
    return (area / (2 * np.pi * sigma**2)) * np.exp(-d**2 / (2 * sigma**2))

# Main function
def compute_conjunctions(df, target_date="2025-09-10", top_n=10, risk_threshold_km=10):
    results = []
    positions = {}

    # Precompute positions
    for idx, row in df.iterrows():
        positions[row.OBJECT_ID] = get_xyz(row.TLE_LINE1, row.TLE_LINE2, target_date)

    object_ids = list(positions.keys())

    # Pairwise distances
    for i in range(len(object_ids)):
        for j in range(i+1, len(object_ids)):
            obj1, obj2 = object_ids[i], object_ids[j]
            pos1, pos2 = positions[obj1], positions[obj2]
            d = distance_km(pos1, pos2)
            Pc = collision_probability(d)
            results.append({
                "Object_1": obj1,
                "Object_2": obj2,
                "x1": pos1[0], "y1": pos1[1], "z1": pos1[2],
                "x2": pos2[0], "y2": pos2[1], "z2": pos2[2],
                "Miss_Distance_km": d,
                "Collision_Probability": Pc,
                "Risk_Flag": d < risk_threshold_km
            })

    result_df = pd.DataFrame(results).sort_values("Miss_Distance_km").head(top_n)
    display(result_df)
    return result_df


In [0]:
result_df = compute_conjunctions(df, target_date="2025-09-10", top_n=10)
display(result_df)

In [0]:
display(result_df[result_df["Risk_Flag"] == True])

In [0]:
# Read the data from the Delta table
leo_df = spark.read.table("workspace.default.objects_in_LEO").toPandas()

# Join result_df with leo_df to get additional information for Object_1
result_df = result_df.merge(leo_df[["OBJECT_ID", "OBJECT_TYPE"]], 
                            left_on="Object_1", 
                            right_on="OBJECT_ID", 
                            how="left").rename(columns={"OBJECT_TYPE": "Object_Type_1"}).drop(columns=["OBJECT_ID"])

# Join again to get additional information for Object_2
result_df = result_df.merge(leo_df[["OBJECT_ID", "OBJECT_TYPE"]], 
                            left_on="Object_2", 
                            right_on="OBJECT_ID", 
                            how="left").rename(columns={"OBJECT_TYPE": "Object_Type_2"}).drop(columns=["OBJECT_ID"])

# Display the updated DataFrame
display(result_df)