In [1]:
from typing import List, Tuple, Callable
import random
import numpy as np
import itertools


def score(player_a: bool, player_b: bool) -> Tuple[int, int]:
    if player_a and player_b:
        return (3, 3)
    if player_a and not player_b:
        return (0, 5)
    if not player_a and player_b:
        return (5, 0)
    else:
        return (1, 1)
    

def repeated_prisoners_dilemma(n_repetitions: int, strategy_player_a: Callable, strategy_player_b: Callable) -> Tuple[Tuple[int, int], Tuple[List[bool], List[bool]]]:
    score_a = 0
    score_b = 0
    history_player_a = []
    history_player_b = []
    for _ in range(n_repetitions):
        turn_player_a = strategy_player_a(history_this_player=history_player_a, history_other_player=history_player_b)
        turn_player_b = strategy_player_b(history_this_player=history_player_b, history_other_player=history_player_a)
        turn_scores = score(player_a=turn_player_a, player_b=turn_player_b)
        score_a += turn_scores[0]
        score_b += turn_scores[1]
        history_player_a.append(turn_player_a)
        history_player_b.append(turn_player_b)
    return ((score_a, score_b), (history_player_a, history_player_b))



class QModel:
    """
    updates using Bellmann Equation:
    Q(s,a) <- Q(s,a) + learning_rate * (immediate_reward + discount_factor * max_a'Q(s',a') - Q(s,a)) 
    
    Q(s,a) are initialized as 0
    
    True: player cooperates
    False: player defects
    """
    def __init__(self, exploration_rate: float = 0.2, learning_rate: float = 0.05, discount_factor: float = 0.99, max_history: int = 5, reward_function: str = "raw", verbose: bool = False):
        """
        exploration_rate: ranges from 0 to 1
            0: always take the action that lead to best Q(s,a) in history (exploit). risk: get stuck in suboptimal behavior
            ->1: deviate from the historically best action for better exploration 

        learning rate: ranges from 0 to 1
            1: completely replace current Q(s,a)
            ->0: barely any update

        discount_factor: ranges from 0 to 1
            0: mypotic (only immediate reward matters)
            ->1: long term rewards count up to as much as the immediate reward

        max_history:
            history that is considered by the model. 
            if less than max_history turns happened, the state only contains the so far performed turns.

        verbose:
            True: print out information 
        """
        self.verbose = verbose
        self.exploration_rate = exploration_rate
        self.max_history = max_history
        self.q = self.init_q()
        self.training_mode = True
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.reward_function = reward_function


    def reward(self, action_this_player: bool, action_other_player: bool) -> int:
        if self.reward_function == "raw":
            return score(player_a=action_this_player, player_b=action_other_player)[0]
        elif self.reward_function == "win_or_lose":
            """
            like in win-stay lose-shift:
            CC (Reward, good for both) or DC (Temptation, best for this player) -> stay 
            CD (Sucker, worst for this player) or DD (Punishment, bad for both) -> shift
            """
            if action_other_player:  # CC or DC
                return 1
            else:  # CD or DD
                return 0
        else:
            raise NotImplementedError(f"{self.reward_function} is not a valid reward_funciton. Valid options: 'raw', 'win_or_lose'" )


    def init_q(self):
        possible_states = []
        for length in range(1, (self.max_history + 1) * 2):
            if length % 2 == 0:
                for combo in itertools.product([0, 1], repeat=length):
                    possible_states.append(str(np.array(combo, dtype=int)))
        q = {"init": {False: 0., True: 0.}}
        for s in possible_states:
            q[s] = {False: 0., True: 0.}
        return q


    def select_action(self, state: str):
        # explore randomly selecting next state 
        if self.training_mode:
            if random.random() <= self.exploration_rate:
                return random.random() <= 0.5
        # exploit state knowledge
        selected_action = self.q[state][True] >= self.q[state][False] 
        if self.verbose:
            print("select action")
            print(f"Q({state}, False): {self.q[state][False]}")
            print(f"Q({state}, True): {self.q[state][True]}")
            print(f"so: {self.q[state][True] >= self.q[state][False]}\n")
        return selected_action
    

    def step(self, history_this_player: List[bool], history_other_player: List[bool]) -> bool:
        """
        select next action and update Q for last state and last action if self.training_mode = True
        """
        # 1. define state
        if len(history_this_player) == 0:
            state = "init"
            if self.verbose: print(f"state = {state}")
        else:
            last_actions_this_player = [int(i) for i in history_this_player[-self.max_history:][::-1]]
            last_actions_other_player = [int(i) for i in history_other_player[-self.max_history:]]
            state_np = np.concat([last_actions_this_player, last_actions_other_player])
            state = str(state_np)
            if self.verbose: print(f"state = {state}")

            # 2. (optional) update q
            if self.training_mode:
                last_action_this_player = history_this_player[-1]
                last_action_other_player = history_other_player[-1]
                immediate_reward = self.reward(action_this_player=last_action_this_player, action_other_player=last_action_other_player)
                if len(history_this_player) == 1:
                    last_state = "init"
                else:
                    last_state_np = np.concat([[int(i) for i in history_this_player[-(self.max_history+1):][::-1]], [int(i) for i in history_other_player[-(self.max_history+1):]]])
                    last_state = str(last_state_np[1:-1])
                if self.verbose: print(f"last state = {last_state}")
                max_next_q = max(self.q[state][True], self.q[state][False])
                current_q = self.q[last_state][last_action_this_player]
                # update using bellmann equation
                self.q[last_state][last_action_this_player] = current_q + self.learning_rate * (immediate_reward + self.discount_factor * max_next_q - current_q)
                if self.verbose:
                    print(f"update Q(s,a) = Q({last_state},{last_action_this_player})")
                    print(f"Q(s,a): {current_q}")
                    print(f"Q(s',a')_max_a': {max_next_q}")
                    print(f"Q(s,a) <- {current_q} + {self.learning_rate} * ({immediate_reward} + {self.discount_factor} * {max_next_q} - {current_q}) = {current_q + self.learning_rate * (immediate_reward + self.discount_factor * max_next_q - current_q)}")

        # 3. perform next action
        if state in self.q.keys():
            return self.select_action(state=state)
        else:
            return random.random() > 0.2
        
        
    def info(self):
        select_c = 0
        select_d = 0
        select_c_due_to_unseen = 0
        for state in self.q.keys():
            if self.q[state][True] == self.q[state][False] == 0:
                select_c_due_to_unseen += 1
            elif self.q[state][True] >= self.q[state][False]:
                select_c += 1
            else:
                select_d += 1
        print(f"select c: {select_c}, select d: {select_d}, unseen {select_c_due_to_unseen}")




class NonparametricStrategies:
    """
    True: player cooperates
    False: player defects
    """
    @staticmethod
    def strategy_always_cooperate(history_this_player: List[bool], history_other_player: List[bool]):
        return True

    @staticmethod
    def strategy_always_defect(history_this_player: List[bool], history_other_player: List[bool]):
        return False

    @staticmethod
    def strategy_tit_for_tat(history_this_player: List[bool], history_other_player: List[bool]):
        """
        start with cooperation and afterwards exactly mirrors the last turn of the opponent
        """
        if len(history_this_player) == 0:
            return True
        return history_other_player[-1]
    
    @staticmethod
    def strategy_tit_for_tat_generous(history_this_player: List[bool], history_other_player: List[bool]):
        """
        tit for tat but with 10% chance forgives deflection of other player in last turn
        """
        if len(history_this_player) == 0:
            return True
        regular_turn = history_other_player[-1] 
        if regular_turn == False:
            return random.random() > 0.9
        return regular_turn

    @staticmethod
    def strategy_tit_for_tat_suspicious(history_this_player: List[bool], history_other_player: List[bool]):
        """
        tit for tat but starting with deflection (prefered in hostile environments)
        """
        if len(history_this_player) == 0:
            return False
        return history_other_player[-1]

    @staticmethod
    def strategy_tit_for_tat_noisy(history_this_player: List[bool], history_other_player: List[bool]):
        """
        randomly deviates from regular tit for tat turn with 10% chance 
        """
        if len(history_this_player) == 0:
            regular_turn = True
        else:
            regular_turn = history_other_player[-1]

        if random.random() > 0.9:
            return not regular_turn
        return regular_turn

    @staticmethod
    def strategy_tit_for_tat_exponential_decay(history_this_player: List[bool], history_other_player: List[bool]):
        """
        tit for tat but considering the history of the opponents actions, assigning more weight to recent moves.
        The player cooperates with a probability proportional to the weighted fraction of the opponents cooperation.
        """
        if len(history_this_player) == 0:
            return True
        weights = np.logspace(start=0.1, stop=1, base=10, num=len(history_other_player))
        return bool(random.random() < (weights[history_other_player].sum() / weights.sum()))

    @staticmethod
    def strategy_grim_trigger(history_this_player: List[bool], history_other_player: List[bool]):
        """
        starts with cooperation but always deflects once the other player deflects a single time
        """
        if sum(history_other_player) != len(history_other_player):
            return False
        return True

    @staticmethod
    def strategy_random(history_this_player: List[bool], history_other_player: List[bool]):
        """
        random deflection / cooperation
        """
        return random.random() > 0.5
    
    @staticmethod
    def strategy_naive_probability(history_this_player: List[bool], history_other_player: List[bool]):
        """
        cooperates with same probability as the opponent 
        """
        if len(history_other_player) == 0:
            return random.random() <= 0.5
        return random.random() <= sum(history_other_player) / len(history_other_player)
    
    @staticmethod
    def strategy_pavlov(history_this_player: List[bool], history_other_player: List[bool]):
        """
        win-stay lose-shift
        CC (Reward, good for both) or DC (Temptation, best for this player) -> stay 
        CD (Sucker, worst for this player) or DD (Punishment, bad for both) -> shift
        """
        if len(history_other_player) == 0:
            return True
        if history_other_player[-1]:  # CC or DC
            return history_this_player[-1]
        else:  # CD or DD
            return not history_this_player[-1]
        
    @staticmethod
    def strategy_pavlov_suspicious(history_this_player: List[bool], history_other_player: List[bool]):
        """
        like win-stay lose-shift, but starting with defecting
        """
        if len(history_other_player) == 0:
            return False
        if history_other_player[-1]:  # CC or DC
            return history_this_player[-1]
        else:  # CD or DD
            return not history_this_player[-1]

In [None]:
## setup ##
n_runs = 10
n_train_episodes = 15_000
n_repetitions = 5
# q model
q_max_history = 4
q_max_exploration_rate = 1.
q_min_exploration_rate = 0.05
q_learning_rate = 0.05
q_discount_factor = 0.95
q_reward_function = "raw"

## competitor pool ##
nonpara = NonparametricStrategies()
competitor_pool = {
    "always_defect": nonpara.strategy_always_defect,
    "random": nonpara.strategy_random, 
    "grim_trigger": nonpara.strategy_grim_trigger, 
    "naive_probability": nonpara.strategy_naive_probability, 
    "pavlov": nonpara.strategy_pavlov, 
    "pavlov_suspicious": nonpara.strategy_pavlov_suspicious, 
    "tit_for_tat": nonpara.strategy_tit_for_tat, 
    "tit_for_tat_suspicious": nonpara.strategy_tit_for_tat_suspicious}
# greedy lookup competitor
model_greedy_lookup = QModel(max_history=q_max_history, discount_factor=0, exploration_rate=1., reward_function=q_reward_function)
for _ in range(n_train_episodes):
    competitor = random.choice([i for i in competitor_pool.values()])
    repeated_prisoners_dilemma(n_repetitions=n_repetitions, strategy_player_a=model_greedy_lookup.step, strategy_player_b=competitor)
print("greedy lookup setting:")
model_greedy_lookup.info()
model_greedy_lookup.exploration_rate = 0.  # so it exploits its experience
model_greedy_lookup.training_mode = False
competitor_pool["greedy_lookup"] = model_greedy_lookup.step

greedy lookup setting:
select c: 3, select d: 82, unseen 256


In [None]:
## experiments ##
experiment_results = {}
for c in competitor_pool.keys():
    experiment_results[c] = {"q_win": 0, "tie": 0, "q_lose": 0, "q_reward": 0, "opponent_reward": 0, "max_possible_reward": n_repetitions * 5 * n_runs, "q_CC": 0, "q_CD": 0, "q_DC": 0, "q_DD": 0}
for _ in range(n_runs):
    for competitor_name_test in [i for i in competitor_pool.keys()]:
        q_model = QModel(max_history=q_max_history, learning_rate=q_learning_rate, discount_factor=q_discount_factor, reward_function=q_reward_function)
        # train
        exploration_rates = np.linspace(start=q_max_exploration_rate, stop=q_min_exploration_rate, num=n_train_episodes)
        for i in range(n_train_episodes):
            if random.random() <= 0.3:
                competitor_name_train = competitor_name_test
            else:
                competitor_name_train = random.choice([k for k in competitor_pool.keys()])
            competitor_train = competitor_pool[competitor_name_train]
            # decay exploration_rate
            q_model.exploration_rate = exploration_rates[i]
            repeated_prisoners_dilemma(n_repetitions=n_repetitions, strategy_player_a=q_model.step, strategy_player_b=competitor_train)
        # test
        competitor_test = competitor_pool[competitor_name_test]
        q_model.training_mode = False
        scores, histories = repeated_prisoners_dilemma(n_repetitions=n_repetitions, strategy_player_a=q_model.step, strategy_player_b=competitor_test)
        
        if scores[0] == scores[1]:
            experiment_results[competitor_name_test]["tie"] += 1
        elif scores[0] > scores[1]:
            experiment_results[competitor_name_test]["q_win"] += 1
        else:
            experiment_results[competitor_name_test]["q_lose"] += 1
        experiment_results[competitor_name_test]["q_reward"] += scores[0]
        experiment_results[competitor_name_test]["opponent_reward"] += scores[1]

        for i in range(len(histories[0])):
            if histories[0][i] and histories[1][i]:
                experiment_results[competitor_name_test]["q_CC"] += 1
            elif histories[0][i] and not histories[1][i]:
                experiment_results[competitor_name_test]["q_CD"] += 1
            elif not histories[0][i] and histories[1][i]:
                experiment_results[competitor_name_test]["q_DC"] += 1
            else:
                experiment_results[competitor_name_test]["q_DD"] += 1

