# Branch and Price Algorithm for Task Scheduling in Nano Satellites

Imports

In [12]:
from skyfield.api import EarthSatellite, load
from datetime import timedelta, datetime
import numpy as np
import pulp
import time
import pandas as pd
from pulp import LpMaximize, LpProblem, LpVariable, lpSum, LpStatus, value, PULP_CBC_CMD
import heapq

# Data Input

Dictionary for Satellite Input Parameters (3 instances)

In [13]:
# Satellite data for 1U Cubesat
satellite_data_1U = {
    'priority': [2, 1, 1],  # Priority for tasks (u_j)
    'power_consumption': [1.0, 1.3, 0.9],  # Power consumption (q_j)
    'ymin': [1, 3, 2],
    'ymax': [4, 3, 5],
    'tmin': [10, 3, 4],
    'tmax': [100, 100, 100],
    # 'tmin': [1, 2, 1],
    # 'tmax': [3, 3, 3],
    'pmin': [2, 2, 3],
    'pmax': [100, 100, 100],
    'wmin': [0, 0, 0],
    'wmax': [100, 100, 100],
    'rho': 0.5,
    'gamma': 2,
    'Vb':3.6,
    'e60': 0.9,
    'Q':10,
    'SoC':0.85 
    # 'available_power': 100  # Available power (r_t)
}
# Instance 2
satellite_data_1U_variant1 = satellite_data_1U.copy()
satellite_data_1U_variant1.update({
    'priority': [1, 3, 2],
    'tmin': [1, 2, 3],
    'tmax': [5, 4, 6],
    'pmin': [1, 1, 2]
})

# Instance 3
satellite_data_1U_variant2 = satellite_data_1U.copy()
satellite_data_1U_variant2.update({
    'priority': [3, 2, 1],
    'power_consumption': [1.4, 1.5, 1.2],
    'Q': 8,
    'SoC': 0.7
})

Reading TLE Data

In [14]:
# Load TLE data
with open("TLE.txt", "r") as file:
    lines = file.readlines()

ts = load.timescale()
satellites = []
i = 0
while i < len(lines):
    if lines[i].strip():
        name = lines[i].strip()
        line1 = lines[i + 1].strip()
        line2 = lines[i + 2].strip()
        satellite = EarthSatellite(line1, line2, name, ts)
        satellites.append(satellite)
        i += 3
    else:
        i += 1

Power Input Model

In [15]:
# Input model - Available power input function

def sunlit_power(satellite: EarthSatellite, start: datetime, t: int):
    ts = load.timescale()
    eph = load('de421.bsp')
    time_step = start + timedelta(minutes=t)
    time_step_skyfield = ts.utc(time_step.year, time_step.month, time_step.day, 
                                time_step.hour, time_step.minute, time_step.second)
    try:
        sunlit = satellite.at(time_step_skyfield).is_sunlit(eph)
    except Exception:
        return 0

    if satellite.name == '1U':
        avg_power = 5
    elif satellite.name == '2U':
        avg_power = 8
    else:
        avg_power = 10
    return 0.8 * avg_power if sunlit else 0

# Optimization Code for B&P Model

In [16]:
# Generating initial feasible profiles for the RMP
def generate_initial_profiles(satellite_data, satellite_info, start_time, num_time_slots):
    initial_profiles = {}
    sunlit_power_values = [sunlit_power(satellite_info, start_time, t) for t in range(num_time_slots)]

    for task_id in range(len(satellite_data['priority'])):
        profiles = []
        priority = satellite_data['priority'][task_id]
        power_consumption = satellite_data['power_consumption'][task_id]
        tmin, tmax = satellite_data['tmin'][task_id], satellite_data['tmax'][task_id]
        ymin, ymax = satellite_data['ymin'][task_id], satellite_data['ymax'][task_id]
        wmin, wmax = satellite_data['wmin'][task_id], satellite_data['wmax'][task_id]
        pmin = satellite_data['pmin'][task_id]

        for start_offset in range(0, wmax - wmin - tmin + 1, max(pmin, tmin)):
            profile = [0] * num_time_slots
            starts = 0
            t = wmin + start_offset
            while starts < ymin and t < wmax:
                if t < num_time_slots and sunlit_power_values[t] > 0:
                    for i in range(t, min(t + tmin, wmax + 1)):
                        if i < num_time_slots and sunlit_power_values[i] > 0:
                            profile[i] = 1
                    starts += 1
                    t += max(tmin, pmin)
                else:
                    t += 1

            profile_priority = priority * sum(profile)
            power_usage = [power_consumption * x for x in profile]

            profiles.append({
                'priority': profile_priority,
                'power_usage': power_usage
            })

        initial_profiles[task_id] = profiles if profiles else [{}]

    return initial_profiles

