In [1]:
import pandas as pd
from astropy.time import Time
from datetime import timedelta
from astropy.coordinates import SkyCoord, Angle, get_sun
import astropy.units as u
import numpy as np


def calculate_roll(
    ra: float,
    dec: float,
    obs_time: Time,
) -> float:
    """Calculate the spacecraft roll angle for a given target and time.

    The roll angle is computed based on the position of the target
    relative to the Sun at the time of observation. This ensures
    proper solar panel orientation.

    Parameters
    ----------
    ra : float
        Right ascension of the target in degrees.
    dec : float
        Declination of the target in degrees.
    obs_time : Time
        The time of observation (used to determine Sun position).

    Returns
    -------
    float
        The spacecraft roll angle in degrees, in the range [0, 360).
    """

    # Get target coordinates
    target_coord = SkyCoord(ra=ra * u.deg, dec=dec * u.deg, frame="icrs")

    # Get Sun position at observation time (astropy >=5 returns GCRS)
    sun_coord = get_sun(obs_time)
    # Convert to ICRS if needed
    if not hasattr(sun_coord, "ra") or not hasattr(sun_coord, "dec"):
        sun_coord = sun_coord.transform_to("icrs")

    # Calculate roll angle
    roll, _ = _spacecraft_roll_from_radec(
        target_coord.ra.deg,
        target_coord.dec.deg,
        sun_coord.ra.deg,
        sun_coord.dec.deg,
    )
    roll = roll % 360.0  # Normalize to [0, 360)
    return roll


def _spacecraft_roll_from_radec(target_ra, target_dec, sun_ra, sun_dec):
    """
    Compute spacecraft roll about boresight defined by:
        zB = target direction
        yB = sun × target (or fallback for anti-Solar pointing)
        xB = yB × zB
    All in ECI.

    For when the target is nearly anti-Solar, we use celestial north projected onto the plane
    perpendicular to the boresight as the reference for yB.
    """
    # Convert to vectors
    t = radec_to_vector(target_ra, target_dec)  # boresight
    s = radec_to_vector(sun_ra, sun_dec)  # Sun direction

    # boresight body axes, we define the others later because of edge cases
    zB = normalize(t)

    # Celestial north in ECI (equatorial frame) is +Z_ECI
    north = np.array([0.0, 0.0, 1.0])
    north_proj = north - np.dot(north, zB) * zB

    # catch edge case where boresight is very close to celestial north
    # in practice this should never happen because of Sun constraints
    if np.linalg.norm(north_proj) < 1.0e-8:
        # boresight ≈ celestial north: pick arbitrary east-like vector
        east = np.array([1.0, 0.0, 0.0])
        north_proj = east - np.dot(east, zB) * zB

    # reference axes for roll angle calculation
    x_ref = normalize(north_proj)
    y_ref = normalize(np.cross(zB, x_ref))

    # Define spacecraft xB,yB using the Sun direction
    sun_cross = np.cross(s, zB)
    if np.linalg.norm(sun_cross) > 1e-6:
        yB = normalize(sun_cross)
        xB = normalize(np.cross(yB, zB))
    else:
        # we need to catch edge cases where Sun is very close to anti-boresight
        xB = x_ref
        yB = y_ref

    # Compute roll angle x_ref -> xB around zB
    cos_r = np.dot(x_ref, xB)
    sin_r = np.dot(y_ref, xB)
    roll = np.rad2deg(np.arctan2(sin_r, cos_r))  # degrees

    return roll, (xB, yB, zB)


def radec_to_vector(ra_deg, dec_deg):
    """Convert RA/Dec (deg) to an Earth-Centered Inertial (ECI) coordinate frame unit vector."""
    ra = np.deg2rad(ra_deg)
    dec = np.deg2rad(dec_deg)
    x = np.cos(dec) * np.cos(ra)
    y = np.cos(dec) * np.sin(ra)
    z = np.sin(dec)
    return np.array([x, y, z])


def vector_to_radec(v):
    """Convert an ECI coordinate frame unit vector to RA/Dec (deg)."""
    x, y, z = v
    r = np.sqrt(x**2 + y**2 + z**2)
    dec = np.arcsin(z / r)
    ra = np.arctan2(y, x)
    ra_deg = np.rad2deg(ra) % 360.0
    dec_deg = np.rad2deg(dec)
    return ra_deg, dec_deg


def normalize(v):
    n = np.linalg.norm(v)
    if n == 0:
        raise ValueError("Zero-length vector")
    return v / n


