In [14]:
import numpy as np
import matplotlib.pyplot as plt
import nashpy as nash
import itertools
import cvxpy as cp

In [15]:
# # Defining MDP - old game

# state_names = ["s1", "s2", "s3", "s4"]
# risk_levels = np.linspace(0,10,100)
# states = []
# for name in state_names:
#     for risk in risk_levels:
#         states.append((name, risk))

# human_actions = [("c",0), ("c",1), ("d",0), ("d",1)]
# robot_actions = [("c",0), ("c",1), ("d",0), ("d",1)]

# def transition(state, uH, uR, dr):
#     s = state[0]
#     r = state[1] + dr
#     if s == "s1":
#         if 0 in uR:
#             return ("s2",r)
#         elif 1 in uR:
#             return ("s1",r)
#     elif s == "s2":
#         if 0 in uH:
#             return ("s3",r)
#         elif 1 in uH:
#             return ("s1",r)
#     elif s == "s3":
#         return ("s3",r)
#     elif s == "s4":
#         if 1 in uH and 1 in uR:
#             return ("s2",r)
#         else:
#             return ("s4",r)
#     else:
#         print("Incorrect state")
        
# def prisoners_dilemma(uH,uR):
#     if uH == "c" and uR == "c":
#         return (3,3)
#     if uH == "d" and uR == "d":
#         return (1,1)
#     if uH == "c" and uR == "d":
#         return (0,5)
#     if uH == "d" and uR == "c":
#         return (5,0)
    
# def pay_or_punish(uH,uR):
#     if uR == "c":
#         return (10,10)
#     if uR == "d":
#         return (-10,-10)
    
# def social(t):
#         return (t[0],t[1],t[0]+t[1])
        
# def reward(state, uH, uR):
#     s = state[0]
#     if s == "s1":
#         r = prisoners_dilemma(uH[0], uR[0])
#     elif s == "s2":
#         r = prisoners_dilemma(uH[0], uR[0])
#     elif s == "s3":
#         r = pay_or_punish(uH[0],uR[0])
#     elif s == "s4":
#         r = prisoners_dilemma(uH[0], uR[0])
#     else:
#         print("Incorrect state")
#         return None
#     return social(r)

In [16]:
# Defining MDP - manufacturing example

state_names = ["s1"]
risk_levels = np.linspace(0,100,100)
states = []
for name in state_names:
    for risk in risk_levels:
        states.append((name, risk))

human_actions = [1,5,10]
robot_actions = ["S","F"]

def project_risk(r):
    return risk_levels[np.argmin([np.abs(r-l) for l in risk_levels])]

def transition(state, uH, uR, dr):
    s = state[0]
    r = risk_levels[np.argmin([np.abs(state[1]+dr-l) for l in risk_levels])]
    return (s,r)
        
def reward(state, uH, uR):
    rR = (2 if uR=="S" else 0)*uH
    rH = (10-uH)
    social = rR + rH
    return (social,social,social)

In [17]:
T = 10
gamma = 1.0
v_funs = {i:{s:None for s in states} for i in range(T+1)}
v_funs_adv = {i:{s:None for s in states} for i in range(T+1)}
v_funs_adv_p = {i:{s:None for s in states} for i in range(T+1)}
uHs = {i:{s:None for s in states} for i in range(T)}
uRs_coop = {i:{s:None for s in states} for i in range(T)}
uRs_adv = {i:{s:None for s in states} for i in range(T)}

In [18]:
# Initialize value functions
for s in states:
    v_funs[T][s] = 0
    v_funs_adv[T][s] = 0
    v_funs_adv_p[T][s] = 0

In [19]:
from itertools import product
from collections import defaultdict

def get_combos(l,t):
    l = [l[:] for _ in range(t)]
    d = defaultdict(list)
    for x in product(*l):
        d[tuple(sorted(x))].append(x)

    return [x[0] for x in d.values()]

In [20]:
# # Compute baseline adversarial value function via brute force
# for t in range(T):
#     plans_h = get_combos(human_actions, T-t)
#     plans_r = get_combos(robot_actions, T-t)
#     for s in states:
#         max_min_value = None
#         for plan_h in plans_h:
#             min_value = None
#             for plan_r in plans_r:
#                 running_reward = 0
#                 s_curr = s
#                 for i in range(T-t):
#                     uH, uR = plan_h[i], plan_r[i]
#                     running_reward += (gamma**i)*(reward(s_curr,uH,uR)[0])
#                     s_curr = transition(s_curr,uH,uR,0)
#                 if min_value is None or running_reward < min_value:
#                     min_value = running_reward
#             if max_min_value is None or max_min_value > min_value:
#                 max_min_value = min_value
#         v_funs_adv[t][s] = max_min_value

