In [95]:
import numpy as np
import pandas as pd
import pygame 
import math
import matplotlib.pyplot as plt
import json
import random

import torch
import torch.nn as nn
import torch.nn.functional as F

from collections import namedtuple, deque
import torch.optim as optim

import tensorflow as tf



In [96]:
class Car:
    def __init__(self,initial_x,initial_y,initial_angle,time_elapsed):
        self.x = initial_x
        self.y = initial_y
        self.time_elapsed = time_elapsed
        
        self.speed = 1
        self.vector_angle = initial_angle
        
        self.car_angle = initial_angle
        
        self.steering_angle = 0
        self.accel = 0

        self.grip = 3
        self.steering_grip = 1.5 
        
        self.CAR_WIDTH = 20
        self.CAR_HEIGHT = 10
        self.CAR_COLOR = (0, 255, 0)
        
        self.max_angle = 80
        self.max_speed = 8
        
        self.friction_coef = 0.01
        
        self.accel_const = 0.1 
        self.brake_const = 0.1
        self.steering_const = 10
        
    def calculate_all(self):
        self.car_angle = self.car_angle * (1-self.steering_grip*self.time_elapsed) + (self.steering_angle+self.car_angle) * self.steering_grip*self.time_elapsed
        self.vector_angle = self.vector_angle * (1-self.grip*self.time_elapsed) + self.car_angle * self.grip*self.time_elapsed
        
        potential_speed = self.speed+self.accel*self.time_elapsed
        
        if((potential_speed < self.max_speed)&(potential_speed > 0)):
            self.speed = self.speed+self.accel*self.time_elapsed
        
    def update_position(self):
        self.x = self.x + self.speed*np.cos(np.radians(self.vector_angle))
        self.y = self.y + self.speed*np.sin(np.radians(self.vector_angle))
        
    def speed_up(self):
        self.accel += self.accel_const
        
    def brake(self):
        self.accel -= self.speed*self.brake_const
        
    def nothing(self):
        self.accel = 0

    def friction(self):
        self.speed -= self.speed*self.friction_coef
        
    def turn_right(self):
        if(self.steering_angle < self.max_angle):
            self.steering_angle += self.steering_const
        else:
            self.steering_angle = self.max_angle
            
    def reset_turn(self):
        if(np.abs(self.steering_angle)  < 1):
            self.steering_angle =0
        elif(self.steering_angle<0):
            self.steering_angle +=self.steering_const
        elif(self.steering_angle>0):
            self.steering_angle -=self.steering_const

    def turn_left(self):
        if(self.steering_angle > -self.max_angle):
            self.steering_angle -= self.steering_const
        else:
            self.steering_angle = -self.max_angle

    def draw(self, screen):
        car_surf = pygame.Surface((self.CAR_WIDTH, self.CAR_HEIGHT), pygame.SRCALPHA)
        car_surf.fill(self.CAR_COLOR)
        rotated_car = pygame.transform.rotate(car_surf, -self.car_angle)
        screen.blit(rotated_car, (self.x - rotated_car.get_width() / 2, self.y - rotated_car.get_height() / 2))
        
    def reset_all(self):
        self.x = 0
        self.y = 0
        
        self.speed = 0
        self.vector_angle = 0
        
        self.car_angle = 0
        
        self.steering_angle = 0
        self.accel = 0
        
    def get_vals(self):
        return self.x,self.y,self.car_angle
    
    def get_pos(self):
        return (self.x,self.y)

    def get_speed(self):
        return self.speed
        

