# DITL with ADCS physics

This notebook simulates multiple slews driven by the ACS and plots reaction-wheel stored momentum over time.


In [None]:
# ruff: noqa: E402
import math
from datetime import datetime, timedelta

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from rust_ephem import TLEEphemeris
from tqdm import tqdm

from conops import MissionConfig, Queue, QueueDITL
from conops.common import ACSMode
from conops.config.visualization import VisualizationConfig
from conops.visualization import plot_ditl_momentum, plot_ditl_telemetry

# Shared ephemeris and targets
begin = datetime(2025, 1, 1)
end = begin + timedelta(hours=12)
ephem = TLEEphemeris(tle="./example.tle", begin=begin, end=end, step_size=10)


number_of_targets = 1000
np.random.seed(41)
target_ra = np.random.uniform(0, 360, number_of_targets)
target_dec = np.random.uniform(-90, 90, number_of_targets)
targids = list(range(10000, 10000 + number_of_targets))


def make_base_config(mtq_dipole: float) -> MissionConfig:
    cfg_local = MissionConfig.from_json_file("./example_config.json")
    acs = cfg_local.spacecraft_bus.attitude_control
    acs.spacecraft_moi = (45.0, 45.0, 45.0)
    acs.wheel_enabled = False
    acs.magnetorquers = [
        {
            "name": "mtq_x",
            "orientation": (1, 0, 0),
            "dipole_strength": mtq_dipole,
            "power_draw": 5.0,
        },
        {
            "name": "mtq_y",
            "orientation": (0, 1, 0),
            "dipole_strength": mtq_dipole,
            "power_draw": 5.0,
        },
        {
            "name": "mtq_z",
            "orientation": (0, 0, 1),
            "dipole_strength": mtq_dipole,
            "power_draw": 5.0,
        },
    ]
    acs.magnetorquer_bfield_T = 3e-5
    acs.cp_offset_body = (0.25, 0.0, 0.0)
    acs.residual_magnetic_moment = (0.05, 0, 0)
    acs.drag_area_m2 = 1.5
    acs.drag_coeff = 2.2
    acs.solar_area_m2 = 1.3
    acs.solar_reflectivity = 1.3
    acs.use_msis_density = True

    ###
    acs.slew_acceleration = 0.01
    acs.max_slew_rate = 0.3

    cfg_local.constraint.ephem = ephem

    if cfg_local.ground_stations is not None:
        for station in cfg_local.ground_stations.stations:
            station.schedule_probability = 1.0

    return cfg_local


def build_queue(cfg_local: MissionConfig) -> Queue:
    q = Queue(ephem=ephem, config=cfg_local)
    q.slew_distance_weight = 0.0
    for i in tqdm(range(number_of_targets), leave=False):
        q.add(
            merit=40,
            ra=target_ra[i],
            dec=target_dec[i],
            obsid=targids[i],
            name=f"pointing_{targids[i]}",
            exptime=1000,
            ss_min=300,
        )
    return q


