In [2]:
import numpy as np
import gurobipy as gp
from gurobipy import GRB
import sys

In [12]:
def compute_expected_defender_utilities(attacker_distribution, attacker_actions, target_values_matrix, all_targets):
    """
    Computes expected defender utility for each target given attacker distribution over attack targets.

    Parameters:
        attacker_distribution (list[float]): Probabilities for attacking each action (index corresponds to attacker_actions).
        attacker_actions (list[int]): List of target nodes corresponding to attacker strategies.
        target_values_matrix (np.ndarray): 4 x num_targets matrix.
            Row 0: defender utility if target is uncovered.
            Row 1: defender utility if target is covered.
        all_targets (list[int]): Full list of target node IDs.

    Returns:
        expected_utilities (dict[int, float]): Mapping from target index to expected defender utility.
    """
    expected_utilities = {t: 0.0 for t in all_targets}

    for i, target in enumerate(attacker_actions):
        print(target)
        prob = attacker_distribution[i]
        print("prob of attack")
        print(prob)
        u_covered = target_values_matrix[1, i]
        print("u covered")
        print(u_covered)
        u_uncovered = target_values_matrix[0, i]
        expected_utilities[target] = prob * u_covered
        print("expected value")
        print(prob * u_covered)

    return expected_utilities

In [2]:
def get_score(target, schedule_assignment, target_utilities, target_inds):
    """
    Returns the defender utility for a given target and defender schedule assignment.
    
    Args:
        ind (int): Index of target in targets (node may not = ind)
        target (int): Target node to evaluate.
        schedule_assignment (list[set]): One schedule per defender (list of sets of targets).
        target_utilities (np.ndarray): 4 x num_targets utility matrix.
                                       Row 0: Defender uncovered
                                       Row 1: Defender covered
                                       Row 2: Attacker covered
                                       Row 3: Attacker uncovered

    Returns:
        float: Defender utility value for the target, depending on whether it is covered.
    """
    is_covered = any(target in schedule for schedule in schedule_assignment)
    if is_covered:
        return target_utilities[1][target_inds[target]]  # Defender covered utility
    else:
        return target_utilities[0][target_inds[target]]  # Defender uncovered utility

In [3]:
def defender_best_response_schedule_form(resources, schedules_by_resource, expected_target_values):
    """
    Defender best response in schedule-form double oracle using attacker-strategy-dependent expected target values.

    Parameters:
        resources: list of resource IDs (e.g., [0, 1, 2])
        schedules_by_resource: dict {r: list of (set, cost)} where each set is a schedule of target nodes
        expected_target_values: dict {t: float}, representing the attack-weighted expected utility of defending t

    Returns:
        selected_schedules: list of sets, one per resource (empty set if no schedule available)
        covered_targets: list of targets covered
        defender_utility: float, total expected defender utility
    """
    model = gp.Model("DefenderBR_ScheduleForm")
    model.setParam("OutputFlag", 0)

    # Collect all unique targets from all schedules
    all_targets = set()
    for scheds in schedules_by_resource.values():
        for s, _ in scheds:
            all_targets.update(s)

    # Identify usable resources (non-empty schedule lists)
    usable_resources = [r for r in resources if len(schedules_by_resource[r]) > 0]
    skipped_resources = [r for r in resources if r not in usable_resources]

    # Decision variables
    x = {}
    for r in usable_resources:
        for i in range(len(schedules_by_resource[r])):
            x[r, i] = model.addVar(vtype=GRB.BINARY, name=f"x_{r}_{i}")

    g = {}
    for t in all_targets:
        g[t] = model.addVar(vtype=GRB.BINARY, name=f"g_{t}")

    # Constraint 1: Each usable resource picks exactly one schedule
    for r in usable_resources:
        model.addConstr(gp.quicksum(x[r, i] for i in range(len(schedules_by_resource[r]))) == 1)

    # Constraint 2: Coverage logic
    for t in all_targets:
        model.addConstr(
            g[t] <= gp.quicksum(
                x[r, i]
                for r in usable_resources
                for i, (sched, _) in enumerate(schedules_by_resource[r])
                if t in sched
            )
        )

    # Objective: minimize expected utility from uncovered targets (zero-sum game setting)
    model.setObjective(
        gp.quicksum(expected_target_values[t] * g[t] for t in all_targets),
        GRB.MINIMIZE
    )

    model.optimize()

    if model.Status != GRB.OPTIMAL:
        print("Warning: Model did not solve to optimality.")
        return None, None, None

    # Retrieve selected schedules
    selected_schedules = {}
    for r in usable_resources:
        for i in range(len(schedules_by_resource[r])):
            if x[r, i].X > 0.5:
                selected_schedules[r] = schedules_by_resource[r][i][0]
                break

    # Add empty sets for skipped resources
    for r in skipped_resources:
        selected_schedules[r] = set()

    # Return results in original resource order
    selected_schedule_list = [selected_schedules[r] for r in resources]
    covered_targets = [t for t in all_targets if g[t].X > 0.5]
    utility = model.ObjVal

    return selected_schedule_list, covered_targets, utility