In [97]:
class extention_line:
    
    def __init__(self,angle,car:Car,outside_line,inside_line,screen):
      self.angle_from_car = angle
      self.car = car
      self.outside_line = outside_line
      self.inside_line = inside_line
      self.shortest_point = (0,0)
      self.screen = screen
      self.smallest = (0,(0,0))
    
    def get_distance(self):
        return self.smallest[0]
    
    def update(self):
        self.x,self.y,self.angle = self.car.get_vals()
        self.point1,self.point2= self.generate_long_segment_from_point((self.x,self.y),self.angle+self.angle_from_car)
        self.shortest_point = self.shortest_intersection()
    
    def plot(self):
        pygame.draw.line(self.screen,(255,255,255),self.point1,self.point2)
        pygame.draw.circle(self.screen,(255,0,0),self.shortest_point,10)
        
    def find_dist(self,x,y):

        return np.sqrt((x-self.car.x)**2+(y-self.car.y)**2)
    
    def shortest_intersection(self):
        point_list = []
        for i in range(len(self.outside_line)):
            p1 = self.outside_line[i-1]
            p2 = self.outside_line[i]
            
            intersection = line_segments_intersection(p1,p2,self.point1,self.point2)
            if (intersection is not None):
                point_list.append((self.find_dist(intersection[0],intersection[1]),intersection))
                
        for i in range(len(self.inside_line)):
            p1 = self.inside_line[i-1]
            p2 = self.inside_line[i]
            
            intersection = line_segments_intersection(p1,p2,self.point1,self.point2)
            if (intersection is not None):
                point_list.append((self.find_dist(intersection[0],intersection[1]),intersection))
                
        if len(point_list) > 0:
            self.smallest = min(point_list, key=lambda i: i[0])

            return self.smallest[1]
        else:
            return self.smallest[1]           
    
    def generate_long_segment_from_point(self,point, angle_degrees, length=10000):
        x, y = point
        angle_radians = math.radians(angle_degrees)

        dx = math.cos(angle_radians) * length
        dy = math.sin(angle_radians) * length

        end_point = (x + dx, y + dy)
        return point, end_point

def line_segments_intersection(p1, p2, p3, p4):
        x1, y1 = p1
        x2, y2 = p2
        x3, y3 = p3
        x4, y4 = p4

        denom = (y4 - y3)*(x2 - x1) - (x4 - x3)*(y2 - y1)

        if denom == 0:
            return None

        t = ((x4 - x3)*(y1 - y3) - (y4 - y3)*(x1 - x3)) / denom
        u = ((x2 - x1)*(y1 - y3) - (y2 - y1)*(x1 - x3)) / denom

        if 0 <= t <= 1 and 0 <= u <= 1:
            intersect_x = x1 + t * (x2 - x1)
            intersect_y = y1 + t * (y2 - y1)
            return (intersect_x, intersect_y)

        return None

In [98]:
class RaceTrack:
    def __init__(self,inner_points,outer_points):
        self.inner_points = inner_points
        self.outer_points = outer_points
        
        self.track_color = (50,50,50)
        self.border_color = (255,90,90)
        self.inner_color = (255,255,255)
    
    def plot_track(self,surface):
        pygame.draw.polygon(surface,self.track_color,self.outer_points,0)
        pygame.draw.polygon(surface,self.inner_color,self.inner_points,0)
        pygame.draw.lines(surface,self.border_color,True,self.inner_points,5)
        pygame.draw.lines(surface,self.border_color,True,self.outer_points,5)
        
    
        


In [99]:
def generate_track(centerline, width=100):
    inner = []
    outer = []
    n = len(centerline)
    for i in range(n):
        p1 = centerline[i - 1]
        p2 = centerline[i]
        p3 = centerline[(i + 1) % n]

        # Calculate the direction between p1 and p3 (smoothed)
        dx = p3[0] - p1[0]
        dy = p3[1] - p1[1]
        length = math.hypot(dx, dy)
        if length == 0:
            perp_dx, perp_dy = 0, 0
        else:
            perp_dx = -dy / length
            perp_dy = dx / length

        offset_x = perp_dx * width / 2
        offset_y = perp_dy * width / 2

        inner.append((p2[0] - offset_x, p2[1] - offset_y))
        outer.append((p2[0] + offset_x, p2[1] + offset_y))
    
    return inner, outer


In [100]:
centerline = [(671.6, 115.5),
(541.7, 120.3),
(402.0, 114.6),
(248.2, 124.1),
(193.1, 177.1),
(184.7, 262.3),
(172.0, 351.3),
(119.8, 441.3),
(111.3, 527.5),
(111.3, 602.3),
(124.0, 694.1),
(183.3, 757.6),
(320.2, 784.1),
(442.9, 792.6),
(579.8, 787.9),
(721.0, 785.0),
(846.6, 784.1),
(962.3, 782.2),
(1104.8, 780.3),
(1257.3, 782.2),
(1367.3, 772.7),
(1457.7, 720.6),
(1450.6, 643.0),
(1404.0, 598.5),
(1267.1, 562.5),
(1119.0, 556.8),
(969.4, 563.4),
(859.3, 583.3),
(705.4, 616.5),
(541.7, 647.7),
(410.5, 607.0),
(366.7, 525.6),
(378.0, 449.8),
(465.5, 398.7),
(582.7, 385.4),
(665.9, 402.5),
(737.9, 426.1),
(785.9, 435.6),
(840.9, 425.2),
(874.8, 401.5),
(900.2, 367.4),
(949.6, 344.7),
(1021.6, 348.5),
(1082.3, 370.3),
(1134.5, 404.4),
(1176.8, 430.9),
(1250.2, 455.5),
(1320.8, 435.6),
(1346.2, 378.8),
(1337.7, 310.6),
(1302.4, 269.9),
(1248.8, 224.4),
(1117.5, 181.8),
(996.2, 151.5),
(736.5, 119.3),]

