<a href="https://colab.research.google.com/github/Veranzi/Data_Science/blob/main/SQNN___Nnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install gym
!pip install pygame
!pip install tensorflow

Importing libraries. The combination of these libraries suggests that the code may involve implementing and training a neural network for reinforcement learning within a specific environment (possibly a game-like environment using Pygame). The gym library provides the environment interface, and TensorFlow, along with Keras layers, is likely used for defining and training neural network models. Pygame is used for rendering and visualizing the environment during training or testing.

In [None]:
#libraries
import numpy as np #used for numerical operations in Python, and in this context, it's likely used for array manipulations and mathematical computations.
import gym  # Imports the OpenAI Gym library, a toolkit for developing and comparing reinforcement learning algorithms. It provides a variety of pre-built environments for testing and training agents.
import pygame   #..Imports the Pygame library, a set of Python modules designed for writing video games. In this context, Pygame may be used for visualizing and rendering the reinforcement learning environment of the taxi.
import tensorflow as tf#...used for building and training neural networks, including those used in reinforcement learning.
from tensorflow.keras import layers #..Keras is a high-level neural networks API, and the layers module provides building blocks for constructing neural network architectures.

  from tensorflow.tsl.python.lib.core import pywrap_ml_dtypes


This code sets up a simple neural network architecture for a Q-network, which can be used in reinforcement learning algorithms to estimate Q-values and make decisions about actions in an environment.


In [None]:
class QNetwork(tf.keras.Model): # The QNetwork class inherits from tf.keras.Model. It is designed for approximating Q-values for a given state in a reinforcement learning setting
    def __init__(self, num_actions): #The constructor (init) initializes the Q-network with three dense (fully connected) layers.
        super(QNetwork, self).__init__()

        #The first two layers (dense1 and dense2) have 128 units each and use the ReLU (Rectified Linear Unit) activation function, which introduces non-linearity to the model.
        self.dense1 = layers.Dense(128, activation='relu')
        self.dense2 = layers.Dense(128, activation='relu')

        #is the output layer with a number of units equal to the num_actions parameter, representing the possible actions in the reinforcement learning environment. It uses a linear activation function, as Q-values are unbounded.
        self.output_layer = layers.Dense(num_actions, activation='linear')
#The call method defines the forward pass of the network.
#It takes a state as input and passes it through the two dense layers with ReLU activation functions, and then through the output layer with a linear activation function.
#The result is the Q-values for each possible action in the given state.
    def call(self, state):
        x = self.dense1(state)
        x = self.dense2(x)
        return self.output_layer(x)

This code establishes a Gym environment for a taxi navigating a maze, sets up Q-networks for reinforcement learning, defines DQN-related parameters, and initializes a Pygame display for visualization.

This code defines a custom environment TaxiDQNEnv for reinforcement learning using the OpenAI Gym framework. It models a taxi navigating a maze to pick up and drop off a passenger to the desired destination. The environment includes a Q-network for making decisions, target Q-network for stability, and various parameters for the Deep Q-Network (DQN) algorithm.

In [None]:
class TaxiDQNEnv(gym.Env):  #The TaxiDQNEnv class inherits from gym.Env, indicating it is an OpenAI Gym environment.
    def __init__(self, taxi_maze): #The constructor (__init__) initializes the environment with a 2D maze represented by a NumPy array (taxi_maze).
        super(TaxiDQNEnv, self).__init__()
        self.taxi_maze = np.array(taxi_maze)
        self.passenger_pos = np.where(self.taxi_maze == 'P') #The positions of the passenger ('P')
        self.goal_pos = np.where(self.taxi_maze == 'D') # goal ('D')
        self.taxi_pos = np.where(self.taxi_maze == 'T') # taxi ('T')
        self.num_rows, self.num_cols = self.taxi_maze.shape
