In [77]:
import pandas as pd
import dimod
from neal import SimulatedAnnealingSampler
from collections import defaultdict
import json
import itertools # We'll use this later

print("Libraries imported successfully.")

Libraries imported successfully.


In [78]:
def load_data_from_json(filepath='dataset.json'):
    """Loads data from the JSON file and converts to DataFrames."""
    with open(filepath, 'r') as f:
        data = json.load(f)
    
    # Load flights and calculate duration
    flights_df = pd.DataFrame(data['flights'])
    flights_df['duration'] = flights_df['end_time'] - flights_df['start_time']
    
    # Load tails
    tails_df = pd.DataFrame(data['tails'])
    
    # Load costs
    # pd.DataFrame(data['costs']) creates (Index=Flights, Columns=Tails)
    # We .transpose() it ONCE to get (Index=Tails, Columns=Flights)
    # This matches the expected format for costs.loc[tail_id, flight_id]
    costs_df = pd.DataFrame(data['costs']).transpose()
    
    print("Data loaded and processed:")
    print("\n Tails ")
    print(tails_df.head())
    print("\n Flights ")
    print(flights_df.head())
    print("\n Costs (Index=Tails, Columns=Flights) ")
    print(costs_df)
    
    return flights_df, tails_df, costs_df

# Execute the data loading
flights_df, tails_df, costs_df = load_data_from_json()

Data loaded and processed:

 Tails 
  tail_id  min_turnaround
0      T1              45
1      T2              45
2      T3              50

 Flights 
  flight_id origin dest  start_time  end_time  duration
0        F1      A    B         480       570        90
1        F2      B    C         615       735       120
2        F3      C    A         780       885       105
3        F4      A    C         540       660       120
4        F5      B    A         660       750        90

 Costs (Index=Tails, Columns=Flights) 
     F1   F2   F3   F4   F5
T1  100  120  110  130  115
T2  110  100  120  140  105
T3  130  140  135  150  145


In [79]:
def check_incompatible(f1, f2, tail, min_turnaround):
    """Checks if two flights are incompatible for a single tail."""
    # a) Time overlap
    if max(f1.start_time, f2.start_time) <= min(f1.end_time, f2.end_time):
        return True
    
    # Check for sequential incompatibility
    # f1 -> f2
    if f1.end_time < f2.start_time:
        if f1.dest != f2.origin:
            return True # Mismatched location
        if f2.start_time - f1.end_time < min_turnaround:
            return True # Turnaround violation
    
    # f2 -> f1
    if f2.end_time < f1.start_time:
        if f2.dest != f1.origin:
            return True # Mismatched location
        if f1.start_time - f2.end_time < min_turnaround:
            return True # Turnaround violation
            
    return False

def get_delay_penalty(f1, f2, tail, min_turnaround, buffer=15):
    """Calculates the delay penalty S_i,j,k for tight connections."""
    penalty = 0
    required_time = min_turnaround + buffer
    
    # f1 -> f2
    if f1.end_time < f2.start_time and f1.dest == f2.origin:
        connection_time = f2.start_time - f1.end_time
        if connection_time < required_time:
            penalty = (required_time - connection_time)**2 # Square penalty for emphasis
            
    # f2 -> f1
    if f2.end_time < f1.start_time and f2.dest == f1.origin:
        connection_time = f1.start_time - f2.end_time
        if connection_time < required_time:
            penalty = (required_time - connection_time)**2
            
    return penalty

print("Helper functions defined.")

Helper functions defined.