def run_case(
    wheel_name: str, max_mom: float, max_torque: float, mtq_dipole: float
) -> tuple[QueueDITL, dict]:
    cfg_case = make_base_config(mtq_dipole)
    acs_case = cfg_case.spacecraft_bus.attitude_control

    s = math.sqrt(2 / 3)
    c = math.sqrt(1 / 3)
    acs_case.wheels = [
        {
            "name": "rw1",
            "orientation": (+s, 0.0, +c),
            "max_torque": max_torque,
            "max_momentum": max_mom,
        },
        {
            "name": "rw2",
            "orientation": (-s, 0.0, +c),
            "max_torque": max_torque,
            "max_momentum": max_mom,
        },
        {
            "name": "rw3",
            "orientation": (0.0, +s, -c),
            "max_torque": max_torque,
            "max_momentum": max_mom,
        },
        {
            "name": "rw4",
            "orientation": (0.0, -s, -c),
            "max_torque": max_torque,
            "max_momentum": max_mom,
        },
    ]

    queue_case = build_queue(cfg_case)
    # Validation warnings are printed automatically when QueueDITL creates the ACS
    ditl_case = QueueDITL(
        config=cfg_case, ephem=ephem, begin=begin, end=end, queue=queue_case
    )
    ditl_case.acs._wheel_mom_margin = 1.0  # disable margin for what-if runs
    ditl_case.step_size = 10
    ditl_case.calc()

    mode_arr = np.array(ditl_case.mode)
    science_frac = float(np.mean(mode_arr == ACSMode.SCIENCE) * 100)
    slew_frac = float(np.mean(mode_arr == ACSMode.SLEWING) * 100)
    desat_events = getattr(ditl_case.acs, "desat_events", 0)
    wm = np.array(getattr(ditl_case, "wheel_momentum_fraction", []))
    wm_raw = np.array(getattr(ditl_case, "wheel_momentum_fraction_raw", []))
    wt = np.array(getattr(ditl_case, "wheel_torque_fraction", []))
    per_wheel = getattr(ditl_case, "wheel_per_wheel_max_raw", {}) or {
        getattr(w, "name", f"rw{i}"): abs(w.current_momentum) / w.max_momentum
        for i, w in enumerate(ditl_case.acs.reaction_wheels)
    }
    raw_peak = max(per_wheel.values()) if per_wheel else 0.0

    summary = {
        "name": wheel_name,
        "mtq_dipole": mtq_dipole,
        "max_mom": max_mom,
        "max_torque": max_torque,
        "science_%": science_frac,
        "slew_%": slew_frac,
        "desat_events": desat_events,
        "max_mom_frac": float(wm.max()) if wm.size else 0.0,
        "max_mom_frac_raw": float(wm_raw.max()) if wm_raw.size else 0.0,
        "p95_mom_frac": float(np.quantile(wm, 0.95)) if wm.size else 0.0,
        "p95_mom_frac_raw": float(np.quantile(wm_raw, 0.95)) if wm_raw.size else 0.0,
        "raw_peak_per_wheel": raw_peak,
        "per_wheel_peaks": per_wheel,
        "max_torque_frac": float(wt.max()) if wt.size else 0.0,
    }
    return ditl_case, summary

In [None]:
# Single MTQ + wheel configuration
single_wheel = ("1.0Nms_0.06Nm", 1.0, 0.06)
single_mtq = 32  # AÂ·m^2

ditl_case, summary_single = run_case(*single_wheel, single_mtq)
print(summary_single)

fig, axes = plot_ditl_telemetry(
    ditl_case,
    figsize=(12, 8),
    config=VisualizationConfig(font_family="DejaVu Sans"),
)

fig, axes = plot_ditl_momentum(
    ditl_case,
    figsize=(12, 12),
    config=VisualizationConfig(font_family="DejaVu Sans"),
)

fig.suptitle(f"Case: {summary_single['name']} mtq={single_mtq}")
plt.show()

In [None]:
wheel_grid = [
    ("1.0Nms_0.06Nm", 1.0, 0.06),
    ("2.0Nms_0.12Nm", 2.0, 0.12),
    ("1.2Nms_0.25Nm", 1.2, 0.25),
    ("4.0Nms_0.25Nm", 4.0, 0.25),
    ("2.5Nms_0.50Nm", 2.5, 0.50),
    ("8.0Nms_0.25Nm", 8.0, 0.25),
    ("5.0Nms_0.50Nm", 5.0, 0.50),
]
mtq_grid = [32, 50, 100]

results = []
ditls = {}
for mtq in mtq_grid:
    for name, mom, torq in wheel_grid:
        key = f"{name}_mtq{mtq}"
        ditl_case, summary = run_case(name, mom, torq, mtq)
        ditls[key] = ditl_case
        summary["key"] = key
        results.append(summary)

df = pd.DataFrame(results).set_index("key")
df_sorted = df.sort_values(
    by=["science_%", "slew_%", "desat_events", "max_mom_frac", "max_torque_frac"],
    ascending=[False, True, True, True, True],
)
display(df_sorted.head())

# Plot the top-ranked case
case_to_plot = df_sorted.index[0]
fig, axes = plot_ditl_telemetry(
    ditls[case_to_plot],
    figsize=(12, 8),
    config=VisualizationConfig(font_family="DejaVu Sans"),
)
fig.suptitle(f"Case: {case_to_plot}")
plt.show()

