In [7]:
import math

class Bridge:
    '''
        Bridges between the snake_environment and DQN agent.
    '''
    @staticmethod
    def min_max_normalize(value, min_value, max_value):
            return (value - min_value) / (max_value - min_value)
    @staticmethod
    def euclidean_distance(source, destination):
        distance = math.sqrt((source[0] - destination[0]) ** 2 + (source[1] - destination[1]) ** 2)
        normalized_distance = Bridge.min_max_normalize(distance, 0, 39.941)
        return normalized_distance

    @staticmethod
    def find_position(maze, element):
        """Find the position of a given element in the maze."""
        for i, row in enumerate(maze):
            for j, val in enumerate(row):
                if val == element:
                    return i, j
        return None

    @staticmethod
    def vector_representation(maze, source_element='S', destination_element='D'):
        """Calculate and normalize the vector representation from source to destination."""

        source_pos = Bridge.find_position(maze, source_element)
        destination_pos = Bridge.find_position(maze, destination_element)

        if source_pos and destination_pos:
            vector_x = destination_pos[1] - source_pos[1]  # x-component (column difference)
            vector_y = destination_pos[0] - source_pos[0]  # y-component (row difference)

            # Normalize vector_x and vector_y
            normalized_vector_x = Bridge.min_max_normalize(vector_x, -25, 25)
            normalized_vector_y = Bridge.min_max_normalize(vector_y, -25, 25)

            return (normalized_vector_x, normalized_vector_y)
        else:
            return None
    @staticmethod
    def get_state(env):
        # Get the current state of the environment
        vector = Bridge.vector_representation(maze)

        # Calculate the euclidean distance between source and destination
        source_pos = Bridge.find_position(maze, 'S')
        destination_pos = Bridge.find_position(maze, 'D')
        distance = Bridge.euclidean_distance(source_pos, destination_pos)

        # state
        state = torch.tensor([vector[0], vector[1], distance])
        return state

