# Reinforcement model 

# DQN 

In [17]:
pip install torch


Note: you may need to restart the kernel to use updated packages.


In [31]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np

# Hyperparameters
alpha = 0.001    # Learning rate for the neural network
gamma = 0.9      # Discount factor
epsilon = 0.1    # Exploration rate
episodes = 20
max_steps = 100  # Max steps per episode
batch_size = 64  # Batch size for training
replay_buffer_size = 10000
update_target_every = 10  # Update target network every N episodes

# Action space (left, right, stay)
actions = ['left', 'right', 'stay']
action_map = {action: idx for idx, action in enumerate(actions)}  # Map actions to indices

# Screen dimensions
WIDTH = 800
HEIGHT = 600
basket_width = 100
basket_speed = 10
apple_width = 20

# Neural Network Model for Q-value approximation
class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Initialize Q-network and target network
q_network = DQN(2, len(actions))  # Input: state (basket_x, apple_x), Output: Q-values for actions
target_network = DQN(2, len(actions))  # Target network
target_network.load_state_dict(q_network.state_dict())  # Copy weights
target_network.eval()  # Set to evaluation mode

# Optimizer
optimizer = optim.Adam(q_network.parameters(), lr=alpha)

# Experience Replay Buffer
replay_buffer = []

def get_state(basket_x, apple_x):
    """Get the current state, represented as the basket's x-position and apple's x-position"""
    return np.array([basket_x, apple_x], dtype=np.float32)

def choose_action(state):
    """Epsilon-greedy strategy to choose action"""
    state = torch.tensor(state).unsqueeze(0)  # Add batch dimension
    if random.uniform(0, 1) < epsilon:
        return random.choice(actions)  # Explore
    else:
        with torch.no_grad():
            q_values = q_network(state)
        return actions[q_values.argmax().item()]  # Exploit

def update_q_network(batch):
    """Update the Q-network using a batch of experiences"""
    states, actions, rewards, next_states, dones = zip(*batch)
    
    states = torch.tensor(np.array(states))  # Convert list of numpy arrays to a single numpy array
    next_states = torch.tensor(np.array(next_states))  # Same as above
    
    # Convert actions to indices based on the action space
    action_indices = [action_map[action] for action in actions]  # Use action_map for correct indices
    
    rewards = torch.tensor(rewards, dtype=torch.float32)
    dones = torch.tensor(dones, dtype=torch.bool)
    
    # Get current Q-values from Q-network
    q_values = q_network(states)
    next_q_values = target_network(next_states)

    # Select Q-values for the chosen actions
    q_value = q_values.gather(1, torch.tensor(action_indices).unsqueeze(1))
    
    # Compute target Q-values
    max_next_q_values = next_q_values.max(dim=1)[0]
    target = rewards + (gamma * max_next_q_values) * (~dones)

    # Compute loss
    loss = nn.MSELoss()(q_value.squeeze(), target)

    # Optimize the Q-network
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Main learning loop
for episode in range(episodes):
    basket_x = random.randint(0, WIDTH - basket_width)
    apple_x = random.randint(0, WIDTH - apple_width)
    basket_y = HEIGHT - 50
    lives = 5
    score = 0
    episode_rewards = 0
    episode_actions = []

    for step in range(max_steps):
        state = get_state(basket_x, apple_x)
        action = choose_action(state)

        # Store action for the episode
        episode_actions.append(action)

        # Take action (move basket)
        if action == 'left' and basket_x > 0:
            basket_x -= basket_speed
        elif action == 'right' and basket_x < WIDTH - basket_width:
            basket_x += basket_speed

        # Check if the apple was caught
        if apple_x >= basket_x and apple_x <= basket_x + basket_width:
            reward = 1  # Catching an apple
            score += 1
        else:
            reward = -1  # Missing the apple or losing a life
            lives -= 1  # Decrease lives if apple is missed

        next_state = get_state(basket_x, apple_x)
        done = lives <= 0  # Game over condition

        # Store experience in replay buffer
        replay_buffer.append((state, action, reward, next_state, done))

        # Sample a batch of experiences from the replay buffer
        if len(replay_buffer) > batch_size:
            batch = random.sample(replay_buffer, batch_size)
            update_q_network(batch)

        # Update target network periodically
        if episode % update_target_every == 0:
            target_network.load_state_dict(q_network.state_dict())

        # Track cumulative reward for this episode
        episode_rewards += reward

        # Check game over condition
        if done:
            break

    print(f"Episode {episode + 1}: Score: {score}, Lives: {lives}, Rewards: {episode_rewards}")
    print(f"Actions taken: {episode_actions}")  # Print the actions taken in the episode


