In [1]:
import math
import random
import heapq
from dataclasses import dataclass, field
from typing import List, Tuple, Optional, Dict

MU_SUN = 1.32712440018e20
AU = 149597870700
P_WATER = 20000.0
ROI_MIN = 0.15
C_DEV = 5e8
C_LAUNCH = 2e7
DV_BUDGET = 15000.0
TIME_MAX = 1825.0
C_FUEL = 5000.0
M0_WET = 1000.0
Isp = 300.0
g0 = 9.80665

Clases de datos

In [2]:
@dataclass
class Asteroid:
    id: str
    a: float
    e: float
    i_deg: float
    om_deg: float
    w_deg: float
    ma_deg: float
    radius_m: float
    water_fraction: float
    spectral_type: str = "C"

    def mass_kg(self) -> float:
        volume = (4.0 / 3.0) * math.pi * (self.radius_m ** 3)
        density = 2000.0 if self.spectral_type == "C" else 3000.0
        return density * volume

    def available_water_kg(self, mining_time_days: float = 10.0) -> float:
        total_water = self.mass_kg() * self.water_fraction
        mining_rate = 1000.0
        return min(total_water, mining_rate * mining_time_days)

    def orbital_position(self, date_days: float = 0.0) -> Tuple[float, float, float]:
        period_days = 365.25 * (self.a ** 1.5)
        ma_updated = self.ma_deg + (date_days / period_days) * 360.0
        r = self.a * AU
        x = r * math.cos(math.radians(ma_updated))
        y = r * math.sin(math.radians(ma_updated)) * math.cos(math.radians(self.i_deg))
        z = r * math.sin(math.radians(ma_updated)) * math.sin(math.radians(self.i_deg))
        return x, y, z


@dataclass(order=True)
class PrioritizedItem:
    priority: float
    item: object = field(compare=False)


@dataclass
class State:
    seq: List[str]
    current: str
    delta_v_used: float
    m_water: float
    t_current: float
    date_abs: float

    def __hash__(self):
        return hash(
            (
                tuple(self.seq),
                self.current,
                round(self.delta_v_used, 2),
                round(self.m_water, 2),
                round(self.t_current, 2),
            )
        )

    def clone_and_add(
        self, asteroid_id: str, dv_add: float, water_add: float, time_add: float, date_add: float
    ):
        return State(
            seq=self.seq + [asteroid_id],
            current=asteroid_id,
            delta_v_used=self.delta_v_used + dv_add,
            m_water=self.m_water + water_add,
            t_current=self.t_current + time_add,
            date_abs=self.date_abs + date_add,
        )

Dinámica orbital

In [3]:
def lambert_transfer_izzo(ast_from, ast_to, date_departure, date_arrival):
    r1 = ast_from.orbital_position(date_departure)
    r2 = ast_to.orbital_position(date_arrival)
    tof_seconds = (date_arrival - date_departure) * 24 * 3600
    distance = math.sqrt(sum((r2[i] - r1[i]) ** 2 for i in range(3)))
    dv_approx = math.sqrt(MU_SUN / distance) * 0.8
    dv_departure = dv_approx * 0.6
    dv_arrival = dv_approx * 0.4
    return dv_departure, dv_arrival, tof_seconds


def find_gravity_assist_opportunities(ast_from, ast_to, date_departure, asteroids):
    opportunities = []
    planets = [("Venus", 0.723, 3.248e14), ("Earth", 1.0, 3.986e14), ("Mars", 1.524, 4.282e13)]
    for planet_name, a_planet, mu_planet in planets:
        transfer_time = 180.0
        encounter_date = date_departure + transfer_time / 2
        v_inf = 5000.0
        r_p = 7_000_000.0
        try:
            delta_angle = 2 * math.asin(1 / (1 + r_p * v_inf**2 / mu_planet))
            dv_saving = v_inf * 2 * math.sin(delta_angle / 2)
            if dv_saving > 100:
                opportunities.append((planet_name, encounter_date, dv_saving))
        except ValueError:
            continue
    return opportunities


