In [None]:
# ===============================
# Install dependencies (Colab)
# ===============================
!pip install dwave-ocean-sdk pandas numpy matplotlib scikit-learn

# ===============================
# Imports
# ===============================
import pandas as pd
import numpy as np
import time
import math
import matplotlib.pyplot as plt
from functools import partial

import dimod
from dimod import BinaryQuadraticModel, SimulatedAnnealingSampler, ExactSolver
# LeapHybridSampler may require D-Wave credentials and internet
try:
    from dwave.system import LeapHybridSampler
    HAS_LEAP = True
except Exception:
    HAS_LEAP = False

# ===============================
# User config
# ===============================
CDM_PATH = "/content/2024_S1_cdm_ccsds.csv"   # <- replace if needed after upload
N_TEST = 10                     # number of highest-risk conjunctions to test
DV_LEVELS_PER_AXIS = [0.0, 0.01, 0.03, 0.06]  # discretized magnitudes per axis (m/s)
AXES = ["r", "t", "n"]
MAX_DV_ALLOW = 0.3              # maximum allowable total Δv (m/s) for feasibility
PC_REDUCTION_SCALE = 5.0        # parameter controlling how Δv reduces Pc (higher -> faster reduction)

# Weights for QUBO objective (linear combination)
W_PC = 1000.0    # penalty weight for remaining collision probability
W_DV = 50.0      # cost weight for Δv
W_PAIR = 5.0     # interaction penalty to avoid conflicting choices (optional)


In [None]:
# ===============================
# Utilities: collision model and helpers
# ===============================
def collision_probability_after(pc0, dv_total, scale=PC_REDUCTION_SCALE):
    """
    Simple exponential model: Pc_after = pc0 * exp(-scale * dv_total)
    (tunable scale; validated in prototype)
    """
    return float(pc0) * math.exp(-scale * dv_total)

def make_variables_grid(axes=AXES, levels=DV_LEVELS_PER_AXIS):
    """
    Create list of variable names and mapping variable -> dv magnitude.
    We encode each axis with one-hot bins (choose exactly one level per axis).
    Include level 0.0 as 'no impulse'.
    """
    var_names = []
    var_to_dv = {}
    for ax in axes:
        for i, lvl in enumerate(levels):
            name = f"dv_{ax}_{i}"
            var_names.append(name)
            var_to_dv[name] = float(lvl)
    return var_names, var_to_dv

# Build one-hot constraint penalty for each axis (want exactly one selection per axis).
def add_onehot_penalty(bqm, axis_vars, gamma=50.0):
    """
    Add quadratic penalty gamma*(sum(x_i) - 1)^2 to BQM for variables in axis_vars.
    Encourages exactly one level selected per axis (including 0).
    """
    # Expand (sum - 1)^2 = sum_i x_i + 2 sum_{i<j} x_i x_j - 2 sum_i x_i + 1
    # Implement via linear and quadratic offsets (constant ignored).
    for v in axis_vars:
        bqm.add_variable(v, gamma * (1 - 2))  # gamma*(1 -2) = -gamma
    for i in range(len(axis_vars)):
        for j in range(i+1, len(axis_vars)):
            bqm.add_quadratic(axis_vars[i], axis_vars[j], 2 * gamma)
    # constant gamma * 1 is irrelevant


In [None]:
# ===============================
# Load CDM and pre-process
# ===============================
cdm = pd.read_csv(CDM_PATH, dtype=str)  # read as str to be robust, convert needed cols later
# ensure numeric conversion for required columns (coerce where necessary)
for col in ["relative_position_r","relative_position_t","relative_position_n",
            "relative_velocity_r","relative_velocity_t","relative_velocity_n",
            "collision_probability"]:
    if col in cdm.columns:
        cdm[col] = pd.to_numeric(cdm[col], errors="coerce")
    else:
        # If some columns are missing, create NaN
        cdm[col] = np.nan