Episode 1: Score: 94, Lives: 0, Rewards: 89
Actions taken: ['stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'left', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'left', 'stay', 'stay', 'stay', 'stay', 'right', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'stay', 'left', 'left', 'left', 'left', 'stay', 'stay', 'stay', 'stay', 'stay', 'right', 'right', 'right', 'right', 'left', 'left', 'stay', 'stay', 'stay', 'stay', 'right', 'right', 'right', 'right', 'right', 'right', 'right', 'right', 'left']
Episode 2: Score: 0, Lives: 0, Rewards: -5
Actions taken: ['stay', 'left', 'left', 'right', 'right']
Episode 3: Score: 0, Lives: 0, Rew

# PPO

In [43]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random

# Hyperparameters
episodes = 20
max_steps = 100
learning_rate = 0.001
gamma = 0.99  # Discount factor
clip_epsilon = 0.2
update_epochs = 10
batch_size = 64
entropy_beta = 0.01

# Screen dimensions
WIDTH = 800
HEIGHT = 600
basket_width = 100
basket_speed = 10
apple_width = 20

# Action space (left, right, stay)
actions = ['left', 'right', 'stay']
num_actions = len(actions)

# Define the policy network (actor) and value network (critic)
class ActorCritic(nn.Module):
    def __init__(self, input_size, num_actions):
        super(ActorCritic, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc_pi = nn.Linear(128, num_actions)  # Policy output
        self.fc_v = nn.Linear(128, 1)  # Value output

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        logits = self.fc_pi(x)
        value = self.fc_v(x)
        return logits, value

# Initialize policy network
actor_critic = ActorCritic(2, num_actions)
optimizer = optim.Adam(actor_critic.parameters(), lr=learning_rate)

# Replay buffer for PPO
class PPOBuffer:
    def __init__(self):
        self.states = []
        self.actions = []
        self.log_probs = []
        self.rewards = []
        self.dones = []
        self.values = []

    def add(self, state, action, log_prob, reward, done, value):
        self.states.append(state)
        self.actions.append(action)
        self.log_probs.append(log_prob)
        self.rewards.append(reward)
        self.dones.append(done)
        self.values.append(value)

    def clear(self):
        self.states = []
        self.actions = []
        self.log_probs = []
        self.rewards = []
        self.dones = []
        self.values = []

# Helper functions
def get_state(basket_x, apple_x):
    return np.array([basket_x / WIDTH, apple_x / WIDTH], dtype=np.float32)

def compute_advantages(rewards, values, dones, gamma):
    advantages = []
    returns = []
    adv = 0
    next_value = 0

    for r, v, done in zip(reversed(rewards), reversed(values), reversed(dones)):
        if done:
            next_value = 0
        delta = r + gamma * next_value - v
        adv = delta + gamma * adv
        returns.insert(0, r + gamma * next_value)
        advantages.insert(0, adv)
        next_value = v

    return advantages, returns

def ppo_update(buffer):
    states = torch.tensor(np.array(buffer.states), dtype=torch.float32)
    actions = torch.tensor(buffer.actions, dtype=torch.long)
    log_probs = torch.tensor(buffer.log_probs, dtype=torch.float32)
    rewards = torch.tensor(buffer.rewards, dtype=torch.float32)
    dones = torch.tensor(buffer.dones, dtype=torch.bool)
    values = torch.tensor(buffer.values, dtype=torch.float32).detach()

    advantages, returns = compute_advantages(buffer.rewards, buffer.values, buffer.dones, gamma)
    advantages = torch.tensor(advantages, dtype=torch.float32)
    returns = torch.tensor(returns, dtype=torch.float32)

    for _ in range(update_epochs):
        logits, new_values = actor_critic(states)
        dist = torch.distributions.Categorical(logits=logits)
        new_log_probs = dist.log_prob(actions)
        entropy = dist.entropy().mean()

        ratios = torch.exp(new_log_probs - log_probs)
        surr1 = ratios * advantages
        surr2 = torch.clamp(ratios, 1 - clip_epsilon, 1 + clip_epsilon) * advantages

        policy_loss = -torch.min(surr1, surr2).mean()
        value_loss = nn.MSELoss()(new_values.squeeze(), returns)

        loss = policy_loss + 0.5 * value_loss - entropy_beta * entropy

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

# Main training loop
buffer = PPOBuffer()

for episode in range(episodes):
    basket_x = random.randint(0, WIDTH - basket_width)
    apple_x = random.randint(0, WIDTH - apple_width)
    basket_y = HEIGHT - 50
    lives = 5
    score = 0

    for step in range(max_steps):
        state = get_state(basket_x, apple_x)
        state_tensor = torch.tensor(state).unsqueeze(0)  # Add batch dimension

        logits, value = actor_critic(state_tensor)
        dist = torch.distributions.Categorical(logits=logits)
        action = dist.sample().item()
        log_prob = dist.log_prob(torch.tensor(action)).item()

        # Take action
        if action == 0 and basket_x > 0:
            basket_x -= basket_speed
        elif action == 1 and basket_x < WIDTH - basket_width:
            basket_x += basket_speed

        # Check if apple is caught
        reward = 1 if apple_x >= basket_x and apple_x <= basket_x + basket_width else -1
        score += reward if reward > 0 else 0
        lives -= 1 if reward == -1 else 0

        done = lives <= 0

        next_state = get_state(basket_x, apple_x)

        # Add to buffer
        buffer.add(state, action, log_prob, reward, done, value.item())

        if done or step == max_steps - 1:
            ppo_update(buffer)
            buffer.clear()
            break

    print(f"Episode {episode + 1}: Score: {score}, Lives: {lives}")


Episode 1: Score: 38, Lives: 0
Episode 2: Score: 0, Lives: 0
Episode 3: Score: 0, Lives: 0
Episode 4: Score: 18, Lives: 0
Episode 5: Score: 0, Lives: 0
Episode 6: Score: 0, Lives: 0
Episode 7: Score: 0, Lives: 0
Episode 8: Score: 0, Lives: 0
Episode 9: Score: 0, Lives: 0
Episode 10: Score: 0, Lives: 0
Episode 11: Score: 0, Lives: 0
Episode 12: Score: 0, Lives: 0
Episode 13: Score: 0, Lives: 0
Episode 14: Score: 0, Lives: 0
Episode 15: Score: 0, Lives: 0
Episode 16: Score: 0, Lives: 0
Episode 17: Score: 0, Lives: 0
Episode 18: Score: 0, Lives: 0
Episode 19: Score: 0, Lives: 0
Episode 20: Score: 0, Lives: 0


In [28]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random

# Hyperparameters
gamma = 0.99  # Discount factor
learning_rate = 0.001
epsilon = 0.1  # Exploration rate
episodes = 20
max_steps = 100
clip_epsilon = 0.2  # PPO clipping parameter
batch_size = 64
update_epochs = 4  # Number of PPO training epochs

# Screen dimensions
WIDTH = 800
HEIGHT = 600
basket_width = 100
basket_speed = 10
apple_width = 20

# Action space
actions = ['left', 'right', 'stay']
action_map = {action: idx for idx, action in enumerate(actions)}

# Neural Network Model for PPO
class ActorCritic(nn.Module):
    def __init__(self, input_size, action_size):
        super(ActorCritic, self).__init__()
        self.shared = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.ReLU(),
        )
        self.actor = nn.Linear(128, action_size)
        self.critic = nn.Linear(128, 1)

    def forward(self, x):
        x = self.shared(x)
        logits = self.actor(x)
        value = self.critic(x)
        return logits, value

# Initialize actor-critic model and optimizer
actor_critic = ActorCritic(2, len(actions))  # Input: state (basket_x, apple_x)
optimizer = optim.Adam(actor_critic.parameters(), lr=learning_rate)

def get_state(basket_x, apple_x):
    """Get the current state as basket's and apple's positions."""
    return np.array([basket_x / WIDTH, apple_x / WIDTH], dtype=np.float32)

def choose_action(state, actor_critic):
    """Choose action using the policy from the actor."""
    state = torch.tensor(state).unsqueeze(0)
    logits, _ = actor_critic(state)
    probs = torch.softmax(logits, dim=-1)
    action = torch.multinomial(probs, 1).item()
    return action, torch.log(probs[0, action])

def compute_gae(rewards, values, dones, gamma, lambda_gae):
    """Compute Generalized Advantage Estimation (GAE)."""
    advantages = []
    gae = 0
    for t in reversed(range(len(rewards))):
        delta = rewards[t] + gamma * values[t + 1] * (1 - dones[t]) - values[t]
        gae = delta + gamma * lambda_gae * (1 - dones[t]) * gae
        advantages.insert(0, gae)
    return advantages

def ppo_update(actor_critic, optimizer, states, actions, log_probs, returns, advantages, clip_epsilon, batch_size):
    """Update the actor-critic model using PPO."""
    states = np.array(states)
    actions = np.array(actions)
    log_probs = np.array(log_probs)
    returns = np.array(returns)
    advantages = np.array(advantages)

    for _ in range(update_epochs):
        indices = np.arange(len(states))
        np.random.shuffle(indices)

        for i in range(0, len(states), batch_size):
            batch_indices = indices[i:i + batch_size]
            batch_states = torch.tensor(states[batch_indices], dtype=torch.float32)
            batch_actions = torch.tensor(actions[batch_indices], dtype=torch.long)
            batch_log_probs = torch.tensor(log_probs[batch_indices], dtype=torch.float32)
            batch_returns = torch.tensor(returns[batch_indices], dtype=torch.float32)
            batch_advantages = torch.tensor(advantages[batch_indices], dtype=torch.float32)

            logits, values = actor_critic(batch_states)
            probs = torch.softmax(logits, dim=-1)
            new_log_probs = torch.log(probs.gather(1, batch_actions.unsqueeze(1)).squeeze(1))

            # Calculate the ratio
            ratio = torch.exp(new_log_probs - batch_log_probs)

            # Surrogate objective
            surrogate1 = ratio * batch_advantages
            surrogate2 = torch.clamp(ratio, 1 - clip_epsilon, 1 + clip_epsilon) * batch_advantages
            policy_loss = -torch.min(surrogate1, surrogate2).mean()

            # Value loss
            value_loss = nn.MSELoss()(values.squeeze(), batch_returns)

            # Combined loss
            loss = policy_loss + 0.5 * value_loss

            # Optimize the actor-critic model
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

# Main training loop
for episode in range(episodes):
    basket_x = random.randint(0, WIDTH - basket_width)
    apple_x = random.randint(0, WIDTH - apple_width)
    basket_y = HEIGHT - 50
    lives = 5
    score = 0

    states = []
    actions_taken = []
    log_probs = []
    rewards = []
    values = []
    dones = []
    episode_rewards = 0
    episode_actions = []

    for step in range(max_steps):
        state = get_state(basket_x, apple_x)
        action, log_prob = choose_action(state, actor_critic)
        _, value = actor_critic(torch.tensor(state).unsqueeze(0))
        values.append(value.item())

        # Take action
        if action == action_map['left'] and basket_x > 0:
            basket_x -= basket_speed
        elif action == action_map['right'] and basket_x < WIDTH - basket_width:
            basket_x += basket_speed

        # Check if the apple was caught
        if apple_x >= basket_x and apple_x <= basket_x + basket_width:
            reward = 1
            score += 1
        else:
            reward = -1
            lives -= 1

        next_state = get_state(basket_x, apple_x)
        done = lives <= 0
        episode_rewards += reward
        episode_actions.append(actions[action])

        # Store the transition
        states.append(state)
        actions_taken.append(action)
        log_probs.append(log_prob.item())
        rewards.append(reward)
        dones.append(done)

        if done:
            break

    # Add the final value for GAE computation
    _, final_value = actor_critic(torch.tensor(next_state).unsqueeze(0))
    values.append(final_value.item())

    # Compute GAE and returns
    advantages = compute_gae(rewards, values, dones, gamma, lambda_gae=0.95)
    returns = [adv + val for adv, val in zip(advantages, values[:-1])]

    # Update the model using PPO
    ppo_update(actor_critic, optimizer, states, actions_taken, log_probs, returns, advantages, clip_epsilon, batch_size)

    print(f"Episode {episode + 1}: Score: {score}, Rewards: {episode_rewards}")
    print(f"Actions taken: {episode_actions}")


Episode 1: Score: 0, Rewards: -5
Actions taken: ['stay', 'stay', 'stay', 'right', 'right']
Episode 2: Score: 0, Rewards: -5
Actions taken: ['left', 'stay', 'stay', 'stay', 'right']
Episode 3: Score: 0, Rewards: -5
Actions taken: ['stay', 'right', 'stay', 'stay', 'stay']
Episode 4: Score: 0, Rewards: -5
Actions taken: ['right', 'right', 'stay', 'right', 'right']
Episode 5: Score: 0, Rewards: -5
Actions taken: ['left', 'left', 'stay', 'stay', 'stay']
Episode 6: Score: 0, Rewards: -5
Actions taken: ['stay', 'right', 'left', 'right', 'right']
Episode 7: Score: 0, Rewards: -5
Actions taken: ['stay', 'left', 'left', 'left', 'right']
Episode 8: Score: 0, Rewards: -5
Actions taken: ['left', 'right', 'left', 'right', 'left']
Episode 9: Score: 0, Rewards: -5
Actions taken: ['left', 'left', 'right', 'stay', 'stay']
Episode 10: Score: 0, Rewards: -5
Actions taken: ['stay', 'right', 'left', 'right', 'right']
Episode 11: Score: 0, Rewards: -5
Actions taken: ['left', 'left', 'left', 'left', 'stay']
E