In [21]:
# Compute baseline adversarial value function
t = T-1
while t >= 0:
    print(t, end=", ")
    for s in states:
        A = np.zeros(shape=(len(human_actions), len(robot_actions)))
        for i in range(len(human_actions)):
            for j in range(len(robot_actions)):
                uH = human_actions[i]
                uR = robot_actions[j]
                s_next = transition(s,uH,uR,0)
                qH = reward(s, uH, uR)[0] + gamma*v_funs_adv[t+1][s_next]
                A[i,j] = qH
        rps = nash.Game(A)
        (s1,s2) = list(rps.support_enumeration())[0]
        v_funs_adv[t][s] = rps[s1,s2][0]
    t -= 1
        

9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 

In [22]:
v_funs_adv[4][("s1",project_risk(30))]

54.0

In [23]:
# # Compute value function
# t = T-1
# while t >= 0:
#     for s in states:
#         qS = None
#         uH_opt = None
#         for uH in human_actions:
#             for uR in robot_actions:
#                 # Find cooperative action subject to constraints
#                 r = reward(s, uH, uR)
#                 qH_adv_p = r[0] + gamma*v_funs_adv_p[t+1][transition(s,uH,uR,0)]
#                 vH_adv = v_funs_adv[t][s]
                
#                 # Check if action passes constraints
#                 # print(s[1], end=",")
#                 if qH_adv_p >= vH_adv - s[1]:
#                     qS_temp = r[2] + gamma*v_funs[t+1][transition(s,uH,uR,qH_adv_p-vH_adv)] # social value of action
#                     if qS is None or qS_temp >= qS:
#                         qS = qS_temp
#                         uH_opt = uH
#         # Find value function adversarial to policy under human's reward
#         print(uH_opt,end=",")
#         v_funs_adv_p[t][s] = np.min([reward(s, uH_opt, a)[0] + gamma*v_funs_adv_p[t+1][transition(s,uH_opt,a,0)] for a in robot_actions])
        
#         v_funs[t][s] = qS
#         uHs[t][s][uH_opt] = uH_opt
#     t = t - 1

In [24]:
# # Compute value function
# t = T-1
# while t >= 0:
#     for s in states:
#         A_social = np.zeros(shape=(len(human_actions), len(robot_actions)))
#         A_H_adv = np.zeros(shape=(len(human_actions), len(robot_actions)))
#         for i in range(len(human_actions)):
#             for j in range(len(robot_actions)):
#                 uH = human_actions[i]
#                 uR = robot_actions[j]

#                 V_base = v_funs_adv[t][s]

#                 dr = 0
#                 s_next = transition(s,uH,uR,dr)

#                 q_social = reward(s, uH, uR)[0] + gamma*v_funs[t+1][s_next]
#                 q_H_adv = reward(s, uH, uR)[1] + gamma*v_funs_adv_p[t+1][s_next]
                
#                 A_social[i,j] = q_social
#                 A_H_adv[i,j] = q_H_adv

#         v_funs_adv_p[t][s] = 1
        
#         v_funs[t][s] = 1
#         uHs[t][s][uH_opt] = 1
#     t = t - 1

In [25]:
# # Compute value function
# t = T-1
# while t >= 0:
#     for s in states:
#         m,n = len(human_actions), len(robot_actions)
#         A_S = np.zeros(shape=(m,n))
#         A_H = np.zeros(shape=(m,n))

#         p_joint = [[cp.Variable() for _ in range(n)] for _ in range(m)]
#         constraints = []

#         for i in range(m):
#             for j in range(n):
#                 uH = human_actions[i]
#                 uR = robot_actions[j]

#                 dr = v_funs_adv[t+1][transition(s,uH,uR,0)] - v_funs_adv[t][s]
#                 s_next = transition(s,uH,uR,dr)
#                 q_coop = reward(s, uH, uR)[0] + gamma*v_funs[t+1][s_next]
#                 A_S[i,j] = q_coop

#                 q_adv = reward(s, uH, uR)[1] + gamma*v_funs_adv_p[t+1][s_next]
#                 A_H[i,j] = q_adv

#                 constraints.append(p_joint[i][j] >= 0)

#         objective = cp.Minimize(np.sum([[p_joint[i][j]*A_S[i,j] for j in range(n)] for i in range(m)]))

#         constraints.append(np.sum(p_joint) == 1)
#         for j in range(n):
#             constraints.append(np.sum([np.sum([p_joint[i][k] for k in range(n)])*A_H[i,j] for i in range(m)]) >= v_funs_adv[t][s] - s[1])

#         prob = cp.Problem(objective, constraints)
#         v_funs[t][s] = prob.solve()

#         v_funs_adv_p[t][s] = min(A_H.T @ np.array([[np.sum([p_joint[i][k].value for k in range(n)]) for i in range(m)]]).T)
        
#         uHs[t][s] = [[p_joint[j][i].value for i in range(n)] for j in range(m)]
#     t = t - 1