# Restricted Master Problem (RMP) 

def solve_rmp(satellite_data, satellite_info, current_profiles, num_time_slots, start_time, branching_constraints=None):
    rmp_problem = LpProblem("RMP", LpMaximize)

    if not current_profiles or not any(current_profiles.values()):
        return None, None, None, None, None, None

    b = LpVariable.dicts('b', range(num_time_slots), lowBound=-1e6)
    i = LpVariable.dicts('i', range(num_time_slots), lowBound=-1e6)
    SoC = LpVariable.dicts('SoC', range(num_time_slots + 1), lowBound=satellite_data['rho'], upBound=1)
    alpha = LpVariable.dicts('alpha', range(num_time_slots), lowBound=0, upBound=1)
    
    xi_kj = {}
    for task_id in range(len(satellite_data['priority'])):
        for idx in range(len(current_profiles[task_id])):
            xi_kj[(task_id, idx)] = LpVariable(f"xi_{task_id}_{idx}", 0, 1, cat="Binary")

    rmp_problem += lpSum(current_profiles[task_id][idx]['priority'] * xi_kj[(task_id, idx)] 
                         for task_id in range(len(satellite_data['priority'])) 
                         for idx in range(len(current_profiles[task_id]))), "Objective"

    dual_mu = {}
    for task_id in range(len(satellite_data['priority'])):
        rmp_problem += lpSum(xi_kj[(task_id, idx)] for idx in range(len(current_profiles[task_id]))) == 1, f"Assign_One_Profile_{task_id}"
        dual_mu[task_id] = rmp_problem.constraints[f"Assign_One_Profile_{task_id}"]

    dual_pi = {}
    dual_nu = {}
    sunlit_power_values = [sunlit_power(satellite_info, start_time, t) for t in range(num_time_slots)]
    for t in range(num_time_slots):
        r_t = sunlit_power_values[t]
        rmp_problem += (lpSum(current_profiles[task_id][idx]['power_usage'][t] * xi_kj[(task_id, idx)]
                             for task_id in range(len(satellite_data['priority']))
                             for idx in range(len(current_profiles[task_id])))
                        <= r_t + (satellite_data['gamma'] * satellite_data['Vb'] * alpha[t] if r_t > 0 else 0)), f"Power_Capacity_{t}"
        dual_pi[t] = rmp_problem.constraints[f"Power_Capacity_{t}"]

        rmp_problem += (lpSum(current_profiles[task_id][idx]['power_usage'][t] * xi_kj[(task_id, idx)]
                              for task_id in range(len(satellite_data['priority']))
                              for idx in range(len(current_profiles[task_id])))
                         + b[t] == r_t), f"Battery_Balance_{t}"
        dual_nu[t] = rmp_problem.constraints[f"Battery_Balance_{t}"]

    for t in range(num_time_slots):
        rmp_problem += i[t] * satellite_data['Vb'] == b[t], f"Current_Flow_{t}"

    rmp_problem += SoC[0] == satellite_data['SoC'], "Initial_SoC"
    for t in range(num_time_slots):
        rmp_problem += SoC[t + 1] == SoC[t] + (i[t] * satellite_data['e60'] * 60) / (satellite_data['Q'] * 3600), f"SoC_Update_{t}"
        rmp_problem += SoC[t] >= satellite_data['rho'], f"SoC_Lower_Limit_{t}"
        rmp_problem += SoC[t] <= 1, f"SoC_Upper_Limit_{t}"

    if branching_constraints:
        for (task_id, t), value in branching_constraints.items():
            if value == 0:
                rmp_problem += lpSum(current_profiles[task_id][idx]['power_usage'][t] * xi_kj[(task_id, idx)]
                                     for idx in range(len(current_profiles[task_id]))) == 0, f"Branch_0_{task_id}_{t}"
            elif value == 1:
                rmp_problem += lpSum(current_profiles[task_id][idx]['power_usage'][t] * xi_kj[(task_id, idx)]
                                     for idx in range(len(current_profiles[task_id]))) == satellite_data['power_consumption'][task_id], f"Branch_1_{task_id}_{t}"

    status = rmp_problem.solve(PULP_CBC_CMD(msg=0))

    if LpStatus[status] != "Optimal":
        return None, None, None, None, None, None

    x_jt_values = {}
    for task_id in range(len(satellite_data['priority'])):
        for idx in range(len(current_profiles[task_id])):
            if pulp.value(xi_kj[(task_id, idx)]) > 0.5:
                for t in range(num_time_slots):
                    power = current_profiles[task_id][idx]['power_usage'][t]
                    if power > 0:
                        if (task_id, t) not in x_jt_values:
                            x_jt_values[(task_id, t)] = 0
                        x_jt_values[(task_id, t)] = 1

    optimal_mu = {task_id: pulp.value(dual_mu[task_id].pi) for task_id in dual_mu}
    optimal_pi = {t: pulp.value(dual_pi[t].pi) for t in dual_pi}
    optimal_nu = {t: pulp.value(dual_nu[t].pi) for t in dual_nu}
    obj_value = pulp.value(rmp_problem.objective)

    return None, optimal_mu, optimal_pi, optimal_nu, x_jt_values, obj_value

