In [1]:
import numpy as np

class Generator:
    def widen_hole_transformation(self, racetrack, start_cell, end_cell):
        δ = 1
        while 1:
            if ((start_cell[1] < δ) or (start_cell[0] < δ)):
                racetrack[0:end_cell[0], 0:end_cell[1]] = 1
                break

            if ((end_cell[1] > 100 - δ) or (end_cell[0] > 100 - δ)):
                racetrack[start_cell[0]:100, start_cell[1]:100] = 1
                break

            δ += 1

        return racetrack
    
    def calculate_valid_fraction(self, racetrack):
        return len(racetrack[racetrack == 0]) / 10000
    
    def mark_finish_states(self, racetrack):
        last_row = racetrack[0, 0:100]
        last_row[last_row == 0] = 2
        return racetrack
    
    def mark_start_states(self, racetrack):
        last_row = racetrack[99, 0:100]
        last_row[last_row == 0] = 1
        return racetrack


    def __init__(self):
        pass

    def generate_racetrack(self):
        racetrack = np.zeros((100, 100), dtype='int')

        frac = 1
        while frac > 0.5:
            random_cell = np.random.randint((100, 100))
            random_hole_dims = np.random.randint((25, 25))
            start_cell = np.array([max(0, x - y//2) for x, y in zip(random_cell, random_hole_dims)])
            end_cell = np.array([min(100, x + y) for x, y in zip(random_cell, random_hole_dims)])

            racetrack = self.widen_hole_transformation(racetrack, start_cell, end_cell)
            frac = self.calculate_valid_fraction(racetrack)

        racetrack = self.mark_start_states(racetrack)
        racetrack = self.mark_finish_states(racetrack)

        return racetrack

In [2]:
class Environment:

    def get_new_state(self, state, action):
        new_state = state.copy()
        new_state[0] = state[0] - state[2]
        new_state[1] = state[1] + state[3]
        new_state[2] = state[2] + action[0]
        new_state[3] = state[3] + action[1]
        return new_state
    
    def select_randomly(self, NUMPY_ARR):
        return np.random.choice(NUMPY_ARR)
    
    def set_zero(NUMPY_ARR):
        NUMPY_ARR[:] = 0
        return NUMPY_ARR
    
    def is_finish_line_crossed(self, state, action):
        new_state = self.get_new_state(state, action)
        old_cell, new_cell = state[0:2], new_state[0:2]
        rows = np.array(range(new_cell[0], old_cell[0] + 1))
        cols = np.array(range(old_cell[1], new_cell[1] + 1))
        fin = set([tuple(x) for x in self.data.finish_line])
        row_col_matrix = [(x, y) for x in rows for y in cols]
        intersect = [x for x in row_col_matrix if x in fin]

        return len(intersect) > 0
    
    def is_out_of_track(self, state, action):
        new_state = self.get_new_state(state, action)
        old_cell, new_cell = state[0:2], new_state[0:2]

        if (new_cell[0] < 0 or new_cell[0] > 99 or new_cell[1] < 0 or new_cell[1] > 99):
            return True
        else:
            return self.data.racetrack[tuple(new_cell)] == -1
        
    def __init__(self, data, gen):
        self.data = data
        self.gen = gen
        self.step_count = 0

    def reset(self):
        self.data.episode = dict({'S': [], 'A': [], 'R': [None], 'probs': []})
        self.step_count = 0

    def start(self):
        state = np.zeros(4, dtype='int')
        state[0] = 99
        state[1] = self.select_randomly(self.data.start_line[:,1])
        return state
    
    def step(self, state, action):
        self.data.episode['A'].append(action) 
        reward = -1

        if self.is_finish_line_crossed(state, action):
            new_state = self.get_new_state(state, action)

            self.data.episode['S'].append(new_state)
            self.data.episode['R'].append(reward)
            self.step_count += 1

            return None, new_state
        
        elif self.is_out_of_track(state, action):
            new_state = self.start()
        else:
            new_state = self.get_new_state(state, action)

        self.data.episode['S'].append(new_state)
        self.data.episode['R'].append(reward)
        self.step_count += 1

        return reward, new_state

In [3]:
class Agent:
    def possible_actions(self, velocity):
        α = [(-1,-1),(-1,0),(0,-1),(-1,1),(0,0),(1,-1),(0,1),(1,0),(1,1)]
        α = [np.array(x) for x in α]
        
        β = []
        for i, x in zip(range(len(α)), α):
            new_vel = np.add(velocity, x)
            if (new_vel[0] < 5) and (new_vel[0] >= 0) and (new_vel[1] < 5) and (new_vel[1] >= 0) and ~(new_vel == np.array([0, 0])):
                β.append(i)
        β = np.array(β)

        return β
    
    def map_to_1D(self, action):
        α = [(-1,-1),(-1,0),(0,-1),(-1,1),(0,0),(1,-1),(0,1),(1,0),(1,1)]
        for i, x in zip(range(len(α)), α):
            if action[0]==x[0] and action[1]==x[1]:
                return i
            
    def map_to_2D(self, action):
        α = [(-1,-1),(-1,0),(0,-1),(-1,1),(0,0),(1,-1),(0,1),(1,0),(1,1)]
        return α[action]

    def __init__(self):
        pass

    def get_action(self, state, policy):
        return self.map_to_2D(policy(state, self.possible_actions(state[2:4])))
            
    

In [4]:
import pygame

class Visualizer:
    def create_window(self):
        self.display = pygame.display.set_mode((self.width, self.height))
        pygame.display.set_caption('Racetrack')

    def setup(self):
        self.cell_edge = 9
        self.width = 100 * self.cell_edge
        self.height = 100 * self.cell_edge
        self.create_window()
        self.window = True

    def close_window(self):
        self.window = False
        pygame.quit()
    
    def draw(self, state = np.array([])):
        self.display.fill(0)
        for i in range(100):
            for j in range(100):
                if self.data.racetrack[i, j] != -1:
                    if self.data.racetrack[i, j] == 0:
                        color = (255, 0, 0)
                    elif self.data.racetrack[i, j] == 1:
                        color = (255, 255, 0)
                    elif self.data.racetrack[i, j] == 2:
                        color = (0, 255, 0)
                    pygame.draw.rect(self.display,color,((j*self.cell_edge,i*self.cell_edge),(self.cell_edge,self.cell_edge)),1)

        if len(state) > 0:
            pygame.draw.rect(self.display,(0,0,255),((state[1]*self.cell_edge,state[0]*self.cell_edge),(self.cell_edge,self.cell_edge)),1)
        
        pygame.display.update()
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.loop = False
                self.close_window()
                return 'stop'
            elif event.type == pygame.KEYDOWN and event.key == pygame.K_SPACE:
                self.loop = False
            
        return None

    def visualize_racetrack(self, state = np.array([])):
        if self.window == False:
            self.setup()
        self.loop = True
        while self.loop:
            ret = self.draw(state)
            if ret != None:
                return ret
            
    def __init__(self, data):
        self.data = data
        self.window = False

pygame 2.5.2 (SDL 2.28.3, Python 3.11.6)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [5]:
from matplotlib import pyplot as plt

class Monte_Carlo_Control:

    def evaluate_target_policy(self):
        env.reset()
        state = env.start()
        self.data.episode['S'].append(state)
        rew = -1
        while rew != None:
            action = agent.get_action(state, self.data.target_policy_action)
            rew, state = env.step(state, action)
        
        self.data.rewards.append(sum(self.data.episode['R'][1:]))

    def plot_rewards(self):
        ax, fig = plt.subplots(figsize=(30,15))
        x = np.arange(1,len(self.data.rewards)+1)
        plt.plot(x*10, self.data.rewards, linewidth=0.5, color = '#BB8FCE')
        plt.xlabel('Episode number', size = 20)
        plt.ylabel('Reward',size = 20)
        plt.title('Plot of Reward vs Episode Number',size=20)
        plt.xticks(size=20)
        plt.yticks(size=20)
        plt.savefig('RewardGraph.png')
        plt.close()
    
    def save_your_work(self):
        self.data.save_Q_vals()
        self.data.save_C_vals()
        self.data.save_π()
        self.data.save_rewards()

    def determine_probility_behavior(self, state, action, possible_actions):
        best_action = self.data.π[tuple(state)]
        num_actions = len(possible_actions)

        if best_action in possible_actions:
            if action == best_action:
                prob = 1 - self.data.ε + self.data.ε/num_actions
            else:
                prob = self.data.ε/num_actions
        else:
            prob = 1/num_actions

        self.data.episode['probs'].append(prob)

    def generate_target_policy_action(self, state, possible_actions):
        if self.data.π[tuple(state)] in possible_actions:
            action = self.data.π[tuple(state)]
        else:
            action = np.random.choice(possible_actions)
        
        return action
    
    def generate_behavior_policy_action(self, state, possible_actions):
        if np.random.rand() > self.data.ε and self.data.π[tuple(state)] in possible_actions:
            action = self.data.π[tuple(state)]
        else:
            action = np.random.choice(possible_actions)
        
        self.determine_probility_behavior(state, action, possible_actions)

        return action
    
    def __init__(self, data):
        '''
        Initialize, for all s ∈ S, a ∈ A(s):
            data.Q(s, a) ← arbitrary (done in Data)
            data.C(s, a) ← 0 (done in Data)
            π(s) ← argmax_a Q(s,a) 
            (with ties broken consistently) 
            (some consistent approach needs to be followed))
        '''
        self.data = data
        for i in range(100):
            for j in range(100):
                if self.data.racetrack[i,j]!=-1:
                    for k in range(5):
                        for l in range(5):
                            self.data.π[i,j,k,l] = np.argmax(self.data.Q_vals[i,j,k,l])
    
    def control(self,env,agent):
        '''
        Performs MC control using episode list [ S0 , A0 , R1, . . . , ST −1 , AT −1, RT , ST ]
        G ← 0
        W ← 1
        For t = T − 1, T − 2, . . . down to 0:
            G ← γ*G + R_t+1
            C(St, At ) ← C(St,At ) + W
            Q(St, At ) ← Q(St,At) + (W/C(St,At))*[G − Q(St,At )]
            π(St) ← argmax_a Q(St,a) (with ties broken consistently)
            If At != π(St) then exit For loop
            W ← W * (1/b(At|St))        
        '''
        env.reset()
        state = env.start()
        self.data.episode['S'].append(state)
        rew = -1
        while rew!=None:
            action = agent.get_action(state,self.generate_behavioural_policy_action)
            rew, state = env.step(state,action)
        
        G = 0
        W = 1
        T = env.step_count
    
        for t in range(T-1,-1,-1):
            G = data.γ * G + self.data.episode['R'][t+1]
            S_t = tuple(self.data.episode['S'][t])
            A_t = agent.map_to_1D(self.data.episode['A'][t])
            
            S_list = list(S_t)
            S_list.append(A_t)
            SA = tuple(S_list)
            
            self.data.C_vals[SA] += W
            self.data.Q_vals[SA] += (W*(G-self.data.Q_vals[SA]))/(self.data.C_vals[SA])           
            self.data.π[S_t] = np.argmax(self.data.Q_vals[S_t])
            if A_t!=self.data.π[S_t]:
                break
            W /= self.data.episode['probs'][t]


https://towardsdatascience.com/solving-racetrack-in-reinforcement-learning-using-monte-carlo-control-bdee2aa4f04e

In [None]:
data = Data()
gen = Generator()
env = Environment(data,gen)
mcc = Monte_Carlo_Control(data)
vis = Visualizer(data)
agent = Agent()