In [1]:
from collections import namedtuple, defaultdict
import random

In [2]:
Position = namedtuple('Position', ['row', 'col'])
Velocity = namedtuple('Velocity', ['row', 'col'])
Action = namedtuple('Action', ['row', 'col'])
State = namedtuple('State', ['pos', 'vel'])
ACTIONS = [Action(i, j) for i in [-1, 0, 1] for j in [-1, 0, 1]]
EPSILON = 0.0001

In [3]:
class Track:
    def __init__(self, boundaries):
        '''Boundaries is a list of tuples. The ith tuple contains the first
        (inclusive) and the last (exclusive) element on the track in
        that row. The last row of the track is the start row.
        The last column of the track is the finish line.
        We give the boundaries from top to bottom, but store them in reversed
        order, because the car goes upwards.'''
        boundaries.reverse()
        self.boundaries = boundaries
        self.max_col = max(self.boundaries, key = lambda x: x[1])[1]
        
    def __str__(self, positions=None):
        rows = []
        for start, end in reversed(self.boundaries):
            row = [' ' for _ in range(self.max_col)]
            for i in range(start, end):
                row[i] = 'O'
            rows.append(row)
        if positions is not None:
            for pos in positions:
                rows[-pos.row - 1][pos.col] = 'X'
        str_rows = map(lambda row: ''.join(row), rows)
        return '\n'.join(str_rows)

    def on_track(self, pos):
        if pos.col >= self.max_col:
            return True
        if pos.row not in range(len(self.boundaries)):
            return False
        left ,right = self.boundaries[pos.row]
        return left <= pos.col < right
    
    def stays_on_track(self, pos, v):
        col = pos.col
        row = pos.row
        if v.row == 0:
            col += v.col
            return self.on_track(Position(row=row, col=col))
        else:
            inv_slope = v.col / v.row
            for i in range(v.row):
                row += 1
                if not self.on_track(Position(row=row, col=col)):
#                     catch intersection with a vertical border
                    return False
                col += inv_slope
                if not self.on_track(Position(row=row, col=col)):
#                     catch intersaction with a horizontal border
                    return False
            return True

    def cross_finish(self, pos, v):
        return pos.col + v.col >= self.max_col

    def get_random_start_pos(self):
        col = random.randrange(*self.boundaries[0])
        return Position(row=0, col=col)

In [4]:
class Environment:
    def __init__(self, track):
        self.track = track
        self.reset()
    
    def reset(self):
        self.pos = self.track.get_random_start_pos()
        self.v = Velocity(0, 0)   
    
    def take_action(self, a):
        row_vel = min(5, max(0, self.v.row + a.row))
        col_vel = min(5, max(0, self.v.col + a.col))
        self.v = Velocity(row=row_vel, col=col_vel)
        if self.track.stays_on_track(self.pos, self.v):
            if self.track.cross_finish(self.pos, self.v):
                self.pos = Position(self.pos.row + self.v.row, self.track.max_col - 1)
                self.reset()
#                 print(self)
                return 0, State(pos=self.pos, vel=self.v)
            row = self.pos.row + self.v.row
            col = self.pos.col + self.v.col
            self.pos = Position(row=row, col=col)
        else:
            self.reset()
            return -50, State(pos=self.pos, vel=self.v)
#         print(self)
#         print(self.pos)
        return -1, State(pos=self.pos, vel=self.v)

    def __str__(self):
        return self.track.__str__([self.pos])
    
    def get_state(self):
        return State(pos=self.pos, vel=self.v)

In [5]:
class SoftPolicy:
    def __init__(self, epsilon, track):
        def default_probs():
            return [1/9 for _ in range(9)]
        self.probs = defaultdict(default_probs)
        self.action_dict = {}
        for i, a in enumerate(ACTIONS):
            self.action_dict[a] = i
        self.explore_prob = epsilon / 9
        self.greedy_prob = 1 - epsilon + self.explore_prob
    
    def update(self, state, action):
        a = self.action_dict[action]
        for i in range(len(self.probs[state])):
            if i == a:
                self.probs[state][i] = self.greedy_prob
            else:
                self.probs[state][i] = self.explore_prob
                
    def get_prob(self, state, action):
        return self.probs[state][self.action_dict[action]]
    
    def get_action(self, state):
        return ACTIONS[random.choices([i for i in range(len(ACTIONS))], weights=self.probs[state])[0]]

In [6]:
class MCPlayer:
    def __init__(self, epsilon, env):
        self.count = defaultdict(int)
        self.action_value = defaultdict(int)
        self.target_policy = {}
        self.behavior_policy = SoftPolicy(epsilon, env.track)
        self.env = env
    
    def play_episode(self):    
        states = []
        actions = []
        reward = -1 # So we go into the loop.
        state = self.env.get_state()
        while reward == -1 :
            states.append(state)
            a = self.behavior_policy.get_action(state)
            reward, state = self.env.take_action(a)
            actions.append(a)
        return states, actions, reward

    def play_and_print(self):
        states, actions, reward = self.play_episode()
        print(self.env.track.__str__([state.pos for state in states]))
        
    
    def learn(self, num_steps):
        for _ in range(num_steps):
            states, actions, G = self.play_episode()
            imp_weight = 1
            for i, (state, action) in enumerate(reversed(list(zip(states, actions)))):
                if state not in states[-i]:
#                     print('a')
                    self.count[state] += imp_weight
                    av = self.action_value[(state, action)]
                    self.action_value[(state, action)] += (imp_weight
                       / self.count[state]
                       * (G - av))
                    a = max(ACTIONS, key=lambda ac: self.action_value[(state, ac)])
                    self.target_policy[state] = a
                    self.behavior_policy.update(state, a)
                    if a != action:
                        break
                    imp_weight *= 1 / self.behavior_policy.get_prob(state, a)
                    G -= 1

In [7]:
sharp_turn = Track([
#     (3, 18),
#     (2, 18),
#     (2, 18),
#     (1, 18),
#     (0, 18),
    (0, 8),
    (0, 7),
    (0, 1),
#     (0, 9),
#     (0, 9),
#     (0, 9),
#     (0, 9),
#     (0, 9),
#     (0, 9),
#     (1, 9),
#     (1, 9),
#     (1, 9),
#     (1, 9),
#     (1, 9),
#     (1, 9),
#     (1, 9),
#     (1, 9),
#     (2, 9),
#     (2, 9),
#     (2, 9),
#     (2, 9),
#     (2, 9),
#     (2, 9),
#     (2, 9),
#     (3, 9),
#     (3, 9),
#     (3, 9),
])

In [8]:
e = Environment(sharp_turn)

In [9]:
p = MCPlayer(EPSILON, e)

In [12]:
%%time
p.learn(10000)

CPU times: user 52min 19s, sys: 9.57 s, total: 52min 29s
Wall time: 52min 30s


In [11]:
p.play_and_print()

OOOOOOOO
OOOOOOO 
X       