#The observation space is defined as a tuple of discrete spaces representing the current positions of the taxi, passenger, and goal.
        self.action_space = gym.spaces.Discrete(6)  # 6 possible actions: 0=up, 1=down, 2=left, 3=right, 4=pick up, 5=drop off
        self.observation_space = gym.spaces.Tuple((
            gym.spaces.Discrete(self.num_rows),
            gym.spaces.Discrete(self.num_cols),
            gym.spaces.Discrete(self.num_rows),
            gym.spaces.Discrete(self.num_cols)
        ))
 #Two instances of the QNetwork class (presumably defined elsewhere in the code) are created for representing Q-values.
 # One is the main Q-network (q_network), and the other is the target Q-network (target_q_network).
 #The weights of the target Q-network are initialized with the weights of the main Q-network.
        # Neural network for Q-values
        self.q_network = QNetwork(self.action_space.n)
        self.target_q_network = QNetwork(self.action_space.n)
        self.target_q_network.set_weights(self.q_network.get_weights())
#DQN parameters are set, including the Adam optimizer with a learning rate of 0.001
#a discount factor of 0.99 for future rewards, and a UCB (Upper Confidence Bound) exploration parameter (ucb_parameter) set to 1.0.
        # DQN parameters
        self.optimizer = tf.optimizers.Adam(learning_rate=0.001)
        self.discount_factor = 0.99
        self.ucb_parameter = 1.0  # UCB parameter for exploration
#Pygame is initialized for visualizing the environment. The size of each cell in the visualization (cell_size) is set, and a Pygame display screen is created based on the dimensions of the maze.

        pygame.init()
        self.cell_size = 125
        self.screen = pygame.display.set_mode((self.num_cols * self.cell_size, self.num_rows * self.cell_size))

returns a NumPy array representing the current state of the environment. The state includes the positions of the taxi and the passenger in the maze.

In [None]:
    def _get_state(self):
      #self.taxi_pos[0][0]: Retrieves the row index of the taxi's position in the maze.
#self.taxi_pos[1][0]: Retrieves the column index of the taxi's position in the maze.
#self.passenger_pos[0][0]: Retrieves the row index of the passenger's position in the maze.
#self.passenger_pos[1][0]: Retrieves the column index of the passenger's position in the maze.
        return np.array([self.taxi_pos[0][0], self.taxi_pos[1][0], self.passenger_pos[0][0], self.passenger_pos[1][0]])

The step method in the TaxiDQNEnv class defines the logic for a single step in the reinforcement learning environment. It takes an action as input, updates the environment based on the action of the taxi, and performs Deep Q-Network (DQN) training.

In [None]:
    def step(self, action):
        state = self._get_state() #Retrieves the current state of the environment using the _get_state method.

        # Take action and observe the next state and reward
        next_state, reward, done, _ = self._take_action(action)

        # DQN training
        target = reward + self.discount_factor * np.max(self.target_q_network.predict(np.array([next_state]))) #Calculates the target Q-value for the Q-network update using the Bellman equation.computes the loss for the Q-network. It includes an Upper Confidence Bound (UCB) bonus to encourage exploration.
        with tf.GradientTape() as tape: #Computes the gradients of the loss with respect to the trainable variables of the Q-network.
            q_values = self.q_network(np.array([state]))
            ucb_bonus = self.ucb_parameter * np.sqrt(np.log(self.total_steps + 1) / np.sum(self.total_actions + 1e-6))
            action_ucb_values = q_values + ucb_bonus
            loss = -tf.reduce_sum(tf.one_hot(action, self.action_space.n) * tf.math.log(tf.nn.softmax(action_ucb_values)))
        gradients = tape.gradient(loss, self.q_network.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.q_network.trainable_variables)) #Applies the gradients to update the Q-network's weights using the optimizer.

        #  The target Q-network is updated periodically every 100 steps to improve stability.
        if self.total_steps % 100 == 0:
            self.target_q_network.set_weights(self.q_network.get_weights())

        self.total_steps += 1 #Tracks the total number of steps taken in the environment.
        self.total_actions[action] += 1  # Updates the count of actions taken for the Upper Confidence Bound (UCB) calculation

        return next_state, reward, done, {} #The method returns the next state, the reward obtained, whether the episode is done, and an empty dictionary of additional information.