In [17]:
def attacker_best_response(all_targets, defender_distribution, defender_actions, target_values_matrix, negative=True):
    """
    Computes the best target for the attacker to attack against a defender mixed strategy.

    Parameters:
        all_targets (list[int]): All possible targets in the game (not limited to subgame).
        defender_distribution (list[float]): Probabilities over defender schedule assignments.
        defender_actions (list[list[set]]): List of defender strategies, each is a list of sets (schedules for each defender).
        target_values_matrix (np.ndarray): 4 x num_targets matrix.
        negative (bool): If True, returns negative utility for attacker (for zero-sum compatibility).

    Returns:
        best_target (int): Target with highest expected attacker utility.
        best_value (float): Expected value of attacking best_target against defender strategy.
    """
    # Step 1: Compute marginal coverage probability for each target
    target_coverage = {t: 0.0 for t in all_targets}
    for i, defender_action in enumerate(defender_actions):
        prob = defender_distribution[i]
        covered_targets = set().union(*defender_action)
        for t in covered_targets:
            if t in target_coverage:
                target_coverage[t] += prob

    # Step 2: Compute expected value for attacker for each target
    expected_utilities = {}
    for i,t in enumerate(all_targets):
        p = target_coverage.get(t, 0.0)
        u_att_covered = target_values_matrix[2, i]
        u_att_uncovered = target_values_matrix[3, i]
        expected_utilities[t] = p * u_att_covered + (1 - p) * u_att_uncovered

    # Step 3: Return target with highest expected utility
    best_target = max(expected_utilities, key=expected_utilities.get)
    best_value = expected_utilities[best_target]

    if negative:
        best_value = -best_value

    return best_target, best_value

In [18]:
D_a = [0.08910124, 0, 0.1084381, 0.1225142, 0.17454078, 0, 0.2426948, 0.26271087]
A_a = [0, 7, 6, 3, 2, 9, 5, 4]
D_d = [0.0, 0.0, 0.0, 0.0, 0.0, 0.46071461993866675, 0.0, 0.16325256242768255, 0.0, 0.150358340435715, 0.0, 0.0831544848626502, 0.0747189027026951, 0.0, 0.06780108963259046, 0.0]
A_d = [[{0}, {0}], [{1, 2}, {0, 3}], [{0, 7}, {0}], [{1, 6, 9}, {0}], [{1, 6}, {3, 7}], [{0, 3, 7}, {6}], [{1, 2}, {3, 7}], [{1, 2, 9}, {0, 3, 7}], [{9, 2, 6}, {9, 2, 6}], [{0, 7}, {9, 2, 6}], [{1, 5, 9}, {0}], [{9, 2, 6}, {1, 5, 9}], [{5}, {0, 3}], [{8, 4}, {0}], [{9, 2, 6}, {4}], [{8, 4}, {1, 5}]]
target_utilities = np.vstack([[-286, -90, -146, -208, -97, -105, -235, -241, -48, -136],
 [-57.2, -18, -29.2, -41.6, -19.4, -21, -47, -48.2, -9.6, -27.2],
 [57.2, 18, 29.2, 41.6, 19.4, 21, 47, 48.2, 9.6, 27.2],
 [286, 90, 146, 208, 97, 105, 235, 241, 48, 136]])
