In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from collections import deque
import random

# Define the DQN (Deep Q-Network) model --> PLAY WITH IT
class DQN(tf.keras.Model):
    def __init__(self, action_size, state_size):
        super(DQN, self).__init__()
        self.fc1 = tf.keras.layers.Dense(4, activation='relu', input_shape=(state_size,))
        self.fc2 = tf.keras.layers.Dense(16, activation='relu')
        self.fc3 = tf.keras.layers.Dense(action_size, activation='linear')

    def call(self, state):
        x = self.fc1(state)
        x = self.fc2(x)
        return self.fc3(x)

# Setting agent and its learning parameters --> PLAY WITH IT
class DQNAgent: 
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95    # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.999
        self.learning_rate = 0.01
        self.model = DQN(action_size, state_size)  # Use the DQN model class
        self.model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        self.position = [0, 0]

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model(state, training=False)
        return np.argmax(act_values[0])

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            # Predict the target value for the current state
            target = reward
            if not done:
                # Predict the target Q-value for the next state
                target = (reward + self.gamma * np.amax(self.model.predict(next_state)[0]))
            # Get the current prediction for all actions for current state
            target_f = self.model.predict(state)
            # Update the target for the action that was taken
            target_f[0][action] = target
            # Fit the model with the updated targets
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def update_position(self, new_position):
            self.position = new_position

def calculate_reward(new_position, nutrient_position, previous_distance):
    # Calculate the Manhattan distance from the agent to the nutrient source
    current_distance = abs(new_position[0] - nutrient_position[0]) + abs(new_position[1] - nutrient_position[1])
    
    # Reward function --> PLAY WITH IT
    if current_distance < previous_distance:
        reward_tmp = (100 / current_distance) if current_distance > 0 else 500
        return reward_tmp # Moved closer
    else:
        reward_tmp = -(current_distance)
        return reward_tmp # Moved away or stayed the same

# Initialize DQN agent
state_size = 2  # since the state is represented by x and y position
action_size = 4  # ['up', 'down', 'left', 'right']
dqn_agent = DQNAgent(state_size, action_size)

# Example of agent acting in the environment
state = np.random.rand(1, state_size)  # dummy state
action = dqn_agent.act(state)  # choose an action
print(f"Action taken by the DQN agent: {action}")


In [None]:
import matplotlib.pyplot as plt