# compute range and speed
cdm["range_m"] = np.sqrt((cdm[["relative_position_r","relative_position_t","relative_position_n"]]**2).sum(axis=1))
cdm["speed_mps"] = np.sqrt((cdm[["relative_velocity_r","relative_velocity_t","relative_velocity_n"]]**2).sum(axis=1))
cdm["Pc"] = cdm["collision_probability"].fillna(0.0)

# select top N highest Pc to test
cdm_sorted = cdm.sort_values("Pc", ascending=False).head(N_TEST).reset_index(drop=True)
print(f"Selected {len(cdm_sorted)} conjunctions (top by reported Pc).")
cdm_sorted[["conjunction_id","Pc","range_m","speed_mps"]]


In [None]:
# ===============================
# Build QUBO builder for one conjunction
# ===============================
var_names, var_to_dv = make_variables_grid(axes=AXES, levels=DV_LEVELS_PER_AXIS)

# group per-axis variables for one-hot
axis_vars = {}
for ax in AXES:
    axis_vars[ax] = [v for v in var_names if v.startswith(f"dv_{ax}_")]

def build_bqm_for_conjunction(pc0, w_pc=W_PC, w_dv=W_DV, w_pair=W_PAIR):
    """
    Build a BinaryQuadraticModel for a single conjunction with initial Pc = pc0.
    The decision selects exactly one level per axis (including 0-level), leading to a
    combined Δv vector. Objective: minimize Pc_after + w_dv * dv_total.
    We encode Pc_after approximately as w_pc * Pc_after(dv_total).
    """
    bqm = BinaryQuadraticModel({}, {}, 0.0, vartype='BINARY')

    # Add variables and linear cost for dv magnitude
    for v, dv in var_to_dv.items():
        # linear term: w_dv * dv  (propellant cost) + a proxy for risk change if this variable chosen alone
        # We do not know dv_total in linear alone; we'll approximate Pc contribution via linearized term:
        # Approx linearization: approximate marginal reduction ~ w_pc * pc0 * (1 - exp(-scale * dv)) ~ w_pc * pc0 * scale * dv (for small dv)
        linear_pc_proxy = w_pc * pc0 * (1 - math.exp(-PC_REDUCTION_SCALE * dv))
        linear = w_dv * dv + linear_pc_proxy
        bqm.add_variable(v, linear)

    # Pairwise terms: quadratic terms needed to capture combined dv_total in Pc_after
    # We approximate combined effect: Pc_after = pc0 * exp(-scale * sum(dv_i * x_i))
    # Expand the exponential up to second-order for a quadratic approx:
    # exp(-s * S) ≈ 1 - s S + (s^2 S^2)/2 -> gives quadratic cross-terms proportional to dv_i*dv_j
    s = PC_REDUCTION_SCALE
    for i, vi in enumerate(var_names):
        for j in range(i+1, len(var_names)):
            vj = var_names[j]
            dv_i = var_to_dv[vi]
            dv_j = var_to_dv[vj]
            # coefficient from second-order term: w_pc * pc0 * (s^2 / 2) * dv_i * dv_j
            coef = w_pc * pc0 * ( (s**2) / 2.0 ) * (dv_i * dv_j)
            # add quadratic positive coef (since second-order increases objective reduction, but we are minimizing
            # so include it with negative sign to reward combined dv)
            bqm.add_quadratic(vi, vj, -coef)  # negative -> choosing both reduces the objective
    # One-hot penalties: enforce one level per axis
    # Use gamma roughly comparable to scale of linear terms
    gamma = max(10.0, w_dv * max(var_to_dv.values()) * 10.0)
    for ax, vlist in axis_vars.items():
        # Add penalty gamma*(sum - 1)^2 -> implemented via linear & quadratic contributions
        # Use method defined earlier
        for v in vlist:
            bqm.add_variable(v, bqm.linear.get(v,0.0) + gamma * (1 - 2))  # -gamma added
        for i in range(len(vlist)):
            for j in range(i+1, len(vlist)):
                bqm.add_quadratic(vlist[i], vlist[j], bqm.quadratic.get((vlist[i], vlist[j]),0.0) + 2 * gamma)

    return bqm


