In [20]:
import os
import sys
from utils import get_settings_dir

In [21]:
get_settings_dir()
from settings import ROWS, COLS

In [22]:
# Setup code for Q-learning agent

# -------------------- Setup Callbacks (callbacks.py) --------------------

# Simulating the Q-learning agent callbacks and training environment
import os
import numpy as np
import pickle
from callbacks import QLearningModel, state_to_features

model = QLearningModel()

In [23]:
# -------------------- Setup Training (train.py) --------------------
from typing import List, Tuple
from objective import Objective
import numpy as np

class AgentTraining:
    def __init__(self):
        self.episodes = 1000
        self.max_steps = 100
        self.rewards = []
        self.eps = 0
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.995
        self.q_table = {}
        self.graph = []
        self.epsilon = 1.0
        self.learning_rate = 0.1
        self.discount_factor = 0.99
        self.model = QLearningModel()
        self.actions = self.model.actions
        self.objective = Objective(task=1, game_state={'self': (None, None, None, (7, 7)),  # Player's current state (self_position)
                                                        'coins': [(5, 5), (2, 8)],            # List of coin positions
                                                        'field': np.zeros((ROWS, COLS))      # Game field representation (grid)
                                                        })
        self.total_reward = 0
    
    def game_events_occurred(self, old_game_state, action, new_game_state, events):
        """ Update Q-table based on events """
        state = state_to_features(old_game_state)
        new_state = state_to_features(new_game_state)

        # Extract necessary parameters for reward calculation
        old_player_pos = old_game_state['self'][3]
        old_objective = old_game_state['coins'][0]  # Assuming coins are the objective
        new_self_pos = new_game_state['self'][3]
        new_objective = new_game_state['coins'][0]
        old_field = old_game_state['field']
        new_field = new_game_state['field']

        reward = self.calculate_reward(
            old_player_pos, old_objective, new_self_pos, new_objective, old_field, new_field, events
        )
        self.update_q_table(state, action, reward, new_state)

    def calculate_reward(
        self, 
        old_player_pos: Tuple[int, int], old_objective: Tuple[int, int],
        new_self_pos: Tuple[int, int], new_objective: Tuple[int, int],
        old_field: np.ndarray, new_field: np.ndarray,
        events: List[str]
        ) -> float:
        """
        Calculate the reward based on the state and events.
        """
        old_distance = self.objective.distance_objective(start_pos=old_player_pos, objective_pos=old_objective, field=old_field)
        new_distance = self.objective.distance_objective(start_pos=new_self_pos, objective_pos=new_objective, field=new_field)

        if old_objective == new_objective:
            reward = 1 if len(new_distance) < len(old_distance) else -3  # discourages moving away or not making progress
        else: 
            reward = 0  # no reward if objective has changed
        
        if "COIN_COLLECTED" in events:
            reward += 2
        
        return reward

    def update_q_table(self, state, action, reward, next_state):
        """ Update Q-table using Q-learning formula """
        state_tuple = tuple(state)
        if state_tuple not in self.q_table:
            self.q_table[state_tuple] = np.zeros(len(self.actions))
        
        next_state_tuple = tuple(next_state)
        if next_state_tuple not in self.q_table:
            self.q_table[next_state_tuple] = np.zeros(len(self.actions))

        action_index = self.actions.index(action)
        
        # Find the max Q-value for the next state (future reward estimation)
        max_next_q_value = np.max(self.q_table[next_state_tuple])

        # Q-learning update rule
        current_q_value = self.q_table[state_tuple][action_index]
        self.q_table[state_tuple][action_index] = current_q_value + self.learning_rate * (
            reward + self.discount_factor * max_next_q_value - current_q_value
        )

    def end_of_round(self, last_game_state, last_action, events):
        """ Handle end of round logic """
        reward = -100 if 'KILLED_SELF' in events else 0
        self.total_reward += reward

        self.graph.append((self.eps, self.total_reward))
        self.total_reward = 0

        if self.eps % 1000 == 0:
            self.save_metrics()
        
        self.eps += 1
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
        self.log_progress(self.eps)


    def save_metrics(self):
        """ Save training metrics (e.g., Q-table, graph) """
        with open("q_table.pkl", "wb") as f:
            pickle.dump(self.q_table, f)
        with open("graph.txt", "w") as f:
            f.write(str(self.rewards))
            
    
    def log_progress(self, episode):
        """ Log training progress """
        if len(self.rewards) >= 100:
            avg_reward = sum(self.rewards[-100:]) / 100
        elif self.rewards:
            avg_reward = sum(self.rewards) / len(self.rewards)
        else:
            avg_reward = 0  # Handle case where self.rewards is empty

        print(f"Episode {episode}: Average Reward: {avg_reward}")