def process_time(time_or_seconds):
    if isinstance(time_or_seconds, Time):
        processed = f""" "TimeType": "ABSOLUTE",\n      "TimeStamp": "{Time(time_or_seconds).utc.strftime("%Y-%m-%dT%H:%M:%SZ")}" """
    elif isinstance(time_or_seconds, u.Quantity):
        processed = f""" "TimeType": "RELATIVE",\n      "TimeStamp": "{np.round(time_or_seconds.to(u.second).value, 1)}" """
    return processed.strip()


def get_halt(time):
    return f"""\n   {{\n      {process_time(time)},\n      "Command": "PANDORA PAYLOAD_HALT_IMAGING_OR_COMMAND_SEQUENCE"\n   }}"""


def get_goto(time, coord, rolltime=None):
    x, y, z = coord.cartesian.xyz.value
    if rolltime is None:
        if isinstance(time, Time):
            rolltime = time
        else:
            rolltime = Time.now()
    roll = (
        Angle(
            calculate_roll(
                coord.ra.deg,
                coord.dec.deg,
                rolltime,
            )
            * u.deg
        )
        .wrap_at(180 * u.deg)
        .value
    )
    theta = Angle(roll * u.deg).wrap_at(180 * u.deg).value
    return f"""\n   {{\n      {process_time(time)},\n      "Command": "PANDORA GOTO_TARGET with VEL_ABER 1, PRI_REF_DIR 8, SEC_REF_DIR 2, TARG_X {x}, TARG_Y {y}, TARG_Z {z}, ANGLE 0.000000000000, PRI_CMD_DIR 3, SEC_CMD_DIR 1, ATT_INTERP 0, Q_TARGE_TWRT_REF1 0.000000000000, Q_TARGE_TWRT_REF2 0.000000000000, Q_TARGE_TWRT_REF3 {theta}, Q_TARGE_TWRT_REF4 0.000000000000, RATE_INTERP 0, CMD_RATE_X 0.000000000000, CMD_RATE_Y 0.000000000000, CMD_RATE_Z 0.000000000000, TIME 0, END_CYCLE 0"\n   }}"""

def get_idle(time):
    return f"""\n   {{\n      {process_time(time)},\n      "Command": "PANDORA GOTO_TARGET WITH CCSDS_AP_ID FAU, VEL_ABER 1, PRI_REF_DIR SUN, SEC_REF_DIR GCNADIR, TARG_X 0, TARG_Y 0, TARG_Z 0, ANGLE 0, PRI_CMD_DIR 6, SEC_CMD_DIR 4, ATT_INTERP 0, Q_TARGE_TWRT_REF1 0, Q_TARGE_TWRT_REF2 0, Q_TARGE_TWRT_REF3 0, Q_TARGE_TWRT_REF4 0, RATE_INTERP 1, CMD_RATE_X 0, CMD_RATE_Y 0, CMD_RATE_Z 0, TIME 0, END_CYCLE 0"\n   }}"""


def get_openf(time, short_name, count=1, filetime=None):
    if filetime is None:
        if isinstance(time, Time):
            filetime = time
        else:
            filetime = Time.now()
    filename = (
        f"""{filetime.utc.strftime("%Y%m%dT%H%M%S")}_{short_name}_{count:02}.bin"""
    )

    return f"""\n   {{\n      {process_time(time)},\n      "Command": "PANDORA PAYLOAD_READ with CCSDS_AP_ID HSDR, PL_APID 0, PATH \\u0027/mnt/data/sci/{filename}\\u0027, PL_PATH \\u0027\\u0027"\n   }}"""


def get_turn_on_VITL(time):
    return f"""\n   {{\n      {process_time(time)},\n      "Command": "PANDORA PAYLOAD_CONTROL with CCSDS_AP_ID FAU, EN_DS ENABLE"\n   }}"""


def get_turn_off_VITL(time):
    return f"""\n   {{\n      {process_time(time)},\n      "Command": "PANDORA PAYLOAD_CONTROL with CCSDS_AP_ID FAU, EN_DS DISABLE"\n   }}"""