In [None]:
class game:
    def __init__(self,inner_points,outer_points,angle_list,start_point,draw_keys=True):
        self.draw_keys = draw_keys
        
        self.my_font = pygame.font.SysFont('Comic Sans MS', 30)

        self.checkpoint_score = 1
        self.speed_score_mult = 0.03
        
        self.start_point = start_point
        
        self.car = Car(self.start_point[0],self.start_point[1],180,1/60)
        self.inner_points = inner_points
        self.outer_points = outer_points
        
        self.action = (0,0)
        
        self.last_pos = (0,0)
        self.current_pos = (0,0)
        
        self.p3 = (0,0)
        self.p4 = (0,0)
        
        self.checkpoint_position = 0
        
        self.log = []
        
        self.current_point = 0
        self.total_score = 0
        
        SCREEN_WIDTH = 1600
        SCREEN_HEIGHT = 900
        
        self.track = RaceTrack(inner_points,outer_points)
        self.screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
        pygame.display.set_caption('Racing Car Simulation')
        
        self.line_list = []
        for angle in angle_list:
            self.line_list.append(extention_line(angle,self.car,self.outer_points,self.inner_points,self.screen))
            
        self.state = self.get_state()
            
        self.safe_update((0,0))
        
    def get_state(self):
        distances = [i.get_distance() for i in self.line_list]
        MAX_DISTANCE = 1835
        [
            dist / MAX_DISTANCE for dist in distances      # scale distances
        ] + [
            self.car.speed / self.car.max_speed,                             # scale speed
            math.sin(self.car.car_angle), math.cos(self.car.car_angle),      # encode angles
            math.sin(self.car.vector_angle), math.cos(self.car.vector_angle),
            math.sin(self.car.steering_angle), math.cos(self.car.steering_angle),
            self.car.accel                                          # usually small, like -1 to +1
        ]
            
    def get_score(self):
        self.current_point = 0

        # Reward for reaching checkpoints
        self.p3 = self.inner_points[self.checkpoint_position % len(self.inner_points)]
        self.p4 = self.outer_points[self.checkpoint_position % len(self.outer_points)]
        if line_segments_intersection(self.last_pos, self.current_pos, self.p3, self.p4):
            self.current_point += self.checkpoint_score  # +100
            self.checkpoint_position += 1

        # Small reward for moving fast, encourage momentum
        self.current_point += self.speed_score_mult * self.car.get_speed()  # up to ~+3 per step

        # Bonus for staying alive (time alive reward)
        self.current_point += 1  # +1 for every frame survived

        # Penalty for standing still
        if self.car.get_speed() < 0.5:
            self.current_point -= 2  # discourage inactivity or stalling

        self.total_score += self.current_point
        
    def is_collided_with_track(self):
        for i in range(len(self.outer_points)):
            p1 = self.outer_points[i-1]
            p2 = self.outer_points[i]
            
            intersection = line_segments_intersection(p1,p2,self.current_pos,self.last_pos)
            if (intersection is not None):
                return True
                
        for i in range(len(self.inner_points)):
            p1 = self.inner_points[i-1]
            p2 = self.inner_points[i]
            
            intersection = line_segments_intersection(p1,p2,self.current_pos,self.last_pos)
            if (intersection is not None):
                return True
            
        return False
    
    def plot_checkpoint(self):
        pygame.draw.line(self.screen,(0,255,0),self.p3,self.p4)
    
    def log_score(self):
        score = self.current_point
        self.log.append([score,(self.action),self.get_state()])
    
    def take_action(self,action):
        if self.draw_keys:
            up = (1500,700)
            down = (1500,750)
            left = (1450,750)
            right = (1550,750)
            
            self.draw_key(False,up,25)
            self.draw_key(False,down,25)
            self.draw_key(False,left,25)
            self.draw_key(False,right,25)
        
        if action[0] == 1:
            self.car.speed_up()
            if self.draw_keys:
                self.draw_key(True,up,25)
            
        elif action[0] == -1:
            self.car.brake()
            if self.draw_keys:
                self.draw_key(True,down,25)
        else:
            self.car.nothing()
            
        if action[1] == 1:
            self.car.turn_left()
            if self.draw_keys:
                self.draw_key(True,left,25)
        elif action[1] == -1:
            self.car.turn_right()
            if self.draw_keys:
                self.draw_key(True,right,25)
        else:
            self.car.reset_turn()
            
        self.action = action

    
    def draw_key(self,filled,coor,side):
        if filled:
            pygame.draw.rect(self.screen,(255,255,255), pygame.Rect(coor[0],coor[1],side,side),width=0)
        else:
            pygame.draw.rect(self.screen,(255,255,255), pygame.Rect(coor[0],coor[1],side,side),width=1)
    
    def update(self, action_):
        self.last_pos = self.current_pos
        self.current_pos = self.car.get_pos()

        if self.draw_keys:
            self.screen.fill((0, 0, 0))  
            self.track.plot_track(self.screen)
            self.car.draw(self.screen)

        for line in self.line_list:
            line.update()
            if self.draw_keys:
                line.plot()

        self.take_action(action_)

        self.car.friction()
        self.car.calculate_all()
        self.car.update_position()

        self.plot_checkpoint() if self.draw_keys else None

        if self.is_collided_with_track():
            self.current_point = -5
            self.total_score += self.current_point
            return False

        self.get_score()
        self.log_score()

        if self.draw_keys:
            text_surface = self.my_font.render(str(self.total_score), True, (255, 255, 0))
            self.screen.blit(text_surface, (50, 50))
            pygame.display.update()
            pygame.time.Clock().tick(60)

        return True
    
    def safe_update(self, action_):
        self.last_pos = self.current_pos
        self.current_pos = self.car.get_pos()

        for line in self.line_list:
            line.update()

        self.take_action(action_)
        self.car.friction()
        self.car.calculate_all()
        self.car.update_position()
        self.get_score()
        self.log_score()

            
    def return_log(self):
        return self.log
    
    def return_total_score(self):
        return self.total_score
    
    def return_score(self):
        return self.current_point
        
    

