In [1]:
from dimod import ConstrainedQuadraticModel, BinaryQuadraticModel, QuadraticModel
from dimod import BinaryQuadraticModel

In [None]:
# GPT
def build_knapsack_bqm(PNR, paths, reward, alpha, src, dest, penalty=1000):
    """
    Construct a BQM for the knapsack re-accommodation problem by embedding bqm constraints
    as quadratic penalty terms. This preserves original bqm functionality via soft constraints.

    Args:
        PNR (list): list of passenger objects
        paths (list): list of paths; each path is a list of flight objects
        reward (dict of dict): reward[passenger_id][class] -> score
        alpha (list): path-level score weights
        src (set): allowed source airports
        dest (set): allowed destination airports
        penalty (float): penalty multiplier for constraint violations

    Returns:
        BinaryQuadraticModel: unconstrained BQM equivalent
    """
    # Initialize BQM
    bqm = BinaryQuadraticModel('BINARY')

    # Helper to index variables
    def var(c, flight_id, pax_id, cls):
        return f"x_{c}_{flight_id}_{pax_id}_{cls}"

    # Objective: minimize -alpha[c] * reward
    for c in range(len(paths)):
        for flight in paths[c]:
            for cls in flight.classes:
                for pax in PNR:
                    if reward[pax.ID].get(cls, 0) != 0:
                        bqm.add_variable(var(c, flight.ID, pax.ID, cls),
                                         alpha[c] * (-reward[pax.ID][cls]))

    # Build auxiliary data structures
    flights = {}
    for c, path in enumerate(paths):
        for flt in path:
            flights.setdefault(flt.ID, {'flight': flt, 'paths': []})['paths'].append(c)

    airports = {}
    for flt_id, entry in flights.items():
        flt = entry['flight']
        airports.setdefault(flt.src, {'in': [], 'out': []})['out'].append(flt_id)
        airports.setdefault(flt.dest, {'in': [], 'out': []})['in'].append(flt_id)

    # Constraint penalties
    # 1. Capacity constraints: sum_{pax, c, cls on flight} pax.PAX * x <= capacity
    for flt_id, entry in flights.items():
        cap = entry['flight'].classes
        for cls, cap_val in cap.items():
            # Build linear expr
            # penalty * (max(0, lhs - cap_val))^2 => penalty*(lhs - cap_val)^2 when violated
            # Expand square: penalty*(lhs^2 -2*cap_val*lhs + cap_val^2)
            # Add to BQM: 2-body and linear terms
            # Collect variables
            l_vars = []
            for pax in PNR:
                if reward[pax.ID].get(cls, 0) != 0:
                    for c in entry['paths']:
                        l_vars.append((var(c, flt_id, pax.ID, cls), pax.PAX))
            # Quadratic term
            for i, (v_i, w_i) in enumerate(l_vars):
                # linear self-term from v_i^2 = v_i
                bqm.add_linear(v_i, penalty * (w_i*w_i - 2*cap_val*w_i))
                for v_j, w_j in l_vars[i+1:]:
                    bqm.add_quadratic(v_i, v_j, penalty * (2*w_i*w_j))
            # constant: penalty * cap_val^2 (drops out)

    # 2. Source/dest constraints: <=1
    for pax in PNR:
        # outgoing from src
        src_vars = []
        dest_vars = []
        for flt_id, entry in flights.items():
            flt = entry['flight']
            for cls in flt.classes:
                if reward[pax.ID].get(cls, 0) != 0:
                    for c in entry['paths']:
                        if flt.src in src:
                            src_vars.append(var(c, flt_id, pax.ID, cls))
                        if flt.dest in dest:
                            dest_vars.append(var(c, flt_id, pax.ID, cls))
        # penalty*(sum(src_vars)-1)^2
        def apply_penalty(var_list):
            for i, vi in enumerate(var_list):
                bqm.add_linear(vi, penalty * (-2))
                for vj in var_list[i+1:]:
                    bqm.add_quadratic(vi, vj, penalty * 2)
            # constant penalty contributes cap_val^2
        apply_penalty(src_vars)
        apply_penalty(dest_vars)

    # 3. Path preservation: for each path and passenger, ensure full selection or none
    for pax in PNR:
        for c, path in enumerate(paths):
            # sum_{flt in path} x == len(path)*b_c
            # Use penalty on (sum - len(path)*b_flag)^2, but b_flag can be absorbed
            # Instead enforce sum_{i>0} x_i - (len(path)-1) x_0 == 0
            # penalty*(lhs)^2
            vars_list = []
            first = path[0]
            for flt in path[1:]:
                for cls in flt.classes:
                    if reward[pax.ID].get(cls, 0) != 0:
                        vars_list.append((var(c, flt.ID, pax.ID, cls), 1))
            for cls in first.classes:
                if reward[pax.ID].get(cls, 0) != 0:
                    vars_list.append((var(c, first.ID, pax.ID, cls), -(len(path)-1)))
            # quadratic expand
            for i, (vi, wi) in enumerate(vars_list):
                bqm.add_linear(vi, penalty * (wi*wi))
                for vj, wj in vars_list[i+1:]:
                    bqm.add_quadratic(vi, vj, penalty * (2*wi*wj))

    # 4. Flow conservation at intermediate airports
    for pax in PNR:
        for station, io in airports.items():
            if station in src or station in dest:
                continue
            lhs = []
            # incoming positive
            for flt_id in io['in']:
                flt = flights[flt_id]['flight']
                for cls in flt.classes:
                    if reward[pax.ID].get(cls, 0) != 0:
                        for c in flights[flt_id]['paths']:
                            lhs.append((var(c, flt_id, pax.ID, cls), 1))
            # outgoing negative
            for flt_id in io['out']:
                flt = flights[flt_id]['flight']
                for cls in flt.classes:
                    if reward[pax.ID].get(cls, 0) != 0:
                        for c in flights[flt_id]['paths']:
                            lhs.append((var(c, flt_id, pax.ID, cls), -1))
            # penalty*(sum w_i x_i)^2
            for i, (vi, wi) in enumerate(lhs):
                bqm.add_linear(vi, penalty * (wi*wi))
                for vj, wj in lhs[i+1:]:
                    bqm.add_quadratic(vi, vj, penalty * (2*wi*wj))

    return bqm