def get_infimg(
    time,
    target_id,
    ROI_START_X=1536,
    ROI_START_Y=512,
    ROI_SIZE_X=512,
    ROI_SIZE_Y=1024,
    INTEGRATIONS=1,
    READ_FRAMES=4,
    RESETS2=1,
    GROUPS=1,
):
    return f"""\n   {{\n      {process_time(time)},\n      "Command": "PANDORA PAYLOAD_ACQUIRE_INF_CAM_IMAGES with PLD_TARGET_ID \\u0027{target_id.strip().replace(" ", "_")}\\u0027, PLD_AVERAGE_GROUPS 0, PLD_ROI_START_X {ROI_START_X}, PLD_ROI_START_Y {ROI_START_Y}, PLD_ROI_SIZE_X {ROI_SIZE_X}, PLD_ROI_SIZE_Y {ROI_SIZE_Y}, PLD_RICE_X 5, PLD_RICE_Y 28, PLD_SAVE_IMAGES_TO_DISK 1, PLD_SEND_THUMBNAILS 1, PLD_THUMBNAIL_BIN_SIZE 1, PLD_THUMBNAIL_COMPRESSION_TYPE 1, PLD_SC_RESETS1 1, PLD_SC_RESETS2 {RESETS2}, PLD_SC_DROP_FRAMES1 0, PLD_SC_DROP_FRAMES2 0, PLD_SC_DROP_FRAMES3 0, PLD_SC_READ_FRAMES {READ_FRAMES}, PLD_SC_GROUPS {GROUPS}, PLD_SC_INTEGRATIONS {INTEGRATIONS}, PLD_CHECKSUM 0xFFFF"\n   }}"""


def get_vissci_time(NUM_TOTAL_FRAMES_REQUESTED=100, EXPOSURE_TIME_US=200000):
    return ((NUM_TOTAL_FRAMES_REQUESTED * EXPOSURE_TIME_US) * u.microsecond).to(
        u.second
    )


def get_infimg_time(
    ROI_SIZE_X=512, ROI_SIZE_Y=1024, INTEGRATIONS=1, READ_FRAMES=4, RESETS2=1, GROUPS=1
):
    nreads = (READ_FRAMES + RESETS2) * GROUPS * INTEGRATIONS
    npixels = ROI_SIZE_X * ROI_SIZE_Y
    return (nreads * npixels * 1e-5) * u.second


def get_vissci(
    time,
    target_id,
    coord,
    NUM_TOTAL_FRAMES_REQUESTED=100,
    FRAMES_PER_COADD=1,
    EXPOSURE_TIME_US=200000,
):
    return f"""\n   {{\n      {process_time(time)},\n      "Command": "PANDORA PAYLOAD_ACQUIRE_VIS_CAM_SCIENCE_DATA with PLD_STAR_ROI_DET_METHOD 3, PLD_FRAMES_PER_COADD {FRAMES_PER_COADD}, PLD_NUM_TOTAL_FRAMES_REQUESTED {NUM_TOTAL_FRAMES_REQUESTED}, PLD_TARGET_RA {coord.ra.deg}, PLD_TARGET_DEC {coord.dec.deg}, PLD_INCLUDE_FIELD_SOLNS_IN_RESP 1, PLD_STAR_ROI_DIMENSION 1280, PLD_MAX_NUM_STAR_ROIS 1, PLD_NUM_PREDEFINED_STAR_ROIS 1, PLD_PREDEFINED_STAR_ROI_RA [1024.0], PLD_PREDEFINED_STAR_ROI_DEC [1024.0], PLD_TARGET_ID \\u0027{target_id}\\u0027, PLD_ROI_START_X 384, PLD_ROI_START_Y 384, PLD_ROI_SIZE_X 1280, PLD_ROI_SIZE_Y 1280, PLD_MAX_MAGNITUDE_IN_QUAD_CATALOG 16.500000, PLD_SAVE_IMAGES_TO_DISK 1, PLD_RICE_X 5, PLD_RICE_Y 25, PLD_IMAGE_DIR \\u0027/sssp/data/\\u0027, PLD_SEND_THUMBNAILS 0, PLD_EXPOSURE_TIME_US {EXPOSURE_TIME_US}, PLD_CHECKSUM 0xFFFF"\n   }}"""


def get_sunsafe(time):
    return f"""\n   {{\n      {process_time(time)},\n      "Command": "PANDORA GOTO_TARGET WITH CCSDS_AP_ID FAU, VEL_ABER 1, PRI_REF_DIR SUN, SEC_REF_DIR ECIZ, TARG_X 0, TARG_Y 0, TARG_Z 0, ANGLE 0, PRI_CMD_DIR 6, SEC_CMD_DIR 4, ATT_INTERP 0, Q_TARGE_TWRT_REF1 0, Q_TARGE_TWRT_REF2 0, Q_TARGE_TWRT_REF3 0, Q_TARGE_TWRT_REF4 0, RATE_INTERP 1, CMD_RATE_X 0, CMD_RATE_Y 0, CMD_RATE_Z 0.5, TIME 0, END_CYCLE 0"\n   }}"""