# Environment setup
class GridWorld:
    def __init__(self, size, nutrient_position, nutrient_gradient):
        self.size = size
        self.nutrient_position = nutrient_position
        self.nutrient_gradient = nutrient_gradient

        # Initialize the DQN agent within the GridWorld
        state_size = 2  # For example, if using x and y coordinates as the state
        action_size = 4  # Four possible actions: up, down, left, right
        self.agent = DQNAgent(state_size, action_size)

        self.grid = np.zeros(size)
        self._set_nutrient()

    def _set_nutrient(self, gradient_type='diamond'):  # Choose 'radial', 'rectangular', or 'diamond'
        # Set the nutrient source on the grid
        x_center, y_center = self.nutrient_position
        self.grid[x_center, y_center] = 1  # Mark the nutrient source

        if gradient_type == 'rectangular':
            for i in range(1, self.nutrient_gradient):
                gradient_area = [(x_center+dx, y_center+dy) for dx in range(-i, i+1) for dy in range(-i, i+1) if 0 <= x_center+dx < self.size[0] and 0 <= y_center+dy < self.size[1]]
                for pos in gradient_area:
                    self.grid[pos] += 0.1  # Increase the value to represent the gradient

        elif gradient_type == 'radial':
            for dx in range(-self.nutrient_gradient, self.nutrient_gradient):
                for dy in range(-self.nutrient_gradient, self.nutrient_gradient):
                    dist = np.sqrt(dx**2 + dy**2)
                    if dist < self.nutrient_gradient and 0 <= x_center+dx < self.size[0] and 0 <= y_center+dy < self.size[1]:
                        self.grid[x_center+dx, y_center+dy] += (self.nutrient_gradient - dist) / self.nutrient_gradient

        elif gradient_type == 'diamond':
            for dx in range(-self.nutrient_gradient, self.nutrient_gradient):
                for dy in range(-self.nutrient_gradient, self.nutrient_gradient):
                    dist = abs(dx) + abs(dy)  # Manhattan distance for a diamond shape
                    if dist < self.nutrient_gradient and 0 <= x_center+dx < self.size[0] and 0 <= y_center+dy < self.size[1]:
                        self.grid[x_center+dx, y_center+dy] += (self.nutrient_gradient - dist) / self.nutrient_gradient

    def show_grid(self, agent_position=None, step_number=None, ax=None):
        if ax is None:
            ax = plt.subplots(figsize=(8, 8))
        plt.imshow(self.grid, cmap='hot', interpolation='nearest')

        # If an agent's position is provided, overlay a green dot at the agent's position
        if agent_position is not None:
            # Ensure the agent's position is a flat list or tuple before plotting
            agent_position = np.array(agent_position).flatten()
            plt.scatter(agent_position[1], agent_position[0], color='green', s=100)  # Increase size for visibility

        # If a step number is provided, display it on the plot
        if step_number is not None:
            plt.text(0.5, 0.95, f'Step: {step_number}', fontsize=14, color='black', ha='center', va='top', transform=plt.gcf().transFigure)

        # Keep the axis but remove the colorbar
        plt.xticks(ticks=np.arange(0, self.size[1], 10), labels=np.arange(0, self.size[1], 10))
        plt.yticks(ticks=np.arange(0, self.size[0], 10), labels=np.arange(0, self.size[0], 10))
        plt.xlabel('X-coordinate')
        plt.ylabel('Y-coordinate')
        plt.grid(False)  # Optionally turn off the grid if you prefer
        plt.show()

    def reset(self):
        # Reset the agent to a new random position
        new_position = (np.random.randint(0, self.size[0]), np.random.randint(0, self.size[1]))
        self.agent.position = new_position
        
        # If there are other variables that should be reset at the start of each episode,
        # they should be reset here as well.
        
        # Return the initial state of the agent
        return self.agent.position
    
    def step(self, action_index):
        # Map the action index to an actual movement
        action_mapping = {
            0: 'up',
            1: 'down',
            2: 'left',
            3: 'right'
        }
        action = action_mapping[action_index]

        # Get the current position of the agent
        x, y = self.agent.position

        # Update the agent's position based on the action
        if action == 'up' and x > 0:
            new_position = (x - 1, y)
        elif action == 'down' and x < self.size[0] - 1:
            new_position = (x + 1, y)
        elif action == 'left' and y > 0:
            new_position = (x, y - 1)
        elif action == 'right' and y < self.size[1] - 1:
            new_position = (x, y + 1)
        else:
            new_position = (x, y)  # No change if the action is not feasible

        # Update the agent's position
        self.agent.position = new_position

        # Calculate the new distance to the nutrient source for the reward function
        new_distance = abs(new_position[0] - self.nutrient_position[0]) + abs(new_position[1] - self.nutrient_position[1])
        previous_distance = abs(x - self.nutrient_position[0]) + abs(y - self.nutrient_position[1])

        # Calculate the reward
        reward = calculate_reward(new_position, self.nutrient_position, previous_distance)
        done = new_distance == 0

        # Reshape the new position for the neural network input
        new_state = np.array(new_position).reshape(1, -1)

        return new_state, reward, done

# Parameters
grid_size = (100, 100)
nutrient_source_position = (50, 50)  # Center of the grid
nutrient_gradient = 100

# Initialize the environment
environment = GridWorld(grid_size, nutrient_source_position, nutrient_gradient)
environment._set_nutrient(gradient_type='radial') 

# Show the initial grid world
environment.show_grid()

In [None]:
# Simulate and visualize the agent's movement for 100 steps without prior training

import time
from IPython.display import clear_output

for step in range(100):
    current_state = np.array(dqn_agent.position).reshape(1, -1)
    
    # Let the agent choose the action
    action_index = dqn_agent.act(current_state)
    
    # Apply the action to the environment to get the new state and reward
    new_state, reward, done = environment.step(action_index)
    
    # Update the agent's position
    dqn_agent.update_position(new_state.flatten())

    # Visualize the agent's movement on the grid
    clear_output(wait=True)
    environment.show_grid(dqn_agent.position, step_number=step)
    
    if done:
        print("Found the nutrient source!")
        break
    time.sleep(0.1)

In [None]:
### TRAINING 

from IPython.display import Image, display
import io
import imageio
import matplotlib.pyplot as plt
from PIL import Image

# Training Parameters
num_episodes = 500
max_steps_per_episode = 25  # More reasonable limit for each episode
batch_size = 4  # Batch size for training

