In [None]:
from sqlalchemy import create_engine, func
from sqlalchemy.orm import sessionmaker
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "../")))
from DB.models import init_db, Circuit, Season, RacingWeekend, Driver, Session, SessionResult, Lap, TyreRaceData, Team, DriverTeamSession, TeamCircuitStats

# Initialize database session
engine, session = init_db()

# Fetch a specific race (e.g., 2023 Bahrain GP)
race_weekend = session.query(RacingWeekend).filter_by(year=2023, round=1).first()
race_session = session.query(Session).filter_by(weekend_id=race_weekend.racing_weekend_id, session_type='Race').first()

# Get total laps in the race
total_laps = session.query(func.max(Lap.lap_num)).filter_by(session_id=race_session.session_id).scalar()

# Fetch driver's first lap to determine starting tyre
driver_id = 12  # Example driver
first_lap = session.query(Lap).filter_by(session_id=race_session.session_id, driver_id=driver_id).order_by(Lap.lap_num).first()
initial_tyre = first_lap.tyre

# Fetch the last 20 races the driver participated in
last_20_race_ids = (
	session.query(TyreRaceData.race_id)
	.filter_by(driver_id=driver_id)
	.order_by(TyreRaceData.race_id.desc())  # Assuming race_id is incremental
	.limit(20)
	.subquery()
)

# Fetch tyre degradation parameters for the last 20 races and average them
tyre_data = (
	session.query(
		TyreRaceData.tyre_type,
		func.avg(TyreRaceData.a).label("avg_a"),
		func.avg(TyreRaceData.b).label("avg_b"),
		func.avg(TyreRaceData.c).label("avg_c"),
	)
	.filter(TyreRaceData.driver_id == driver_id)
	.filter(TyreRaceData.race_id.in_(last_20_race_ids))
	.group_by(TyreRaceData.tyre_type)
	.all()
)

# Store averaged values in dictionary
tyre_params = {
	td.tyre_type: {'a': td.avg_a, 'b': td.avg_b, 'c': td.avg_c}
	for td in tyre_data
}

print(tyre_params)

# Fetch team's pit time at the circuit
dts = session.query(DriverTeamSession).filter_by(session_id=race_session.session_id, driver_id=driver_id).first()
team_stats = session.query(TeamCircuitStats).filter_by(circuit_id=race_weekend.circut_id, team_id=dts.team_id).first()
pit_time = team_stats.pit_time

{1: {'a': 0.014524335031135222, 'b': -0.259793656959313, 'c': 1.862598802478543}, 2: {'a': 0.003404626754128564, 'b': -0.0015711251891984146, 'c': 0.8881846067091267}, 3: {'a': 0.001548857741135055, 'b': -0.010664659121802356, 'c': 1.1093883954908919}, 4: {'a': 0.003961848445711819, 'b': -0.1909778265198026, 'c': 3.2373330032404954}}


  .filter(TyreRaceData.race_id.in_(last_20_race_ids))


Sim Env

In [39]:
import gym
import numpy as np
from gym import spaces

class F1RaceEnv(gym.Env):
	def __init__(self, total_laps, initial_tyre, tyre_params, pit_time):
		super(F1RaceEnv, self).__init__()
		self.total_laps = total_laps
		self.initial_tyre = initial_tyre
		self.tyre_params = tyre_params
		self.pit_time = pit_time
		self.available_tyres = [1, 2, 3]
		
		# Validate exactly 2 tyre compounds as per F1 rules
		if len(self.available_tyres) < 3:
			raise ValueError("Exactly 2 tyre compounds required, got {}".format(len(self.available_tyres)))
		
		# Dynamic state size calculation: 1 (lap) + 2 (tyres) + 3 (stint, pit, remaining)
		self.state_size = 1 + len(self.available_tyres) + 3
		
		# Action space: 0=Continue, 1=Pit to next tyre, 2=Pit to previous tyre
		self.action_space = spaces.Discrete(3)
		
		# Correct observation space with dynamic size
		self.observation_space = spaces.Box(
			low=0, 
			high=1, 
			shape=(self.state_size,),  # Now dynamic based on tyre count
			dtype=np.float32
		)
		
	def reset(self):
		self.current_lap = 1
		self.current_tyre = self.initial_tyre
		self.stint_laps = 1
		self.pit_done = False
		return self._get_state()
	
	def _get_lap_time(self, tyre, stint_laps):
		params = self.tyre_params[tyre]
		return params['a'] * stint_laps**2 + params['b'] * stint_laps + params['c']
	
	def _encode_tyre(self, tyre):
		return [1 if tyre == t else 0 for t in self.available_tyres]
	
	def _get_state(self):
		encoded_tyre = self._encode_tyre(self.current_tyre)
		state = [
			self.current_lap / self.total_laps,  # Normalized lap
			*encoded_tyre,                       # One-hot encoded tyre
			self.stint_laps / 20,                # Normalized stint laps (max 20 laps/stint)
			float(self.pit_done),                # Pit status
			(self.total_laps - self.current_lap) / self.total_laps  # Remaining race
		]
		return np.array(state, dtype=np.float32)
	
	def step(self, action):
		done = False
		reward = 0
		info = {}
		
		if action == 0:  # Continue
			lap_time = self._get_lap_time(self.current_tyre, self.stint_laps)
			reward = -lap_time
			self.current_lap += 1
			self.stint_laps += 1
			
			reward += max(0, 5 - self.stint_laps * 0.5)

		else:  # Pit action
			if self.pit_done:
				reward = -1000
				done = True
				info['reason'] = 'Already pitted'
				return self._get_state(), reward, done, info
			
			if self.pit_done and self.current_lap - self.last_pit_lap < 5:  # Pitting too soon
				reward -= 20
	
			# Cycle through tyres using modulo arithmetic
			new_tyre_idx = (self.available_tyres.index(self.current_tyre) + action) % len(self.available_tyres)
			new_tyre = self.available_tyres[new_tyre_idx]
			
			if new_tyre == self.current_tyre:
				reward = -1000
				done = True
				info['reason'] = 'Invalid tyre'
				return self._get_state(), reward, done, info
			
			lap_time = self._get_lap_time(self.current_tyre, self.stint_laps)
			reward = -(lap_time + self.pit_time)
			self.current_lap += 1
			self.current_tyre = new_tyre
			self.stint_laps = 1
			self.pit_done = True
		
		# Check race completion
		if self.current_lap > self.total_laps:
			done = True
			if not self.pit_done:
				reward -= 1000  # Penalty for missing pit stop
				info['reason'] = 'No pit stop'
			else:
				info['reason'] = 'Finished'
		
		return self._get_state(), reward, done, info