In [2]:
from pandoravisibility import Visibility
line1 = "1 99152U 80229J   26028.98563657  .00000000  00000-0  37770-3 0    04"
line2 = "2 99152  97.7989  29.8614 0004295 213.9017 252.3527 14.87783072    03"

vis = Visibility(line1, line2)

# Make short exposures, large image footprint, small files, no VITL

In [3]:
def get_VITL_seq(time, target_id, coord=None, slew_time=timedelta(minutes=3), NUM_TOTAL_FRAMES_REQUESTED=10, FRAMES_PER_COADD=1, INTEGRATIONS=1, count=1):
    if coord is None:
        coord = SkyCoord.from_name(target_id)
    VITL_vis_time = get_vissci_time(NUM_TOTAL_FRAMES_REQUESTED=NUM_TOTAL_FRAMES_REQUESTED)
    VITL_nir_time = get_infimg_time(INTEGRATIONS=INTEGRATIONS)
    data_collect_time = np.max([VITL_vis_time.value, VITL_nir_time.value]) * u.second
    seq = ",".join(
        [
            get_halt(time + timedelta(seconds=0)),
            get_goto(time + timedelta(seconds=2), coord),
            get_openf(
                time + timedelta(seconds=4),
                target_id.lower()
                .replace("-", "")
                .replace("_", "")
                .replace(".", "")[:8],
                count=count,
            ),
#             get_infimg(time + timedelta(seconds=6) + slew_time, target_id, INTEGRATIONS=INTEGRATIONS),
            get_vissci(time + timedelta(seconds=8) + slew_time, target_id, coord, NUM_TOTAL_FRAMES_REQUESTED=NUM_TOTAL_FRAMES_REQUESTED, FRAMES_PER_COADD=FRAMES_PER_COADD),
            get_turn_on_VITL(time + timedelta(seconds=8) + slew_time + timedelta(seconds=14)),
            get_turn_off_VITL(time + timedelta(seconds=8) + slew_time + timedelta(seconds=np.floor((NUM_TOTAL_FRAMES_REQUESTED * 0.2)/2) + 14)),
        ]
    )
    return seq


def get_seqs(ra, dec, target_id, start_time, end_time, check_visibility=True, NUM_TOTAL_FRAMES_REQUESTED=20, FRAMES_PER_COADD=20, INTEGRATIONS=1):
    seqs = []
    slewtime = timedelta(minutes=4)
    obstime=timedelta(minutes=9)
    coord = SkyCoord(ra, dec, unit='deg')
    td = timedelta(minutes=0)
    if check_visibility:
        visible = vis.get_visibility(coord, start_time + td) & vis.get_visibility(coord, start_time + td + timedelta(minutes=3))
        assert visible, f"{target_id} is not visible at {start_time + td} [Start Time + {td.seconds} seconds]."
    seqs.append(get_VITL_seq(start_time + td, target_id,
                             coord,
                             slew_time=slewtime,
                             count=1,
                             NUM_TOTAL_FRAMES_REQUESTED=NUM_TOTAL_FRAMES_REQUESTED,
                             FRAMES_PER_COADD=FRAMES_PER_COADD,
                             INTEGRATIONS=INTEGRATIONS))
 
    td += slewtime + obstime
    jdx = 2
    while (start_time+td) < (end_time - timedelta(minutes=1)):
        seqs.append(get_VITL_seq(start_time + td, target_id, coord, slew_time=timedelta(seconds=20), count=jdx % 99, NUM_TOTAL_FRAMES_REQUESTED=800, FRAMES_PER_COADD=FRAMES_PER_COADD))
        if check_visibility:
            visible = vis.get_visibility(coord, start_time + td) & vis.get_visibility(coord, start_time + td + timedelta(minutes=3))
            assert visible, f"{target_id} is not visible at {start_time + td} [Start Time + {td.seconds} seconds]."

        jdx += 1
        td += obstime
    return seqs

In [5]:
ksat_csv_fname = "20260129_contacts.csv"
ksat_df = pd.read_csv(ksat_csv_fname)
pass_start_times = [Time(time, scale='utc') for time in ksat_df.start_time if Time(time) > Time.now()]
pass_end_times = [Time(time, scale='utc') for start_time, time in zip(ksat_df.start_time, ksat_df.end_time) if Time(start_time) > Time.now()]


assert len(pass_start_times) > 0, ("**Update your KSAT CSV file**")

In [6]:
# seqs = []
# seqs.extend(get_seqs(ra=68.01905555823345,
#                 dec=-49.947737592507174,
#                 target_id="HD_29078",
#                 start_time=pass_end_times[0] + timedelta(minutes=4),
#                 end_time=pass_end_times[0] + timedelta(minutes=5),
#                 NUM_TOTAL_FRAMES_REQUESTED=7000, FRAMES_PER_COADD=50
#                 ),
#            )