In [26]:
# Compute value function
t = T-1
while t >= 0:
    for s in states:
        m,n = len(human_actions), len(robot_actions)
        A_S = np.zeros(shape=(m,n))
        A_H = np.zeros(shape=(m,n))

        p_H = cp.Variable((m,1))
        constraints = []

        for i in range(m):
            for j in range(n):
                uH = human_actions[i]
                uR = robot_actions[j]

                dr = v_funs_adv[t+1][transition(s,uH,uR,0)] - v_funs_adv[t][s]
                s_next = transition(s,uH,uR,dr)
                q_coop = reward(s, uH, uR)[0] + gamma*v_funs[t+1][s_next]
                A_S[i,j] = q_coop

                q_adv = reward(s, uH, uR)[1] + gamma*v_funs_adv_p[t+1][s_next]
                A_H[i,j] = q_adv

        constraints.extend([p_H>=0, cp.sum(p_H)==1])
        for j in range(n):
            constraints.append(p_H.T @ A_H[:,j] >= v_funs_adv[t][s] - s[1])

        vals = []
        p_Hs = []
        for j in range(n):
            objective = cp.Maximize(p_H.T @ A_S[:,j])
            prob = cp.Problem(objective, constraints)
            val_j = prob.solve()
            vals.append(val_j)
            p_Hs.append(p_H.value)

        opt_val = np.max(vals)
        opt_p_H = p_Hs[np.argmax(vals)]
        v_funs[t][s] = opt_val
        v_funs_adv_p[t][s] = np.min(opt_p_H.T @ A_H)
        
        uR_adv = np.zeros((n,1))
        uR_coop = np.zeros((n,1))

        uR_adv[np.argmin(opt_p_H.T @ A_H),0] = 1
        uR_coop[np.argmax(vals),0] = 1

        uRs_adv[t][s] = uR_adv
        uRs_coop[t][s] = uR_coop
        uHs[t][s] = opt_p_H
    t = t - 1

In [27]:
human_actions

[1, 5, 10]

In [28]:
# Simulate policy
pol_R_adv = lambda t,s: robot_actions[np.argmax(uRs_adv[t][s])]
pol_R_coop = lambda t,s: robot_actions[np.argmax(uRs_coop[t][s])]
pol_H = lambda t,s:  human_actions[np.argmax(uHs[t][s])]


print("Testing with cooperative agent")
s = ("s1",project_risk(1))
print(s)
for t in range(T):
    uH = pol_H(t,s)
    uR = pol_R_coop(t,s)
    dr = reward(s, uH, uR)[0] + gamma*v_funs_adv[t+1][transition(s,uH,uR,0)]-v_funs_adv[t][(s[0],project_risk(s[1]))]
    s = transition(s,uH,uR,dr)
    print("state=",s,", uH=", uH,", dr=", dr)

print()
print("Testing with adversarial agent")
s = ("s1",project_risk(1))
print(s)
for t in range(T):
    uH = pol_H(t,s)
    uR = pol_R_adv(t,s)
    dr = reward(s, uH, uR)[0] + gamma*v_funs_adv[t+1][transition(s,uH,uR,0)]-v_funs_adv[t][(s[0],project_risk(s[1]))]
    s = transition(s,uH,uR,dr)
    print("state=",s,", uH=", uH,", dr=", dr)

Testing with cooperative agent
('s1', 1.0101010101010102)
state= ('s1', 3.0303030303030303) , uH= 1 , dr= 2.0
state= ('s1', 5.050505050505051) , uH= 1 , dr= 2.0
state= ('s1', 16.161616161616163) , uH= 10 , dr= 11.0
state= ('s1', 27.272727272727273) , uH= 10 , dr= 11.0
state= ('s1', 38.38383838383839) , uH= 10 , dr= 11.0
state= ('s1', 49.494949494949495) , uH= 10 , dr= 11.0
state= ('s1', 60.60606060606061) , uH= 10 , dr= 11.0
state= ('s1', 71.71717171717172) , uH= 10 , dr= 11.0
state= ('s1', 82.82828282828284) , uH= 10 , dr= 11.0
state= ('s1', 93.93939393939395) , uH= 10 , dr= 11.0

Testing with adversarial agent
('s1', 1.0101010101010102)
state= ('s1', 1.0101010101010102) , uH= 1 , dr= 0.0
state= ('s1', 1.0101010101010102) , uH= 1 , dr= 0.0
state= ('s1', 1.0101010101010102) , uH= 1 , dr= 0.0
state= ('s1', 1.0101010101010102) , uH= 1 , dr= 0.0
state= ('s1', 1.0101010101010102) , uH= 1 , dr= 0.0
state= ('s1', 1.0101010101010102) , uH= 1 , dr= 0.0
state= ('s1', 1.0101010101010102) , uH= 1