# Initialize the environment
environment = GridWorld(grid_size, nutrient_source_position, nutrient_gradient)

# Initialize DQN agent
dqn_agent = DQNAgent(state_size=2, action_size=4)

# Keep track of the learning progress
all_epochs = []
all_penalties = []

# To store the rewards for plotting
rewards_per_episode = []

# Store the frames for GIF
frames = []

for episode in range(num_episodes):
    # Reset the environment and get the initial state
    initial_state = environment.reset()  # This should return a tuple like (x, y)
    state = np.array(initial_state).reshape(1, -1)  # Convert the tuple to a NumPy array and reshape it
    total_reward = 0
    done = False
    
    for step in range(max_steps_per_episode):
        # Agent selects an action
        action_index = dqn_agent.act(state)
        
        # Apply the action to the environment
        next_state, reward, done = environment.step(action_index)
        next_state = next_state.reshape(1, -1)
        total_reward += reward
        
        # Remember the experience
        dqn_agent.remember(state, action_index, reward, next_state, done)
        
        # Update state
        state = next_state
        
        # Replay and train the neural network
        if len(dqn_agent.memory) > batch_size:
            dqn_agent.replay(batch_size)
        
        # Update the total_reward
        total_reward += reward
        
        if done:
            break

        print(step)
    
    # Decay epsilon
    if dqn_agent.epsilon > dqn_agent.epsilon_min:
        dqn_agent.epsilon *= dqn_agent.epsilon_decay
    
    all_epochs.append(step)
    all_penalties.append(total_reward < 0)  # Assuming negative rewards are penalties

    # Append the total reward of the episode for plotting
    rewards_per_episode.append(total_reward)

    # Visualize the training progress
    clear_output(wait=True)
    fig = plt.figure(figsize=(12, 5))
    plt.plot(rewards_per_episode)
    plt.title('Training Progress')
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.tight_layout()

    # Save the plot to a BytesIO object
    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    plt.close(fig)  # close the figure to prevent it from displaying in the notebook
    buf.seek(0)
    image = Image.open(buf)
    display(image)

    # Add to frames for GIF
    frames.append(image)

    # Print the progress
    print(f"Episode: {episode + 1}/{num_episodes}, Steps: {step}, Total reward: {total_reward}, Epsilon: {dqn_agent.epsilon:.2f}")


# Save the frames as a GIF
imageio.mimsave('training_progress.gif', frames, fps=5)

# After the training is complete
dqn_agent.model.save('DQN_Agent_Diamond_pretrained.tf', save_format="tf")

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import imageio
from keras.models import load_model

# Load the trained model
dqn_agent.model = load_model('DQN_Agent_Diamond_pretrained.tf')

# Reset the environment for the demonstration
start_position = environment.reset()
state = np.array(start_position).reshape(1, -1)

frames = []  # List to keep track of the frames for the GIF
rewards = []  # List to keep track of the rewards

# Simulate and visualize the agent's movement for 100 steps
for t in range(100):
    # Agent selects the best action based on the trained model
    action_index = np.argmax(dqn_agent.model.predict(state))

    # Apply the action to the environment
    new_state, reward, done = environment.step(action_index)

    # Update rewards list and state
    rewards.append(reward)
    state = new_state

    # Visualization
    clear_output(wait=True)
    fig, ax = plt.subplots(1, 2, figsize=(12, 6))  # Adjust the subplot size ratio

    # Plot for the agent's movement on the grid
    ax[0].imshow(environment.grid, cmap='hot', interpolation='nearest')
    agent_position = np.array(environment.agent.position).flatten()
    ax[0].scatter(agent_position[1], agent_position[0], color='green', s=100)  # Mark the agent's position
    ax[0].set_title('Agent Movement')
    ax[0].set_xlabel('X-coordinate')
    ax[0].set_ylabel('Y-coordinate')

    # Plot for the reward per step
    ax[1].plot(rewards, label='Reward')
    ax[1].set_title('Reward per Step')
    ax[1].set_xlabel('Step')
    ax[1].set_ylabel('Reward')
    ax[1].legend()

    display(fig)
    plt.close(fig)

    # Capture the current plot as an image frame
    fig.canvas.draw()
    image = np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8')
    image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    frames.append(image)

    if done:
        print(f"Episode finished after {t + 1} timesteps")
        break

# Save frames as a GIF
imageio.mimsave('agent_movement --better.gif', frames, fps=5)