if __name__ == "__main__":
    
    maze = [
            ['0', '0', '0', '1', '0', '1', '0', '0', '0', '1', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '1', '1', '0', '0', '0'],
            ['1', '1', '0', '0', '0', '0', '1', '1', '0', '0', '1', '0', '1', '1', '0', '1', '1', '0', '1', '1', '0', '0', '0', '1', '0'],
            ['0', '1', '1', '0', '1', '0', '0', '0', '0', '1', '0', '1', '0', '0', '0', '0', '1', '0', '0', '0', '1', '0', '1', '1', '0'],
            ['0', '0', '0', '1', '1', '1', '0', '1', '1', '1', '0', '0', '1', '0', '1', '0', '0', '1', '1', '0', '0', '0', '1', '0', '1'],
            ['0', '1', '1', '0', '1', '0', '0', '0', '0', '0', '0', '1', '0', '1', '0', '0', '1', '0', '1', '0', '1', '0', '0', '0', '0'],
            ['0', '1', '0', '0', '0', '1', '0', '1', '0', '1', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '1', '0'],
            ['0', '0', '1', '1', '0', '0', '0', '0', '1', '0', '0', '1', '0', '1', '0', '0', '1', '1', '1', '1', '0', '0', '0', '0', '0'],
            ['0', '1', '0', '0', '1', '0', '1', '0', '1', '1', '0', '0', '0', '1', '1', '0', '0', '0', '0', '0', '1', '0', '1', '1', '0'],
            ['0', '0', '0', '1', '0', '1', '0', '0', '0', '0', '1', '0', '1', '0', '1', '0', '1', '0', '1', '0', '1', '1', '0', '0', '0'],
            ['1', '0', '1', '0', '0', '0', '1', '0', '0', '0', '0', '0', '1', '0', '0', '0', '1', '0', '1', '1', '0', '0', '0', '1', '0'],
            ['0', '0', '0', '0', '1', '0', '0', '1', '0', '1', 'S', '1', '0', '0', '1', '0', '0', '1', '0', '0', '1', '1', '0', '0', '0'],
            ['0', '1', '0', '1', '0', '1', '0', '0', '0', '0', '0', '0', '1', '1', '0', '0', '1', '0', '1', '0', '0', '0', '0', '1', '0'],
            ['1', '0', '0', '1', '0', '1', '0', '1', '0', '1', '1', '0', '0', '1', '1', '0', '0', '0', '0', '1', '0', '1', '0', '0', '0'],
            ['1', '0', '1', '0', '0', '0', '1', '0', '1', '0', '0', '0', '1', '0', '0', '1', '0', '1', '1', '0', '1', '0', '0', '1', '0'],
            ['0', '1', '0', '1', '1', '0', '0', '0', '0', '1', '1', '0', '1', '0', '1', '0', '0', '0', '1', '0', '0', '1', '0', '0', '0'],
            ['0', '0', '0', '1', '0', '0', '1', '1', '0', '1', '0', '0', '0', '0', '1', '0', '1', '1', '0', '0', '1', '0', '0', '1', '0'],
            ['1', '0', '1', '0', '1', '0', '0', '0', '1', '0', '0', '1', '1', '1', '0', '0', '0', '1', '1', '0', '0', '1', '0', '0', '0'],
            ['0', '0', '0', '0', '0', '0', '1', '0', '0', '1', '0', '0', '0', '1', '1', '0', '1', '0', '1', '1', '0', '1', '1', '0', '0'],
            ['0', '1', '0', '1', '0', '1', '1', '1', '0', '0', '1', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0'],
            ['1', '0', '1', '0', '0', '1', '1', '0', '1', '0', '1', '0', '0', '1', '0', '1', '0', '1', '1', '0', '1', '1', '0', '1', '0'],
            ['0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '1', '0', '1', '0', '1', '0', '0', '1', '0', '0', '0', '0', '0'],
            ['1', '1', '1', '1', '1', '1', '0', '1', '0', '1', '0', '1', '0', '0', '0', '0', '0', '0', '1', '0', '0', '1', '1', '0', '0'],
            ['0', '0', '0', '0', '1', '0', '0', '0', '1', '0', '0', '0', '0', '1', '0', '1', '0', '1', '0', '0', '1', '1', '0', '0', '0'],
            ['0', '1', '1', '0', '0', '1', '1', '1', '0', '1', '0', '1', '0', '0', '1', '0', '0', '0', '1', '0', '1', '0', '0', '1', '0'],
            ['D', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '1', '0', '1', '0', '0', '1', '0', '0', '1', '0', '0', '1', '1', '0']
       ]
    
    vector = Bridge.vector_representation(maze)
    print("Vector representation from 'S' to 'D':", vector)

    # Calculate the euclidean distance between source and destination
    source_pos = Bridge.find_position(maze, 'S')
    destination_pos = Bridge.find_position(maze, 'D')
    distance = Bridge.euclidean_distance(source_pos, destination_pos)
    print("Euclidean distance between 'S' and 'D':", distance)

Vector representation from 'S' to 'D': (0.3, 0.78)
Euclidean distance between 'S' and 'D': 0.4307516219945733


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim

class DQN(nn.Module):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_size, 24)  # Input Layer
        self.fc2 = nn.Linear(24, 24)          # Hidden Layer
        self.fc3 = nn.Linear(24, action_size) # Output Layer

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)  # Linear output for action values
    
    @staticmethod
    def build_model(state_size, action_size):
        # Creating the model instance
        dqn_model = DQN(state_size, action_size)

        # Defining optimizer and loss function
        optimizer = optim.Adam(dqn_model.parameters())  # Adam optimizer
        loss_function = nn.MSELoss()                    # Mean Squared Error Loss
        
        return dqn_model, optimizer, loss_function
    
    @staticmethod
    def predict_action(model, state):
        # vector, normalized_distance = state

        # Convert the inputs to a torch tensor in the same format as the training data
        # model_input = torch.tensor([vector[0], vector[1], normalized_distance]).float()
        model_input = torch.tensor([state[0], state[1], state[2]]).float()

        # Pass the input through the model
        with torch.no_grad():  # We don't need to track gradients here
            model_output = model(model_input)

        # Get the action with the highest Q-value
        best_action = torch.argmax(model_output).item()
        
        return best_action

# Example usage
vector_size = 2  # x, y components of the vector
distance_size = 1  # normalized distance

state_size = vector_size + distance_size
action_size = 4  # four possible directions to move

model, optimizer, loss_fn = DQN.build_model(state_size, action_size)

# ---------------------------------
# Training the model on dummy data
# ---------------------------------

# Dummy data generation
num_samples = 1000
dummy_inputs = torch.rand(num_samples, vector_size + distance_size)
dummy_outputs = torch.randint(0, action_size, (num_samples,))
dummy_outputs = nn.functional.one_hot(dummy_outputs, num_classes=action_size).float()

print(f'Input shape: {dummy_inputs.shape} \n input sample: {dummy_inputs[0]}')      # Input shape: torch.Size([1000, 3])     input sample:  tensor([0.1270, 0.6738, 0.6910])
print(f'Output shape: {dummy_outputs.shape} \n output sample: {dummy_outputs[0]}') # Output shape: torch.Size([1000, 4])     output sample: tensor([1., 0., 0., 0.])