In [None]:
# ===============================
# Solvers to test
# ===============================
def solve_exact(bqm):
    solver = ExactSolver()
    t0 = time.perf_counter()
    sampleset = solver.sample(bqm)
    t1 = time.perf_counter()
    return sampleset.first, t1 - t0

def solve_simulated_annealing(bqm, num_reads=200):
    sampler = SimulatedAnnealingSampler()
    t0 = time.perf_counter()
    sampleset = sampler.sample(bqm, num_reads=num_reads)
    t1 = time.perf_counter()
    return sampleset.first, t1 - t0

def solve_leap_hybrid(bqm):
    if not HAS_LEAP:
        return None, None
    sampler = LeapHybridSampler()
    t0 = time.perf_counter()
    sampleset = sampler.sample(bqm)
    t1 = time.perf_counter()
    return sampleset.first, t1 - t0

def solve_greedy(bqm):
    # simple greedy baseline: pick for each axis the level with lowest linear bias (ignoring quadratic interactions)
    # get axis candidates
    chosen = {}
    for ax, vlist in axis_vars.items():
        best_v, best_score = None, float('inf')
        for v in vlist:
            score = bqm.linear.get(v, 0.0)
            if score < best_score:
                best_score = score
                best_v = v
        chosen[best_v] = 1
    # build a sample dict
    sample = {v: 0 for v in var_names}
    for v in chosen:
        sample[v] = 1
    # estimate energy
    energy = bqm.energy(sample)
    class FakeResult:
        def __init__(self, sample, energy):
            self.sample = sample
            self.energy = energy
    t = 0.0
    return FakeResult(sample, energy), t


In [None]:
# ===============================
# Evaluate solvers on selected conjunctions
# ===============================
results = []

solvers = {
    "Exact": solve_exact,
    "SimAnneal": solve_simulated_annealing,
    "LeapHybrid": solve_leap_hybrid,  # may be None if no credentials
    "Greedy": solve_greedy
}

for idx, row in cdm_sorted.iterrows():
    conj_id = row.get("conjunction_id", f"row{idx}")
    pc0 = float(row.get("Pc", 0.0))
    # Build BQM
    bqm = build_bqm_for_conjunction(pc0)
    # solve with available solvers
    solver_outcomes = {}
    # exact first (if problem size small enough)
    for name, fn in solvers.items():
        if name == "LeapHybrid" and not HAS_LEAP:
            solver_outcomes[name] = {"available": False}
            continue
        try:
            sol, runtime = fn(bqm)
            if sol is None:
                solver_outcomes[name] = {"available": False}
                continue
        except Exception as e:
            solver_outcomes[name] = {"available": False, "error": str(e)}
            continue

        sample = sol.sample
        # sample may be a BinaryMappingType or dict; ensure dict-like
        if hasattr(sample, "items"):
            sample_dict = dict(sample)
        else:
            # some API return attribute sample
            sample_dict = dict(sample)

        # compute chosen dv_total
        dv_total = sum(var_to_dv[v] * int(sample_dict.get(v,0)) for v in var_names)
        # compute Pc after using model
        pc_after = collision_probability_after(pc0, dv_total)
        # feasibility
        feasible = dv_total <= MAX_DV_ALLOW
        # energy / objective reported by solver
        energy = float(sol.energy) if hasattr(sol, "energy") else bqm.energy(sample_dict)
        # store
        solver_outcomes[name] = {
            "available": True,
            "sample": sample_dict,
            "dv_total": dv_total,
            "pc_after": pc_after,
            "runtime_s": runtime,
            "energy": energy,
            "feasible": feasible
        }

    # compute optimal from Exact if available to measure optimality gap
    exact = solver_outcomes.get("Exact", {})
    if exact.get("available"):
        exact_energy = exact["energy"]
    else:
        exact_energy = None

    # collect a line per conjunction
    results.append({
        "conjunction_id": conj_id,
        "Pc0": pc0,
        "exact_energy": exact_energy,
        "solver_outcomes": solver_outcomes
    })