In [24]:
# -------------------- Test Execution --------------------

# Create instances of the agent and model
q_model = QLearningModel()
trainer = AgentTraining()

# Simulate a game loop
for episode in range(10):  # Run a limited number of episodes for testing
    old_game_state = {'self': (None, None, None, (7, 7)), 'coins': [(5, 5)], 'field': np.zeros((10, 10))}
    new_game_state = {'self': (None, None, None, (6, 6)), 'coins': [(5, 5)], 'field': np.zeros((10, 10))}
    state = state_to_features(old_game_state)
    
    # Choose an action using the Q-learning model
    action = q_model.choose_action(state)

    # Simulate events for the action
    events = ["COIN_COLLECTED"] if action == 'RIGHT' else []
    
    # Training step
    trainer.game_events_occurred(old_game_state, action, new_game_state, events)
    
    # End of round logic after some steps
    if episode % 10 == 0:
        trainer.end_of_round(new_game_state, action, events)

    # Log progress
    trainer.log_progress(episode)

# Print the Q-table for review
print("Q-Table:")
print(trainer.q_table)

Episode 1: Average Reward: 0
Episode 0: Average Reward: 0
Episode 1: Average Reward: 0
Episode 2: Average Reward: 0
Episode 3: Average Reward: 0
Episode 4: Average Reward: 0
Episode 5: Average Reward: 0
Episode 6: Average Reward: 0
Episode 7: Average Reward: 0
Episode 8: Average Reward: 0
Episode 9: Average Reward: 0
Q-Table:
{(7, 7, 1, 0): array([0.19 , 0.3  , 0.271, 0.271, 0.1  ]), (6, 6, 1, 0): array([0., 0., 0., 0., 0.])}


In [26]:
from collections import namedtuple, deque # self.transitions stores recent transitions (state, action, reward, next_state) using a deque
import numpy as np
import pickle
from typing import List, Dict, Tuple
import events as e

# Define a namedtuple for transitions (state, action, next_state, reward)
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

# Hyperparameters
TRANSITION_HISTORY_SIZE = 3  # Number of transitions to keep in memory
DISCOUNT_FACTOR = 0.99  # Gamma for future rewards
LEARNING_RATE = 0.1  # Alpha for Q-learning updates

# Example custom event
PLACEHOLDER_EVENT = "PLACEHOLDER"

def setup_training(self):
    """
    Initialize the agent for training.
    This method is called after `setup` in callbacks.py.
    """
    # Setup a deque to store transitions with a limited history size
    self.transitions = deque(maxlen=TRANSITION_HISTORY_SIZE)
    # Initialize training parameters
    self.epsilon = 1.0  # Exploration rate
    self.epsilon_min = 0.1
    self.epsilon_decay = 0.995
    self.q_table = np.zeros((100, 6))  # Example: assuming a 100 state x 6 action Q-table
    self.graph = []  # To store metrics (for example, rewards over episodes)