# Pricing Subproblem

def solve_pricing_subproblem(satellite_data, num_time_slots, mu_j, pi_t, nu_t, task_id, satellite_info, start_time):
    pricing_problem = LpProblem(f"Pricing_Task_{task_id}", LpMaximize)

    x_jt = LpVariable.dicts(f"x_{task_id}", range(num_time_slots), cat="Binary")
    phi_jt = LpVariable.dicts(f"phi_{task_id}", range(num_time_slots), cat="Binary")

    priority = satellite_data['priority'][task_id]
    power_consumption = satellite_data['power_consumption'][task_id]
    tmin, tmax = satellite_data['tmin'][task_id], satellite_data['tmax'][task_id]
    ymin, ymax = satellite_data['ymin'][task_id], satellite_data['ymax'][task_id]
    pmin, pmax = satellite_data['pmin'][task_id], satellite_data['pmax'][task_id]
    wmin, wmax = satellite_data['wmin'][task_id], satellite_data['wmax'][task_id]

    sunlit_power_values = [sunlit_power(satellite_info, start_time, t) for t in range(num_time_slots)]

    reduced_cost = (priority * lpSum(x_jt[t] for t in range(num_time_slots))
                    + 0.01 * lpSum(x_jt[t] for t in range(num_time_slots))
                    - mu_j.get(task_id, 0)
                    - power_consumption * lpSum((pi_t.get(t, 0) + nu_t.get(t, 0)) * x_jt[t] for t in range(num_time_slots)))
    pricing_problem += reduced_cost, "Reduced_Cost"

    pricing_problem += phi_jt[0] >= x_jt[0], "C2j"
    for t in range(1, num_time_slots):
        pricing_problem += phi_jt[t] >= x_jt[t] - x_jt[t-1], f"C2k_{t}"
    for t in range(num_time_slots):
        pricing_problem += phi_jt[t] <= x_jt[t], f"C2l_{t}"
    for t in range(1, num_time_slots):
        pricing_problem += phi_jt[t] <= 2 - x_jt[t] - x_jt[t-1], f"C2m_{t}"

    pricing_problem += lpSum(phi_jt[t] for t in range(num_time_slots)) >= ymin, "C2n"
    pricing_problem += lpSum(phi_jt[t] for t in range(num_time_slots)) <= ymax, "C2o"

    for t in range(num_time_slots - tmin + 1):
        pricing_problem += lpSum(x_jt[l] for l in range(t, min(t + tmin, num_time_slots))) >= tmin * phi_jt[t], f"C2p_{t}"
    for t in range(num_time_slots - tmax):
        pricing_problem += lpSum(x_jt[l] for l in range(t, t + tmax)) <= tmax, f"C2q_{t}"

    for t in range(max(0, num_time_slots - tmin + 1), num_time_slots):
        if t < num_time_slots:
            pricing_problem += lpSum(x_jt[l] for l in range(t, num_time_slots)) >= (num_time_slots - t) * phi_jt[t], f"C2r_{t}"

    for t in range(wmin, wmax - pmin + 1):
        if t + pmin - 1 < num_time_slots:
            pricing_problem += lpSum(phi_jt[l] for l in range(t, min(t + pmin, num_time_slots))) <= 1, f"C2s_{t}"

    for t in range(wmin, wmax - pmax + 1):
        if t + pmax - 1 < num_time_slots:
            pricing_problem += lpSum(phi_jt[l] for l in range(t, min(t + pmax, num_time_slots))) >= phi_jt[t], f"C2t_{t}"

    for t in range(num_time_slots):
        if t < wmin or t >= wmax:
            pricing_problem += x_jt[t] == 0, f"C2u_v_{t}"
        if sunlit_power_values[t] == 0:
            pricing_problem += x_jt[t] == 0, f"No_Power_During_Eclipse_{task_id}_{t}"

    status = pricing_problem.solve(PULP_CBC_CMD(msg=0))

    if LpStatus[status] == "Optimal" and pulp.value(pricing_problem.objective) > 1e-6:
        profile = [int(pulp.value(x_jt[t]) > 0.5) for t in range(num_time_slots)]
        power_usage = [power_consumption * x for x in profile]
        profile_priority = priority * sum(profile)
        return {
            'priority': profile_priority,
            'power_usage': power_usage
        }
    return None

