## **Reinforcement Learning-Based Pong Game from Atari with Deep Q-Network Agent : A Detailed Report**

### **1. Introduction**

This RL project is based on the interative arcade game called Pong created by Allan Alcorn, an engineer at Atari,Inc, an American Video game developer company in 1972. Here, the game is played and learned by an AI agent through Reinforcement Learning techniques. Using the Deep Q-Network(DQN) algorithm, the agent gets trained to control one of the paddles and compete against a human opponent. The agent learns over time progressively, interacting with the environment and updates its policy using the experience it gains from training/playing.

Here, a feature is also added for the agent to learn from an human opponent while playing against them in real time. Thus, enhancing the ability of the agent to play through dynamic and continuous learning.


### **2. What is Pong game?**
Pong is a classic 2D arcade game in which two paddles move vertically on either side of the screen to hit a ball back back and forth continuously. The goal is to score points by sending the ball past the other paddle that is controlled by another opponent.
The game mechanics include:
- **Ball Movement**: The ball moves with a constant velocity and bounces off the paddles and the top and bottom walls.
- **Paddle Movement**: Paddle moves with a constant velocity and vertically to intercept the ball
- **Scoring Points**: A point is scored if the any of the players fail to hit the ball and let it pass through the paddle boundaries

### **3. Reinforcement Learning (RL) in Pong**
Reinforcement Learning (RL) is a type of machine learning where an agent learns to make decisions by performing actions in an environment and receiving rewards or penalties based on the outcomes of these actions. In our Pong game:
- the agent's goal is to maximise the cumulative reward by learning how to move the paddle to successfully hit the ball and prevent the opponent from scoring the point.
- The only actions here to be performed is the paddle movements i.e up and down
- Rewards are given bases Rewards are given based on whether the agent hits the ball (+1), loses a point (-10), or scores (+10).

**Reinforcement Learning General Framework**
  - **Agent**: An entity that learns and makes decisions based on the observed environment and the rewards it receives.
  - **State (S)**: The current state of the environment, which is represented by the ball's position,its velocity, and the paddle position.
  - **Action (A)**: The set of actions ( here it is only two actions) available to the agent.
  - **Reward (R)**: The feedback recieved from the environment after taking an action from the action pool based on the circumstance/state of the environment. Positive rewards are for success(ex, hitting the ball), negative rewards are for losing a point or letting the ball go.
  - **Policy ($\pi$)**: The policy is a strategy that will be used by the agent to follow a set of actions based on the current state to maximise rewards.
  - **Value Function(V)**: The expected cumulative reward that the agent will receive, starting from a given state and following the policy.

The Pong game environment for this project is modelled as a Markov Decision Process(MDP) where: 
 - The next state depends on the current state and action taken which is the Markov Property
 - The agent learns to approximate the **Q-value function** using *Neural Networks*.
  
### **4. Deep Q-Network (DQN) for Pong Game**
This project uses Deep Q-Network(DQN) algorithm to train the AI Agent.

#### **4.1. Q-Learning**

Q-learning is a form of model-free reinforcement learning where the agent learns a Q-function $ Q(s, a) $, which estimates the value (expected future rewards) of taking action $ a $ in state $ s $. The update rule for Q-learning is:

$$
Q(s, a) \leftarrow Q(s, a) + \alpha \left( r + \gamma \max_{a'} Q(s', a') - Q(s, a) \right)
$$

Where:
- $ \alpha $ is the learning rate.
- $ r $ is the reward obtained from the environment.
- $ \gamma $ is the discount factor, which determines the importance of future rewards.
- $ s' $ is the new state after taking action $ a $.

#### **4.2. Deep Q-Network (DQN)**

DQN extends Q-learning by using a neural network to approximate the Q-function $ Q(s, a) $. The agent interacts with the environment, collects experiences, and uses these experiences to train a neural network to predict the Q-values.

##### **Network Architecture for DQN**
The neural network in DQN has the given architecture:
- **Input Layer**: The state of the Pong environment (ball position, velocity, paddle positions).
- **Hidden Layers**: Several fully connected layers that process the input to extract useful features.
- **Output Layer**: Two output values, each representing the Q-value of one possible action (move up or move down).

#### **4.3. Experience Replay**

One crucial feature of DQN is **experience replay** where instead of updating the Q-values after each action, the agent stores the experiences it gains $ (s, a, r, s') $ in a memory buffer which it periodically samples as a batch of experiences to update the neural network. This helps break the correlation between consecutive experiences and stabilizes training.

#### **4.4. Target Network**

DQN also uses a **target network** to stabilize the Q-value updates. This target network is a copy of the Q-network, but its weights are updated less frequently which reduces oscillations and divergence during training to maintain a consistent agent and its plays.

#### **4.5. Epsilon-Greedy Policy (Exploration vs Explotation condition balance)**

To balance exploration and exploitation, the DQN agent follows an **epsilon-greedy policy** where it explores by taking random actions with probability $ \epsilon $, and exploits its current knowledge (choosing the action with the highest Q-value) with probability $ 1 - \epsilon $. Over time, $ \epsilon $ decays to reduce exploration as the agent becomes more confident in its policy.

### **5. Tech Stack**

- **Turtle Graphics**: Used for rendering the Pong game environment, including paddles, ball movement, and the game screen.
- **PyTorch**: To implement the DQN model and for building and training the neural network that approximates the Q-function.
- **NumPy**: For numerical operations and handling state representations.
- **Collections (Deque)**: For managing experience replay by storing past experiences.

### **6. Implementation Procedure**

#### **6.1. Game Setup**
The game environment is created and setup using Python's Turtle module. The screen is initialized, and the paddles and ball are rendered. The game mechanics are coded, including ball bouncing, scoring, and paddle control.

#### **6.2. Agent Training**
- The DQN agent interacts with the Pong environment, where the state is represented by a vector of six values (ball position, ball velocity, left paddle position, right paddle position).
- The agent receives rewards based on performance of its actions and the impacts it has on the environment (e.g., hitting the ball or scoring points).
- Over the iteration of episodes, the agent's Q-network is updated using the Q-learning update rule, where the expected future reward is approximated through a neural network.

#### **6.3. Playing Against the Agent**
After training the agent, the model is saved and can be loaded for gameplay. The human controls the right paddle using keyboard inputs, while the trained agent controls the left paddle. The agent continues to learn during the game using experience replay, improving its performance as it plays against the human.

### **Primary Equations Used**

#### **Q-Learning Update Rule**:
$$
Q(s, a) \leftarrow Q(s, a) + \alpha \left( r + \gamma \max_{a'} Q(s', a') - Q(s, a) \right)
$$

Where:
- $ Q(s, a) $: Current Q-value for state $ s $ and action $ a $.
- $ r $: Immediate reward from the environment.
- $ \gamma $: Discount factor, controlling the weight of future rewards.
- $ \alpha $: Learning rate.

#### **DQN Loss Function**:
The goal is to minimize the difference between the predicted Q-values (from the neural network) and the target Q-values (which represent the "true" value the agent should have learned for a particular state-action pair). This difference is captured using the Mean Squared Error (MSE).
The loss function used to update the neural network is the mean squared error (MSE) between the predicted Q-values and the target Q-values:

$$
L = \mathbb{E} \left[ \left( r + \gamma \max_{a'} Q(s', a') - Q(s, a) \right)^2 \right]
$$

- **$ L $**: This is the loss, specifically the mean squared error between the predicted Q-values and the target Q-values.

- **$ \mathbb{E} $**: Represents the expected value, or the average over a batch of experiences sampled from the replay memory.

- **$ r $**: The **immediate reward** received after taking action $ a $ in state $ s $. This is the short-term feedback the agent gets from the environment after its action.

- **$ \gamma $**: The **discount factor**, which determines the importance of future rewards. A value between 0 and 1, where:
  - $ \gamma = 0 $ means the agent only cares about immediate rewards.
  - $ \gamma $ close to 1 means the agent values future rewards more, encouraging it to plan ahead.

- **$ \max_{a'} Q(s', a') $**: The maximum predicted Q-value for the **next state** $ s' $ across all possible next actions $ a' $. This represents the agent's estimate of the best future reward it can obtain from the next state.

- **$ Q(s, a) $**: The **predicted Q-value** for taking action $ a $ in the current state $ s $.

- **$ r + \gamma \max_{a'} Q(s', a') $**: This term represents the **target Q-value**. It's the immediate reward $ r $ plus the discounted maximum future reward $ \max_{a'} Q(s', a') $. This is the "true" Q-value that the neural network is trying to approximate.


### **Challenges Encountered and Steps taken**

1. **Exploration-Exploitation Tradeoff**: A balance between exploring new actions and exploiting known strategies had to be achieved by using an epsilon-greedy policy.
2. **Training Stability**: Using experience replay and a target network was necessary to stabilize the learning process.
3. **Human-AI Interaction**: Had to ensure a  smooth gameplay between the human and AI agent required balancing the game's speed, AI responsiveness, and the agent's continuous learning during gameplay.

In [1]:
import random
import numpy as np
import turtle
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque


In [2]:
# --- Pong Environment ---

class PongEnv:
    def __init__(self):
        # Initialize the screen
        self.screen = turtle.Screen()
        self.screen.title("Pong RL")
        self.screen.bgcolor("white")
        self.screen.setup(width=1000, height=600)

        # Left paddle (controlled by the DQN agent)
        self.left_paddle = turtle.Turtle()
        self.left_paddle.speed(0)
        self.left_paddle.shape("square")
        self.left_paddle.color("black")
        self.left_paddle.shapesize(stretch_wid=6, stretch_len=2)
        self.left_paddle.penup()
        self.left_paddle.goto(-400, 0)

        # Right paddle (controlled by simple AI)
        self.right_paddle = turtle.Turtle()
        self.right_paddle.speed(0)
        self.right_paddle.shape("square")
        self.right_paddle.color("black")
        self.right_paddle.shapesize(stretch_wid=6, stretch_len=2)
        self.right_paddle.penup()
        self.right_paddle.goto(400, 0)

        # Ball
        self.ball = turtle.Turtle()
        self.ball.speed(40)
        self.ball.shape("circle")
        self.ball.color("blue")
        self.ball.penup()
        self.ball.goto(0, 0)
        self.ball.dx = 5  # Horizontal velocity
        self.ball.dy = -5  # Vertical velocity

        # Score
        self.left_score = 0
        self.right_score = 0

    def reset(self):
        """ Reset the ball position and randomly assign a new direction """
        self.ball.goto(0, 0)
        self.ball.dx *= random.choice([-1, 1])
        self.ball.dy *= random.choice([-1, 1])
        return self.get_state()

    def get_state(self):
        """ Get the current state: [ball_x, ball_y, ball_dx, ball_dy, left_paddle_y, right_paddle_y] """
        return np.array([self.ball.xcor(), self.ball.ycor(),
                         self.ball.dx, self.ball.dy,
                         self.left_paddle.ycor(), self.right_paddle.ycor()])

    def step(self, action):
        """ 
        Perform the selected action and move the paddles and ball accordingly.
        Return the new state, reward, and whether the episode has ended (done).
        """
        # Move left paddle based on the agent's action (0 = move up, 1 = move down)
        if action == 0 and self.left_paddle.ycor() < 250:
            self.left_paddle.sety(self.left_paddle.ycor() + 20)
        elif action == 1 and self.left_paddle.ycor() > -240:
            self.left_paddle.sety(self.left_paddle.ycor() - 20)

        # Simple AI for the right paddle (follows the ball)
        if self.right_paddle.ycor() < self.ball.ycor() and self.right_paddle.ycor() < 250:
            self.right_paddle.sety(self.right_paddle.ycor() + 20)
        elif self.right_paddle.ycor() > self.ball.ycor() and self.right_paddle.ycor() > -240:
            self.right_paddle.sety(self.right_paddle.ycor() - 20)

        # Move the ball
        self.ball.setx(self.ball.xcor() + self.ball.dx)
        self.ball.sety(self.ball.ycor() + self.ball.dy)

        # Ball collision with top and bottom walls
        if self.ball.ycor() > 290:
            self.ball.sety(290)
            self.ball.dy *= -1

        if self.ball.ycor() < -290:
            self.ball.sety(-290)
            self.ball.dy *= -1

        # Ball collision with paddles
        if (self.ball.xcor() > 360 and self.ball.xcor() < 370) and \
        (self.ball.ycor() < self.right_paddle.ycor() + 50 and self.ball.ycor() > self.right_paddle.ycor() - 50):
            self.ball.setx(360)
            self.ball.dx *= -1  # Reverse direction upon hitting the right paddle

        if (self.ball.xcor() < -360 and self.ball.xcor() > -370) and \
        (self.ball.ycor() < self.left_paddle.ycor() + 50 and self.ball.ycor() > self.left_paddle.ycor() - 50):
            self.ball.setx(-360)
            self.ball.dx *= -1  # Reverse direction upon hitting the left paddle

        # Reward for paddle positioning (intermediate reward)
        paddle_position_reward = 1 - abs(self.left_paddle.ycor() - self.ball.ycor()) / 300  # Reward based on distance between paddle and ball

        # Check for scoring
        reward = 0
        done = False

        # Left paddle scores
        if self.ball.xcor() > 500:
            self.left_score += 1
            reward = 10  # Positive reward for scoring
            done = True  # End of episode (for training)

        # Right paddle scores
        if self.ball.xcor() < -500:
            self.right_score += 1
            reward = -10  # Penalty for opponent scoring
            done = True  # End of episode (for training)

        # Combine rewards (shaping reward + intermediate reward)
        reward += paddle_position_reward  # Add positioning reward to the total reward

        # Get the new state after the step
        state = self.get_state()
        return state, reward, done



In [3]:
# --- DQN Agent ---

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        # Define the neural network layers
        self.fc1 = nn.Linear(state_dim, 128)  # Input layer (state_dim -> 128 units)
        self.fc2 = nn.Linear(128, 128)        # Hidden layer (128 -> 128 units)
        self.fc3 = nn.Linear(128, action_dim) # Output layer (128 -> action_dim)

    def forward(self, x):
        # Pass input through the network
        x = F.relu(self.fc1(x))  # ReLU activation
        x = F.relu(self.fc2(x))  # ReLU activation
        return self.fc3(x)       # Output action values


### Concept of Prioritized Experience Replay (PER)

Prioritized Experience Replay (PER) is an improvement to the standard experience replay mechanism used in DQN which aims to improve learning rate of the agent efficiently. 
In the standard experience replay system, the agent stores the experiences (state, action, reward, next state, done) into a buffer and samples random mini-batches from this buffer to train the neural network. But, this random sampling treats all experiences equally and its not optimal for our case.
Priority-based sampling mechanism  increases the learning efficiency by considering only the most "important" experiences, which is where the agent has learnt the most. This is improved using the **Temporal Difference (TD) error**, which measures the difference between the predicted Q-value and the target Q-value. Experiences with a higher TD-error indicates that the agent's prediction is high and it has to learn from experiences more to reduce this error. 

#### Key Concepts:
1. **Temporal Difference (TD) Error**:
   - TD error is the difference between the Q-value predicted by the Q-network and the target Q-value. It reflects how much the agent's prediction deviates from the actual outcome.
   - TD error = |Q(s, a) - (r + γ * max Q(s', a'))|
   - Larger TD errors imply that the agent made a larger prediction error, signaling that these experiences are more valuable for learning.

2. **Prioritized Sampling**:
   - Instead of sampling experiences randomly, PER assigns a priority to each experience based on its TD error. Experiences with larger TD errors are more likely to be sampled for training because they offer a higher potential for reducing the agent's prediction error.
   - The probability of sampling an experience $i$ is proportional to its priority $p_i$:
     $$
     P(i) = \frac{p_i^\alpha}{\sum_k p_k^\alpha}
     $$
     
     where $\alpha$ controls the degree of prioritization (when $ \alpha $ = 0, prioritization follows uniform random sampling).

In [4]:
class DQNAgent:
    def __init__(self, state_dim, action_dim):
        self.q_network = DQN(state_dim, action_dim)  # Primary Q-network
        self.target_network = DQN(state_dim, action_dim)  # Target Q-network
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=0.001)  # Optimizer for training

        self.replay_buffer = deque(maxlen=10000)  # Experience replay buffer
        self.priorities = deque(maxlen=10000)     # Store TD-errors as priorities for prioritized replay
        self.batch_size = 64
        self.gamma = 0.99  # Discount factor for future rewards
        self.epsilon = 1.0  # Exploration rate (initially explore)
        self.epsilon_decay = 0.995  # Decay rate for epsilon
        self.epsilon_min = 0.01  # Minimum value for epsilon
        self.update_target_steps = 100  # How often to update the target network
        self.steps_done = 0

    def select_action(self, state):
        """ Select an action using epsilon-greedy exploration """
        if random.random() < self.epsilon:
            return random.randint(0, 1)  # Explore (random action)
        else:
            state = torch.FloatTensor(state).unsqueeze(0)  # Convert state to tensor
            with torch.no_grad():
                q_values = self.q_network(state)  # Get action values from the Q-network
            return q_values.argmax().item()  # Exploit (select best action)

    def update(self):
        """ Sample a batch from replay buffer based on priorities and update the Q-network """
        if len(self.replay_buffer) < self.batch_size:
            return  # Not enough samples to update

        # Normalize priorities to create a probability distribution
        priorities = np.array(self.priorities)
        probabilities = priorities / priorities.sum()

        # Sample experiences based on the calculated probabilities
        indices = np.random.choice(len(self.replay_buffer), self.batch_size, p=probabilities)
        batch = [self.replay_buffer[i] for i in indices]
        
        # Convert batch into tensors
        states, actions, rewards, next_states, dones = zip(*batch)
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones)

        # Calculate Q-values and targets
        q_values = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        next_q_values = self.target_network(next_states).max(1)[0]
        targets = rewards + self.gamma * next_q_values * (1 - dones)

        # Calculate loss and update the Q-network
        loss = F.mse_loss(q_values, targets.detach())
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Update the target network periodically
        self.steps_done += 1
        if self.steps_done % self.update_target_steps == 0:
            self.target_network.load_state_dict(self.q_network.state_dict())

    def add_experience(self, state, action, reward, next_state, done):
        """ Add experience to the replay buffer with its TD-error (priority) """
        # Calculate the TD-error (temporal difference)
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
        
        with torch.no_grad():
            q_value = self.q_network(state_tensor)[0, action].item()
            next_q_value = self.target_network(next_state_tensor).max(1)[0].item()
        
        target = reward + (1 - done) * self.gamma * next_q_value
        td_error = abs(target - q_value)
        
        # Append experience and its TD-error to the buffers
        self.replay_buffer.append((state, action, reward, next_state, done))
        self.priorities.append(td_error + 0.01)  # Small value added to avoid zero probability


In [5]:
# --- Training ---

def train_dqn_agent():
    # initiating environment and agent
    env = PongEnv()
    state_dim = 6  # State: [ball_x, ball_y, ball_dx, ball_dy, left_paddle_y, right_paddle_y]
    action_dim = 2  # Actions: 0 = move up, 1 = move down
    agent = DQNAgent(state_dim, action_dim)

    num_episodes = 1000
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        done = False

        while not done:
            # agent selects an action
            action = agent.select_action(state)

            # environment responds to the action (state is changed)
            next_state, reward, done = env.step(action)

            # gained experience added to replay buffer and agent is updated
            agent.add_experience(state, action, reward, next_state, done)
            agent.update()

            state = next_state
            total_reward += reward

        # Decay epsilon (exploration rate) after each episode to decrease the exploration as its policy is refined
        agent.epsilon = max(agent.epsilon_min, agent.epsilon * agent.epsilon_decay)
        print(f"Episode {episode + 1}: Total Reward = {total_reward}")


In [6]:
if __name__ == "__main__":
    train_dqn_agent()


  states = torch.FloatTensor(states)


Episode 1: Total Reward = 34.76666666666666
Episode 2: Total Reward = -1.0333333333333332
Episode 3: Total Reward = 90.53333333333333
Episode 4: Total Reward = 103.59999999999997
Episode 5: Total Reward = 154.93333333333325
Episode 6: Total Reward = 100.03333333333335
Episode 7: Total Reward = 78.73333333333335
Episode 8: Total Reward = -5.233333333333333
Episode 9: Total Reward = 41.16666666666666
Episode 10: Total Reward = 75.66666666666667
Episode 11: Total Reward = 71.50000000000004
Episode 12: Total Reward = 63.09999999999999
Episode 13: Total Reward = -40.233333333333334
Episode 14: Total Reward = 119.56666666666672
Episode 15: Total Reward = 159.9999999999999
Episode 16: Total Reward = 41.1
Episode 17: Total Reward = -22.63333333333334
Episode 18: Total Reward = -8.03333333333333
Episode 19: Total Reward = 14.1
Episode 20: Total Reward = 80.86666666666662
Episode 21: Total Reward = 60.66666666666673
Episode 22: Total Reward = 158.46666666666658
Episode 23: Total Reward = 117.633

TclError: invalid command name ".!canvas"