def game_events_occurred(self, old_game_state: dict, self_action: str, new_game_state: dict, events: List[str]):
    """
    Called once per step to allow intermediate rewards based on game events.
    
    Update the agent's Q-table or policy based on game events that occurred during the step.
    """
    self.logger.debug(f'Encountered game event(s): {", ".join(map(repr, events))} in step {new_game_state["step"]}')
    
    # Convert states to features using the provided helper function
    old_state = state_to_features(old_game_state)
    new_state = state_to_features(new_game_state)
    
    # Calculate reward from events
    reward = reward_from_events(self, events)
    
    # Append the transition (state, action, reward, next_state) to the deque
    self.transitions.append(Transition(old_state, self_action, new_state, reward))
    
    # Update Q-table with the new transition
    update_q_table(self, old_state, self_action, reward, new_state)

def end_of_round(self, last_game_state: dict, last_action: str, events: List[str]):
    """
    Called at the end of each game or when the agent died to finalize rewards and update the model.
    
    This method is also responsible for saving the agent's model periodically.
    """
    self.logger.debug(f'Encountered event(s): {", ".join(map(repr, events))} in the final step')
    
    # Add the final transition with the last state and action
    last_state = state_to_features(last_game_state)
    final_reward = reward_from_events(self, events)
    self.transitions.append(Transition(last_state, last_action, None, final_reward))
    
    # Save the model periodically or after a set number of rounds
    if self.eps % 100 == 0:  # Example: save every 100 episodes
        with open("my-saved-model.pt", "wb") as file:
            pickle.dump(self.q_table, file)
    
    # Increment the episode count and update epsilon for exploration/exploitation
    self.eps += 1
    self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

def reward_from_events(self, events: List[str]) -> int:
    """
    Calculate reward from the game events that occurred during a step.
    
    Customize rewards to encourage or discourage specific agent behaviors.
    """
    game_rewards = {
        e.COIN_COLLECTED: 1,
        e.KILLED_OPPONENT: 5,
        PLACEHOLDER_EVENT: -0.1  # Negative reward for a custom placeholder event
    }
    
    # Sum up the rewards from the events
    reward_sum = 0
    for event in events:
        if event in game_rewards:
            reward_sum += game_rewards[event]
    self.logger.info(f"Awarded {reward_sum} for events {', '.join(events)}")
    return reward_sum

def update_q_table(self, state, action, reward, next_state):
    """
    Update the Q-table using the Q-learning update rule.
    
    :param state: The current state in features.
    :param action: The action taken.
    :param reward: The reward received from the action.
    :param next_state: The next state after the action.
    """
    # Find the max Q-value for the next state (future reward estimation)
    if next_state is not None:
        max_next_q_value = np.max(self.q_table[next_state])
    else:
        max_next_q_value = 0  # No next state at the end of the game

    # Get the current Q-value for the (state, action) pair
    current_q_value = self.q_table[state, action]
    
    # Q-learning update rule
    self.q_table[state, action] = current_q_value + LEARNING_RATE * (reward + DISCOUNT_FACTOR * max_next_q_value - current_q_value)

def log_training_progress(self, episode):
    """
    Log the agent's training progress, e.g., the average reward over recent episodes.
    
    :param episode: The current episode number.
    """
    # Example: Log the average reward over the last 100 episodes
    avg_reward = sum(self.rewards[-100:]) / 100 if len(self.rewards) >= 100 else sum(self.rewards) / len(self.rewards)
    self.logger.info(f"Episode {episode}: Average Reward: {avg_reward}")


In [32]:
from collections import namedtuple, deque
import numpy as np
import pickle
from typing import List, Dict
import random

# Define namedtuple for transitions
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