targets = [0,1,2,3,4,5,6,7,8,9]

BR_a, u_BRa_Dd = attacker_best_response(targets, D_d, A_d, target_utilities)
BR_a, u_BRa_Dd

(0, -91.73863544451106)

In [None]:
expected_target_values = compute_expected_defender_utilities(D_a, A_a, target_utilities, targets)

In [16]:
schedules = {0: [({0}, 0), ({0, 3}, 0), ({0, 3, 7}, 0), ({0, 7}, 0), ({3}, 0), ({3, 7}, 0), ({7}, 0), ({4}, 0), ({8, 4}, 0), ({8}, 0), ({1}, 0), ({1, 2}, 0), ({1, 2, 9}, 0), ({1, 5}, 0), ({1, 5, 9}, 0), ({1, 6}, 0), ({1, 6, 9}, 0), ({1, 9}, 0), ({2}, 0), ({2, 6}, 0), ({9, 2, 6}, 0), ({9, 2}, 0), ({5}, 0), ({9, 5}, 0), ({6}, 0), ({9, 6}, 0), ({9}, 0)], 1: [({0}, 0), ({0, 3}, 0), ({0, 3, 7}, 0), ({0, 7}, 0), ({3}, 0), ({3, 7}, 0), ({7}, 0), ({4}, 0), ({8, 4}, 0), ({8}, 0), ({1}, 0), ({1, 2}, 0), ({1, 2, 9}, 0), ({1, 5}, 0), ({1, 5, 9}, 0), ({1, 6}, 0), ({1, 6, 9}, 0), ({1, 9}, 0), ({2}, 0), ({2, 6}, 0), ({9, 2, 6}, 0), ({9, 2}, 0), ({5}, 0), ({9, 5}, 0), ({6}, 0), ({9, 6}, 0), ({9}, 0)]}
BR_d, _, _ = defender_best_response_schedule_form([0,1], schedules, expected_target_values)
BR_d

Set parameter WLSAccessID
Set parameter WLSSecret
Set parameter LicenseID to value 2455389
Academic license 2455389 - for non-commercial use only - registered to jc___@columbia.edu


[{4, 8}, {1, 5}]

In [20]:
target_inds = {0:0, 7:1, 6:2, 3:3, 2:4, 9:5, 5:6, 4:7}
u_BRd_Da = sum(
            D_a[i] * (target_utilities[1, target_inds[target]] if any(target in s for s in BR_d) else target_utilities[0, target_inds[target]])
            for i, target in enumerate(A_a)
        )

In [21]:
u_BRd_Da

-107.797646034

In [22]:
def compute_defender_utility_against_attacker_distribution(BR_d, A_a, D_a, target_utilities, target_inds, extra_coverage_weight=1.0):
    utility = 0.0
    for prob, target in zip(D_a, A_a):
        num_covers = sum(target in sched for sched in BR_d)m
        if num_covers == 0:
            score = target_utilities[0][target_inds[target]]  # uncovered utility
        else:
            score = target_utilities[1][target_inds[target]]  # covered utility (weight=1 so doesn't matter)
        utility += prob * score
    return utility

In [23]:
compute_defender_utility_against_attacker_distribution(BR_d, A_a, D_a, target_utilities, target_inds, extra_coverage_weight=1.0)

-107.797646034

