In [2]:
import numpy as np
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
import random

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95  # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.1
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma * np.amax(self.model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

class CarEnvironment:
    def __init__(self):
        self.state_size = 12
        self.action_size = 4
        self.max_initial_speed = 10  # Maximum initial speed of vehicles
        self.max_initial_distance = 100  # Maximum initial distance between vehicles

    def reset(self):
        # Randomize initial state
        ego_initial_speed = np.random.uniform(0, self.max_initial_speed)
        leading_vehicle_initial_speed = np.random.uniform(0, self.max_initial_speed)
        following_vehicle_initial_speed = np.random.uniform(0, self.max_initial_speed)
        
        ego_initial_distance = np.random.uniform(0, self.max_initial_distance)
        leading_vehicle_initial_distance = np.random.uniform(ego_initial_distance + 10, ego_initial_distance + 50)
        following_vehicle_initial_distance = np.random.uniform(ego_initial_distance - 50, ego_initial_distance - 10)
        
        
        state = np.array([
            ego_initial_speed, leading_vehicle_initial_speed, following_vehicle_initial_speed,
            ego_initial_distance, leading_vehicle_initial_distance, following_vehicle_initial_distance,
            0, 0, 0, 0, 0, 0  
        ])
        
        self.state = state  
        return state

    def step(self, action):
        state = self.state  
        reward = 0
        done = False
        info = {}

        ego_initial_speed, leading_vehicle_initial_speed, following_vehicle_initial_speed, \
        ego_initial_distance, leading_vehicle_initial_distance, following_vehicle_initial_distance, \
        _, _, _, _, _, _ = state
        
        ego_position = ego_initial_distance
        leading_vehicle_position = leading_vehicle_initial_distance
        following_vehicle_position = following_vehicle_initial_distance

        if (
            abs(ego_position - leading_vehicle_position) <= 2 or
            abs(ego_position - following_vehicle_position) <= 2
            ):
            print("Collision very much possible")
            reward -= 10  # Collision occurred
            done = True

        # Check for not maintaining ideal speed 
        if ego_initial_speed < 5 or ego_initial_speed > 10:
            reward -= 1
        if 5 <= ego_initial_speed <= 10:
            reward += 1  # Positive reward for maintaining ideal speed

        random_distance_change = np.random.uniform(-2, 2)
        random_speed_change = np.random.uniform(-1, 1)

        following_vehicle_initial_distance += random_distance_change
        following_vehicle_initial_speed += random_speed_change

        # Ensure distances and speeds remain within acceptable limits
        following_vehicle_initial_distance = max(0, following_vehicle_initial_distance)
        following_vehicle_initial_speed = max(0, following_vehicle_initial_speed)

        self.state = np.array([
            ego_initial_speed, leading_vehicle_initial_speed, following_vehicle_initial_speed,
            ego_position, leading_vehicle_position, following_vehicle_position,
            0, 0, 0, 0, 0, 0  
        ])
        
        return self.state, reward, done, info


env = CarEnvironment()
state_size = env.state_size
action_size = env.action_size
agent = DQNAgent(state_size, action_size)

# Training loop
num_episodes = 15
batch_size = 2
max_episode_length = 15  

for episode in range(num_episodes):
    state = env.reset()
    state = np.reshape(state, [1, state_size])
    done = False
    total_reward = 0
    time_step = 0  
    while not done and time_step < max_episode_length:  
       
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        next_state = np.reshape(next_state, [1, state_size])
        total_reward += reward
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        time_step += 1
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)

   
    print("Training Episode:", episode + 1, "Total Reward:", total_reward)

# Evaluate the agent after training
total_rewards = []
num_eval_episodes = 3

for episode in range(num_eval_episodes):
    state = env.reset()
    state = np.reshape(state, [1, state_size])
    done = False
    episode_reward = 0

    while not done:
       
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        next_state = np.reshape(next_state, [1, state_size])
        episode_reward += reward
        state = next_state
        if time_step >= max_episode_length:  
            break

    total_rewards.append(episode_reward)
    if (episode + 1) % 10 == 0:
        print("Evaluation Episode:", episode + 1, "Total Reward:", episode_reward)

avg_reward = np.mean(total_rewards)
print("Average Reward:", avg_reward)


Training Episode: 1 Total Reward: 15
Training Episode: 2 Total Reward: -15


Training Episode: 3 Total Reward: -15
Training Episode: 4 Total Reward: 15
Training Episode: 5 Total Reward: -15


Training Episode: 6 Total Reward: 15
Training Episode: 7 Total Reward: -15


Training Episode: 8 Total Reward: 15
Training Episode: 9 Total Reward: 15


Training Episode: 10 Total Reward: 15
Training Episode: 11 Total Reward: 15
Training Episode: 12 Total Reward: 15


Training Episode: 13 Total Reward: -15
Training Episode: 14 Total Reward: -15


Training Episode: 15 Total Reward: 15
Average Reward: -0.3333333333333333
