In [7]:
import functools

import gymnasium
import numpy as np
from gymnasium.spaces import Discrete, Tuple
from gymnasium.utils import seeding
from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector, wrappers
import random
import matplotlib.pyplot as plt

In [8]:
class RockPaperScissorsEnv(AECEnv):
    metadata = {"name":"RockPaperScissorsEnv_v0"}
    def __init__(self):
        super().__init__()
        self.possible_agents = ["player_0","player_1"]
        self.agent_name_mapping = dict(zip(self.possible_agents,list(range(len(self.possible_agents)))))
        self._action_spaces = {agent: Discrete(3) for agent in self.possible_agents} #0 for Rock, 1 for Paper, 2 for Scissors
        self.observation_spaces = {agent: Discrete(4) for agent in self.possible_agents} #0-2 for Opponent's last move and 3 if he made the first move to mask his move
        self.agents_moves = {}
    
    def observe(self,agent): #Observe opponents last move but return 3 instead if it is the first move
        opponent = self.possible_agents[1-self.agent_name_mapping[agent]]
        return self.agents_moves.get(opponent,3)

    def reset(self,seed=None,options=None):
        self.agents = self.possible_agents[:]
        self.rewards = {agent:0 for agent in self.agents}
        self._cumulative_rewards = {agent:0 for agent in self.agents}
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.infos = {agent:{} for agent in self.agents}

        self.agents_moves.clear()

        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()

        observation = self.observe(self.agent_selection)
        return observation, self.infos[self.agent_selection]

    def step(self,action):
        current_agent = self.agent_selection
        self.agents_moves[current_agent] = action

        if self._agent_selector.is_last(): #Resolve round if last agent has moved
            if self.agents_moves["player_0"] == self.agents_moves["player_1"]: #Tie 
                self.rewards["player_0"] = 0
                self.rewards["player_1"] = 0
            elif self.agents_moves["player_0"] == 0 and self.agents_moves["player_1"] == 1:#If player_0 picks rock and player_1 picks paper
                self.rewards["player_0"] = -1
                self.rewards["player_1"] = 1
            elif self.agents_moves["player_0"] == 1 and self.agents_moves["player_1"] == 2:#If player_0 picks paper and player_1 picks scissors
                self.rewards["player_0"] = -1
                self.rewards["player_1"] = 1
            elif self.agents_moves["player_0"] == 2 and self.agents_moves["player_1"] == 0:#If player_0 picks scissors and player_1 picks rock
                self.rewards["player_0"] = -1
                self.rewards["player_1"] = 1
            else: #If no win conditions for player_0 and not tie, player_1 wins
                self.rewards["player_0"] = 1
                self.rewards["player_1"] = -1
                
            self.terminations = {a: True for a in self.agents}
            
        self.agent_selection = self._agent_selector.next() #Switch to next agent
        self._accumulate_rewards()

    def action_space(self,agent):
        return self._action_spaces[agent]

    def observation_space(self,agent):
        return self._observation_spaces[agent]

In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
import torch.nn.functional as F

LEARNING_RATE = 0.001
GAMMA = 0.99

NUM_EPISODES = 20000
EPSILON_START = 1
EPSILON_END = 0.01
EPSILON_DECAY = 0.9995

In [23]:
class QNetwork(nn.Module):
    def __init__(self,input_size,output_size):
        super(QNetwork,self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_size,128),
            nn.ReLU(),
            nn.Linear(128,128),
            nn.ReLU(),
            nn.Linear(128,output_size)
        )
    def forward(self,x):
        return self.net(x)

class DQNAgent:
    def __init__(self,input_size,output_size):
        self.action_size = output_size
        self.observation_size = input_size
        self.q_network = QNetwork(input_size=input_size,output_size=output_size) 
        self.optimizer = optim.Adam(self.q_network.parameters(),lr=LEARNING_RATE)
        self.loss_fn = nn.MSELoss()
        self.epsilon = EPSILON_START
    def choose_action(self,state):
        if random.random() < self.epsilon:
            return random.randint(0,self.action_size-1) #Choose random action
        else: 
            with torch.no_grad():
                state_tensor = torch.LongTensor([state])
                state_one_hot = F.one_hot(state_tensor,num_classes = self.observation_size).float()
                q_values = self.q_network(state_one_hot)
                return torch.argmax(q_values).item()
                
    def learn(self,state,action,reward,next_state,done):
        state_tensor = torch.LongTensor([state])
        state_one_hot = F.one_hot(state_tensor,num_classes = self.observation_size).float()
        next_state_tensor = torch.LongTensor([next_state])
        next_state_one_hot = F.one_hot(next_state_tensor,num_classes = self.observation_size).float()
        action_tensor = torch.LongTensor([action])
        reward_tensor = torch.FloatTensor([reward])
        done_tensor = torch.BoolTensor([done])

        current_q_values = self.q_network(state_one_hot)
        current_q_for_action = current_q_values.gather(1,action_tensor.unsqueeze(1)).squeeze(1)

        with torch.no_grad():
            next_q_values = self.q_network(next_state_one_hot)
            max_next_q = next_q_values.max(1)[0]
            target_q = reward_tensor + (~done_tensor) * GAMMA * max_next_q

        loss = self.loss_fn(current_q_for_action,target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def decay_epsilon(self):
        self.epsilon = max(EPSILON_END,self.epsilon*EPSILON_DECAY)

In [24]:
if __name__ == "__main__":
    env = RockPaperScissorsEnv()
    agents = {agent_id: DQNAgent(input_size=4,output_size=3) for agent_id in env.possible_agents} #0-2 for Rock,paper,scissors opponent last move,3 for opponent current move
    total_rewards = {agent_id: 0 for agent_id in env.possible_agents}
    print("Start training")
    for episode in range(NUM_EPISODES):
        observation,info = env.reset()
        experience_buffer = {agent_id:{} for agent_id in env.possible_agents}#Store experiences for agents
        
        for agent_id in env.agent_iter():
            observation,reward,termination,truncation,info = env.last()
            if termination or truncation:# If agent is done, learn from its final experience
                exp = experience_buffer[agent_id]
                agents[agent_id].learn(exp['state'],exp['action'],reward,observation,True)
                continue
            current_agent = agents[agent_id]
            action = current_agent.choose_action(observation)
            experience_buffer[agent_id] = {"state":observation,"action":action}
            env.step(action)

        for agent in agents.values():
            agent.decay_epsilon()

        for agent_id in env.possible_agents:
            total_rewards[agent_id] += env.rewards[agent_id]
                
        if (episode+1)%1000 == 0:
            avg_reward_p0 = total_rewards['player_0']/1000
            avg_reward_p1 = total_rewards['player_1']/1000
            
            print(f"Episode: {episode+1}/{NUM_EPISODES} - Average Reward P0 (last 1k): {avg_reward_p0} - Average Reward P1 (last 1k): {avg_reward_p1}")
            print(f"Episode: {episode+1}/{NUM_EPISODES} - Epsilon P0: {agents['player_0'].epsilon} - Epsilon P1: {agents['player_1'].epsilon}")
            print(f"Last Observation:{observation}, info:{info}")
            total_rewards = {agent_id:0 for agent_id in env.possible_agents}

Start training


KeyboardInterrupt: 