# Column Generation Algorithm

def column_generation(satellite_data, satellite_info, initial_profiles, num_time_slots, start_time, max_iterations=500):
    start_time_computation = time.time()  # Record start time
    current_profiles = {j: profiles.copy() for j, profiles in initial_profiles.items()}
    best_obj = -float('inf')
    best_x_jt = None
    node_queue = [(0, {}, 0)]  # (node_depth, branching_constraints, upper_bound)

    while node_queue:
        depth, branching_constraints, ub = heapq.heappop(node_queue)

        _, _, _, _, x_jt_values, obj_value = solve_rmp(satellite_data, satellite_info, current_profiles, num_time_slots, start_time, branching_constraints)

        if x_jt_values is None:
            continue

        is_integer = all(val in [0, 1] for val in x_jt_values.values() if val is not None)
        if is_integer and obj_value > best_obj:
            best_obj = obj_value
            best_x_jt = {(task_id, t): val for (task_id, t), val in x_jt_values.items() if val > 0}

        new_profiles = {}
        for task_id in range(len(satellite_data['priority'])):
            new_profile = solve_pricing_subproblem(satellite_data, num_time_slots, {}, {}, {}, task_id, satellite_info, start_time)
            if new_profile:
                if task_id not in new_profiles:
                    new_profiles[task_id] = []
                new_profiles[task_id].append(new_profile)

        for task_id, profiles in new_profiles.items():
            for profile in profiles:
                if profile not in current_profiles[task_id]:
                    current_profiles[task_id].append(profile)

        if not is_integer and obj_value > best_obj:
            most_fractional = max(x_jt_values.items(), key=lambda x: abs(0.5 - x[1]) if 0 < x[1] < 1 else -float('inf'), default=None)
            if most_fractional:
                task_id, t = most_fractional[0]

                for val in [0, 1]:
                    new_constraints = branching_constraints.copy()
                    new_constraints[(task_id, t)] = val
                    heapq.heappush(node_queue, (depth + 1, new_constraints, obj_value))

    end_time_computation = time.time()  # Record end time
    computation_time = end_time_computation - start_time_computation  # Calculate elapsed time in seconds

    return best_obj, best_x_jt, computation_time

# Computing results for 3 instances

Executing for instance 1

In [17]:
satellite_data = satellite_data_1U
satellite_info = satellites[0]
start_time = datetime(2025, 1, 1, 12, 0)
num_time_slots = 100

initial_Kj_star_1U = generate_initial_profiles(satellite_data, satellite_info, start_time, num_time_slots)
obj_value, active_tasks, computation_time = column_generation(satellite_data, satellite_info, initial_Kj_star_1U, num_time_slots, start_time)

