<a href="https://colab.research.google.com/github/Kshitij04Poojary/Iterated-Prisoners-Dilemma/blob/main/DQN_IPD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random


class DQN(nn.Module):
    def __init__(self, in_dim, hidden_dim, out_dim, layer_num):
        super().__init__()
        self.lstmLayer = nn.LSTM(in_dim, hidden_dim, layer_num)
        self.relu = nn.ReLU()
        self.fcLayer = nn.Linear(hidden_dim, out_dim)
        self.weightInit = np.sqrt(1.0 / hidden_dim)

    def forward(self, x):
        out, _ = self.lstmLayer(x)
        out = self.relu(out)
        out = self.fcLayer(out)
        out = nn.Softmax(dim=-1)(out)
        return out

# Define the replay buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.position = 0

    def push(self, state, action, reward, next_state):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return zip(*random.sample(self.buffer, batch_size))

    def __len__(self):
        return len(self.buffer)

# Define constants
input_size = 5  # State representation size
hidden_size = 64  # Hidden layer size
output_size = 2  # Number of actions
batch_size = 64
gamma = 0.99  # Discount factor
epsilon_start = 1.0
epsilon_end = 0.01
epsilon_decay = 0.995
target_update = 10  # Update target network every 10 steps
num_episodes = 200

# Define the IPD environment
class IteratedPrisonersDilemma:
    def __init__(self):
        self.num_actions = 2  # Cooperate or Defect
        self.payoff_matrix = np.array([[3, 0], [5, 1], [1, 5], [0, 0]])  # Payoff matrix

    def step(self, action1, action2):
        reward1 = self.payoff_matrix[action1][action2]
        reward2 = self.payoff_matrix[action2][action1]
        return reward1, reward2

# Initialize DQN, target DQN, optimizer
policy_net = DQN(input_size, hidden_size, output_size,4)
target_net = DQN(input_size, hidden_size, output_size,4)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=0.02)

# Initialize replay buffer
replay_buffer = ReplayBuffer(capacity=10000)

# Epsilon-greedy action selection
def select_action(state, epsilon):
    if np.random.rand() < epsilon:
        return np.random.randint(output_size)
    else:
        with torch.no_grad():
            # Add batch dimension to the state
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
            q_values = policy_net(state_tensor)
            return q_values.argmax().item()

# Update Q-values using DQN
def update_q_values():
    if len(replay_buffer) > batch_size:
        states, actions, rewards, next_states = replay_buffer.sample(batch_size)
        states = torch.tensor(states, dtype=torch.float32)
        actions = torch.tensor(actions, dtype=torch.long)
        rewards = torch.tensor(rewards, dtype=torch.float32)
        next_states = torch.tensor(next_states, dtype=torch.float32)

        q_values = policy_net(states)
        q_values_next = target_net(next_states).max(1)[0].detach()
        expected_q_values = rewards + gamma * q_values_next

        loss = nn.functional.mse_loss(q_values.gather(1, actions.unsqueeze(1)), expected_q_values.unsqueeze(1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

# Training loop
ipd_env = IteratedPrisonersDilemma()
for episode in range(num_episodes):
    state = [0, 0, 0, 0, 0]  # Initial state
    total_reward = 0
    for t in range(100):  # Limiting episode length
        # Select action
        epsilon = epsilon_end + (epsilon_start - epsilon_end) * np.exp(-episode / epsilon_decay)
        action = select_action(state, epsilon)

        # Take action
        opponent_action = np.random.randint(2)  # Random opponent action
        reward, opponent_reward = ipd_env.step(action, opponent_action)

        # Store transition in replay buffer
        next_state = [action, opponent_action, reward, opponent_reward, 0]  # Placeholder for next state
        replay_buffer.push(state, action, reward, next_state)

        # Move to the next state
        state = next_state
        total_reward += reward

        # Perform one step of optimization (on minibatch)
        update_q_values()

        # Update target network
        if t % target_update == 0:
            target_net.load_state_dict(policy_net.state_dict())

    print(f"Episode {episode + 1}, Total Reward: {total_reward}")

# After training, you can use the policy_net to play the game


Episode 1, Total Reward: 199
Episode 2, Total Reward: 282
Episode 3, Total Reward: 266
Episode 4, Total Reward: 299
Episode 5, Total Reward: 264
Episode 6, Total Reward: 304
Episode 7, Total Reward: 290
Episode 8, Total Reward: 275
Episode 9, Total Reward: 312
Episode 10, Total Reward: 268
Episode 11, Total Reward: 340
Episode 12, Total Reward: 326
Episode 13, Total Reward: 300
Episode 14, Total Reward: 300
Episode 15, Total Reward: 304
Episode 16, Total Reward: 276
Episode 17, Total Reward: 284
Episode 18, Total Reward: 292
Episode 19, Total Reward: 277
Episode 20, Total Reward: 314
Episode 21, Total Reward: 311
Episode 22, Total Reward: 304
Episode 23, Total Reward: 299
Episode 24, Total Reward: 322
Episode 25, Total Reward: 308
Episode 26, Total Reward: 300
Episode 27, Total Reward: 360
Episode 28, Total Reward: 267
Episode 29, Total Reward: 284
Episode 30, Total Reward: 264
Episode 31, Total Reward: 267
Episode 32, Total Reward: 264
Episode 33, Total Reward: 293
Episode 34, Total R

In [2]:
# Function to play IPD using the policy network
def play_ipd(policy_net, num_episodes):
    ipd_env = IteratedPrisonersDilemma()
    total_rewards = []
    for episode in range(num_episodes):
        state = [0, 0, 0, 0, 0]  # Initial state
        total_reward = 0
        for t in range(100):  # Limiting episode length
            # Select action using the policy network
            with torch.no_grad():

                q_values = policy_net(torch.tensor(state, dtype=torch.float32).unsqueeze(0))
                action = q_values.argmax().item()

            # Random opponent action
            opponent_action = np.random.randint(2)

            # Get rewards from the environment
            reward, _ = ipd_env.step(action, opponent_action)

            # Update total reward
            total_reward += reward

            # Move to the next state (only for recording, not used)
            next_state = [action, opponent_action, reward, 0, 0]

            # Update state
            state = next_state

        total_rewards.append(total_reward)

    return total_rewards

# Test against random strategy
def test_against_random(policy_net, num_episodes):
    random_rewards = play_ipd(policy_net, num_episodes)
    avg_reward = np.mean(random_rewards)
    print("Average reward against random strategy:", avg_reward)


test_against_random(policy_net, num_episodes=100)

Average reward against random strategy: 298.36
