In [2]:
import numpy as np
from scipy.optimize import linprog
from typing import List, Tuple, Dict

class Node:
    def __init__(self, fixed: List[int], bounds: List[Tuple[int, int]]):
        self.fixed = fixed
        self.bounds = bounds

def read_cor_file(file_path: str) -> Tuple[int, int, List[float], Dict[Tuple[int, int], float], int]:
    with open(file_path, 'r') as f:
        lines = f.readlines()

    num_servers = 5
    num_clients = 25
    fixed_costs = [0] * num_servers
    revenues = {}
    capacity = 188

    for line in lines:
        parts = line.split()
        if len(parts) >= 3:
            if parts[0].startswith('x_') and parts[1] == 'obj':
                server = int(parts[0].split('_')[1]) - 1
                fixed_costs[server] = float(parts[2])
            elif parts[0].startswith('x_') and parts[1] == 'c':
                capacity = float(parts[2])
            elif parts[0].startswith('y_') and parts[1] == 'obj':
                client, server = map(int, parts[0].split('_')[1:3])
                client -= 1
                server -= 1
                revenue = -float(parts[2])
                revenues[(client, server)] = revenue

    return num_servers, num_clients, fixed_costs, revenues, capacity

def read_sto_file(file_path: str) -> Dict[str, Dict]:
    with open(file_path, 'r') as f:
        lines = f.readlines()

    scenarios = {}
    current_scenario = None
    scenario_prob = None

    for line in lines:
        parts = line.split()
        if len(parts) >= 4 and parts[0] == 'SC':
            current_scenario = parts[1]
            scenario_prob = float(parts[3])
            scenarios[current_scenario] = {'prob': scenario_prob, 'clients': {}}
        elif len(parts) == 4 and parts[0] == 'RHS':
            client = int(parts[1][1:]) - 7
            availability = int(parts[3])
            scenarios[current_scenario]['clients'][client] = availability

    return scenarios

def solve_lp(num_servers: int, num_clients: int, fixed_costs: List[float], revenues: Dict[Tuple[int, int], float], 
             capacity: int, scenarios: Dict[str, Dict], fixed: List[int], bounds: List[Tuple[int, int]]) -> Tuple[float, np.ndarray]:
    num_scenarios = len(scenarios)
    
    # Objective function coefficients
    c = np.array(fixed_costs + [0] * (num_clients * num_servers * num_scenarios))
    
    # Constraints matrix
    A_ub = []
    b_ub = []
    
    # Server capacity constraints
    for s in range(num_scenarios):
        for j in range(num_servers):
            row = [0] * num_servers + [0] * (num_clients * num_servers * s) + [1 if i == j else 0 for i in range(num_servers) for _ in range(num_clients)] + [0] * (num_clients * num_servers * (num_scenarios - s - 1))
            A_ub.append(row)
            b_ub.append(capacity)
    
    # Client assignment constraints
    for s, scenario in enumerate(scenarios.values()):
        for i in range(num_clients):
            row = [0] * num_servers + [0] * (num_clients * num_servers * s) + [1 if k == i else 0 for _ in range(num_servers) for k in range(num_clients)] + [0] * (num_clients * num_servers * (num_scenarios - s - 1))
            A_ub.append(row)
            b_ub.append(scenario['clients'].get(i, 0))
    
    # Maximum number of servers constraint
    A_ub.append([1] * num_servers + [0] * (num_clients * num_servers * num_scenarios))
    b_ub.append(5)
    
    # Convert to numpy arrays
    A_ub = np.array(A_ub)
    b_ub = np.array(b_ub)
    
    # Bounds
    x_bounds = [(0, 1) for _ in range(num_servers)]
    y_bounds = [(0, 1) for _ in range(num_clients * num_servers * num_scenarios)]
    
    # Apply fixed variables and bounds
    for i, val in enumerate(fixed):
        if val is not None:
            x_bounds[i] = (val, val)
    
    for i, (lb, ub) in enumerate(bounds):
        y_bounds[i] = (lb, ub)
    
    # Solve LP
    res = linprog(c, A_ub=A_ub, b_ub=b_ub, bounds=x_bounds + y_bounds, method='highs')
    
    return res.fun, res.x

def branch_and_cut(num_servers: int, num_clients: int, fixed_costs: List[float], revenues: Dict[Tuple[int, int], float], 
                   capacity: int, scenarios: Dict[str, Dict]) -> Tuple[float, np.ndarray]:
    best_obj = float('inf')
    best_sol = None
    nodes = [Node(fixed=[None]*num_servers, bounds=[(0, 1)]*(num_clients*num_servers*len(scenarios)))]
    
    while nodes:
        node = nodes.pop(0)
        obj, sol = solve_lp(num_servers, num_clients, fixed_costs, revenues, capacity, scenarios, node.fixed, node.bounds)
        
        if obj >= best_obj:
            continue
        
        if all(x in [0, 1] for x in sol[:num_servers]):
            best_obj = obj
            best_sol = sol
            continue
        
        # Branch on the first non-integer x variable
        for i in range(num_servers):
            if node.fixed[i] is None and sol[i] not in [0, 1]:
                new_fixed_0 = node.fixed.copy()
                new_fixed_0[i] = 0
                nodes.append(Node(fixed=new_fixed_0, bounds=node.bounds))
                
                new_fixed_1 = node.fixed.copy()
                new_fixed_1[i] = 1
                nodes.append(Node(fixed=new_fixed_1, bounds=node.bounds))
                
                break
    
    return best_obj, best_sol

def main():
    cor_file_path = '/Users/vivekchaudhary/Documents/progressive_hedging/sslp/sslp_5_25_50.cor'
    sto_file_path = '/Users/vivekchaudhary/Documents/progressive_hedging/sslp/sslp_5_25_50.sto'

    num_servers, num_clients, fixed_costs, revenues, capacity = read_cor_file(cor_file_path)
    scenarios = read_sto_file(sto_file_path)

    obj, sol = branch_and_cut(num_servers, num_clients, fixed_costs, revenues, capacity, scenarios)

    print("Optimal objective value:", obj)
    print("Opened servers:")
    for j in range(num_servers):
        if sol[j] > 0.5:
            print(f"Server {j+1}")

if __name__ == "__main__":
    main()

Optimal objective value: 0.0
Opened servers:
