# Challenge: Iterative Prisoner's Dilemma
Das *Gefangenendilemma* oder *Prisoner's Dilemma* ist eines der bekanntesten Probleme der Spieltheorie (welche für Multi-Agenten Systeme im Bereich der KI relevant ist). Initial von Thomas Hobbes in der politischen Philosophie ([Der Leviathan](https://books.google.de/books/about/Der_Leviathan.html?id=FGAGEAAAQBAJ&redir_esc=y)) eingeführt, handelt das Dilemma von zwei Verbrechern, die von der Polizei gefangen genommen und unabhängig verhöhrt werden. Jeder Gefangene kann nun entweder mit dem anderen Gefangenen kooperieren, also der Polizei nichts verraten (*Cooperate*) oder den anderen Gefangenen im Stich lassen (*Defect*). Halten beide Gefangene dicht, so müssen sie für kurze Zeit (z.B. 2 Jahre) ins Gefängnis, verraten sich beide gegenseitig müssen beide länger (z.B. 5 Jahre) ins Gefängnis, hält aber einer dicht und der andere nicht, so kommt der eine für die längste Zeit (z.B. 8 Jahre), der andere aber gar nicht (also 0 Jahre) ins Gefängnis. Das Gefangenendilemma lässt sich also als eine Pay-Off Matrix der folgenden Form schreiben:

| | Cooperate | Defect |
| :- | :--: | :-: |
| **Cooperate** | -2 / -2 | -8 / 0 |
| **Defect** | 0 / -8 | -5 / -5 |

Dabei sind horizontal die Aktionen von Agent 1 und vertikal die Aktionen von Agent 2 aufgetragen. Links des / ist der Reward von Agent 1, rechts des / der Reward von Agent 2. Es wird davon ausgegangen, dass die Agenten ihre Aktionen gleichzeitig wählen und sofort einen Reward bekommen.