# seqs.extend(get_seqs(ra=083.8201 ,
#                 dec=-05.3876,
#                 target_id="OrionNeb",
#                 start_time=pass_end_times[0] + timedelta(minutes=15),
#                 end_time=pass_end_times[0] + timedelta(minutes=27)))

# seqs.extend(get_seqs(ra=117.57057608545425,
#                 dec=8.733698238186177,
#                 target_id="HD63632",
#                 start_time=pass_end_times[0] + timedelta(minutes=39),
#                 end_time=pass_end_times[0] + timedelta(minutes=80)))



In [7]:
# seq = f"""{{\n  "Commands": [{",".join(seqs)}\n  ]\n}}"""
# short_name = "HD115367"
# count = 1
# fname = f"""{pass_start_times[0].utc.strftime("%Y%m%dT%H%M%S")[:-1]}_{short_name}_{count:02}.seq"""

# with open(fname, "w") as f:
#     f.write(seq)

In [8]:
seqs = []
seqs.extend(get_seqs(ra=68.0190794753464,
                dec=-49.94780165252183,
                target_id="HD29078",
                start_time=pass_end_times[0] + timedelta(minutes=2),
                end_time=pass_end_times[0] + timedelta(minutes=24),
                NUM_TOTAL_FRAMES_REQUESTED=2000, FRAMES_PER_COADD=20, check_visibility=True))

# seqs.extend(get_seqs(ra=127.38830455233187,
#                 dec=-21.280802741896665,
#                 target_id="HD71961",
#                 start_time=pass_end_times[0] + timedelta(minutes=25),
#                 end_time=pass_end_times[0] + timedelta(minutes=48),
#                 NUM_TOTAL_FRAMES_REQUESTED=800, FRAMES_PER_COADD=20, check_visibility=True))

# seqs.extend(
#     get_halt(pass_start_times[0] + timedelta(minutes=59)),
#     get_idle(pass_start_times[0] + timedelta(minutes=60))])
# seqs.extend(get_seqs(ra=121.21182432107362,
#                 dec=-42.11048595651747,
#                 target_id="HD67125",
#                 start_time=pass_end_times[0] + timedelta(minutes=82),
#                 end_time=pass_start_times[0] + timedelta(minutes=100),
#                 NUM_TOTAL_FRAMES_REQUESTED=800, FRAMES_PER_COADD=20, check_visibility=True))


seq = f"""{{\n  "Commands": [{",".join(seqs)}\n  ]\n}}"""
short_name = "vitl"
count = 10
fname = f"""{pass_start_times[1].utc.strftime("%Y%m%dT%H%M%S")[:-1]}_{short_name}_{count:02}.seq"""

with open(fname, "w") as f:
    f.write(seq)

In [9]:
pass_end_times[0]

<Time object: scale='utc' format='isot' value=2026-01-29T19:55:40.000>

In [None]:
# seqs = []
# seqs.extend(get_seqs(ra=92.38646709204566,
#                 dec=-25.275365484466985,
#                 target_id="HD42416",
#                 start_time=pass_end_times[0] + timedelta(minutes=3),
#                 end_time=pass_end_times[0] + timedelta(minutes=20),
#                 NUM_TOTAL_FRAMES_REQUESTED=400, FRAMES_PER_COADD=20, check_visibility=True))

# seqs.extend(get_seqs(ra=100.23620136611297,
#                 dec=23.17495581646612,
#                 target_id="HD47679",
#                 start_time=pass_end_times[0] + timedelta(minutes=21),
#                 end_time=pass_start_times[1] - timedelta(minutes=38),
#                 NUM_TOTAL_FRAMES_REQUESTED=400, FRAMES_PER_COADD=20, check_visibility=True))

# seqs.extend(get_seqs(ra=180.444452918763,
#                 dec=-32.22965541987291,
#                 target_id="HD104470",
#                 start_time=pass_end_times[0] + timedelta(minutes=49),
#                 end_time=pass_start_times[1] - timedelta(minutes=5),
#                 NUM_TOTAL_FRAMES_REQUESTED=400, FRAMES_PER_COADD=20, check_visibility=True))


# seq = f"""{{\n  "Commands": [{",".join(seqs)}\n  ]\n}}"""
# short_name = "vitl"
# count = 1
# fname = f"""{pass_start_times[0].utc.strftime("%Y%m%dT%H%M%S")[:-1]}_{short_name}_{count:02}.seq"""

# with open(fname, "w") as f:
#     f.write(seq)