# Return results
if active_tasks is not None:
    result = {
        'objective_value': obj_value,
        'active_tasks': active_tasks,  # Dictionary of (task_id, time_slot) where task is active (value = 1)
        'computation_time_seconds': computation_time  # Time taken in seconds
    }
else:
    result = {
        'objective_value': None,
        'active_tasks': {},
        'computation_time_seconds': computation_time  # Time taken even if no solution is found
    }

result

{'objective_value': 37.0,
 'active_tasks': {(0, 90): 1,
  (0, 91): 1,
  (0, 92): 1,
  (0, 93): 1,
  (0, 94): 1,
  (0, 95): 1,
  (0, 96): 1,
  (0, 97): 1,
  (0, 98): 1,
  (0, 99): 1,
  (1, 90): 1,
  (1, 91): 1,
  (1, 92): 1,
  (1, 93): 1,
  (1, 94): 1,
  (1, 95): 1,
  (1, 96): 1,
  (1, 97): 1,
  (1, 98): 1,
  (2, 92): 1,
  (2, 93): 1,
  (2, 94): 1,
  (2, 95): 1,
  (2, 96): 1,
  (2, 97): 1,
  (2, 98): 1,
  (2, 99): 1},
 'computation_time_seconds': 2.9840211868286133}

Executing for instance 2

In [18]:
satellite_data = satellite_data_1U_variant1
satellite_info = satellites[0]
start_time = datetime(2025, 1, 1, 12, 0)
num_time_slots = 100

initial_Kj_star_1U = generate_initial_profiles(satellite_data, satellite_info, start_time, num_time_slots)
obj_value, active_tasks, computation_time = column_generation(satellite_data, satellite_info, initial_Kj_star_1U, num_time_slots, start_time)

# Return results
if active_tasks is not None:
    result = {
        'objective_value': obj_value,
        'active_tasks': active_tasks,  # Dictionary of (task_id, time_slot) where task is active (value = 1)
        'computation_time_seconds': computation_time  # Time taken in seconds
    }
else:
    result = {
        'objective_value': None,
        'active_tasks': {},
        'computation_time_seconds': computation_time  # Time taken even if no solution is found
    }

result

{'objective_value': 31.0,
 'active_tasks': {(0, 96): 1,
  (1, 14): 1,
  (1, 15): 1,
  (1, 16): 1,
  (1, 17): 1,
  (1, 18): 1,
  (1, 19): 1,
  (2, 93): 1,
  (2, 94): 1,
  (2, 95): 1,
  (2, 96): 1,
  (2, 97): 1,
  (2, 98): 1},
 'computation_time_seconds': 2.918139934539795}

Executing for instance 3

In [19]:
satellite_data = satellite_data_1U_variant2
satellite_info = satellites[0]
start_time = datetime(2025, 1, 1, 12, 0)
num_time_slots = 100

initial_Kj_star_1U = generate_initial_profiles(satellite_data, satellite_info, start_time, num_time_slots)
obj_value, active_tasks, computation_time = column_generation(satellite_data, satellite_info, initial_Kj_star_1U, num_time_slots, start_time)

# Return results
if active_tasks is not None:
    result = {
        'objective_value': obj_value,
        'active_tasks': active_tasks,  # Dictionary of (task_id, time_slot) where task is active (value = 1)
        'computation_time_seconds': computation_time  # Time taken in seconds
    }
else:
    result = {
        'objective_value': None,
        'active_tasks': {},
        'computation_time_seconds': computation_time  # Time taken even if no solution is found
    }

result

{'objective_value': 56.0,
 'active_tasks': {(0, 0): 1,
  (0, 1): 1,
  (0, 2): 1,
  (0, 3): 1,
  (0, 4): 1,
  (0, 5): 1,
  (0, 6): 1,
  (0, 7): 1,
  (0, 8): 1,
  (0, 9): 1,
  (1, 63): 1,
  (1, 64): 1,
  (1, 65): 1,
  (1, 66): 1,
  (1, 67): 1,
  (1, 68): 1,
  (1, 69): 1,
  (1, 70): 1,
  (1, 71): 1,
  (2, 12): 1,
  (2, 13): 1,
  (2, 14): 1,
  (2, 15): 1,
  (2, 16): 1,
  (2, 17): 1,
  (2, 18): 1,
  (2, 19): 1},
 'computation_time_seconds': 2.7467360496520996}