# Training loop
epochs = 10
for epoch in range(epochs):
    for i in range(num_samples):
        # Forward pass
        outputs = model(dummy_inputs[i])
        loss = loss_fn(outputs, dummy_outputs[i])


        # Backward and optimize
        optimizer.zero_grad()   # zero the gradients because they accumulate by default
        loss.backward()         # calculate the gradients
        optimizer.step()        # update the parameters

    print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')

# prediction
sample_input = torch.rand(1, vector_size + distance_size)
sample_output = model(sample_input)
print(f'Input: {sample_input} \n Output: {sample_output}')

# Example usage
vector_example = (0.5, -0.3)  # Example vector representation
normalized_distance_example = 0.8  # Example normalized distance

# Predict the action
predicted_action = DQN.predict_action(model, dummy_inputs[0])
print(f"Predicted action: {predicted_action} \n Expected action: {torch.argmax(dummy_outputs[0]).item()}")
'''
note:
    torch.argmax: index of max value
    .item(): value of the tensor
'''


ModuleNotFoundError: No module named 'torch'

In [None]:
maze = [
    ['S', '0', '0', '1', '0', '1', '0', '0', '0', '1', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '1', '1', '0', '0', '0'],
    ['1', '1', '0', '0', '0', '0', '1', '1', '0', '0', '1', '0', '1', '1', '0', '1', '1', '0', '1', '1', '0', '0', '0', '1', '0'],
    ['0', '1', '1', '0', '1', '0', '0', '0', '0', '1', '0', '1', '0', '0', '0', '0', '1', '0', '0', '0', '1', '0', '1', '1', '0'],
    ['0', '0', '0', '1', '1', '1', '0', '1', '1', '1', '0', '0', '1', '0', '1', '0', '0', '1', '1', '0', '0', '0', '1', '0', '1'],
    ['0', '1', '1', '0', '1', '0', '0', '0', '0', '0', '0', '1', '0', '1', '0', '0', '1', '0', '1', '0', '1', '0', '0', '0', '0'],
    ['0', '1', '0', '0', '0', '1', '0', '1', '0', '1', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '1', '0'],
    ['0', '0', '1', '1', '0', '0', '0', '0', '1', '0', '0', '1', '0', '1', '0', '0', '1', '1', '1', '1', '0', '0', '0', '0', '0'],
    ['0', '1', '0', '0', '1', '0', '1', '0', '1', '1', '0', '0', '0', '1', '1', '0', '0', '0', '0', '0', '1', '0', '1', '1', '0'],
    ['0', '0', '0', '1', '0', '1', '0', '0', '0', '0', '1', '0', '1', '0', '1', '0', '1', '0', '1', '0', '1', '1', '0', '0', '0'],
    ['1', '0', '1', '0', '0', '0', '1', '0', '0', '0', '0', '0', '1', '0', '0', '0', '1', '0', '1', '1', '0', '0', '0', '1', '0'],
    ['0', '0', '0', '0', '1', '0', '0', '1', '0', '1', '0', '1', '0', '0', '1', '0', '0', '1', '0', '0', '1', '1', '0', '0', '0'],
    ['0', '1', '0', '1', '0', '1', '0', '0', '0', '0', '0', '0', '1', '1', '0', '0', '1', '0', '1', '0', '0', '0', '0', '1', '0'],
    ['1', '0', '0', '1', '0', '1', '0', '1', '0', '1', '1', '0', '0', '1', '1', '0', '0', '0', '0', '1', '0', '1', '0', '0', '0'],
    ['1', '0', '1', '0', '0', '0', '1', '0', '1', '0', '0', '0', '1', '0', '0', '1', '0', '1', '1', '0', '1', '0', '0', '1', '0'],
    ['0', '1', '0', '1', '1', '0', '0', '0', '0', '1', '1', '0', '1', '0', '1', '0', '0', '0', '1', '0', '0', '1', '0', '0', '0'],
    ['0', '0', '0', '1', '0', '0', '1', '1', '0', '1', '0', '0', '0', '0', '1', '0', '1', '1', '0', '0', '1', '0', '0', '1', '0'],
    ['1', '0', '1', '0', '1', 'F', '0', '0', '1', '0', '0', '1', '1', '1', '0', '0', '0', '1', '1', '0', '0', '1', '0', '0', '0'],
    ['0', '0', '0', '0', '0', '0', '1', '0', '0', '1', '0', '0', '0', '1', '1', '0', '1', '0', '1', '1', '0', '1', '1', '0', '0'],
    ['0', '1', '0', '1', '0', '1', '1', '1', '0', '0', '1', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0'],
    ['1', '0', '1', '0', '0', '1', '1', '0', '1', '0', '1', '0', '0', '1', '0', '1', '0', '1', '1', '0', '1', '1', '0', '1', '0'],
    ['0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '1', '0', '1', '0', '1', '0', '0', '1', '0', '0', '0', '0', '0'],
    ['1', '1', '1', '1', '1', '1', '0', '1', '0', '1', '0', '1', '0', '0', '0', '0', '0', '0', '1', '0', '0', '1', '1', '0', '0'],
    ['0', '0', '0', '0', '1', '0', '0', '0', '1', '0', '0', '0', '0', '1', '0', '1', '0', '1', '0', '0', '1', '1', '0', '0', '0'],
    ['0', '1', '1', '0', '0', '1', '1', '1', '0', '1', '0', '1', '0', '0', '1', '0', '0', '0', '1', '0', '1', '0', '0', '1', '0'],
    ['0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '1', '0', '1', '0', '0', '1', '0', '0', '1', '0', '0', '1', '1', '0']
]

In [None]:
# Build DQNAgent class
import gymnasium as gym
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError
from collections import deque

# Define the DQN agent class
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size    # we need it to define the input layer of the network
        self.action_size = action_size  # we need it to define the output layer of the network
        self.memory = deque(maxlen=2000)    # deque is a list-like container with fast appends and pops on either end
        self.gamma = 0.95  # Discount factor
        self.epsilon = 1.0  # Exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.model, self.optimizer, self.loss_function = DQN.build_model(state_size=3, action_size=4)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.randint(self.action_size)
        q_values = DQN.predict_action(self.model, state)
        return np.argmax(q_values[0])

    def replay(self, batch_size):
        minibatch = np.array(random.sample(self.memory, batch_size))
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = reward + self.gamma * np.amax(self.model.predict(next_state)[0])
            target_f = self.model.predict(state)    # target_f is the predicted Q values
            target_f[0][action] = target            # target_f[0][action] is the Q value of the action taken
            '''
            note: target_f[0] : keras model returns a list of lists, so we need to access the first element of the list to get the predicted Q values
            '''
            self.model.fit(state, target_f, epochs=1, verbose=0)    # re-training the model
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
        minibatch = random.sample(self.memory, batch_size)  # get random batch (i.e. batch with random elements) from memory
        for state, action, reward, next_state, done in minibatch:
            state = torch.tensor(state, dtype=torch.float32)
            next_state = torch.tensor(next_state, dtype=torch.float32)
            target = reward + (self.gamma * torch.max(self.model(next_state)).item()) * (not done)  # Bellman equation (discounted reward)
            '''
                note: *(not done): gives 0 if done is True, 1 otherwise
                target is target Q value
            '''

            # Predicted Q values
            pred_q_values = self.model(state)   # predicted Q values for the current state
            target_q_values = pred_q_values.clone().detach()    # clone the predicted Q values
            target_q_values[action] = target                    # update the Q value of the action taken

            # Back-propagate and optimize
            self.optimizer.zero_grad()
            '''
                Before you start a new optimization step, you need to zero out the previously accumulated gradients.
                clears old gradients from the last step (if they exist).
            '''
            loss = self.loss_function(pred_q_values, target_q_values)   # calculate the loss
            loss.backward()         # backward pass. 
            '''
                note: pred_q_values remembers the graph of operations that created it thats how it knows which model to backpropagate
            '''
            self.optimizer.step()   # update the model parameters

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay  # decrease the exploration rate

# -------------------------------------------------------------
# To Do: Update below code to use PyTorch  + Snake environment
# -------------------------------------------------------------

from snake_maze_env import MazeSnakeGameEnv
# Create the environment
env = MazeSnakeGameEnv(maze, height=25, width=25, snake_growth = False, boundary_loop = False)
state_size = 3
action_size = 4

# Initialize the DQN agent
agent = DQNAgent(state_size, action_size)

# Training loop
batch_size = 32
num_episodes = 100
for episode in range(num_episodes):
    state = env.reset() # updated maze
    # state = np.reshape(state, [1, state_size])  # Reshape the state to a 1x3 vector so that it can be fed to the network
    dqn_state = Bridge.get_state(env)
    for t in range(500):
        # Render the environment (optional)
        env.render()

        # Choose an action
        dqn_action = agent.act(dqn_state)

        # Perform the action
        next_state, reward, done, _ = env.step(dqn_action)
        # next_state = np.reshape(next_state, [1, state_size])
        next_dqn_state = Bridge.get_state(env)

        # Remember the experience
        agent.remember(state, action, reward, next_dqn_state, done)

        # Update the state
        dqn_state = next_dqn_state

        # Check if episode is finished
        if done:
            break

        # Train the agent
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)

# questions
* update rewards from replay memory before training? discounting reward as agent is farther from target?
- yes update using discount factor

# References
* [colab code](https://colab.research.google.com/drive/?authuser=2#create=1&folderId=1kvo56FfBy3KNHKiRdLoUsl5KlScGJ_21)