This method encapsulates the transition dynamics of the environment based on the chosen action and is used within the step method to update the environment during the reinforcement learning process.

In [None]:
    def _take_action(self, action):# defines the logic for taking a specific action in the reinforcement learning environment and updating the state, reward, and done status accordingly.
        taxi_row, taxi_col, passenger_row, passenger_col = self._get_state() #The current state of the environment, including the positions of the taxi and the passenger, is obtained using the _get_state method.

        reward = 0  # Initialize reward
        done = False  # Initialize done

        if action == 0:  # Up .........Decreases the taxi's row index, ensuring it doesn't go below the top boundary.
            taxi_row = max(0, taxi_row - 1)
        elif action == 1:  # Down............Increases the taxi's row index, ensuring it doesn't exceed the bottom boundary.
            taxi_row = min(self.num_rows - 1, taxi_row + 1)
        elif action == 2:  # Left.............Decreases the taxi's column index, ensuring it doesn't go below the left boundary.
            taxi_col = max(0, taxi_col - 1)
        elif action == 3:  # Right......... Increases the taxi's column index, ensuring it doesn't exceed the right boundary.
            taxi_col = min(self.num_cols - 1, taxi_col + 1)
        elif action == 4:  # Pick up.................If the taxi is in the same position as the passenger, the passenger is picked up, the corresponding cell in the maze is cleared, and the reward is set to 1.0.
            if (taxi_row, taxi_col) == (passenger_row, passenger_col):
                self.taxi_maze[passenger_row, passenger_col] = ' '
                reward = 1.0  # Update reward for successful pick-up
        elif action == 5:  # Drop off...................If the taxi is at the goal position and the passenger has been picked up, the drop-off is successful, and the reward is set to 1.0. If the drop-off is unsuccessful, the reward is set to -1.0.
            if (taxi_row, taxi_col) == (self.goal_pos[0][0], self.goal_pos[1][0]) and \
                    self.taxi_maze[passenger_row, passenger_col] == ' ':
                reward = 1.0  # Update reward for successful drop-off
                done = True
            else:
                reward = -1.0  # Update reward for unsuccessful drop-off
                done = False
        else:
            raise ValueError("Invalid action")
     #The method returns a tuple containing the next state, the reward obtained, the done status, and an empty dictionary of additional information.
        return np.array([taxi_row, taxi_col, passenger_row, passenger_col]), reward, done, {}


The reset method in the TaxiDQNEnv class is responsible for resetting the environment to its initial state at the beginning of a new episode.

In [None]:
    def reset(self):
        self.taxi_pos = np.where(self.taxi_maze == 'T')  #Resets the position of the taxi to its initial position by finding the coordinates where the character 'T' is located in the maze.
        self.passenger_pos = np.where(self.taxi_maze == 'P')  #Resets the position of the passenger to its initial position by finding the coordinates where the character 'P' is located in the maze.
        self.total_steps = 0 #Resets the total number of steps taken in the environment to zero for the new episode.
        self.total_actions = np.zeros(self.action_space.n)  # Initializes an array of zeros to track the counts of each action taken. This is used for the Upper Confidence Bound (UCB) calculation.
        return self._get_state() #Returns the initial state of the environment after the reset using the _get_state method. This state includes the positions of the taxi and the passenger.


The render method in the TaxiDQNEnv class is responsible for visually rendering the current state of the environment using the Pygame library. It draws a representation of the maze on the screen, highlighting the positions of the taxi, passenger, and destination.