In [6]:
def generate_defender_actions(schedule_dict):
    """
    Generates all possible joint defender actions from a dictionary mapping
    each defender to their list of (schedule, cost) tuples.

    Defenders with no schedules are assigned an empty set as a placeholder.

    Returns:
        defender_actions: list of lists of sets (each inner list is one full defender action)
    """
    sorted_defenders = sorted(schedule_dict.keys())

    # Use a default empty set if a defender has no available schedules
    schedule_lists = [
        schedule_dict[d] if schedule_dict[d] else [({}, 0)]  # dummy no-op schedule
        for d in sorted_defenders
    ]

    all_combinations = list(itertools.product(*schedule_lists))
    defender_actions = []

    for combo in all_combinations:
        schedules = [item[0] for item in combo]  # Extract schedule (ignore cost)
        defender_actions.append(schedules)

    return defender_actions

In [76]:
import itertools
import numpy as np

def dbr_test(all_defender_actions, current_attacker_actions, D_a, udc, uduc):
    evs = []
    for da in all_defender_actions:
        coverage = list(itertools.chain.from_iterable(da))
        ev = 0
        for i,t in enumerate(current_attacker_actions):
            if t in coverage:
                ev += udc[t]*D_a[i] #add weighted defender covered value
            else:
                ev += uduc[t]*D_a[i] #add weighted defender uncovered value
        evs.append(ev)
    return all_defender_actions[np.argmax(np.array(evs))], max(evs)


def abr_test(all_attacker_actions, current_defender_actions, D_d, uac, uauc):
    evs = []
    for t in all_attacker_actions:
        ev = 0
        # print(current_defender_actions)
        for i,da in enumerate(current_defender_actions):
            coverage = list(itertools.chain.from_iterable(da))
            if t in coverage:
                # print(coverage)
                # print(t)
                # print(i)
                ev += uac[t]*D_d[i] #add weighted attacker covered value
            else:
                ev += uauc[t]*D_d[i] #add weighted attacker uncovered value
        evs.append(ev)
    return all_attacker_actions[np.argmax(np.array(evs))], -max(evs)

In [67]:
schedules = {0: [({0}, 0), ({0, 3}, 0), ({0, 3, 7}, 0), ({0, 7}, 0), ({3}, 0), ({3, 7}, 0), ({7}, 0), ({4}, 0), ({8, 4}, 0), ({8}, 0), ({1}, 0), ({1, 2}, 0), ({1, 2, 9}, 0), ({1, 5}, 0), ({1, 5, 9}, 0), ({1, 6}, 0), ({1, 6, 9}, 0), ({1, 9}, 0), ({2}, 0), ({2, 6}, 0), ({9, 2, 6}, 0), ({9, 2}, 0), ({5}, 0), ({9, 5}, 0), ({6}, 0), ({9, 6}, 0), ({9}, 0)], 1: [({0}, 0), ({0, 3}, 0), ({0, 3, 7}, 0), ({0, 7}, 0), ({3}, 0), ({3, 7}, 0), ({7}, 0), ({4}, 0), ({8, 4}, 0), ({8}, 0), ({1}, 0), ({1, 2}, 0), ({1, 2, 9}, 0), ({1, 5}, 0), ({1, 5, 9}, 0), ({1, 6}, 0), ({1, 6, 9}, 0), ({1, 9}, 0), ({2}, 0), ({2, 6}, 0), ({9, 2, 6}, 0), ({9, 2}, 0), ({5}, 0), ({9, 5}, 0), ({6}, 0), ({9, 6}, 0), ({9}, 0)]}
all_defender_actions = generate_defender_actions(schedules)
all_attacker_actions = [0,1,2,3,4,5,6,7,8,9]