def simple_hohmann_delta_v(a1_au, a2_au):
    r1 = a1_au * AU
    r2 = a2_au * AU
    v1 = math.sqrt(MU_SUN / r1)
    v2 = math.sqrt(MU_SUN / r2)
    at = (r1 + r2) / 2
    vt1 = math.sqrt(MU_SUN * (2 / r1 - 1 / at))
    vt2 = math.sqrt(MU_SUN * (2 / r2 - 1 / at))
    return abs(vt1 - v1) + abs(v2 - vt2)


def calculate_return_delta_v(asteroid, date, target_orbit="Gateway"):
    dv_escape = 0.1
    a_earth = 1.0
    dv_heliocentric = simple_hohmann_delta_v(asteroid.a, a_earth)
    insertion_costs = {"LEO": 3200, "Lunar": 900, "Gateway": 700}
    dv_insertion = insertion_costs.get(target_orbit, 700)
    if target_orbit == "LEO":
        dv_insertion *= 0.3
    return dv_escape + dv_heliocentric + dv_insertion

Modelo económico

In [4]:
def compute_propellant_mass(delta_v, m0=M0_WET):
    return m0 * (1.0 - math.exp(-delta_v / (Isp * g0)))


def revenue_from_state(state):
    return state.m_water * P_WATER


def cost_propulsion(delta_v_total):
    return compute_propellant_mass(delta_v_total) * C_FUEL


def cost_mining(state):
    mining_days_per_ast = 10.0
    cost_per_hour = 50.0
    asteroids_mined = len(state.seq) - 1
    hours = mining_days_per_ast * 24 * asteroids_mined
    return hours * cost_per_hour


def cost_ops(state):
    return state.t_current * 1000.0


def cost_return(state, asteroids):
    if not state.seq or state.current == "BASE":
        return 0.0
    last_asteroid = asteroids[state.current]
    dv_return = calculate_return_delta_v(last_asteroid, state.date_abs)
    m_wet_return = M0_WET + state.m_water
    m_prop_return = compute_propellant_mass(dv_return, m_wet_return)
    return m_prop_return * C_FUEL


def total_cost(state, asteroids):
    return (
        cost_propulsion(state.delta_v_used)
        + cost_mining(state)
        + cost_ops(state)
        + cost_return(state, asteroids)
    )


def roi_of_state(state, asteroids):
    ingresos = revenue_from_state(state)
    costos = total_cost(state, asteroids)
    return (ingresos - costos) / (C_DEV + C_LAUNCH)

Heurística

In [5]:
def heuristic_remaining_potential(state, asteroids):
    unvisited = [
        a
        for k, a in asteroids.items()
        if k not in state.seq and a.id != "BASE" and a.water_fraction > 0
    ]
    if not unvisited:
        return 0.0
    sum_term = 0.0
    c_est = 0.0
    for ast in unvisited:
        v_a = ast.available_water_kg()
        dv_required = simple_hohmann_delta_v(asteroids[state.current].a, ast.a)
        remaining_dv = DV_BUDGET - state.delta_v_used
        if dv_required > remaining_dv:
            eta_access = 0.0
        else:
            eta_access = 1.0 - (dv_required / (remaining_dv + 1e-6))
            eta_access = eta_access ** 0.5
        sum_term += v_a * P_WATER * eta_access
        m_prop_est = compute_propellant_mass(dv_required)
        prop_cost = m_prop_est * C_FUEL
        time_cost = 180.0 * 1000.0
        c_est += prop_cost + time_cost
    return -((sum_term - c_est) / (C_DEV + C_LAUNCH))

R*

In [6]:
def actions(state, asteroids):
    available = []
    for ast_id, asteroid in asteroids.items():
        if ast_id in state.seq or ast_id == "BASE" or asteroid.water_fraction <= 0:
            continue
        current_ast = asteroids[state.current]
        dv_required = simple_hohmann_delta_v(current_ast.a, asteroid.a)
        if state.delta_v_used + dv_required <= DV_BUDGET:
            available.append(ast_id)
    return available