In [80]:
def build_tap_bqm(flights, tails, costs, weights, penalties):
    """Builds the BQM for the multi-objective TAP."""
    
    bqm = dimod.BinaryQuadraticModel('BINARY')
    
    # Pre-calculate mean duration for utilization objective
    mean_duration = flights['duration'].sum() / len(tails)
    
    # A. Add Objectives 
    
    for i, tail in tails.iterrows():
        for j, flight in flights.iterrows():
            var_name = f"x_{tail.tail_id}_{flight.flight_id}"
            
            # 1. H_cost (Linear)
            cost = costs.loc[tail.tail_id, flight.flight_id]
            bqm.add_linear(var_name, weights['cost'] * cost)
            
            
            # 2. H_util (Utilization)
            # This objective is: sum_i ( (sum_j D_j * x_ij) - D_bar )^2
            # Expands to: sum_i [ (sum_j D_j * x_ij)^2 - 2*D_bar*sum_j(D_j*x_ij) + D_bar^2 ]
            # We add all terms associated with x_ij in this loop.
            
            # Add the simple linear part: -2 * D_bar * D_j * x_ij
            bqm.add_linear(var_name, weights['util'] * (-2 * mean_duration * flight.duration))

            
            # Now add the (sum_j D_j * x_ij)^2 part.
            # This expands to: sum_j(D_j^2 * x_ij) + sum_j<k(2 * D_j*D_k * x_ij*x_ik)
            
            # 1. Add the linear part (j==k case): D_j^2 * x_ij
            linear_strength = weights['util'] * (flight.duration ** 2)
            bqm.add_linear(var_name, linear_strength)

            # 2. Add the quadratic part (j < k case)
            for k, flight_k in flights.iterrows():
                # Only process pairs where j < k to avoid double counting
                # and to avoid the j==k case.
                if j >= k:
                    continue
                    
                var_k_name = f"x_{tail.tail_id}_{flight_k.flight_id}"
                
                # The coupling strength is 2 * D_j * D_k
                coupling_strength = weights['util'] * 2 * flight.duration * flight_k.duration
                bqm.add_quadratic(var_name, var_k_name, coupling_strength)
            

            # 3. H_delay (Delay Penalty)
            for k, flight_k in flights.iterrows():
                if j >= k:
                    continue # Avoid double counting
                
                var_k_name = f"x_{tail.tail_id}_{flight_k.flight_id}"
                delay_pen = get_delay_penalty(flight, flight_k, tail, tail.min_turnaround)
                if delay_pen > 0:
                    bqm.add_quadratic(var_name, var_k_name, weights['delay'] * delay_pen)

    # B. Add Constraints 
    
    # 1. H_C1: Flight Coverage (sum(x_ij) - 1)^2 for each flight j
    for j, flight in flights.iterrows():
        flight_vars = [f"x_{t.tail_id}_{flight.flight_id}" for _, t in tails.iterrows()]
        
        # (sum(vars) - 1)^2 = 2*sum(x_i*x_k) - sum(x_i) + 1 (ignoring constant)
        for var_name in flight_vars:
            bqm.add_linear(var_name, -1 * penalties['C1'])
        
        for i_idx in range(len(flight_vars)):
            for k_idx in range(i_idx + 1, len(flight_vars)):
                bqm.add_quadratic(flight_vars[i_idx], flight_vars[k_idx], 2 * penalties['C1'])

    # 2. H_C2: Aircraft Incompatibility
    for i, tail in tails.iterrows():
        for j, f1 in flights.iterrows():
            for k, f2 in flights.iterrows():
                if j >= k:
                    continue # Avoid double counting
                
                if check_incompatible(f1, f2, tail, tail.min_turnaround):
                    var1 = f"x_{tail.tail_id}_{f1.flight_id}"
                    var2 = f"x_{tail.tail_id}_{f2.flight_id}"
                    bqm.add_quadratic(var1, var2, penalties['C2'])
                    
    return bqm

print("BQM building function.")

BQM building function.