# Define the Agent class
class MyAgent:
    # Hyperparameters
    TRANSITION_HISTORY_SIZE = 3
    DISCOUNT_FACTOR = 0.99
    LEARNING_RATE = 0.1

    # Initialize the agent
    def __init__(self):
        self.setup_training()
    
    def setup_training(self):
        """Initialize the agent for training."""
        self.transitions = deque(maxlen=self.TRANSITION_HISTORY_SIZE)
        self.epsilon = 1.0
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.995
        self.q_table = np.zeros((100, 6))  # Example Q-table size
        self.graph = []
        self.eps = 0
        self.rewards = []
        self.logger = lambda x: print(x)  # Simple logger function

    def action_to_index(self, action: str) -> int:
        action_map = {"UP": 0, "RIGHT": 1, "DOWN": 2, "LEFT": 3, "BOMB": 4, "WAIT": 5}
        return action_map.get(action, -1)  # Return -1 for unknown actions
    
    def game_events_occurred(self, old_game_state: Dict, self_action: str, new_game_state: Dict, events: List[str]):
        """Handle game events and update Q-table."""
        old_state = state_to_features(old_game_state)
        new_state = state_to_features(new_game_state)
        reward = self.reward_from_events(events)
        
        # Convert action to an index
        print(f'self_action: {self_action}')
        action_index = self.action_to_index(self_action)
        
        # Append the transition (state, action, reward, next_state) to the deque
        self.transitions.append(Transition(old_state, action_index, new_state, reward))
        
        # Update Q-table with the new transition
        self.update_q_table(old_state, action_index, reward, new_state)

    def end_of_round(self, last_game_state: Dict, last_action: str, events: List[str]):
        """Finalize rewards and update model at the end of each round."""
        last_state = state_to_features(last_game_state)
        final_reward = self.reward_from_events(events)
        self.transitions.append(Transition(last_state, last_action, None, final_reward))
        
        if self.eps % 100 == 0:
            with open("my-saved-model.pt", "wb") as file:
                pickle.dump(self.q_table, file)
        
        self.eps += 1
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
    
    def reward_from_events(self, events: List[str]) -> int:
        """Calculate reward based on events."""
        game_rewards = {
            "COIN_COLLECTED": 1,
            "KILLED_OPPONENT": 5,
            "PLACEHOLDER_EVENT": -0.1
        }
        reward_sum = 0
        for event in events:
            if event in game_rewards:
                reward_sum += game_rewards[event]
        self.logger(f"Awarded {reward_sum} for events {', '.join(events)}")
        return reward_sum

    def update_q_table(self, state, action, reward, next_state):
        """Update the Q-table using the Q-learning rule."""
        if next_state is not None:
            max_next_q_value = np.max(self.q_table[next_state])
        else:
            max_next_q_value = 0

        current_q_value = self.q_table[state, action]
        self.q_table[state, action] = current_q_value + self.LEARNING_RATE * (reward + self.DISCOUNT_FACTOR * max_next_q_value - current_q_value)
    
    def log_training_progress(self, episode):
        """Log training progress."""
        avg_reward = sum(self.rewards[-100:]) / 100 if len(self.rewards) >= 100 else sum(self.rewards) / len(self.rewards)
        self.logger(f"Episode {episode}: Average Reward: {avg_reward}")

# Helper function to simulate state_to_features (Replace with actual implementation)
def state_to_features(game_state):
    """Convert game state to feature indices."""
    # For simplicity, this example assumes a very basic encoding
    # In a real scenario, this should encode the game state appropriately
    return int(game_state.get('self', [0, 0, 0, 0])[3][0] * 10 + game_state.get('self', [0, 0, 0, 0])[3][1])


# Mock data for testing
old_game_state = {'self': [0, 0, 0, (0, 0)], 'coins': [(1, 1)], 'field': np.zeros((10, 10))}
new_game_state = {'self': [0, 0, 0, (1, 1)], 'coins': [(1, 1)], 'field': np.zeros((10, 10))}
events = ["COIN_COLLECTED"]

# Create an instance of MyAgent
agent = MyAgent()

# Test agent methods
agent.game_events_occurred(old_game_state, "RIGHT", new_game_state, events)
agent.end_of_round(last_game_state=new_game_state, last_action="RIGHT", events=events)


Awarded 1 for events COIN_COLLECTED
self_action: RIGHT
Awarded 1 for events COIN_COLLECTED


# Comparison with symmetry version