def result(state, action_ast_id, asteroids):
    ast_from = asteroids[state.current]
    ast_to = asteroids[action_ast_id]
    date_departure = state.date_abs
    transfer_days = 180.0
    date_arrival = date_departure + transfer_days
    dv_dep, dv_arr, tof_seconds = lambert_transfer_izzo(ast_from, ast_to, date_departure, date_arrival)
    gravity_assists = find_gravity_assist_opportunities(ast_from, ast_to, date_departure, asteroids)
    if gravity_assists:
        best_assist = max(gravity_assists, key=lambda x: x[2])
        dv_total = max(0, dv_dep + dv_arr - best_assist[2])
    else:
        dv_total = dv_dep + dv_arr
    tof_days = tof_seconds / (24 * 3600)
    t_mining = 10.0
    water_collected = ast_to.available_water_kg(t_mining)
    time_total = tof_days + t_mining
    return state.clone_and_add(action_ast_id, dv_total, water_collected, time_total, time_total)


def goal_test(state, asteroids):
    return (
        roi_of_state(state, asteroids) >= ROI_MIN
        and state.delta_v_used <= DV_BUDGET
        and state.t_current <= TIME_MAX
        and len(state.seq) >= 2
    )


def evaluation_function(state, asteroids):
    return -roi_of_state(state, asteroids) + heuristic_remaining_potential(state, asteroids)


In [7]:
def a_star_to_random_goal(initial_state, random_goal, asteroids, max_expansions=2000):
    frontier = []
    heapq.heappush(frontier, PrioritizedItem(priority=evaluation_function(initial_state, asteroids), item=initial_state))
    explored = set()
    expansions = 0
    while frontier and expansions < max_expansions:
        current_item = heapq.heappop(frontier).item
        expansions += 1
        if hash(current_item) in explored:
            continue
        if goal_test(current_item, asteroids) or current_item.current == random_goal:
            return current_item
        explored.add(hash(current_item))
        for action in actions(current_item, asteroids):
            child = result(current_item, action, asteroids)
            if hash(child) not in explored:
                f_val = evaluation_function(child, asteroids)
                heapq.heappush(frontier, PrioritizedItem(priority=f_val, item=child))
    return None


def select_random_intermediate_goal(asteroids, k, K, visited):
    unvisited = [a for a in asteroids.keys() if a not in visited and a != "BASE" and asteroids[a].water_fraction > 0]
    if not unvisited:
        return None
    if k < K * 0.3:
        return random.choice(unvisited)
    weights = [asteroids[u].available_water_kg() for u in unvisited]
    return random.choices(unvisited, weights=weights, k=1)[0]


def r_star_asteroid_optimization(initial_state, asteroids, K=50):
    best_solution = None
    best_roi = -1e9
    for k in range(K):
        random_goal = select_random_intermediate_goal(asteroids, k, K, initial_state.seq)
        if random_goal is None:
            break
        print(f"[R*] Sub-búsqueda {k + 1}/{K} hacia: {random_goal}")
        partial = a_star_to_random_goal(initial_state, random_goal, asteroids)
        if partial:
            current_roi = roi_of_state(partial, asteroids)
            print(
                f"  -> ROI: {current_roi:.4f}, ΔV: {partial.delta_v_used:.0f} m/s, "
                f"Agua: {partial.m_water:.0f} kg, Asteroides: {len(partial.seq) - 1}"
            )
            if current_roi > best_roi:
                best_roi = current_roi
                best_solution = partial
    return best_solution, best_roi

Datos y ejecución

In [9]:
def generate_realistic_asteroids(n=15):
    asteroids = {}
    for i in range(n):
        aid = f"A{i + 1}"
        a = random.uniform(1.2, 3.0)
        e = random.uniform(0.0, 0.3)
        inc = random.uniform(0.0, 20.0)
        radius = random.uniform(100.0, 2000.0)
        water_frac = random.uniform(0.02, 0.1)
        asteroids[aid] = Asteroid(
            id=aid,
            a=a,
            e=e,
            i_deg=inc,
            om_deg=random.uniform(0, 360),
            w_deg=random.uniform(0, 360),
            ma_deg=random.uniform(0, 360),
            radius_m=radius,
            water_fraction=water_frac,
            spectral_type="C",
        )
    return asteroids