# Summarize table for easy viewing
rows_summary = []
for r in results:
    cid = r["conjunction_id"]
    pc0 = r["Pc0"]
    for solver_name, out in r["solver_outcomes"].items():
        if not out.get("available"):
            rows_summary.append({
                "conjunction_id": cid,
                "solver": solver_name,
                "available": False
            })
        else:
            rows_summary.append({
                "conjunction_id": cid,
                "solver": solver_name,
                "available": True,
                "Pc0": pc0,
                "Pc_after": out["pc_after"],
                "dv_total": out["dv_total"],
                "energy": out["energy"],
                "runtime_s": out["runtime_s"],
                "feasible": out["feasible"]
            })

summary_df = pd.DataFrame(rows_summary)
summary_df


In [None]:
# ===============================
# Analysis & Metrics
# ===============================
# Compute improvement metrics per solver: absolute and relative Pc reduction, feasibility rate, avg dv, avg runtime
metrics = []
for solver_name in summary_df["solver"].unique():
    sub = summary_df[ (summary_df["solver"] == solver_name) & (summary_df["available"]==True) ]
    if len(sub)==0:
        metrics.append({
            "solver": solver_name,
            "tested": 0,
            "feasible_rate": None,
            "mean_dv": None,
            "mean_runtime_s": None,
            "mean_pc_reduction": None,
            "mean_pc_rel_reduction": None
        })
        continue
    mean_dv = sub["dv_total"].mean()
    mean_runtime = sub["runtime_s"].mean()
    # absolute reduction
    abs_red = (sub["Pc0"] - sub["Pc_after"]).mean()
    rel_red = ((sub["Pc0"] - sub["Pc_after"]) / (sub["Pc0"].replace(0, np.nan))).replace([np.inf, -np.inf], np.nan).mean()
    feasible_rate = sub["feasible"].mean()
    metrics.append({
        "solver": solver_name,
        "tested": len(sub),
        "feasible_rate": float(feasible_rate),
        "mean_dv": float(mean_dv),
        "mean_runtime_s": float(mean_runtime),
        "mean_pc_reduction": float(abs_red),
        "mean_pc_rel_reduction": float(rel_red if not np.isnan(rel_red) else 0.0)
    })
metrics_df = pd.DataFrame(metrics).sort_values("solver")
metrics_df


In [None]:
# ===============================
# Plots: Pc before vs after (example for SimAnneal)
# ===============================
def plot_solver_comparison(solverA, solverB):
    A = summary_df[ (summary_df["solver"]==solverA) & (summary_df["available"]==True) ].set_index("conjunction_id")
    B = summary_df[ (summary_df["solver"]==solverB) & (summary_df["available"]==True) ].set_index("conjunction_id")
    # inner join on conj id
    common = A.join(B, lsuffix=f"_{solverA}", rsuffix=f"_{solverB}", how="inner")
    if common.empty:
        print("No common conjunctions solved by both solvers.")
        return
    plt.figure(figsize=(6,6))
    plt.scatter(common[f"Pc_after_{solverA}"], common[f"Pc_after_{solverB}"])
    plt.plot([0, common[[f"Pc_after_{solverA}", f"Pc_after_{solverB}"]].max().max()],
             [0, common[[f"Pc_after_{solverA}", f"Pc_after_{solverB}"]].max().max()], '--', alpha=0.5)
    plt.xlabel(f"Pc after ({solverA})")
    plt.ylabel(f"Pc after ({solverB})")
    plt.title(f"Pc after: {solverA} vs {solverB}")
    plt.grid(True)
    plt.show()

# Example compare SimAnneal vs Greedy
plot_solver_comparison("SimAnneal", "Greedy")


In [None]:
summary_df.to_csv("/content/cdm_maneuver_solver_summary.csv", index=False)
metrics_df.to_csv("/content/cdm_maneuver_solver_metrics.csv", index=False)
print("Saved summary and metrics to /content/")