In [None]:
    def render(self):
        self.screen.fill((255, 255, 255)) #Fills the entire screen with a white color to clear the previous rendering.

        for row in range(self.num_rows):
            for col in range(self.num_cols):
                cell_left = col * self.cell_size
                cell_top = row * self.cell_size #Calculate the pixel coordinates of the top-left corner of the current cell based on its row and column.

                if self.taxi_maze[row, col] == '#':  # Obstacle
                    pygame.draw.rect(self.screen, (0, 0, 0), (cell_left, cell_top, self.cell_size, self.cell_size))
                elif self.taxi_maze[row, col] == 'S':  # Starting position
                    pygame.draw.rect(self.screen, (0, 255, 0), (cell_left, cell_top, self.cell_size, self.cell_size))
                elif self.taxi_maze[row, col] == 'P':  # Passenger position
                    pygame.draw.rect(self.screen, (255, 255, 0), (cell_left, cell_top, self.cell_size, self.cell_size))
                elif self.taxi_maze[row, col] == 'D':  # Destination position
                    pygame.draw.rect(self.screen, (255, 0, 0), (cell_left, cell_top, self.cell_size, self.cell_size))

                if np.array_equal(self._get_state(), np.array([row, col, self.passenger_pos[0][0], self.passenger_pos[1][0]])): #draw a blue rectangle to highlight the taxi's position.
                    pygame.draw.rect(self.screen, (0, 0, 255), (cell_left, cell_top, self.cell_size, self.cell_size)) #

        pygame.display.update() #Updates the Pygame display to reflect the changes made in the rendering.

Periodically, the total reward for the current episode is printed for monitoring the training progress.

In [None]:
# Example Usage
taxi_maze = [   #The layout of the taxi environment is defined as a 2D maze.
    [' ', ' ', ' #', ' ', 'P'], #The total reward obtained in each episode is collected in the rewards list.
    [' ', 'T', ' ', ' ', ' '],
    [' ', '# ', ' ', ' #', ' '],
    ['D', ' ', ' ', ' ', ' '],
]

env = TaxiDQNEnv(taxi_maze)

# Training loop with rewards collection. After the training loop, you can analyze or visualize the training results using the rewards list.
num_episodes = 1000 #number of episodes
rewards = []

The agent selects actions using UCB exploration. It calculates UCB values for each action based on the Q-values obtained from the neural network. The action with the highest UCB value is chosen.

In [None]:
for episode in range(num_episodes): #The chosen action is used to take a step in the environment (env.step(action)), and the resulting state, reward, and done status are obtained.
    state = env.reset()
    total_reward = 0  #The total reward for the episode is updated.

    while True:#The loop continues until the episode is done.
        # Use UCB Exploration
        q_values = env.q_network.predict(tf.convert_to_tensor([state]))[0]
        q_values = env.q_network.predict(np.array([state]))[0]
        q_values = np.asarray(q_values).astype(np.float32)
        ucb_values = q_values + env.ucb_parameter * np.sqrt(np.log(env.total_steps + 1) / (env.total_actions + 1e-6))
        action = np.argmax(ucb_values)
#The training loop prints the episode number and the total reward for monitoring the training progress.
        next_state, reward, done, _ = env.step(action)
        total_reward += reward

        state = next_state

        if done:
            break

    rewards.append(total_reward)
    print(f"Episode: {episode + 1}, Total Reward: {total_reward}")

env.close() #After the training loop completes, the environment is closed.

The plot serves as a diagnostic tool to assess the performance and training progress of the agent

In [None]:

# Plotting the rewards
import matplotlib.pyplot as plt

plt.plot(rewards) #Plots the rewards list on the y-axis against the episode numbers on the x-axis. This line plot represents how the total reward changes over the course of training episodes.
plt.title('Rewards Over Episodes') #Sets the title of the plot.
plt.xlabel('Episode') #Sets the label for the x-axis.
plt.ylabel('Total Reward') #Sets the label for the y-axis.
plt.show() #Displays the plot