In [None]:
# Manual
def build_knapsack_bqm(PNR, paths, reward, alpha, src, dest, **kwargs):
    """Construct a bqm for the knapsack problem.

    Args:
        PNR (array-like):
            Array of passengers as a objects.
        paths :
            Array of paths and a path is a array of flights as objects.
        reward:
            Dict of dict of passengers for a score of each class
        alpha:
            Score for each path

    Returns:
        Constrained quadratic model instance that represents the knapsack problem.
    """
    num_PNR = len(PNR)
    
    print(f"\nBuilding a BQM for {num_PNR} PNRs and {len(paths)} paths")

    bqm = BinaryQuadraticModel(vartype='BINARY')
    
    flights = {}  #Dictionary keeping Track of flights and their corresponding paths
    for c in range(len(paths)):
        for flt in paths[c]:
            if flt.ID not in flights:
                    flights[flt.ID] = [flt,[c]]
            else:
                flights[flt.ID][1].append(c)
    
    for ID in flights:
        for cls in flights[ID][0].classes:
            constraint = QuadraticModel()
            for passengers in PNR:
                if reward[passengers.ID][cls] != 0:
                    for c in flights[ID][1]:
                        constraint.add_variable('BINARY', (c, ID, passengers.ID, cls))
                        bqm.add_linear((c, ID, passengers.ID, cls), alpha[c]*( -reward[passengers.ID][cls]))
                        constraint.set_linear((c, ID, passengers.ID, cls), passengers.PAX)
                    #Capacity Constrainst of a class in a particular flight
            bqm.add_linear_inequality_constraint(constraint, lagrange_multiplier=kwargs.get('lagrange_9'), lb=0, ub=flights[ID][0].classes[cls], label=f'Capacity of class {cls} in {ID}')

    
    for passengers in PNR:
        # Summation of outgoing and incoming flights from src and dest respectively for a particular passenger in all classes is less than equal to 1
        constraint1 = QuadraticModel()
        constraint2 = QuadraticModel()
        for ID in flights:
            if flights[ID][0].src in src:
                for cls in flights[ID][0].classes:
                    if reward[passengers.ID][cls] != 0: 
                        for c in flights[ID][1]: 
                            constraint1.add_variable('BINARY', (c, ID, passengers.ID, cls))
                            constraint1.set_linear((c, ID, passengers.ID, cls), 1)
            if flights[ID][0].dest in dest:
                for cls in flights[ID][0].classes:
                    if reward[passengers.ID][cls] != 0: 
                        for c in flights[ID][1]: 
                            constraint2.add_variable('BINARY', (c, ID, passengers.ID, cls))
                            constraint2.set_linear((c, ID, passengers.ID, cls), 1)
                            
        bqm.add_linear_inequality_constraint(constraint1, lagrange_multiplier=kwargs.get('lagrange_10'), lb=0, ub=1, label = f"Outgoing Flights from src for passenger {passengers.ID}")
        bqm.add_linear_inequality_constraint(constraint2, lagrange_multiplier=kwargs.get('lagrange_11'), lb=0, ub=1, label = f"Incoming Flights to dest for passenger {passengers.ID}")
    
    airports = {} #Dictionary Keeping track of Flights incoming and outgoing at a particular airport for a particular path
    for flight in flights:
        if flights[flight][0].src not in airports:
            incoming = [flight]
            outcoming = []
            airports[flights[flight][0].src] = (incoming, outcoming)
        else:
            airports[flights[flight][0].src][0].append(flight)
            
        if flights[flight][0].dest not in airports:
            incoming = []
            outcoming = [flight]
            airports[flights[flight][0].dest] = (incoming, outcoming)
        else:
            airports[flights[flight][0].dest][1].append(flight)
    
    for passengers in PNR: #Summation of a path is equal to its length if started on
        for c in range(len(paths)):
            constraint = QuadraticModel()
            for flt in range(1,len(paths[c])):
                flight = paths[c][flt]
                for cls in flight.classes:
                    if reward[passengers.ID][cls] != 0:
                        constraint.add_variable('BINARY', (c, flight.ID, passengers.ID, cls))
                        constraint.set_linear((c, flight.ID, passengers.ID, cls), 1)
            
            for cls in paths[c][0].classes:
                flight = paths[c][0]
                if reward[passengers.ID][cls] != 0:
                        constraint.add_variable('BINARY', (c, flight.ID, passengers.ID, cls))
                        constraint.set_linear((c, flight.ID, passengers.ID, cls), -(len(paths[c])-1))
                        
            bqm.add_linear_equality_constraint(constraint, lagrange_multiplier=kwargs.get('lagrange_12'), constant=0, label = f'Path Preservance of {passengers.ID} for {c} path')
                
    for passengers in PNR:
        #Summation of the incoming flights at a airport for a passenger is than equal to the summation of outgoing flights (Path Preservence)
        for station in airports:
            if station not in src and station not in dest:
                constraint = QuadraticModel()
                for flt in airports[station][0]:
                    for cls in flights[flt][0].classes:
                        if reward[passengers.ID][cls] != 0: 
                            for c in flights[flt][1]:
                                constraint.add_variable('BINARY', (c, flt, passengers.ID, cls))
                                constraint.set_linear((c, flt, passengers.ID, cls), 1)
                                
                for flt in airports[station][1]:
                    for cls in flights[flt][0].classes:
                        if reward[passengers.ID][cls] != 0: 
                            for c in flights[flt][1]:
                                constraint.add_variable('BINARY', (c, flt, passengers.ID, cls))
                                constraint.set_linear((c, flt, passengers.ID, cls), -1)
                                
                bqm.add_linear_equality_constraint(constraint, lagrange_multiplier=kwargs.get('lagrange_13'), constant=0, label = f'Path Preservance of {passengers.ID} at airport {station}')
        
    return bqm