In [102]:
inner_points,outer_points = generate_track(centerline,100)

In [103]:
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.out = nn.Linear(128, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.out(x)
    
    
Transition = namedtuple('Transition',('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)    

In [111]:
angle_list = [angle*30 for angle in range(0,12)]

In [168]:
count = 19
version = 46

In [169]:
import pygame
import random
import torch
import torch.nn as nn
import torch.optim as optim
from collections import namedtuple, deque

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

GAMMA = 0.999
LR = 0.0001
BATCH_SIZE = 256
MEMORY_SIZE = 500_000

EPS_START = 1.0
EPS_END = 0.05
EPS_DECAY = 0.995   # if per step (else 0.99 if per episode)

TARGET_UPDATE = 500  # or 1000 if per step
EPISODES = 25000
TIME = 1000

# Actions
actions = [(-1, -1), (-1, 0), (-1, 1),
           (0, -1), (0, 0), (0, 1),
           (1, -1), (1, 0), (1, 1)]

# Transition tuple
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'done'))

# Replay memory class
class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, *args):
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# DQN model
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 64)
        self.out = nn.Linear(64, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        return self.out(x)

# Initialize

path_name = "/Users/ard/Desktop/Coding_2/F1_laptime_project/policies/policy_no_"

loss_list = []
while True:
    TIME += 100
    memory = ReplayMemory(MEMORY_SIZE)
    policy_dqn = DQN(17, 9).to(device)
    target_dqn = DQN(17, 9).to(device)
    
    try:
        policy_dqn.load_state_dict(torch.load(path_name+str(count)+"-"+str(version)+".pth"))
    except:
        pass
    count+=1
    target_dqn.load_state_dict(policy_dqn.state_dict())
    target_dqn.eval()

    optimizer = optim.Adam(policy_dqn.parameters(), lr=LR)
    criterion = nn.SmoothL1Loss()

    epsilon = EPS_START
    EPS_START = max([EPS_START*0.9,EPS_END])

    def select_action(state, epsilon):
        if random.random() < epsilon:
            return random.randint(0, 8)
        with torch.no_grad():
            state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
            return policy_dqn(state).argmax(dim=1).item()
        
    scorelist = []
    version = 0
    for episode in range(EPISODES):
        test = (episode % 100 == 0)
        newgame = game(inner_points, outer_points, angle_list,(random.randint(780,820), random.randint(100, 150)), test)
        state = newgame.get_state()
        
        episode_loss = []

        for t in range(TIME):
            action = select_action(state, 0.0 if test else epsilon)
            done = not newgame.update(actions[action])
            reward = newgame.return_score()
            next_state = newgame.get_state()

            memory.push(torch.tensor(state, dtype=torch.float32),
                        action,
                        torch.tensor(next_state, dtype=torch.float32),
                        reward,
                        done)

            state = next_state

            if len(memory) >= BATCH_SIZE:
                transitions = memory.sample(BATCH_SIZE)
                batch = Transition(*zip(*transitions))

                states = torch.stack(batch.state).to(device)
                actions_batch = torch.tensor(batch.action, dtype=torch.long).to(device)
                rewards = torch.tensor(batch.reward, dtype=torch.float32).to(device)
                next_states = torch.stack(batch.next_state).to(device)
                dones = torch.tensor(batch.done, dtype=torch.float32).to(device)

                q_values = policy_dqn(states).gather(1, actions_batch.unsqueeze(1)).squeeze(1)
                next_q_values = target_dqn(next_states).max(1)[0]
                expected_q_values = rewards + GAMMA * next_q_values * (1 - dones)

                loss = criterion(q_values, expected_q_values.detach())
                episode_loss.append(loss.detach().numpy())
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            if done:
                break

        epsilon = max(EPS_END, epsilon * EPS_DECAY)

        if episode % TARGET_UPDATE == 0:
            target_dqn.load_state_dict(policy_dqn.state_dict())
            torch.save(policy_dqn.state_dict(), path_name+str(count)+"-"+str(version)+".pth")
            version += 1
            scorelist.append(newgame.return_total_score())
        loss_list.append(episode_loss)

        print(f"Episode {episode + 1}, Score: {newgame.return_total_score()}, Epsilon: {epsilon:.3f}, loss: {np.mean(episode_loss)} count: {count} version: {version}")

Episode 1, Score: -309.1693373513215, Epsilon: 0.995, loss: 1.2757450342178345 count: 20 version: 1
Episode 2, Score: -152.09184676145173, Epsilon: 0.990, loss: 3.810534715652466 count: 20 version: 1
Episode 3, Score: -301.55765699784644, Epsilon: 0.985, loss: 4.276900768280029 count: 20 version: 1
Episode 4, Score: -193.13603134906973, Epsilon: 0.980, loss: 3.7686941623687744 count: 20 version: 1
Episode 5, Score: -190.19830620047983, Epsilon: 0.975, loss: 3.0303661823272705 count: 20 version: 1
Episode 6, Score: -249.1838207919445, Epsilon: 0.970, loss: 3.0650341510772705 count: 20 version: 1
Episode 7, Score: -573.8561457087809, Epsilon: 0.966, loss: 3.091226816177368 count: 20 version: 1
Episode 8, Score: -253.31038417400083, Epsilon: 0.961, loss: 3.2130260467529297 count: 20 version: 1
Episode 9, Score: -382.6615941456765, Epsilon: 0.956, loss: 3.382078170776367 count: 20 version: 1
Episode 10, Score: -315.43778221083744, Epsilon: 0.951, loss: 3.4420108795166016 count: 20 version:

KeyboardInterrupt: 

In [154]:
pygame.init()
pygame.font.init()

my_font = pygame.font.SysFont('Comic Sans MS', 30)

policy_dqn = DQN(17, 9).to(device)
policy_dqn.load_state_dict(torch.load("/Users/ard/Desktop/Coding_2/F1_laptime_project/policies/policy_no_20-47.pth"))

angle_list = [angle*30 for angle in range(12)]

newgame = game(inner_points,outer_points,angle_list,(800,150),True)

done = False

while not done:
    state =  newgame.get_state()
    action = select_action(state, 0)
    done = not newgame.update(actions[action])