Das **Dilemma** ergibt sich dabei folgendermaßen: Für einen einzelnen Agenten ist die Kombination ich wähle *Defect*, der andere wählt *Cooperate* am besten, für beide Agenten zusammengerechnet ist aber die Situation beide wählen *Cooperate* am besten. Zugleich ist die beste Antwort auf jede Aktion des Gegners eigentlich diesen zu verraten, *Defect*/*Defect* ist daher ein [Nash Equillibrium](https://de.wikipedia.org/wiki/Nash-Gleichgewicht), aber suboptimal für beide Agenten zusammen, *Cooperate/Cooperate* ist [Pareto optimal](https://de.wikipedia.org/wiki/Pareto-Optimum). Daher stellt sich im Prisoner's Dilemma die Frage wie sehr ich darauf vertraue, dass der andere zu mir hält und, ob ich das ausnutzen möchte.

Gerade dieses Spannungsfeld wird beim *iterativen Gefangenendilemma* umso wichtiger. Während das klassiche Gefangenendilemma nur für eine "Runde" gespielt wird, wiederholt man bei der iterativen Version die Aktionsauswahl mehrere Male. Nutzt man die Gutmütigkeit des Gegenspielers in dieser Version einmal aus, so wird dieser zukünftig nicht mehr so gutmütig sein. Das *iterative Gefangenendilemma* modelliert also den Aufbau und das Ausnutzen von Vertrauen in einem kooperativenm Multi-Agenten System. Dieses Modell wurde zum Beispiel in der Politikwissenschaft verwendet, um Verhaltensweisen im Kalten Krieg zu diskutieren. Es gibt aber noch viele weitere Arbeiten zum iterativen Gefangenendilemma aus der Spieltheorie, Informatik, Psychologie, Politikwissenschaft, Soziologie und Philosophie. Eine spannende und einfache Einführung in dieses Thema findet sich in [diesem Video](https://www.youtube.com/watch?v=mScpHTIi-kM).

**In dieser Challenge sollt ihr einen Agenten entwickeln, der, einer bestimmten Strategie folgend, im iterativen Gefangenendilemma möglichst viel kumulierten Reward erzielt.** Grob zusammengefasst gibt es vier Klassen solcher Agenten:
- Reflex-Agenten (if-then Regeln)
- Modell-basierte Reflex-Agenten (if-then Regeln basierend auf der Historie)
- lernende Gegenspieler-Modell-basierte Agenten (lernen Aktionen des Gegenspielers vorherzusagen)
- lernende Wert-basierte Agenten (lernen den Wert einer Aktion in einer bestimmten Situation vorherzusagen und wählen wertigste Aktion)

Um das Spiel ein wenig spannender zu gestalten wurden Unsicherheiten in diese Implementierung eingebaut, die es im klassischen iterativen Prisoner's Dilemma nicht gibt:
- Für jedes Spiel werden neue Rewards für jedes Aktions-Paar gezogen (aus einer Normalverteilung um die oben angeführten Werte mit Varianz 1) und mit einer Wahrscheinlichkeit von 1% ändern sich diese Werte bei jeder Reward Berechnung, werden also neu gesamplet
- Die Rewards sind nicht fix, sondern werden aus einer Normalverteilung mit Varianz 1, die über den fixen Rewards zentriert ist gesamplet
- Mit einer Wahrscheinlichkeit von 10% observiert euer Agent die gegenteilige Aktion, die der Gegner tatsächlich gemacht hat

Die Payoff Matrix für die vorliegende Challenge kann also wie folgt geschrieben werden (wobei N(mu, var) für ein Sample aus einer Normalverteilung mit Mittelwert mu und Varianz var steht, beachtet aber, dass aus der inneren Normalverteilung nur mit einer Wahrscheinlichkeit von 1% ein neur Wert gezogen wird und dieser sonst konstant bleibt, während aus der äußeren Normalverteilung in jeder Runde neu gezogen wird):

| | Cooperate | Defect |
| :- | :--: | :-: |
| **Cooperate** | N(N(-2, 1), 1) / N(N(-2, 1), 1) | N(N(-8, 1), 1) / N(N(0, 1), 1) |
| **Defect** | N(N(0, 1), 1) / N(N(-8, 1), 1) | N(N(-5, 1), 1) / N(N(-5, 1), 1) |

Als Eingabe für die Entscheidung erhalten eure Agenten nur die Aktion des Gegners, diese und eure eigenen Aktionen dürfen sich aber gemerkt werden und daraus können wahrscheinliche Rewards abgeleitet werden. Bitte versucht aber nicht auf andere Variablen außerhalb eures Agenten zuzugreifen!


## Da Rulez
Den Code für den Wettkampf (Tournament) findet ihr unten, dieser wird auch nicht abgeändert und in dieser Form für die finale Auswertung verwendet. Im Allgemeinen gilt:

- im Tournament spielt am Ende jeder Agent 100x gegen jeden Agenten inklusive sich selbst und vier Baselines
- ihr dürfte euch zu zweit (aber nicht zu mehreren) zusammen tun, müsst dann den Gewinn aber auch teilen
- wer im Schnitt am meisten Reward bekommt, also die wenigsten Jahre im Gefängnis verbringen muss, gewinnt
- jede Runde geht zwischen 1000 und 2000 Zeitschritte, in jedem Zeitschritt wählt jeder der Spielenden Agenten genau eine Aktion
- Agenten dürfen nur eine der zwei vorgegebenen Aktionen und nichts anderes zurückgeben
- ihr dürft eure Agenten beliebig implementieren, aber die Signaturen des Konstruktors und der `get_next_action` Methode dürfen nicht geändert werden, das heißt weder die Eingabe noch die Ausagebparameter dürfen angepasst werden
- ihr dürft keine "hacky" Tricks verwenden, um auf externe Werte zuzugreifen, auf die euer Agent nicht zugreifen können sollte oder den Agenten des Gegners durch Programmierkniffe zu behindern
- ihr SOLLT spieltheoretische, statistische, mathematische und sonstige Eigenschaften der vorgegebenen Implementierung ausnutzen, um einen möglichst guten Agenten zu finden 
- Bitte versucht eure Implementierung möglichst effizient zu halten und seht z.B. von riesigen Neuronalen Netzen ab, ich möchte das Tournament schnellstmöglich auswerten könnwn
- der Zeitrahmen für die Implementierung ist von Freitag, 10:00 Uhr bis Freitag, 13:00 UHr, die Abgabe [in diesem Google Drive Ordner](https://drive.google.com/drive/folders/1JwScCpgj43v4yqHE6la3KO7XdwKKUGwE?usp=sharing) am Freitag um 13:00 Uhr wird gewertet

## Agenten
Jeder Agent muss in jedem Zeitschritt zwischen zwei Aktionen wählen, `Cooperate` und `Defect` (siehe oben). Um diese Auswahlmöglichkeit programmatisch umzusetzen, wählen wir ein [Enum](https://en.wikipedia.org/wiki/Enumerated_type). Das heißt, zu jedem Zeitpunkt muss ein Agent `Action.COOPERATE` oder `Action.DEFECT` zurückgeben. Andere Rückgaben sind nicht zulässig und führen zur Disqualifikation.

In [None]:
from enum import Enum

class Action(Enum):
    COOPERATE = 1
    DEFECT = 2

Als nächstes folgt die Basis-Klasse für alle Agenten. Alle Agenten im Tournament sollten von dieser Klasse erben. Die Basisklasse stellt sicher, dass die übergebene letzte Aktion des Gegenspielers eine valide Aktion oder None (nur im ersten Durchlauf erlaubt) ist. Bitte beachtet die Methoden Signaturen, jeder Agent braucht einen Konstruktor (`__init__`) ohne Eingabeparameter und eine `get_next_action` Methode, die eine `Action` oder None übergeben bekommt und eine `Action` zurückgibt (`None` als Rückgabe ist, anders als in der Basisklasse, für eure Agenten nicht zulässig!!). Andere Übergabeparameter und Rückgabewerte sind für diese Methoden nicht erlaubt!

In [None]:
class Agent:

    def __init__(self) -> None:
        pass

    def get_next_action(self, last_opponent_action: Action) -> Action:
        # last_opponent_action must be one of Action.COOPERATE, Action.DEFECT, None
        assert last_opponent_action in list(Action) or last_opponent_action is None
        return None  # None return is not allowed and will lead to disqualification, return either Action.COOPERATE or ACTION.DEFECT

Nachfolgenden sind vier Baselines implementiert, die euch einen Hinweis darauf geben können, wie ihr eure Agenten implementieren könnt. Diese Baselines werden auch am Tournament teilnehmen, ihre Schwächen auszunutzen kann also Punkte bringen! Die folgenden Baselines sind implementiert:

- `DefectAgent`: wählt, egal was der Gegner tut, immer `Action.DEFECT`, spielt also das Nash Equilibrium des nicht-iterativen Gefangenendilemmas
- `CooperateAgent`: wählt, egal was der Gegner tut, immer `Action.COOPERATE`, hofft also auf einen anderen Agenten, der ebenfalls zusammenarbeiten will
- `RandomAgent`: wählt immer eine zufällige Aktion, ist daher unberechenbar
- `SwitchingAgent`: wählt `Action.DEFECT`und `Action.COOPERATE` immer im Wechsel, wobei die erste Aktion zufällig gewählt wird

In [None]:
import random

""" Always Defect baseline """
class DefectAgent(Agent):

    def __init__(self) -> None:
        pass

    def get_next_action(self, last_opponent_action: Action) -> Action:
        super().get_next_action(last_opponent_action)
        return Action.DEFECT

""" Always Cooperate baseline """
class CooperateAgent(Agent):

    def __init__(self) -> None:
        pass

    def get_next_action(self, last_opponent_action: Action) -> Action:
        super().get_next_action(last_opponent_action)
        return Action.COOPERATE

""" Random Action Selection baseline """
class RandomAgent(Agent):

    def __init__(self) -> None:
        pass

    def get_next_action(self, last_opponent_action: Action) -> Action:
        super().get_next_action(last_opponent_action)
        return random.choice(list(Action))

""" Constant switching baseline """
class SwitchingAgent(Agent):

    def __init__(self) -> None:
        self.my_last_action = random.choice(list(Action))

    def get_next_action(self, last_opponent_action: Action) -> Action:
        super().get_next_action(last_opponent_action)
        
        if self.my_last_action == Action.COOPERATE:
            self.my_last_action = Action.DEFECT
        else:
            self.my_last_action = Action.COOPERATE
        
        return self.my_last_action

In [None]:
class OneLineAgent(Agent):

    def __init__(self):
        pass
    
    def get_next_action(self, last_opponent_action: Action) -> Action:
        super().get_next_action(last_opponent_action)
        if last_opponent_action is None:
            return Action.COOPERATE
        return last_opponent_action

In [None]:
class GraphAgent(Agent):
    round = 0
    opponent_defects = 0

    def __init__(self) -> None:
        pass

    def get_next_action(self, last_opponent_action: Action) -> Action:
        super().get_next_action(last_opponent_action)
        self.round += 1
        if last_opponent_action is Action.DEFECT: self.opponent_defects += 1
        if self.opponent_defects < (self.round / 10):
            return Action.COOPERATE
        return Action.DEFECT

In [None]:
class der_Morser_Agent(Agent):

    def __init__(self) -> None:
        self.entscheidungen = []
        self.meine_entscheidungen = [Action.DEFECT,Action.DEFECT,Action.COOPERATE,Action.COOPERATE,Action.DEFECT,Action.DEFECT,Action.COOPERATE,Action.COOPERATE,Action.DEFECT,Action.DEFECT]
        # print(self.meine_entscheidungen)
        self.Gegenstrategie = None
        pass

    def get_next_action(self, last_opponent_action: Action) -> Action:
        super().get_next_action(last_opponent_action)
        
        if  last_opponent_action != None:
            self.entscheidungen.append(last_opponent_action)
        
        i = len(self.entscheidungen)
        
        if i < 10:
            return self.meine_entscheidungen[i]
        
        elif i == 10:
            """
            counter_wir = 0
            for i in range(10):
                if self.meine_entscheidungen[i] == self.entscheidungen[i]:
                    counter_wir += 1
                
                
                
            counter_D = 0
            for i in range(10):
                if self.entscheidungen[i] == Action.DEFECT:
                    counter_D += 1
                    
            
            counter_C = 0
            for i in range(10):
                if self.entscheidungen[i] == Action.COOPERATE:
                    counter_C += 1
                    
                
            counter_A = 0
            for i in range(10):
                if i > 0 and self.entscheidungen[i] != self.entscheidungen[i-1]:
                    counter_A += 1
            
                
            liste = [counter_wir,counter_D,counter_C,counter_A]
            
            max_index = liste.index(max(liste))
            if liste(max_index) >= 8:
                self.Gegenstrategie = max_index
            """
            self.Gegenstrategie = self.test_strategie()
            if self.Gegenstrategie == 0:
                return Action.COOPERATE
            elif self.Gegenstrategie == None:
                return Action.DEFECT
            else:
                return Action.DEFECT
    
        else:
            if self.Gegenstrategie == 0:
                return Action.COOPERATE
            elif self.Gegenstrategie == None:
                return Action.DEFECT
            else:
                return Action.DEFECT
            
    def test_strategie(self):
        counter_wir = 0
        for i in range(10):
            if self.meine_entscheidungen[i] == self.entscheidungen[i]:
                counter_wir += 1



        counter_D = 0
        for i in range(10):
            if self.entscheidungen[i] == Action.DEFECT:
                counter_D += 1


        counter_C = 0
        for i in range(10):
            if self.entscheidungen[i] == Action.COOPERATE:
                counter_C += 1


        counter_A = 0
        for i in range(10):
            if i > 0 and self.entscheidungen[i] != self.entscheidungen[i-1]:
                counter_A += 1


        liste = [counter_wir,counter_D,counter_C,counter_A]

        max_index = liste.index(max(liste))
        if liste[max_index] >= 8:
            self.Gegenstrategie = max_index
        
        return self.Gegenstrategie

In [None]:
class YouBetterPlayNice(Agent):

    def __init__(self) -> None:
        self.opponents_last_action = Action.COOPERATE

    def get_next_action(self, last_opponent_action: Action) -> Action:
        super().get_next_action(last_opponent_action)
        
        my_action = Action.COOPERATE
        
        if last_opponent_action == Action.DEFECT and self.opponents_last_action == Action.DEFECT :
            my_action = Action.DEFECT
        
        self.opponents_last_action = last_opponent_action
        return my_action

In [None]:
class RNN_Agent_007(Agent):
    def __init__(self) -> None:
        self.hidden_state = float(51)
        self.lower_limit = float(0)
        self.lower_bound = float(50)
        self.upper_bound = float(60)
        self.upper_limit = float(100)

    def adjust_to_limits(self, h):
        if h >= self.upper_limit:
            h = self.upper_limit
        elif h <= self.lower_limit:
            h = self.lower_limit
        return h

    def update_hidden_state(self, h) -> float:
        self.hidden_state = h
        return self.hidden_state

    def get_next_action(self, last_opponent_action: Action) -> Action:
        super().get_next_action(last_opponent_action)

        h = self.hidden_state
        if last_opponent_action == Action.COOPERATE:
            update = random.gauss(mu=1, sigma=1)
            # update = 1
        else:
            update = random.gauss(mu=-1, sigma=1)
            # update = -1
        h += update

        h = self.adjust_to_limits(h)
        self.hidden_state = self.update_hidden_state(h)
        # print("the current hidden state is: ", self.hidden_state)

        if self.hidden_state <= self.lower_bound or self.hidden_state >= self.upper_bound:
            return Action.DEFECT
        else:
            return Action.COOPERATE

In [None]:
""" Emmas innocent Agent """
class dilEmma(Agent):

    def __init__(self) -> None:
        self.opponent_actions = []
        self.round_count = 0
        self.test_instance = 0

    def get_next_action(self, last_opponent_action: Action) -> Action:
        self.round_count += 1
        if self.round_count % 100 == 0:
            self.test_instance = self.round_count

        # Defect the first 10 rounds
        if self.test_instance <= self.round_count <= self.test_instance + 10:
            return Action.DEFECT
        
        # Cooperate 10 times
        if self.test_instance + 10 < self.round_count <= self.test_instance + 20:
            return Action.COOPERATE
        
        # Evaluate strategy after round 20
        if self.round_count > self.test_instance + 20:
            identical_test = sum(action == Action.COOPERATE for action in self.opponent_actions[self.test_instance:self.test_instance+20])/20
            cooperation_test = sum(action == Action.COOPERATE for action in self.opponent_actions[self.test_instance+10:self.test_instance+20]) 

            if identical_test >= 0.8 or identical_test <= 0.8: 
                return Action.DEFECT
            
            if cooperation_test >= 7 and last_opponent_action == Action.COOPERATE:
                rand_num = random.randint(0, 9)
                if rand_num < 9:  # 90% chance to cooperate
                    return Action.COOPERATE
                else:
                    return Action.DEFECT

Hier sollt ihr euren Agenten implementieren. Bestenfalls erbt euer Agent von der `Agent`. Die Übergabe- und Rückgabeparameter für die `__init__` und die `get_next_action` Methoden sollten gleich zu den obigen Implementierungen sein. Denkt euch gerne einen coolen (und eindeutigen) Namen für eure Klasse/euren Agenten aus.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random

# Define the RNN model
class Real_RNN_Agent_31(Agent):
    def __init__(self) -> None:
    # def __init__(self, input_size = 2, hidden_size = 16, output_size = 2) -> None:

        self.rounds = 1
        self.print_rounds = 1
        self.print_checker = 1

        self.generate_starting_sequence()

        # self.starting_sequence = []
        # for _ in range(0,10):
        #     if random.random() > 0.5:
        #         self.starting_sequence.append(Action.COOPERATE)
        #     else:
        #         self.starting_sequence.append(Action.DEFECT)
        # print("my starting sequence is: ", self.starting_sequence)

        # Initialize the RNN

        # LookerUp-Depth (on how many iterations is the RNN trained at once?)
        self.LU_Depth = 10
        # Prediction-Depth (how many steps into the future should the RNN predict?)
        self.Pred_Depth = 5
        # batch-size (Default is 1)
        self.batch_size = 1

        self.input_size = 2*2*self.LU_Depth # 20 input layers
        self.hidden_size = 2*self.input_size # 40 hidden layers
        self.output_size = 2*self.Pred_Depth # 20 output layers
        self.rnn = nn.Sequential(
            nn.Linear(self.input_size + self.hidden_size, self.hidden_size),
            nn.ReLU(),
            nn.Linear(self.hidden_size, self.output_size),
            nn.LogSoftmax(dim=1)
        )

        # epoch_length = 10000
        # epoch = 0
        
        self.my_moves = []
        self.their_moves = []

        # initialize hidden state
        self.hidden_state = self.init_hidden_state()
        # self.hidden_size = torch.zeros(self.batch_size, self.hidden_size)

        self.i2h = nn.Linear(self.input_size + self.hidden_size, self.hidden_size)
        self.i2o = nn.Linear(self.input_size + self.hidden_size, self.output_size)
        self.softmax = nn.LogSoftmax(dim=1)

        # initialize backpropagation parameters
        self.total_rewards = 0
        # self.backprop_epoch = 0
        # self.total_loss = 0
        self.current_losses = []

        # Define loss function and optimizer
        # self.criterion = nn.MSELoss()
        self.criterion = nn.HuberLoss()
        self.optimizer = optim.SGD(self.rnn.parameters(), lr=1e-0) # lr=1e-3)
        # weight decay introduces L2 regularization
        # self.optimizer = optim.Adam(self.rnn.parameters(), lr=1e-1, weight_decay=1e-3, amsgrad=True) # lr=0.0001, weight_decay=1e-4)

        print("initialized")
    
    # initialize hidden state
    def init_hidden_state(self):
        # return torch.zeros(self.batch_size, self.hidden_size)
        # return torch.ones(self.batch_size, self.hidden_size)*0.5
        return torch.rand(self.batch_size, self.hidden_size)

    def generate_starting_sequence(self):

        self.starting_sequence = []
        random.seed(12102003)
        
        for _ in range(0,20):
            if random.random() > 0.5:
                self.starting_sequence.append(Action.COOPERATE)
            else:
                self.starting_sequence.append(Action.DEFECT)
        # print("my starting sequence is: ", self.starting_sequence)

        return self.starting_sequence

    def forward(self, input, hidden):
        
        combined = torch.cat((input, hidden), 1)
        hidden = self.i2h(combined)
        output = self.i2o(combined)
        output = self.softmax(output)
        return output, hidden

    # Define the Prisoner's Dilemma game logic
    def get_reward(self, player1, player2):

        if player1 == Action.COOPERATE and player2 == Action.COOPERATE:
            outcome = 2
        elif player1 == Action.COOPERATE and player2 == Action.DEFECT:
            outcome = 8
        elif player1 == Action.DEFECT and player2 == Action.COOPERATE:
            outcome = 0
        elif player1 == Action.DEFECT and player2 == Action.DEFECT:
            outcome = 5

        # payoff_matrix = {
        # (Action.COOPERATE, Action.COOPERATE): 2,
        # (Action.COOPERATE, Action.DEFECT): 0,
        # (Action.DEFECT, Action.COOPERATE): 8,
        # (Action.DEFECT, Action.DEFECT): 5}
        # outcome = payoff_matrix[player1, player2]
            
        return outcome
    
    def calculate_rewards(self, my_move, their_move):
        
        self.total_rewards = self.get_reward(my_move, their_move)

        # to check the current reward via print
        # if self.rounds % 2000 == 0:
        #     the_current_rewards = []
        #     for ii in range(len(my_move)):
        #         current_reward = self.get_reward(my_move[ii], their_move[ii])
        #         self.total_rewards += current_reward
        #         the_current_rewards.append(current_reward)
        #     print("\n current rewards: " + str(the_current_rewards.float()) + "\n")

        # else:
        # for ii in range(len(my_move)):
        #     self.total_rewards += self.get_reward(my_move[ii], their_move[ii])

        return self.total_rewards
    
    # Function to convert moves to one-hot vectors
    def input_to_onehot(self, move):

        if move == Action.COOPERATE:
            return torch.tensor([[1, 0]], dtype=torch.float32)
        elif move == Action.DEFECT:
            return torch.tensor([[0, 1]], dtype=torch.float32)

    # def input_to_target(self, next_move):

    #     if next_move == Action.COOPERATE:
    #         target_move = torch.tensor([[1, 0]], dtype=torch.float32)
    #     else:
    #         target_move = torch.tensor([[0, 1]], dtype=torch.float32)
    #     return target_move
    
    # Backpropagation
    def backward(self, output, my_next_move, their_next_move):

        # calculate classification loss:
        classification_loss = self.criterion(output, self.input_to_onehot(their_next_move))

        current_reward = self.get_reward(my_next_move, their_next_move)
        reward_loss = self.criterion(torch.tensor(current_reward, dtype=torch.float32, requires_grad=True).squeeze(), torch.zeros(1))

        # Weighted loss incorporating rewards
        total_loss1 = classification_loss # only classification loss
        total_loss2 = reward_loss  # only mean rewards as weights
        # total_loss3 = classification_loss + reward_loss  # both mean rewards and class loss as weights
        
        self.total_loss = total_loss2
        
        # Perform backpropagation
        self.optimizer.zero_grad()
        self.total_loss.backward()
        self.optimizer.step()

        # total_loss += loss.item()
        self.current_losses.append(self.total_loss.item())

        # self.backprop_epoch += 1

        pass

    def print_stuff(self):

        the_current_rewards = []
        # sometimes we play as the opponent (first Action is None), sometimes not (first Action is filled).
        # move_diff accounts for the difference in length.
        move_diff = len(self.my_moves) - len(self.their_moves)
        # print(move_diff)
        # print(len(self.my_moves))
        # print(len(self.their_moves))
        # print(self.rounds)
        
        for ii in range(-len(self.starting_sequence),0,1):
            the_current_rewards.append(self.get_reward(self.my_moves[ii-move_diff], self.their_moves[ii]))
                
        print("\n current rewards: " + str(the_current_rewards))

        print(" current losses: " + str(self.current_losses) + "\n")

        # print("game iteration: " + str(self.rounds)  + "\n")

        pass

    def check_morse(self):

        # "Morser part": check if I am playing against myself
        morse = False
        # counts overlaps between the first 10 elements of their_moves and our starting sequence
        checker1 = sum([x == y for x, y in zip(self.their_moves[0:len(self.starting_sequence):1], self.starting_sequence[0:len(self.starting_sequence):1])])

        if checker1 > 17:
            morse = True
        else:
            morse = False

        return morse

    def check_fairplay(self):
        
        # sanity check in case the opponent is still someone else (or in case a new round has begun without notice)
        morse = True
        checker2 = sum([True if var == Action.DEFECT else False for var in self.their_moves[-int(len(self.starting_sequence)/2):]])
        
        if checker2 > 3:
            morse = False
            # something with self.print_checker?
            # print("\n greetings from checker2 \n")
        else:
            morse = True

        return morse

        # if checker1 > 7:
        #     morse = True
        #     
        #     if self.rounds > 2*len(self.starting_sequence):
        #         checker2 = sum([True if _ == Action.DEFECT else False for _ in self.their_moves[-10:]])
        #         if checker2 > 3:
        #             morse = False
        #             self.rounds = 1
        #             print("\n greetings from checker \n")
        #             self.my_moves = []
        #             self.their_moves = []
        #             self.hidden_state = self.init_hidden_state()
        #             self.rounds = 1
        #             self.generate_starting_sequence()
        # else:
        #     morse = False
    
    def new_round(self):
        self.my_moves = []
        self.their_moves = []
        self.hidden_state = self.init_hidden_state()
        self.rounds = 1
        self.print_rounds += 1
        self.generate_starting_sequence()
            

    def get_next_action(self, last_opponent_action: Action) -> Action:
        super().get_next_action(last_opponent_action)

        if last_opponent_action != None:
            self.their_moves.append(last_opponent_action)
        else:
            if self.rounds != 1 and (1 == 1): # self.print_rounds % 10 == 0:
                self.print_stuff()
            
            # a new round has started. Thus we need to reset everything and start the training again. 
            self.new_round()
            # print("\n hello round :) \n")
            

        if self.rounds > 2*len(self.starting_sequence):
            # "Morser part": check if I am playing against myself
            morse = self.check_morse()
            if morse == True:
                morse = self.check_fairplay()
                if morse == True:
                    self.my_moves.append(Action.COOPERATE)
                    self.rounds += 1
                    self.current_losses = []
                    return Action.COOPERATE
                else:
                    # continue playing, but reset hidden state
                    self.hidden_state = self.init_hidden_state()
            pass

        elif self.rounds > len(self.starting_sequence) and self.rounds <= 2*len(self.starting_sequence):
            # "Morser part": check if I am playing against myself
            morse = self.check_morse()
            # sanity check in case the opponent is still someone else (or in case a new round has begun without notice)
            if morse == True:
                self.my_moves.append(Action.COOPERATE)
                self.rounds += 1
                self.current_losses = []
                return Action.COOPERATE
            pass
        
        else:
            # else just follow the standard sequence since we are not past round 10
            # print(self.rounds)
            sequence_move = self.starting_sequence[self.rounds-1]
            self.my_moves.append(sequence_move)
            # print("\n "+ str(self.my_moves) + "\n")
            # print("\n "+ str(self.their_moves) + "\n")
            # self.print_checker += 1
            self.rounds += 1
            return sequence_move

        # analogous to Dropout: re-initialize hidden state every 100 iterations 
        # if self.rounds % 100 == 0:
        # self.hidden_state = self.init_hidden_state()
        
        # document losses
        self.current_losses = []
        
        # perform forward pass (training the RNN)
        for ii in range(-len(self.starting_sequence),-self.LU_Depth-self.Pred_Depth+2,1):
            
            input1 = self.input_to_onehot(self.my_moves[ii])
            input2 = self.input_to_onehot(self.their_moves[ii])
            combined_input = torch.cat((input1, input2), dim=1)
            if self.LU_Depth > 1:
                for LU in range(1,self.LU_Depth):
                    input1 = self.input_to_onehot(self.my_moves[ii+LU])
                    input2 = self.input_to_onehot(self.their_moves[ii+LU])
                    combined_input = torch.cat((combined_input, input1, input2), dim=1)
            
            # # adjust shape from (input_size) to (batch_size, input_size)
            # input1 = input1.unsqueeze(0)
            # input2 = input2.unsqueeze(0)
                    
            # print(input1)
            # print(input1.shape)
            # input1 = self.input_to_onehot(self.my_moves[ii])
            # input2 = self.input_to_onehot(self.their_moves[ii])
            # combined_input = torch.cat((input1, input2), dim=2) # dim=1)

            # Concatenate input1_expanded and input2_expanded along the third dimension
            # combined_input = torch.cat((input1, input2), dim=1)  # shape: (batch_size, input_size)
            # print(combined_input)
            # print(combined_input.shape)

            # self.hidden_state: shape: (batch_size, hidden_size)
            # print(self.hidden_state)
            # print(self.hidden_state.shape)


            # self.forward is called here automatically
            # output, hidden = self.forward(input, self.hidden_state)

            # Concatenate combined_input and hidden_state_expanded along the second dimension
            combined = torch.cat((combined_input, self.hidden_state), dim=1)  # shape: (batch_size, input_size + hidden_size, 1)
            self.hidden = self.rnn[0](combined)
            output = self.rnn[2](self.hidden_state)

            # if Prediction-Depth is >1, multiple future moves are proposed by the RNN and stored in the list my_next_moves 
            single_output = output.split(int(output.size(dim=1)/self.Pred_Depth), dim=1)

            my_next_moves = []
            for Pred in range(0,self.Pred_Depth):
                # Choose the move with the highest probability
                _, predicted_index = torch.max(single_output[Pred], 1)
                next_move_index = predicted_index.item()

                # Convert the predicted move index to 'COOPERATE' or 'DEFECT'
                # Even indices always correspond to COOPERATE, odd indices to DEFECT
                if next_move_index % 2 == 0:
                    my_next_moves.append(Action.COOPERATE)
                else:
                    my_next_moves.append(Action.DEFECT)
            
                # perform backward pass (backpropagation to adjust weights)
                # but not on the last move (since the response is unknown yet!)
                if ii+self.LU_Depth+self.Pred_Depth-1 != 0:
                    
                    # start with the first value that corresponds to a predicted step
                    first_Pred = ii+self.LU_Depth-1
                    # print(type(output))
                    # print(type(my_next_moves))
                    # print(type(self.their_moves[(first_Pred):(first_Pred-self.Pred_Depth)]))
                    self.backward(single_output[Pred], my_next_moves[Pred], self.their_moves[first_Pred+Pred])
        
        # # if self.rounds % 2000 == 0:
        # #     print("\n total loss: " + str(total_loss) + "\n")

        # input1 = self.input_to_onehot(self.my_moves[-1])
        # input2 = self.input_to_onehot(self.their_moves[-1])
        # input = torch.cat((input1, input2), dim=1)
        
        # # self.forward is called here automatically
        # # output, hidden = self.forward(input, self.hidden_state)
        # combined = torch.cat((input, self.hidden_state), 1)
        # self.hidden = self.rnn[0](combined)
        # output = self.rnn[2](self.hidden_state)

        # # Choose the move with the highest probability
        # _, predicted_index = torch.max(output, 1)
        # my_next_move_index = predicted_index.item()

        # # Convert the predicted move index to 'COOPERATE' or 'DEFECT'
        # if my_next_move_index == 0:
        #     my_next_move = Action.COOPERATE
        # elif my_next_move_index == 1:
        #     my_next_move = Action.DEFECT

        # # perform backward pass (backpropagation to adjust weights)
        # # self.backward(output, my_next_move)
        
        
        
        # for ii in range(10,0,-1):
        #     onehot_moves.append(self.input_to_onehot(self.their_moves[-ii]))
        # input = self.input_to_onehot(onehot_moves)
        # # self.forward is called here automatically
        # # output, hidden = self.forward(input, self.hidden_state)
        # combined = torch.cat((input, self.hidden_state), 1)
        # self.hidden = self.rnn[0](combined)
        # output = self.rnn[2](self.hidden_state)

        # # Choose the move with the highest probability
        # _, predicted_index = torch.max(output, 1)
        # my_next_move_index = predicted_index.item()

        # # Convert the predicted move index to 'COOPERATE' or 'DEFECT'
        # if my_next_move_index == 0:
        #     my_next_move = Action.COOPERATE
        # else:
        #     my_next_move = Action.DEFECT

        # # perform backward pass (backpropagation to adjust weights)
        # self.backward(output, my_next_move)
        
        # document the next move for the reward calculations in the next round
        self.my_moves.append(my_next_moves[-1])
        self.rounds += 1

        return my_next_moves[-1]
    

In [None]:
# dummy cell
import torch

some_Pred_Depth = 3

some_output = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

print(some_output[(4+1):(4+some_Pred_Depth+1)])




Nachfolgend ist das Tournament implementiert. Die Aufgabe der einzelnen Funktionen wird hier zum Verständnis kurz erklärt, diese sind für euch aber wahrscheinlich weniger relevant:

- `sample_reward`: gibt ein sample aus einer Gauss-Verteilung zurück, die Varianz 1 hat und über dem Übergabewert zentriert ist, wird genutzt um randomisierte Rewards zu erzeugen
- `sample_new_rewards_means`: mit einer Wahrscheinlichkeit von 1% (oder einem anderen übergebnen Wert) werden neue Reward-Mittelwerte gesamplet, die neuen Mittelwerte werden ebenfalls aus einer Normalverteilung gesamplet, die über vorgegeben Werten zentriert ist
- `get_reward`: stellt sicher, dass die übergebenen Aktionen der Agenten valide sind und gibt dann basierend auf den Aktionen passende Reward samples zurück
- `get_uncertain_observation`: mit einer Wahrscheinlichkeit von 10% werden die Aktionen getauscht, um Unsicherheit über die gewählte Aktion des Gegner zu erzeugen
- `play_rounds`: eine zufällige Anzahl an Zeitschritten/Runden wird gesamplet (bei fester Rundenzahl ist die optimale Strategie immer "always defect", siehe Video oben), dann wählen beide übergebenen Agenten für jeden Zeitschritt je eine Aktion, für jedes Aktionspaar erhält der Agent einen Reward, der kumuliert und zum Schluss (zur Bildung eines Mittelwerts) durch die Anzahl der Zeitschritte geteilt wird
- `tournament`: für eine feste Anzahl an Wiederholungen (100) spielt jeder Agent einmal gegen jeden Agenten inklusive sich selbst und die Baselines, die normalisierten, kumulierten Rewards werden aufsummiert, in einem dict gespeichert und final, zur Bildung eines Mittelwerts, durch die Anzahl der Wiederholungen geteilt

In [None]:
from tqdm import tqdm

# rewards are sampled from a gaussian distribution centered around the provided mean value 
def sample_reward(mean: float) -> float:
    return random.gauss(mu=mean, sigma=1)

# with a probability of 1% we sample new reward means
def sample_new_reward_means(curr: list, means: list=[-2, -8, 0, -5], prob: float=0.01) -> float:
    if random.random() < prob or curr is None:
        curr = []
        for m in means:
            curr.append(random.gauss(mu=m, sigma=1))

    return curr

def get_reward(my_action: Action, opponent_action: Action, reward_means: list) -> float:
    assert my_action in Action
    assert opponent_action in Action
    reward_means = sample_new_reward_means(curr=reward_means)

    # assume negative numbers are years spent in prison e.g. -1 is one year in prison and 0 is no prison time
    # consequently highest value is best
    if my_action == Action.COOPERATE and opponent_action == Action.COOPERATE:
        return sample_reward(reward_means[0]), reward_means
    if my_action == Action.COOPERATE and opponent_action == Action.DEFECT:
        return sample_reward(reward_means[1]), reward_means
    if my_action == Action.DEFECT and opponent_action == Action.COOPERATE:
        return sample_reward(reward_means[2]), reward_means
    if my_action == Action.DEFECT and opponent_action == Action.DEFECT:
        return sample_reward(reward_means[3]), reward_means

# with a probability of 10% the agent receives a wrong observation
def get_uncertain_observation(action: Action) -> Action:
    observation = action
    if random.random() < 0.1:
        if observation == Action.DEFECT:
            observation = Action.COOPERATE
        if observation == Action.COOPERATE:
            observation = Action.DEFECT
    return observation

def play_rounds(agent: Agent, opponent: Agent) -> float:
    num_rounds = random.randint(200, 400)
    action_agent = None
    action_opponent = None
    reward = 0
    reward_means = sample_new_reward_means(curr=None)

    for _ in range(num_rounds):
        action_opponent = get_uncertain_observation(action_opponent)
        action_agent = get_uncertain_observation(action_agent)
        action_agent = agent.get_next_action(action_opponent)
        action_opponent = opponent.get_next_action(action_agent)

        c_reward, reward_means = get_reward(action_agent, action_opponent, reward_means)
        reward += c_reward

    return reward/num_rounds

def tournament(all_agents: list, repeats: int=50) -> dict:
    results = {x.__class__.__name__: 0 for x in all_agents}

    # we repeat the experiment multiple times to eliminate "luck" as far as possible
    for _ in tqdm(range(repeats), desc='repeats'):
        # each agent plays each opponent once per repeat
        for agent in all_agents:
            # print("\n" + str(agent))
            for opponent in all_agents:
                # print(str(opponent) + "\n")
                r = play_rounds(agent, opponent)
                results[agent.__class__.__name__] += r

    # normalization just to get smaller numbers
    for key in results.keys():
        results[key] /= repeats
    
    return results

Mit dem Nachfolgendem Code wird das Tournament ausgeführt und ausgewertet.

In [None]:
all_agents = [Real_RNN_Agent_31(), DefectAgent(), CooperateAgent(), SwitchingAgent(), RandomAgent()]  # all of your agents will be added to this list
results = tournament(all_agents)
results_sorted = dict(sorted(results.items(), key=lambda item: item[1], reverse=True))

print('RESULTS:') # highest value is best (all will be <= 0)
for i, agent_key in enumerate(results_sorted.keys()):
    print(f'\t {i+1}. {agent_key}: {round(results_sorted[agent_key], 4)}')
print(f'Winner is {max(results, key=results.get)} 🥳🥳🥳🥳')