Deep reinforcement learning development algorithm attempt 01 using actor-critic strategy with neural networks <br>
DRL-AC-01

In [1]:
# importing important libraries
# import pandas as pd
# import numpy as np
# import matplotlib.pyplot as plt
# import seaborn as sns
# import tensorflow as tf
# from tensorflow.keras.layers import Dense
# from tensorflow.keras.optimizers import Adam

In [None]:
# Define the Actor neural network
class ActorNetwork(tf.keras.Model):
    def __init__(self, n_actions):
        super(ActorNetwork, self).__init__()
        self.dense1 = Dense(64, activation='relu')
        self.dense2 = Dense(64, activation='relu')
        self.output_layer = Dense(n_actions, activation='softmax')

    def call(self, state):
        x = self.dense1(state)
        x = self.dense2(x)
        return self.output_layer(x)

# Define the Critic neural network
class CriticNetwork(tf.keras.Model):
    def __init__(self):
        super(CriticNetwork, self).__init__()
        self.dense1 = Dense(64, activation='relu')
        self.dense2 = Dense(64, activation='relu')
        self.output_layer = Dense(1)

    def call(self, state):
        x = self.dense1(state)
        x = self.dense2(x)
        return self.output_layer(x)

# Define the DRL Actor-Critic agent
class DRLActorCritic:
    def __init__(self, state_dim, n_actions, lr_actor=0.001, lr_critic=0.001):
        self.state_dim = state_dim
        self.n_actions = n_actions

        self.actor = ActorNetwork(n_actions)
        self.critic = CriticNetwork()

        self.actor_optimizer = Adam(learning_rate=lr_actor)
        self.critic_optimizer = Adam(learning_rate=lr_critic)

    def get_action(self, state): # it will take state generated by reset function, pass it to the actor network 
        #and returns an action Get action probabilities from the actor network
        
        action_probs = self.actor(np.array([state]))
        action = np.random.choice(self.n_actions, p=action_probs.numpy()[0])
        return action, action_probs

    def train(self, states, actions, discounted_rewards): 
        '''
          It will take state generated by the reset function, action predicted by the actor network and its cummulative reward
          
         it should take state and action taken by actor and use them to calculate max cummulative reward and use temporal
         difference to update the weights of actor and its own network
        
        it is for training of both the neural networks and does not return any value as such
        '''
        

In [None]:
import numpy as np

class CarEnv:
    def __init__(self, target_position=10.0, obstacle_positions=[4.0, 6.0]):
        self.target_position = target_position
        self.obstacle_positions = obstacle_positions
        self.state_dim = 2  # Position, velocity
        self.action_dim = 1  # Continuous action for acceleration
        self.current_position = 0.0
        self.current_velocity = 0.0
        self.max_velocity = 2.0
        self.min_velocity = -2.0

    def reset(self): # initializes or resets the environment and creates a state
        '''
        1. get the world map and spawn vehicle at random point assigned by the environment
        2. choose another random or selected point as target value
        3. initialize camera sensor and attach it to the vehicle
        4. initialize lidar and attach it to the vehicle
        5. initialize IMU and attach it to the vehicle
        6. initialize GNSS and attach it to the vehicle
        7. see if lane invasion and traffic light sensors needs to be incorporated or not
        return state
        '''
        return np.array([self.current_position, self.current_velocity])

    def step(self, action): # this function is called after the actor network's action prediction
        
    ''' takes action predicted by the actor network
    
        This function in a reinforcement learning environment simulates one timestep of interaction between the 
        agent and the environment. It takes an action as input, updates the environment based on that action, 
        calculates the reward for the action, and provides the new state and reward to the agent.'''
        '''
        1. The actor has taken an action and based on that action a new state has been created which is generalized here and sent
        as a new_state variable
        the action taken by the actor will now creates a new state
        
        2. The reward function will be developed here to reward or penalize the model based on the action and new state
        
        3. also it will be checked if the model is done or not, this will be used to terminate model if collision occurs
       
        return new_state, reward, done, {}
        
        Question is how we will be sending multiple actions like acceleration and steering as a single action ?
        '''

In [None]:
# Define the DRL Actor-Critic agent
class DRLActorCritic:
    # ... (previously defined code for DRLActorCritic)

# Define the environment
env = CarEnv()

# Initialize the DRL Actor-Critic agent
state_dim = 2  # will depend on images+sensors dataset
n_actions = 1  # will depend upon how we want the action
agent = DRLActorCritic(state_dim, n_actions)

# Training parameters
num_episodes = 1000

# Episodic training loop
for episode in range(num_episodes):
    state = env.reset()  # Reset the environment to start a new episode
    done = False  # Initialize the episode termination flag
    total_reward = 0  # Initialize the total reward for this episode

    while not done:
        # Choose an action using the current policy (actor network)
        action, action_probs = agent.get_action(state)

        # Take the chosen action and observe the next state and reward
        next_state, reward, done, _ = env.step(action)

        # Train the agent using the observed transition (s, a, r, s')
        agent.train(np.array([state]), np.array([action]), np.array([reward]))

        # Update the current state for the next iteration
        state = next_state

        # Accumulate the reward for this episode
        total_reward += reward

    print(f"Episode {episode + 1}, Total Reward: {total_reward}")

# Close the environment (if needed)
# env.close()


In this episodic training loop:

We reset the environment at the beginning of each episode using env.reset().<br>
Within each episode, we iterate until the episode termination condition (done) is met.<br>
For each timestep within an episode:<br>
a. We choose an action using the current policy (agent.get_action(state)).<br>
b. We take the chosen action and observe the next state and reward (env.step(action)).<br>
c. We train the agent using the observed transition (agent.train(...)).<br>
d. We update the current state for the next iteration.<br>
e. We accumulate the reward for this episode.<br>