In [1]:
import random
import numpy as np
from collections import deque
from copy import deepcopy
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque, namedtuple
import random

# Define the neural network architecture
class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

class DQNAgent:
    def __init__(self, state_dim, action_dim, hidden_dim=128, buffer_size=10000, epsilon_decay = 0.99):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.hidden_dim = hidden_dim
        self.batch_size = 32
        self.buffer = deque(maxlen=buffer_size)
        self.gamma = 0.9
        #self.epsilon = 0.1
        self.dqn = DQN(state_dim, action_dim)
        self.target_dqn = deepcopy(self.dqn)
        self.optimizer = optim.Adam(self.dqn.parameters(), lr=0.001)

        self.epsilon = 0.5  # Start with higher epsilon
        self.epsilon_min = 0.1  # Minimum exploration rate
        self.epsilon_decay = epsilon_decay  # Decay factor

        # Additional attributes for prioritized experience replay
        self.alpha = 0.6  # Priority exponent
        self.beta = 0.4  # Importance sampling weight
        self.priorities = deque(maxlen=buffer_size)


    def choose_action(self, state, epsilon):
        if random.random() < epsilon:
            return random.randint(0, self.action_dim - 1)
        else:
            state = torch.FloatTensor(state)
            with torch.no_grad():
                q_values = self.dqn(state)
            return torch.argmax(q_values).item()

    def store(self, state, action, reward, next_state):
        self.buffer.append((state, action, reward, next_state))

    def train(self, batch_size):
        if len(self.buffer) < batch_size:
            return

        minibatch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states = zip(*minibatch)

        states = torch.stack([torch.FloatTensor(s) for s in states])
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.stack([torch.FloatTensor(s) for s in next_states])

        current_q = self.dqn(states)
        current_q = current_q.gather(1, actions.unsqueeze(1)).squeeze(1)

        next_q = self.target_dqn(next_states).max(1)[0]
        targets = rewards + self.gamma * next_q

        loss = nn.MSELoss()(current_q, targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()

    def update_target_net(self):
        self.target_dqn.load_state_dict(self.dqn.state_dict())

    def update_epsilon(self):
      self.epsilon = max(self.epsilon * self.epsilon_decay, self.epsilon_min)

    # Fine-Tuning Epsilon
    def fine_tune_epsilon(self, epsilon_value):
        self.epsilon = epsilon_value


In [2]:
class Car:
  def __init__(self, tyre="Intermediate"):
    self.default_tyre = tyre
    self.possible_tyres = ["Ultrasoft", "Soft", "Intermediate", "Fullwet"]
    self.pitstop_time = 23
    self.reset()
    # missing self.condition?


  def reset(self):
    self.change_tyre(self.default_tyre)


  def degrade(self, w, r):
    if self.tyre == "Ultrasoft":
        self.condition *= (1 - 0.0050*w - (2500-r)/90000)
    elif self.tyre == "Soft":
        self.condition *= (1 - 0.0051*w - (2500-r)/93000)
    elif self.tyre == "Intermediate":
        self.condition *= (1 - 0.0052*abs(0.5-w) - (2500-r)/95000)
    elif self.tyre == "Fullwet":
        self.condition *= (1 - 0.0053*(1-w) - (2500-r)/97000)


  def change_tyre(self, new_tyre):
    assert new_tyre in self.possible_tyres
    self.tyre = new_tyre
    self.condition = 1.00


  def get_velocity(self):
    if self.tyre == "Ultrasoft":
        vel = 80.7*(0.2 + 0.8*self.condition**1.5)
    elif self.tyre == "Soft":
        vel = 80.1*(0.2 + 0.8*self.condition**1.5)
    elif self.tyre == "Intermediate":
        vel = 79.5*(0.2 + 0.8*self.condition**1.5)
    elif self.tyre == "Fullwet":
        vel = 79.0*(0.2 + 0.8*self.condition**1.5)
    return vel

  def tyre_degrade(self, tyre, tyre_cond, w, r):
    if tyre == "Ultrasoft":
        tyre_cond *= (1 - 0.0050*w - (2500-r)/90000)
    elif tyre == "Soft":
        tyre_cond *= (1 - 0.0051*w - (2500-r)/93000)
    elif tyre == "Intermediate":
        tyre_cond *= (1 - 0.0052*abs(0.5-w) - (2500-r)/95000)
    elif tyre == "Fullwet":
        tyre_cond *= (1 - 0.0053*(1-w) - (2500-r)/97000)
    return tyre_cond

  def retrieve_velocity(self, tyre, tyre_cond):
    if tyre == "Ultrasoft":
        vel = 80.7*(0.2 + 0.8*tyre_cond**1.5)
    elif tyre == "Soft":
        vel = 80.1*(0.2 + 0.8*tyre_cond**1.5)
    elif tyre == "Intermediate":
        vel = 79.5*(0.2 + 0.8*tyre_cond**1.5)
    elif tyre == "Fullwet":
        vel = 79.0*(0.2 + 0.8*tyre_cond**1.5)
    return vel

class Track:
  def __init__(self, car=Car()):
    # self.radius and self.cur_weather are defined in self.reset()
    self.total_laps = 162
    self.car = car
    self.possible_weather = ["Dry", "20% Wet", "40% Wet", "60% Wet", "80% Wet", "100% Wet"]
    self.wetness = {
        "Dry": 0.00, "20% Wet": 0.20, "40% Wet": 0.40, "60% Wet": 0.60, "80% Wet": 0.80, "100% Wet": 1.00
    }
    self.p_transition = {
        "Dry": {
            "Dry": 0.987, "20% Wet": 0.013, "40% Wet": 0.000, "60% Wet": 0.000, "80% Wet": 0.000, "100% Wet": 0.000
        },
        "20% Wet": {
            "Dry": 0.012, "20% Wet": 0.975, "40% Wet": 0.013, "60% Wet": 0.000, "80% Wet": 0.000, "100% Wet": 0.000
        },
        "40% Wet": {
            "Dry": 0.000, "20% Wet": 0.012, "40% Wet": 0.975, "60% Wet": 0.013, "80% Wet": 0.000, "100% Wet": 0.000
        },
        "60% Wet": {
            "Dry": 0.000, "20% Wet": 0.000, "40% Wet": 0.012, "60% Wet": 0.975, "80% Wet": 0.013, "100% Wet": 0.000
        },
        "80% Wet": {
            "Dry": 0.000, "20% Wet": 0.000, "40% Wet": 0.000, "60% Wet": 0.012, "80% Wet": 0.975, "100% Wet": 0.013
        },
        "100% Wet": {
            "Dry": 0.000, "20% Wet": 0.000, "40% Wet": 0.000, "60% Wet": 0.000, "80% Wet": 0.012, "100% Wet": 0.988
        }
    }
    self.reset()


  def reset(self):
    # self.radius = np.random.randint(600,1201)
    self.radius = np.random.randint(600,601)
    # self.radius = np.random.randint(900,901)
    # self.radius = np.random.randint(1200,1201)
    self.cur_weather = np.random.choice(self.possible_weather)
    self.is_done = False
    self.pitstop = False
    self.laps_cleared = 0
    self.car.reset()
    return self._get_state()


  def _get_state(self):
    tyre_encoding = [0] * len(self.car.possible_tyres)
    tyre_index = self.car.possible_tyres.index(self.car.tyre)
    tyre_encoding[tyre_index] = 1

  # One-hot encoding for weather condition
    weather_encoding = [0] * len(self.possible_weather)
    weather_index = self.possible_weather.index(self.cur_weather)
    weather_encoding[weather_index] = 1

  # Combine the encoded vectors with the remaining state variables
    return tyre_encoding + [self.car.condition] + weather_encoding + [self.radius, self.laps_cleared]


  def transition(self, action=0):
    time_taken = 0
    if self.laps_cleared == int(self.laps_cleared):
        if self.pitstop:
            self.car.change_tyre(self.committed_tyre)
            time_taken += self.car.pitstop_time
            self.pitstop = False

    ## The environment is coded such that only an action taken at
    # the start of the three-quarters mark of each lap matters
    if self.laps_cleared - int(self.laps_cleared) == 0.75:
        if action < 4:
            self.pitstop = True
            self.committed_tyre = self.car.possible_tyres[action]
        else:
            self.pitstop = False

    self.cur_weather = np.random.choice(
        self.possible_weather, p=list(self.p_transition[self.cur_weather].values())
    )
    # we assume that degradation happens only after a car has travelled the one-eighth lap
    velocity = self.car.get_velocity()
    time_taken += (2*np.pi*self.radius/8) / velocity
    reward = 0 - time_taken
    self.car.degrade(
        w=self.wetness[self.cur_weather], r=self.radius
    )
    self.laps_cleared += 0.125

    if self.laps_cleared == self.total_laps:
        self.is_done = True

    next_state = self._get_state()
    return reward, next_state, self.is_done, velocity

In [3]:
class Agent():
  def __init__(self, state_dim, action_dim, hidden_dim=128, buffer_size=10000, epsilon_decay = 0.99):
    # ... (The original attributes and methods you provided)
    self.possible_weather = ["Dry", "20% Wet", "40% Wet", "60% Wet", "80% Wet", "100% Wet"]
    self.wetness = {
        "Dry": 0.00, "20% Wet": 0.20, "40% Wet": 0.40, "60% Wet": 0.60, "80% Wet": 0.80, "100% Wet": 1.00
    }
    self.possible_tyres = ["Ultrasoft", "Soft", "Intermediate", "Fullwet"]
    self.pitstop_time = 23


    #self.scheduler = StepLR(self.optimizer, step_size=100, gamma=0.95)
    self.table = {"Ultrasoft": {
                                "Dry": {'condition':0,'time':0},
                                "20% Wet": {'condition':0,'time':0},
                                "40% Wet": {'condition':0,'time':0},
                                "60% Wet": {'condition':0,'time':0},
                                "80% Wet": {'condition':0,'time':0},
                                "100% Wet": {'condition':0,'time':0}
                                },
                      "Soft":  {
                                "Dry": {'condition':0,'time':0},
                                "20% Wet": {'condition':0,'time':0},
                                "40% Wet": {'condition':0,'time':0},
                                "60% Wet": {'condition':0,'time':0},
                                "80% Wet": {'condition':0,'time':0},
                                "100% Wet": {'condition':0,'time':0}
                                },
              "Intermediate":  {
                                "Dry": {'condition':0,'time':0},
                                "20% Wet": {'condition':0,'time':0},
                                "40% Wet": {'condition':0,'time':0},
                                "60% Wet": {'condition':0,'time':0},
                                "80% Wet": {'condition':0,'time':0},
                                "100% Wet": {'condition':0,'time':0}
                                },
                  "Fullwet":   {
                                "Dry": {'condition':0,'time':0},
                                "20% Wet": {'condition':0,'time':0},
                                "40% Wet": {'condition':0,'time':0},
                                "60% Wet": {'condition':0,'time':0},
                                "80% Wet": {'condition':0,'time':0},
                                "100% Wet": {'condition':0,'time':0}
                                },
                  "Unchanged": {
                                "Dry": {'condition':0,'time':0},
                                "20% Wet": {'condition':0,'time':0},
                                "40% Wet": {'condition':0,'time':0},
                                "60% Wet": {'condition':0,'time':0},
                                "80% Wet": {'condition':0,'time':0},
                                "100% Wet": {'condition':0,'time':0}}
                  }
    # self.generate_lap_table(state)


    # Initialize DQN agent
    #self.dqn_agent = DQNAgent(state_dim, action_dim, buffer_size)
    self.dqn_agent = DQNAgent(state_dim, action_dim, buffer_size, hidden_dim)

  #action selection based on current state
  def choose_action(self, state, epsilon):
      # Extract tire condition from state
      tire_condition_index = len(self.possible_tyres)
      tire_condition = state[tire_condition_index]

      # If tire condition is below a threshold and there are more than a certain number of laps remaining
      if tire_condition < 0.3 and state[-1] < 146:  # Assuming 16 laps remaining out of 162
          return 4  # Action to take a pit stop

      return self.dqn_agent.choose_action(state, epsilon)

  #maintain replay buffer to train the agent later
  def store(self, state, action, reward, next_state):
    self.dqn_agent.store(state, action, reward, next_state)

  # samples a batch of experiences from the replay buffer and updates the network's weights based on the difference between predicted and target Q-values.
  def train(self, batch_size):
    tyre_condition_index = len(self.possible_tyres)
    return self.dqn_agent.train(batch_size,tyre_condition_index=tyre_condition_index)

  # stabalize learning process
  def update_target_net(self):
    self.dqn_agent.update_target_net()

  # Gradual Decay of Epsilon
  def update_epsilon(self):
      self.dqn_agent.update_epsilon()

  # Fine-Tuning Epsilon
  def fine_tune_epsilon(self, epsilon_value):
      self.dqn_agent.fine_tune_epsilon()

  # Adjust Learning Rate
  def adjust_learning_rate(self, learning_rate):
      self.dqn_agent.adjust_learning_rate()


In [4]:
# Training loop
def train_agent(agent, env, episodes):
    for episode in range(episodes):
        state = env.reset()
        done = False
        total_reward = 0

        while not done:
            epsilon = max(agent.epsilon, 0.5)  # Ensure epsilon doesn't drop below 0.1
            action = agent.choose_action(state, epsilon)
            reward, next_state, done, _ = env.transition(action)
            agent.store(state, action, reward, next_state)
            loss = agent.train(agent.batch_size)
            state = next_state
            total_reward += reward

        agent.update_target_net()
        agent.fine_tune_epsilon(0.1)  # Example: Set epsilon to 0.1

        # Adjust learning rate if needed
        #agent.adjust_learning_rate(0.001)
        print(f"Episode {episode + 1}/{episodes}, Total Reward: {total_reward:.2f}, Loss: {loss:.6f}")

# Create the environment and agent
car = Car()
track = Track(car)
state_dim = len(track._get_state())
action_dim = len(car.possible_tyres) + 1  # Actions are changing tires or not

agent = DQNAgent(state_dim, action_dim)

# Train the agent
train_agent(agent, track, episodes=500)


Episode 1/500, Total Reward: -11913.41, Loss: 43.401142
Episode 2/500, Total Reward: -12283.98, Loss: 65.761673
Episode 3/500, Total Reward: -12012.98, Loss: 40.700954
Episode 4/500, Total Reward: -11861.28, Loss: 16.578018
Episode 5/500, Total Reward: -12027.50, Loss: 20.864088
Episode 6/500, Total Reward: -11842.16, Loss: 41.305531
Episode 7/500, Total Reward: -12145.03, Loss: 30.883251
Episode 8/500, Total Reward: -11827.64, Loss: 3.859449
Episode 9/500, Total Reward: -11854.95, Loss: 41.259083
Episode 10/500, Total Reward: -11854.22, Loss: 39.711834
Episode 11/500, Total Reward: -11958.66, Loss: 30.236134
Episode 12/500, Total Reward: -11828.67, Loss: 43.321186
Episode 13/500, Total Reward: -11848.55, Loss: 29.798222
Episode 14/500, Total Reward: -11811.98, Loss: 51.591564
Episode 15/500, Total Reward: -12030.93, Loss: 5.088189
Episode 16/500, Total Reward: -11936.31, Loss: 27.611074
Episode 17/500, Total Reward: -11796.57, Loss: 37.523064
Episode 18/500, Total Reward: -11925.91, L

In [5]:
import numpy as np

def evaluate_agent(agent, num_episodes=100):
    lap_times = []
    laps_completed = 0
    tire_changes = []

    for episode in range(num_episodes):
        # Reset the environment and agent for a new episode
        track = Track()
        state = track.reset()
        done = False
        lap_time = 0
        changes = 0  # Number of tire changes in the current episode

        while not done:
            # Let the agent choose an action
            action = agent.choose_action(state, epsilon=0)  # Use greedy policy (no exploration)
            #print('action', action)
            reward, next_state, done, velocity = track.transition(action)
            lap_time += (2 * np.pi * track.radius / 8) / velocity

            if done:
                laps_completed += 1
                lap_times.append(lap_time)
            else:
                # Check if a tire change was made
                tire_condition_index = len(track.car.possible_tyres)
                current_tire_condition = next_state[tire_condition_index]
                if current_tire_condition < 0.3:
                    changes += 1

            state = next_state

        tire_changes.append(changes)

    # Calculate performance metrics
    average_lap_time = np.mean(lap_times)
    laps_per_episode = np.mean(laps_completed)
    average_tire_changes = np.mean(tire_changes)

    print(f"Average Lap Time: {average_lap_time} seconds")
    print(f"Average Laps Completed: {laps_per_episode} out of 162 laps")
    print(f"Average Tire Changes: {average_tire_changes} per episode")

# Create and initialize the agent
agent = Agent(state_dim = 13, action_dim = 5, hidden_dim=128)
# Load the trained agent's model weights (if available)

# Evaluate the agent's performance
evaluate_agent(agent, num_episodes=100)


Average Lap Time: 36928.869139224546 seconds
Average Laps Completed: 100.0 out of 162 laps
Average Tire Changes: 1239.75 per episode
