In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import asyncio
import nest_asyncio
from astropy.coordinates import SkyCoord
from astropy import units as u
from scipy.optimize import linear_sum_assignment
from math import radians, cos, sin, sqrt
import math

# Constants
NUM_ARMS = 8
FOCAL_PLANE_RADIUS = 6.0  # in arcmin
ARM_RADIUS = 6.5          # max arm extension
PARKED_RADIUS = 0.25      # rest position radius
MIN_SAFE_DISTANCE = 0.3   # min allowed between targets
THETA_STEP = 360 / NUM_ARMS

# Initial arm positions
pickup_arm_centers = [(i * THETA_STEP, ARM_RADIUS) for i in range(NUM_ARMS)]


In [None]:
#Convert YSO Targets to Polar Coordinates
def convert_grouped_targets_to_polar(grouped_yso_path, groups_summary_path):
    grouped_df = pd.read_csv(grouped_yso_path)
    summary_df = pd.read_csv(groups_summary_path)
    grouped_polar_targets = {}

    for _, group_row in summary_df.iterrows():
        group_id = int(group_row['Group'])
        origin = SkyCoord(group_row['RA_center'], group_row['DEC_center'], unit="deg")
        group_targets = grouped_df[grouped_df['Group'] == group_id]
        
        polar_targets = []
        for _, row in group_targets.iterrows():
            target = SkyCoord(row['RA_deg'], row['DEC_deg'], unit="deg")
            sep = origin.separation(target).arcminute
            angle = origin.position_angle(target).deg % 360
            polar_targets.append((angle, sep))

        grouped_polar_targets[group_id] = polar_targets
    return grouped_polar_targets


In [None]:
def polar_to_cartesian(theta_deg, r):
    theta_rad = radians(theta_deg)
    return r * cos(theta_rad), r * sin(theta_rad)

def euclidean_distance(p1, p2):
    return sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)


In [None]:
#Safe Distance Adjustment via Jitter
def adjust_distances(targets, jitter_step=0.05, max_iterations=100):
    adjusted = list(targets)
    iteration = 0

    while iteration < max_iterations:
        modified = False
        for i in range(len(adjusted)):
            for j in range(i+1, len(adjusted)):
                t1, r1 = adjusted[i]
                t2, r2 = adjusted[j]
                dist = sqrt(r1**2 + r2**2 - 2 * r1 * r2 * cos(radians(t1 - t2)))

                if dist < MIN_SAFE_DISTANCE:
                    modified = True
                    delta = (t2 - t1 + 360) % 360
                    jitter = 5 if delta <= 180 else -5
                    adjusted[i] = ((t1 - jitter) % 360, min(FOCAL_PLANE_RADIUS, r1 + jitter_step))
                    adjusted[j] = ((t2 + jitter) % 360, min(FOCAL_PLANE_RADIUS, r2 + jitter_step))
        if not modified:
            break
        iteration += 1
    return adjusted


In [None]:
#Assign Targets to Arms (with Parking Option)
def assign_targets_with_parking(grouped_polar_targets):
    results = {}
    arm_cart = [polar_to_cartesian(t, ARM_RADIUS) for t, _ in pickup_arm_centers]

    for group_id, targets in grouped_polar_targets.items():
        targets = adjust_distances(targets)
        tgt_cart = [polar_to_cartesian(t, r) for t, r in targets]
        cost = np.zeros((NUM_ARMS, len(targets)))

        for i in range(NUM_ARMS):
            for j in range(len(targets)):
                d = euclidean_distance(arm_cart[i], tgt_cart[j])
                cost[i][j] = d if d <= FOCAL_PLANE_RADIUS else 1e6

        row_ind, col_ind = linear_sum_assignment(cost)
        assignments = {}
        assigned = []

        for i, j in zip(row_ind, col_ind):
            if cost[i][j] < 1e6:
                assignments[i] = targets[j]
                assigned.append(i)

        # Park unassigned arms
        for i in range(NUM_ARMS):
            if i not in assigned:
                assignments[i] = (pickup_arm_centers[i][0], PARKED_RADIUS)

        results[group_id] = {"assignments": assignments}
    return results


In [None]:
#Simulate Arm Movement (Asynchronously)
async def move_arm(index, start, end, results):
    steps = 100
    theta_vals = np.linspace(start[0], end[0], steps)
    r_vals = np.linspace(start[1], end[1], steps)
    for theta, r in zip(theta_vals, r_vals):
        results[index] = (theta % 360, r)
        await asyncio.sleep(0.01)

async def arrange_arms(assignments):
    results = [(theta, PARKED_RADIUS) for theta, _ in pickup_arm_centers]
    tasks = [move_arm(i, (pickup_arm_centers[i][0], PARKED_RADIUS), assignments[i], results)
             for i in range(NUM_ARMS)]
    await asyncio.gather(*tasks)
    return results


In [None]:
#Plot Final Positions in Polar Format
def plot_positions(initial, final):
    fig, ax = plt.subplots(figsize=(8, 8), subplot_kw={'polar': True})
    ax.set_title("TA-MOONS Arm Simulation", va='bottom')

    for i, ((theta_init, _), (theta_final, r_final)) in enumerate(zip(initial, final)):
        color = 'red' if r_final > PARKED_RADIUS else 'black'
        ax.plot([radians(theta_init), radians(theta_final)], [6.5, r_final], color=color, lw=2)
        ax.text(radians(theta_final), r_final + 0.3, f"A{i+1}", ha='center')

    ax.set_rmax(FOCAL_PLANE_RADIUS + 2)
    ax.set_theta_zero_location("N")
    ax.set_theta_direction(-1)
    ax.grid(True)
    plt.tight_layout()
    plt.show()


In [None]:
#Execute Simulation
async def run_simulation(grouped_yso_path, summary_path, group_id):
    grouped_targets = convert_grouped_targets_to_polar(grouped_yso_path, summary_path)
    assigned = assign_targets_with_parking(grouped_targets)
    
    if group_id not in assigned:
        print(f"❌ Group {group_id} not found.")
        return

    assignments = assigned[group_id]["assignments"]
    final_positions = await arrange_arms(assignments)
    plot_positions(pickup_arm_centers, final_positions)


In [None]:
nest_asyncio.apply()
await run_simulation(
    "grouped_ysos_IC348.csv",
    "group_summary_IC348.csv",
    group_id=int(input("Enter Group ID to simulate: "))
)