In [4]:
import pandas as pd

df = pd.DataFrame.from_dict(experiment_results, orient="index")
df.index.name = "competitor"
df

Unnamed: 0_level_0,q_win,tie,q_lose,q_reward,opponent_reward,max_possible_reward,q_CC,q_CD,q_DC,q_DD
competitor,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
always_defect,0,0,10,35,110,250,0,15,0,35
random,6,1,3,132,92,250,13,8,16,13
grim_trigger,0,10,0,140,140,250,30,10,10,0
naive_probability,5,4,1,144,129,250,26,10,13,1
pavlov,3,7,0,140,125,250,24,10,13,3
pavlov_suspicious,3,5,2,114,109,250,12,12,13,13
tit_for_tat,0,10,0,140,140,250,30,10,10,0
tit_for_tat_suspicious,0,0,10,100,150,250,3,28,18,1
greedy_lookup,0,3,7,63,98,250,10,7,0,33


In [5]:
# verbose train for demonstration
q_model.verbose = True
q_model.training_mode = True
repeated_prisoners_dilemma(n_repetitions=3, strategy_player_a=q_model.step, strategy_player_b=competitor)

state = init
select action
Q(init, False): 5.393043761049864
Q(init, True): 6.805050335244032
so: True

state = [1 1]
last state = init
update Q(s,a) = Q(init,True)
Q(s,a): 6.805050335244032
Q(s',a')_max_a': 10.249926339198746
Q(s,a) <- 6.805050335244032 + 0.05 * (3 + 0.95 * 10.249926339198746 - 6.805050335244032) = 7.101669319593771
select action
Q([1 1], False): 7.763359816089103
Q([1 1], True): 10.249926339198746
so: True

state = [1 1 1 1]
last state = [1 1]
update Q(s,a) = Q([1 1],True)
Q(s,a): 10.249926339198746
Q(s',a')_max_a': 7.737779529748915
Q(s,a) <- 10.249926339198746 + 0.05 * (3 + 0.95 * 7.737779529748915 - 10.249926339198746) = 10.254974549901881
select action
Q([1 1 1 1], False): 6.453837375035388
Q([1 1 1 1], True): 7.737779529748915
so: True



((9, 9), ([True, True, True], [True, True, True]))

# QModel vs QModel

In [6]:
experiment_results = {"tie": 0, "a_wins": 0, "b_wins": 0, "a_reward": 0, "b_reward": 0, "CC": 0, "CD": 0, "DC": 0, "DD": 0}
for i in range(n_runs):
    q_model_a = QModel(max_history=q_max_history, learning_rate=q_learning_rate, discount_factor=q_discount_factor, reward_function=q_reward_function)
    q_model_b = QModel(max_history=q_max_history, learning_rate=q_learning_rate, discount_factor=q_discount_factor, reward_function=q_reward_function)
    # train
    exploration_rates = np.linspace(start=q_max_exploration_rate, stop=q_min_exploration_rate, num=n_train_episodes)
    for i in range(n_train_episodes):
        # decay exploration_rate
        q_model.exploration_rate = exploration_rates[i]
        repeated_prisoners_dilemma(n_repetitions=n_repetitions, strategy_player_a=q_model_a.step, strategy_player_b=q_model_b.step)
    # test
    competitor_test = competitor_pool[competitor_name_test]
    q_model_a.training_mode = False
    q_model_b.training_mode = False
    scores, histories = repeated_prisoners_dilemma(n_repetitions=n_repetitions, strategy_player_a=q_model_a.step, strategy_player_b=q_model_b.step)

    if scores[0] == scores[1]:
        experiment_results["tie"] += 1
    elif scores[0] > scores[1]:
        experiment_results["a_wins"] += 1
    else:
        experiment_results["b_wins"] += 1
    experiment_results["a_reward"] += scores[0]
    experiment_results["b_reward"] += scores[1]

    for i in range(len(histories[0])):
        if histories[0][i] and histories[1][i]:
            experiment_results["CC"] += 1
        elif histories[0][i] and not histories[1][i]:
            experiment_results["CD"] += 1
        elif not histories[0][i] and histories[1][i]:
            experiment_results["DC"] += 1
        else:
            experiment_results["DD"] += 1
experiment_results

{'tie': 9,
 'a_wins': 0,
 'b_wins': 1,
 'a_reward': 72,
 'b_reward': 77,
 'CC': 10,
 'CD': 2,
 'DC': 1,
 'DD': 37}