def main():
    a_dict = generate_realistic_asteroids(12)
    gateway = Asteroid("BASE", 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 100.0, 0.0, "S")
    a_dict["BASE"] = gateway
    s0 = State(["BASE"], "BASE", 0.0, 0.0, 0.0, 0.0)
    print("=" * 70)
    print("ALGORITMO R* PARA PROSPECCIÓN MULTI-ASTEROIDE")
    print("=" * 70)
    print("\nAsteroides generados:")
    for k, ast in list(a_dict.items())[:8]:
        water_est = ast.available_water_kg()
        print(f"  {ast.id}: a={ast.a:.2f} AU, R={ast.radius_m:.0f} m, water={ast.water_fraction:.3f}, H2O~{water_est:,.0f} kg")
    print(f"\nPresupuesto: ΔV ≤ {DV_BUDGET / 1000:.1f} km/s, ROI ≥ {ROI_MIN:.0%}")
    print("Iniciando optimización R*...\n")
    best_state, best_roi = r_star_asteroid_optimization(s0, a_dict, K=30)
    print("\n" + "=" * 70)
    if best_state:
        print("MEJOR SOLUCIÓN ENCONTRADA:")
        print(f"  Ruta: {' → '.join(best_state.seq)}")
        print(f"  ΔV total: {best_state.delta_v_used:,.0f} m/s ({best_state.delta_v_used / 1000:.1f} km/s)")
        print(f"  Agua recolectada: {best_state.m_water:,.0f} kg")
        print(f"  Tiempo total: {best_state.t_current:.1f} días")
        print(f"  ROI: {best_roi:.4f} ({best_roi * 100:.1f}%)")
        costos = total_cost(best_state, a_dict)
        ingresos = revenue_from_state(best_state)
        print(f"  Ingresos: ${ingresos:,.2f}")
        print(f"  Costos: ${costos:,.2f}")
        print(f"  Beneficio neto: ${ingresos - costos:,.2f}")
        if best_roi >= ROI_MIN:
            print("SOLUCIÓN VIABLE")
        else:
            print("SOLUCIÓN NO VIABLE")
    else:
        print("No se encontró solución viable.")
    print("=" * 70)


if __name__ == "__main__":
    main()

ALGORITMO R* PARA PROSPECCIÓN MULTI-ASTEROIDE

Asteroides generados:
  A1: a=2.97 AU, R=1148 m, water=0.046, H2O~10,000 kg
  A2: a=2.11 AU, R=1317 m, water=0.066, H2O~10,000 kg
  A3: a=2.61 AU, R=1092 m, water=0.040, H2O~10,000 kg
  A4: a=2.23 AU, R=361 m, water=0.039, H2O~10,000 kg
  A5: a=2.89 AU, R=933 m, water=0.089, H2O~10,000 kg
  A6: a=1.96 AU, R=1890 m, water=0.021, H2O~10,000 kg
  A7: a=1.66 AU, R=1030 m, water=0.033, H2O~10,000 kg
  A8: a=2.65 AU, R=577 m, water=0.025, H2O~10,000 kg

Presupuesto: ΔV ≤ 15.0 km/s, ROI ≥ 15%
Iniciando optimización R*...

[R*] Sub-búsqueda 1/30 hacia: A2
  -> ROI: 0.2730, ΔV: 7181 m/s, Agua: 10000 kg, Asteroides: 1
[R*] Sub-búsqueda 2/30 hacia: A4
  -> ROI: 0.2730, ΔV: 7181 m/s, Agua: 10000 kg, Asteroides: 1
[R*] Sub-búsqueda 3/30 hacia: A4
  -> ROI: 0.2730, ΔV: 7181 m/s, Agua: 10000 kg, Asteroides: 1
[R*] Sub-búsqueda 4/30 hacia: A7
  -> ROI: 0.2730, ΔV: 7181 m/s, Agua: 10000 kg, Asteroides: 1
[R*] Sub-búsqueda 5/30 hacia: A3
  -> ROI: 0.2730, Δ