In [68]:
D_a = [0.08910124, 0, 0.1084381, 0.1225142, 0.17454078, 0, 0.2426948, 0.26271087]
A_a = [0, 7, 6, 3, 2, 9, 5, 4]
D_d = [0.0, 0.0, 0.0, 0.0, 0.0, 0.46071461993866675, 0.0, 0.16325256242768255, 0.0, 0.150358340435715, 0.0, 0.0831544848626502, 0.0747189027026951, 0.0, 0.06780108963259046, 0.0]
A_d = [[{0}, {0}], [{1, 2}, {0, 3}], [{0, 7}, {0}], [{1, 6, 9}, {0}], [{1, 6}, {3, 7}], [{0, 3, 7}, {6}], [{1, 2}, {3, 7}], [{1, 2, 9}, {0, 3, 7}], [{9, 2, 6}, {9, 2, 6}], [{0, 7}, {9, 2, 6}], [{1, 5, 9}, {0}], [{9, 2, 6}, {1, 5, 9}], [{5}, {0, 3}], [{8, 4}, {0}], [{9, 2, 6}, {4}], [{8, 4}, {1, 5}]]
target_utilities = np.vstack([[-286, -90, -146, -208, -97, -105, -235, -241, -48, -136],
 [-57.2, -18, -29.2, -41.6, -19.4, -21, -47, -48.2, -9.6, -27.2],
 [57.2, 18, 29.2, 41.6, 19.4, 21, 47, 48.2, 9.6, 27.2],
 [286, 90, 146, 208, 97, 105, 235, 241, 48, 136]])
targets = [0,1,2,3,4,5,6,7,8,9]

udc = {i:target_utilities[1][i] for i in range(10)}
uduc = {i:target_utilities[0][i] for i in range(10)}
uac = {i:target_utilities[2][i] for i in range(10)}
uauc = {i:target_utilities[3][i] for i in range(10)}


In [69]:
dbr_test(all_defender_actions, A_a, D_a, udc, uduc)

([{0, 3}, {2, 6}], -71.35227151400001)

In [70]:
abr_test(all_attacker_actions, A_d, D_d, uac, uauc)

(0, -48.00000000000001)

In [72]:
def generate_zero_sum_schedule_game_matrix(attacker_actions, defender_actions, udc, uduc):
    n = len(attacker_actions)
    m = len(defender_actions)
    U = np.zeros((n,m))

    for i, da in enumerate(defender_actions):
        coverage = list(itertools.chain.from_iterable(da))
        for j, t in enumerate(attacker_actions):
            if t in coverage:
                U[i,j] = udc[t]
            else:
                U[i,j] = uduc[t]

    return U

def get_score(target, schedule_assignment, udc, uduc):
    if target in itertools.chain.from_iterable(schedule_assignment):
        return udc[target]
    return uduc[target]

def expand_subgame(U, A_a, A_d, BR_a_in_U, BR_d_in_U, udc,uduc):
    n, m = U.shape  # Get current matrix size
    
    if BR_a_in_U and BR_d_in_U:
        return U  # No expansion needed
        
    # Create new expanded matrix with placeholder values (assuming scores are non-positive)
    if not BR_a_in_U:
        new_m = m + 1
    else:
        new_m = m

    if not BR_d_in_U:
        new_n = n + 1
    else:
        new_n = n
        
    new_U = np.full((new_n, new_m), fill_value=1, dtype=U.dtype)
    
    # Copy over the existing values
    new_U[:n, :m] = U 

    # Compute new **column** (if A_a expanded)
    if not BR_a_in_U:
        for i in range(new_n):  # Iterate over all rows (old + new)
            new_U[i, new_m-1] = get_score(A_a[-1], A_d[i], udc, uduc)
    
    # Compute new **row** (if A_d expanded)
    if not BR_d_in_U:
        for i in range(new_m):  # Iterate over all columns (old + new)
            new_U[new_n-1, i] = get_score(A_a[i], A_d[-1], udc, uduc)

    return new_U