In [None]:
df_sorted

In [None]:
from conops.common import ACSCommandType

# Timebase
t = np.array(ditl_case.utime)
dt = np.diff(t, prepend=t[0])
hrs = dt / 3600

m = np.array(ditl_case.mode)
time_by_mode = {
    "SCIENCE": np.sum(hrs[m == ACSMode.SCIENCE]),
    "SLEWING": np.sum(hrs[m == ACSMode.SLEWING]),
    "PASS": np.sum(hrs[m == ACSMode.PASS]),
    "SAFE": np.sum(hrs[m == ACSMode.SAFE]),
    "CHARGING": np.sum(hrs[m == ACSMode.CHARGING]),
    "SAA": np.sum(hrs[m == ACSMode.SAA]),
}
print("Hours by mode:", time_by_mode)

# Desat
desat_steps = getattr(ditl_case, "desat_time_steps", 0)
print(f"Desat hours: {desat_steps * ditl_case.step_size / 3600:.2f}")

# Wheel margins
wm = np.array(getattr(ditl_case, "wheel_momentum_fraction", []))
wt = np.array(getattr(ditl_case, "wheel_torque_fraction", []))
if wm.size:
    print(f"Wheel momentum: max={wm.max():.2f}, p95={np.quantile(wm, 0.95):.2f}")
if wt.size:
    print(f"Wheel torque: max={wt.max():.2f}, p95={np.quantile(wt, 0.95):.2f}")

# Slew outcomes
slews = [
    c
    for c in ditl_case.acs.executed_commands
    if c.command_type == ACSCommandType.SLEW_TO_TARGET and c.slew
]
dists = [c.slew.slewdist for c in slews]
rejected = getattr(ditl_case.acs, "headroom_rejects", 0)
print(
    f"Slews: {len(slews)} executed, {rejected} rejected; dist mean={np.mean(dists):.1f} deg"
)

# Simple histograms (numpy bins)
hist_mom, bins_m = np.histogram(wm, bins=np.linspace(0, 1, 11)) if wm.size else ([], [])

In [None]:
from datetime import datetime, timezone

import numpy as np

# enable MSIS
ditl_case.config.spacecraft_bus.attitude_control.use_msis_density = False

# after your DITL calc:
acs = ditl_case.acs
ephem_local = ditl_case.ephem

densities = []
alts_km = []
for ut in ditl_case.utime:
    idx = ephem_local.index(datetime.fromtimestamp(ut, tz=timezone.utc))
    r_vec = np.array(ephem_local.gcrs_pv.position[idx])
    # normalize to meters if in km
    if np.linalg.norm(r_vec) < 1e6:
        r_vec = r_vec * 1000.0
    alt_m = max(0.0, np.linalg.norm(r_vec) - 6378e3)
    lat = (
        float(getattr(ephem_local, "lat", [0])[idx])
        if hasattr(ephem_local, "lat")
        else 0.0
    )
    lon = (
        float(getattr(ephem_local, "long", [0])[idx])
        if hasattr(ephem_local, "long")
        else 0.0
    )
    rho = acs._atmospheric_density(ut, alt_m, lat, lon)
    densities.append(rho)
    alts_km.append(alt_m / 1000.0)

# Plot density vs time
import matplotlib.pyplot as plt

time_hours = (np.array(ditl_case.utime) - ditl_case.utime[0]) / 3600
plt.semilogy(time_hours, densities)
plt.xlabel("Time (hours)")
plt.ylabel("Density (kg/m^3)")
plt.tight_layout()
plt.show()

In [None]:
acs = ditl_case.acs
acs._use_msis = True  # ensure MSIS path
ut0 = ditl_case.utime[0]
idx = ditl_case.ephem.index(datetime.fromtimestamp(ut0, tz=timezone.utc))
r = np.array(ditl_case.ephem.gcrs_pv.position[idx])
if np.linalg.norm(r) < 1e6:
    r = r * 1000
alt = np.linalg.norm(r) - 6378e3
lat = float(getattr(ditl_case.ephem, "lat", [0])[idx])
lon = float(getattr(ditl_case.ephem, "long", [0])[idx])
rho = acs._atmospheric_density(ut0, alt, lat, lon)
print(f"alt={alt / 1000:.1f} km rho={rho:.3e} kg/m^3")