In [81]:
def analyze_solution(sample, flights, tails, costs):
    """Parses a valid sample and calculates the true objective scores."""
    
    assignment = {}
    is_feasible = True
    
    # Check feasibility: Flight Coverage
    for j, flight in flights.iterrows():
        flight_id = flight.flight_id
        tail_assigned = None
        count = 0
        for i, tail in tails.iterrows():
            tail_id = tail.tail_id
            if sample.get(f"x_{tail_id}_{flight_id}", 0) == 1:
                count += 1
                tail_assigned = tail_id
        
        if count != 1:
            # print(f"Feasibility Fail: Flight {flight_id} has {count} assignments.")
            is_feasible = False
            break
        assignment[flight_id] = tail_assigned
    
    if not is_feasible:
        return {'feasible': False}

    # Check feasibility: Incompatibility
    for i, tail in tails.iterrows():
        tail_id = tail.tail_id
        assigned_flights_data = []
        for flight_id, assigned_tail in assignment.items():
            if assigned_tail == tail_id:
                assigned_flights_data.append(flights[flights['flight_id'] == flight_id].iloc[0])
        
        for j in range(len(assigned_flights_data)):
            for k in range(j + 1, len(assigned_flights_data)):
                f1 = assigned_flights_data[j]
                f2 = assigned_flights_data[k]
                if check_incompatible(f1, f2, tail, tail.min_turnaround):
                    # print(f"Feasibility Fail: Tail {tail_id} has incompatible flights {f1.flight_id} and {f2.flight_id}.")
                    is_feasible = False
                    break
            if not is_feasible:
                break
        if not is_feasible:
            break

    if not is_feasible:
        return {'feasible': False}

    # If feasible, calculate scores
    total_cost = 0
    total_delay_penalty = 0
    tail_durations = defaultdict(int)

    for flight_id, tail_id in assignment.items():
        flight = flights[flights['flight_id'] == flight_id].iloc[0]
        total_cost += costs.loc[tail_id, flight_id]
        tail_durations[tail_id] += flight.duration

    # Calculate delay penalty
    for i, tail in tails.iterrows():
        tail_id = tail.tail_id
        assigned_flights_data = []
        for flight_id, assigned_tail in assignment.items():
            if assigned_tail == tail_id:
                assigned_flights_data.append(flights[flights['flight_id'] == flight_id].iloc[0])
        
        for j in range(len(assigned_flights_data)):
            for k in range(j + 1, len(assigned_flights_data)):
                f1 = assigned_flights_data[j]
                f2 = assigned_flights_data[k]
                total_delay_penalty += get_delay_penalty(f1, f2, tail, tail.min_turnaround)
    
    # Calculate utilization score (variance)
    mean_duration = flights['duration'].sum() / len(tails)
    # Ensure all tails are in the dict for variance calculation
    for t in tails['tail_id']:
        _ = tail_durations[t] 
        
    util_variance = sum([(d - mean_duration)**2 for d in tail_durations.values()])
    
    return {
        'feasible': True,
        'assignment': assignment,
        'score_cost': total_cost,
        'score_util_variance': util_variance,
        'score_delay_penalty': total_delay_penalty,
        'tail_durations': dict(tail_durations)
    }

print("Solution analysis function defined.")

Solution analysis function defined.


In [82]:
# Define penalty multipliers (must be > max objective scores)
# A good heuristic is to set them ~1.5-2x the max possible objective score.
# Max cost is ~5*150 = 750. Max util/delay is harder to guess, but 1000 is a safe start.
penalties = {
    'C1': 1000, # Flight coverage
    'C2': 1000  # Incompatibility
}

# Define objective weights to explore the Pareto front
weight_scenarios = {
    "Cost_Focus":    {'cost': 1.0, 'util': 0.0, 'delay': 0.0},
    "Util_Focus":    {'cost': 0.0, 'util': 1.0, 'delay': 0.0},
    "Delay_Focus":   {'cost': 0.0, 'util': 0.0, 'delay': 1.0},
    "Balanced":      {'cost': 0.33, 'util': 0.33, 'delay': 0.33},
}

# Initialize Sampler
sampler = SimulatedAnnealingSampler()

print("Penalties, scenarios, and sampler initialized.")

Penalties, scenarios, and sampler initialized.


In [83]:
print(" Running Multi-Objective Tail Assignment on Simulated Annealer ")

# Store results for comparison
results_log = []

for scenario_name, weights in weight_scenarios.items():
    print(f"\n Scenario: {scenario_name} (Weights: {weights}) ")
    
    # 1. Build BQM
    bqm = build_tap_bqm(flights_df, tails_df, costs_df, weights, penalties)
    
    # 2. Run Simulated Annealer
    response = sampler.sample(bqm, num_reads=100)
    
    # 3. Find and analyze the best feasible solution
    best_feasible_solution = None
    for sample, energy in response.data(['sample', 'energy']):
        result = analyze_solution(sample, flights_df, tails_df, costs_df)
        
        if result['feasible']:
            result['scenario'] = scenario_name
            result['energy'] = energy
            best_feasible_solution = result
            break # Found the ground state (best feasible)
    
    # 4. Print Results
    if best_feasible_solution:
        results_log.append(best_feasible_solution)
        print("  Found Feasible Solution:")
        print(f"    Assignment: {best_feasible_solution['assignment']}")
        print(f"    Tail Durations: {best_feasible_solution['tail_durations']}")
        print(f"    Objective Scores:")
        print(f"      Total Cost:     {best_feasible_solution['score_cost']}")
        print(f"      Util. Variance: {best_feasible_solution['score_util_variance']:.2f}")
        print(f"      Delay Penalty:  {best_feasible_solution['score_delay_penalty']}")
    else:
        print("  No feasible solution found in the samples.")
        print(f"  Lowest energy (infeasible): {response.first.energy}")
        print("  Consider increasing penalty coefficients (gamma) or num_reads.")