In [84]:
from solvers.nash import nash
def double_oracle_sf_test(all_defender_actions, all_attacker_actions, udc, uduc, uac, uauc, eps=1e-12):
    A_d = all_defender_actions[:1]
    A_a = all_attacker_actions[:1]
    print(A_d,A_a)
    U_subgame = generate_zero_sum_schedule_game_matrix(A_a, A_d, udc, uduc)
    gap = np.inf

    while gap > eps:
        print(U_subgame)
        BR_a_in_U = False
        BR_d_in_U = False
        print("A_a, A_d")
        print(A_a,A_d)
        print("Da, Dd, u")
        D_a, D_d, u_s = nash(U_subgame)
        print(D_a, D_d, u_s)
        BR_a, u_BRa_Dd = abr_test(all_attacker_actions, A_d, D_d, uac, uauc)
        BR_d, u_BRd_Da = dbr_test(all_defender_actions, A_a, D_a, udc, uduc)
        print("BR a, BR d")
        print(BR_a,BR_d)
        print("U BRa Dd, U BRd, Da")
        print(u_BRa_Dd,u_BRd_Da)
        gap = abs(u_BRa_Dd - u_BRd_Da)

        if BR_a not in A_a:
            A_a.append(BR_a)
        else:
            BR_a_in_U = True

        for existing in A_d:
            if all(sched in existing for sched in BR_d) and all(sched in BR_d for sched in existing):
                BR_d_in_U = True
                break
        
        if not BR_d_in_U:
            A_d.append(BR_d)
            
        print("BR_a_in_U,BR_d_in_U")
        print(BR_a_in_U,BR_d_in_U)
        U_subgame = expand_subgame(U_subgame, A_a, A_d, BR_a_in_U, BR_d_in_U, udc,uduc)
    return D_a, D_d, u_s, A_a, A_d

In [85]:
D_a, D_d, u_s, A_a, A_d = double_oracle_sf_test(all_defender_actions, all_attacker_actions, udc, uduc, uac, uauc)

[[{0}, {0}]] [0]
[[-57.2]]
A_a, A_d
[0] [[{0}, {0}]]
Da, Dd, u
[1.] [1.0] -57.2
BR a, BR d
7 [{0}, {0}]
U BRa Dd, U BRd, Da
-241.0 -57.2
BR_a_in_U,BR_d_in_U
False True
[[ -57.2 -241. ]]
A_a, A_d
[0, 7] [[{0}, {0}]]
Da, Dd, u
[0. 1.] [1.0] -241.0
BR a, BR d
7 [{0}, {0, 3, 7}]
U BRa Dd, U BRd, Da
-241.0 -48.2
BR_a_in_U,BR_d_in_U
True False
[[ -57.2 -241. ]
 [ -57.2  -48.2]]
A_a, A_d
[0, 7] [[{0}, {0}], [{0}, {0, 3, 7}]]
Da, Dd, u
[1. 0.] [0.0, 1.0] -57.2
BR a, BR d
6 [{0}, {0}]
U BRa Dd, U BRd, Da
-235.0 -57.2
BR_a_in_U,BR_d_in_U
False True
[[ -57.2 -241.  -235. ]
 [ -57.2  -48.2 -235. ]]
A_a, A_d
[0, 7, 6] [[{0}, {0}], [{0}, {0, 3, 7}]]
Da, Dd, u
[0. 0. 1.] [0.0, 1.0] -235.0
BR a, BR d
6 [{0}, {1, 6}]
U BRa Dd, U BRd, Da
-235.0 -47.0
BR_a_in_U,BR_d_in_U
True False
[[ -57.2 -241.  -235. ]
 [ -57.2  -48.2 -235. ]
 [ -57.2 -241.   -47. ]]
A_a, A_d
[0, 7, 6] [[{0}, {0}], [{0}, {0, 3, 7}], [{0}, {1, 6}]]
Da, Dd, u
[0.         0.49369748 0.50630252] [0.0, 0.5094537815126051, 0.490546218487394

In [75]:
uauc

{0: 286.0,
 1: 90.0,
 2: 146.0,
 3: 208.0,
 4: 97.0,
 5: 105.0,
 6: 235.0,
 7: 241.0,
 8: 48.0,
 9: 136.0}

In [83]:
brd = [{0}, {0, 3, 7}]
A_d = [[{0}, {0}]]
for existing in A_d:
    print(all(sched in existing for sched in brd) and all(sched in brd for sched in existing))

False