RL Training

In [None]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import random

class DQNAgent:
	def __init__(self, state_size, action_size):
		self.state_size = state_size
		self.action_size = action_size
		self.memory = []
		self.gamma = 0.95
		self.epsilon = 1.0
		self.epsilon_min = 0.01
		self.epsilon_decay = 0.995
		self.model = self._build_model()
	
	def _build_model(self):
		model = tf.keras.Sequential([
			layers.Dense(24, activation='relu', input_shape=(self.state_size,)),
			layers.Dense(24, activation='relu'),
			layers.Dense(self.action_size, activation='linear')
		])
		model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(0.001))
		return model
	
	def act(self, state):
		if np.random.rand() < self.epsilon:
			return random.randint(0, self.action_size - 1)
		return np.argmax(self.model.predict(state[np.newaxis])[0])
	
	def train(self, state, action, reward, next_state, done):
		target = reward
		if not done:
			target += self.gamma * np.max(self.model.predict(next_state[np.newaxis])[0])
		target_vec = self.model.predict(state[np.newaxis])[0]
		target_vec[action] = target
		self.model.fit(state[np.newaxis], target_vec[np.newaxis], verbose=0)
		
		if self.epsilon > self.epsilon_min:
			self.epsilon *= self.epsilon_decay

# Initialize environment and agent
env = F1RaceEnv(total_laps, initial_tyre, tyre_params, pit_time)
agent = DQNAgent(state_size=7, action_size=3)

# Training loop
episodes = 100
for e in range(episodes):
	state = env.reset()
	total_reward = 0
	done = False
	while not done:
		action = agent.act(state)
		next_state, reward, done, _ = env.step(action)
		agent.train(state, action, reward, next_state, done)
		state = next_state
		total_reward += reward
	print(f"Episode: {e+1}, Total Reward: {total_reward:.2f}, Epsilon: {agent.epsilon:.2f}")

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 115ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 56ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 65ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 63ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 63ms/step
Episode: 1, Total Reward: -1049.35, Epsilon: 0.99
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 53ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 56ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 59ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 53ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 52ms/step
Episode: 2, Total Reward: -1049.35, Epsilon: 0.97
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 56ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 51ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━

Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x7f04d66d04c0>>
Traceback (most recent call last):
  File "/home/ben/Individual_Project/env/lib/python3.10/site-packages/ipykernel/ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(
KeyboardInterrupt: 


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 64ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 70ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 61ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 60ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 63ms/step
Episode: 68, Total Reward: -1052.05, Epsilon: 0.28
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 108ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 59ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 52ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 68ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 83ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 72ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 71ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 70ms/step
[1m1/1[0m 

KeyboardInterrupt: 

Strat Optimisation

In [31]:
def get_optimal_strategy(env, agent):
	state = env.reset()
	done = False
	strategy = []
	while not done:
		action = agent.act(state)
		next_state, _, done, _ = env.step(action)
		strategy.append({
			'lap': int(state[0] * env.total_laps),
			'action': action
		})
		state = next_state
	return strategy

# Example usage
optimal_strategy = get_optimal_strategy(env, agent)
print("Optimal Strategy:", optimal_strategy)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 72ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 64ms/step
Optimal Strategy: [{'lap': 1, 'action': np.int64(1)}, {'lap': 2, 'action': np.int64(1)}]