print("\n All scenarios complete.")

 Running Multi-Objective Tail Assignment on Simulated Annealer 

 Scenario: Cost_Focus (Weights: {'cost': 1.0, 'util': 0.0, 'delay': 0.0}) 
  Found Feasible Solution:
    Assignment: {'F1': 'T1', 'F2': 'T2', 'F3': 'T2', 'F4': 'T3', 'F5': 'T1'}
    Tail Durations: {'T1': np.int64(180), 'T2': np.int64(225), 'T3': np.int64(120)}
    Objective Scores:
      Total Cost:     585
      Util. Variance: 5550.00
      Delay Penalty:  225

 Scenario: Util_Focus (Weights: {'cost': 0.0, 'util': 1.0, 'delay': 0.0}) 
  Found Feasible Solution:
    Assignment: {'F1': 'T2', 'F2': 'T1', 'F3': 'T1', 'F4': 'T3', 'F5': 'T2'}
    Tail Durations: {'T2': np.int64(180), 'T1': np.int64(225), 'T3': np.int64(120)}
    Objective Scores:
      Total Cost:     595
      Util. Variance: 5550.00
      Delay Penalty:  225

 Scenario: Delay_Focus (Weights: {'cost': 0.0, 'util': 0.0, 'delay': 1.0}) 
  Found Feasible Solution:
    Assignment: {'F1': 'T2', 'F2': 'T1', 'F3': 'T3', 'F4': 'T3', 'F5': 'T2'}
    Tail Durations:

In [84]:
if results_log:
    results_df = pd.DataFrame(results_log)
    results_df = results_df.set_index('scenario')
    
    # Reorder columns for clarity
    columns_of_interest = [
        'score_cost', 
        'score_util_variance', 
        'score_delay_penalty', 
        'assignment', 
        'tail_durations',
        'energy'
    ]
    
    print("\nFinal Results Comparison")
    print(results_df[columns_of_interest])
else:
    print("No results were logged.")


Final Results Comparison
             score_cost  score_util_variance  score_delay_penalty  \
scenario                                                            
Cost_Focus          585               5550.0                  225   
Util_Focus          595               5550.0                  225   
Delay_Focus         620               5550.0                    0   
Balanced            595               5550.0                    0   

                                                    assignment  \
scenario                                                         
Cost_Focus   {'F1': 'T1', 'F2': 'T2', 'F3': 'T2', 'F4': 'T3...   
Util_Focus   {'F1': 'T2', 'F2': 'T1', 'F3': 'T1', 'F4': 'T3...   
Delay_Focus  {'F1': 'T2', 'F2': 'T1', 'F3': 'T3', 'F4': 'T3...   
Balanced     {'F1': 'T2', 'F2': 'T3', 'F3': 'T1', 'F4': 'T1...   

                                tail_durations   energy  
scenario                                                 
Cost_Focus   {'T1': 180, 'T2': 225, 'T3': 120}

In [85]:
import numpy as np
import json

def make_json_serializable(obj):
    if isinstance(obj, np.integer):
        return int(obj)
    if isinstance(obj, np.floating):
        return float(obj)
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    if isinstance(obj, dict):
        return {k: make_json_serializable(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [make_json_serializable(v) for v in obj]
    return obj

json_output_filename = 'annealing_result.json'
clean_log = [make_json_serializable(res) for res in results_log]

with open(json_output_filename, 'w') as f:
    json.dump(clean_log, f, indent=4)

print(f"Full results log saved to {json_output_filename}")

Full results log saved to annealing_result.json