In [None]:
alt

In [None]:
ditl_case.print_statistics()

In [None]:
from conops.common import ACSCommandType, ACSMode

time_h = (np.array(ditl_case.utime) - ditl_case.utime[0]) / 3600
slew_cmds = [
    c
    for c in ditl_case.acs.executed_commands
    if c.command_type == ACSCommandType.SLEW_TO_TARGET
]
desat_cmds = [
    c for c in ditl_case.acs.executed_commands if c.command_type == ACSCommandType.DESAT
]
print(
    "DESAT windows:",
    [(c.execution_time, getattr(c, "duration", None)) for c in desat_cmds],
)

# Overlay where mode==SLEWING to see it spans settle/desat
import matplotlib.pyplot as plt

m = np.array(ditl_case.mode)
plt.plot(time_h, ditl_case.ra)
plt.scatter(
    time_h[m == ACSMode.SLEWING],
    np.array(ditl_case.ra)[m == ACSMode.SLEWING],
    s=8,
    color="r",
)

In [None]:
from conops.common import ACSCommandType

cmds = [
    c
    for c in ditl_case.acs.executed_commands
    if c.command_type == ACSCommandType.SLEW_TO_TARGET
]
for c in cmds:
    dist = c.slew.slewdist  # deg along great circle
    t = c.slew.slewtime  # s (includes settle)
    avg_rate = dist / t if t else 0
    a = getattr(c.slew, "_accel_override", None)
    vmax = getattr(c.slew, "_vmax_override", None)
    print(
        f"obsid={c.slew.obsid} dist={dist:.2f}deg time={t:.1f}s avg={avg_rate:.3f} deg/s accel={a} vmax={vmax}"
    )

In [None]:
import numpy as np

from conops.common import separation

# great-circle step-to-step rate
rates = []
for i in range(1, len(ditl_case.ra)):
    r = separation(
        [np.deg2rad(ditl_case.ra[i - 1]), np.deg2rad(ditl_case.dec[i - 1])],
        [np.deg2rad(ditl_case.ra[i]), np.deg2rad(ditl_case.dec[i])],
    )  # deg
    d = np.rad2deg(r)
    rates.append(d / ditl_case.step_size)  # deg/s
print("Max step rate:", max(rates), "deg/s")

In [None]:
import numpy as np

from conops.common import ACSCommandType

dists = [
    c.slew.slewdist
    for c in ditl_case.acs.executed_commands
    if c.command_type == ACSCommandType.SLEW_TO_TARGET and c.slew
]
print("mean dist:", np.mean(dists), "deg", "max:", np.max(dists), "deg")

In [None]:
import numpy as np

from conops.common import ACSCommandType, separation

# Slew distances actually flown
dists = [
    c.slew.slewdist
    for c in ditl_case.acs.executed_commands
    if c.command_type == ACSCommandType.SLEW_TO_TARGET and c.slew
]
print(
    f"Count={len(dists)}, mean={np.mean(dists):.1f} deg, median={np.median(dists):.1f}, max={np.max(dists):.1f}"
)

# Time in SLEWING
mode_array = np.array(ditl_case.mode)
frac_slewing = np.mean(mode_array == 1) * 100
print(f"SLEWING fraction: {frac_slewing:.1f}%")

# Effective accel/rate per slew
for c in ditl_case.acs.executed_commands:
    if c.command_type == ACSCommandType.SLEW_TO_TARGET and c.slew:
        print(
            f"obsid={c.slew.obsid} dist={c.slew.slewdist:.1f} deg, "
            f"accel={getattr(c.slew, '_accel_override', None)}, "
            f"vmax={getattr(c.slew, '_vmax_override', None)}"
        )

In [None]:
import numpy as np

from conops.common import ACSMode

mode_arr = np.array(ditl_case.mode)
science_frac = np.mean(mode_arr == ACSMode.SCIENCE) * 100
slew_frac = np.mean(mode_arr == ACSMode.SLEWING) * 100
print(f"SCIENCE: {science_frac:.1f}%")
print(f"SLEWING: {slew_frac:.1f}%")