In [None]:
from typing import Tuple, List, Any, Optional, Dict
from copy import deepcopy
from tqdm import tqdm

import numpy as np
import pandas as pd

from matplotlib import pyplot as plt
import matplotlib.ticker

from matk.utils.animation import animate_frames
from IPython.display import HTML
%matplotlib inline

# Classes

In [None]:
# R G B
START = ((0, 255, 0), 1) 
END = ((0, 0, 100), 2)
VOLCANO = ((255, 0, 0), 3)
ROCK = ((96, 96, 96), 4)
WALKER = ((255,51,153), 5)
ESCALATOR = ((0, 255, 255), 6)

def exclude_list_from_list(main_list, exclude_list):
    return list(set(main_list) - set(exclude_list))

class Labirint(object):
    
    def __init__(
        self,
        size: int,
        n_rocks: int,
        n_volcanos: int,
        n_escalators: int,
        escalator_len: int,
        main_step_prob: float = 1.0,
        start: Tuple[int,int] = (0,0),
        end: Optional[Tuple[int,int]] = None,
    ):
        self.size = size
        self.n_rocks = n_rocks
        self.n_volcanos = n_volcanos
        self.n_escalators = n_escalators
        self.escalator_len = escalator_len
        self.start = start
        self.end = (size-1, size-1) if end is None else end
        
        field, states = self.create_field()
        self.field = field
        self.states = states
        self.agent = Walker(start, main_step_prob=main_step_prob)
        
    def create_field(self):
        field = np.zeros((self.size, self.size))
        all_possible_coords = [(i,j) for i in range(self.size) for j in range(self.size)]
        
        # Place start/end
        field[self.start] = START[1]
        field[self.end] = END[1]
        all_possible_coords = exclude_list_from_list(all_possible_coords, [self.start, self.end])
        
        # Place escalator
        ecalator_possible_coords = [el for el in all_possible_coords if (el[1] != self.size-1) and (el[0] + self.escalator_len + 1 < self.size)]
        escalator_places_idx = np.random.choice(list(range(len(ecalator_possible_coords))), size=self.n_escalators)
        escalator_places = []
        for i in escalator_places_idx:
            field[ecalator_possible_coords[i][0]:ecalator_possible_coords[i][0]+self.escalator_len, ecalator_possible_coords[i][1]] = ESCALATOR[1]
            escalator_places += [(ecalator_possible_coords[i][0] + j,ecalator_possible_coords[i][1]) for j in range(self.escalator_len + 1)]
        all_possible_coords = exclude_list_from_list(all_possible_coords, list(escalator_places))
        
        # Place rocks
        rock_places_idx = np.random.choice(list(range(len(all_possible_coords))), size=self.n_rocks)
        rock_places = []
        for i in rock_places_idx:
            field[all_possible_coords[i]] = ROCK[1]
            rock_places.append(all_possible_coords[i])
        all_possible_coords = exclude_list_from_list(all_possible_coords, list(rock_places))
        
        # Place volkanos
        volcano_places_idx = np.random.choice(list(range(len(all_possible_coords))), size=self.n_volcanos)
        volcano_places = []
        for i in volcano_places_idx:
            field[all_possible_coords[i]] = VOLCANO[1]
            volcano_places.append(all_possible_coords[i])
        all_possible_coords = exclude_list_from_list(all_possible_coords, list(volcano_places))
        
        states = [(i,j) for i in range(self.size) for j in range(self.size)]
        states = exclude_list_from_list(states, list(rock_places))
        
        
        return field, states
    
    def v_print(self, input_field=None, with_agent=False, return_field=False):
        field = self.field if input_field is None else input_field
        
        visualise_field = np.zeros((self.size, self.size, 3), dtype=np.uint8)
        
        # Visualise start/end
        visualise_field[self.start] = START[0]
        visualise_field[self.end] = END[0]
        
        # Visualise escalator
        rock_coords = np.where(field == ESCALATOR[1])
        for x, y in zip(rock_coords[0], rock_coords[1]):
            visualise_field[x, y] = ESCALATOR[0]
        
        # Visualise rocks
        rock_coords = np.where(field == ROCK[1])
        for x, y in zip(rock_coords[0], rock_coords[1]):
            visualise_field[x, y] = ROCK[0]
            
        # Visualise volcano
        volcano_coords = np.where(field == VOLCANO[1])
        for x, y in zip(volcano_coords[0], volcano_coords[1]):
            visualise_field[x, y] = VOLCANO[0]
            
        # Visualise walker
        if with_agent:
            visualise_field[self.agent.coord[0],self.agent.coord[1]] = WALKER[0]
        
        if return_field:
            return visualise_field
        else:
            plt.imshow(visualise_field)
            plt.plot()
        
    def get_possible_actions(self, coords):
        if self.field[coords[0], coords[1]] == ESCALATOR[1]:
            return ["down"]
        
        possible_actions = []
        if coords[1] - 1 >= 0 and self.field[coords[0], coords[1] - 1] != ROCK[1]:
            possible_actions.append("left")
        if coords[1] + 1 < self.size and self.field[coords[0], coords[1] + 1] != ROCK[1]:
            possible_actions.append("right")
        if coords[0] - 1 >= 0 and self.field[coords[0] - 1, coords[1]] != ROCK[1]:
            possible_actions.append("up")
        if coords[0] + 1 < self.size and self.field[coords[0] + 1, coords[1]] != ROCK[1]:
            possible_actions.append("down")
            
        return possible_actions
    
    def get_reward(self, coords):
        if self.field[coords[0], coords[1]] in [0,START[1], ESCALATOR[1]]:
            return -1
        if self.field[coords[0], coords[1]] == VOLCANO[1]:
            return -50
        if self.field[coords[0], coords[1]] == END[1]:
            return 100
        
    def get_endgame(self, coords):
        if self.field[coords[0], coords[1]] in [VOLCANO[1],END[1]]:
            return True
        else:
            return False
        
    def get_success(self, coords):
        if self.field[coords[0], coords[1]] == END[1]:
            return True
        else:
            return False
        
    def get_escalator(self, coords):
        if self.field[coords[0], coords[1]] == ESCALATOR[1]:
            return True
        else:
            return False
        
class Walker(object):
    
    def __init__(
        self,
        coord: Tuple[int,int],
        main_step_prob: float = 1.0
    ):
        self.start_coord = coord
        self.main_step_prob = main_step_prob
        self.coord = list(coord)
        
    def random_step(self, real_step, possible_actions):
        if len(possible_actions) == 1:
            return real_step
        
        if np.random.binomial(n=2, p=self.main_step_prob):
            return real_step
        else:
            return np.random.choice(exclude_list_from_list(possible_actions, [real_step]))
        
    def get_coord_from_action(self, action, cur_coord):
        x, y = cur_coord
        if action == "left":
            y -= 1
        elif action == "right":
            y += 1
        elif action == "up":
            x -= 1
        elif action == "down":
            x += 1
        return x, y
        
    def step(self, direction, possible_actions, do_step=True):
        if direction not in possible_actions:
            raise RuntimeError("direction should be in possible_actions")
        
        direction = self.random_step(direction, possible_actions)
        new_x, new_y = self.get_coord_from_action(direction, tuple(self.coord))
        self.coord[0] = new_x
        self.coord[1] = new_y
    
    def reset_coord(self):
        self.coord = list(self.start_coord)
            
class ValueItteration(object):
    
    def __init__(
        self,
        environment: object,
        environment_config: Dict[str, Any],
        gamma: float,
        epsilon: float
    ):
        self.environment = environment(**environment_config)
        
        self.gamma = gamma
        self.epsilon = epsilon
        
        self.policy = {}
        for s in self.environment.states:
            self.policy[s] = np.random.choice(self.environment.get_possible_actions(s))
        self.hashed_policies = [deepcopy(self.policy)]
            
        self.v = {}
        for s in self.environment.states:
            self.v[s] = self.environment.get_reward(s)
            
    def run_policy(self, max_iter=1_000, input_policy=None, verbose=True):
        policy = self.policy if input_policy is None else input_policy
        self.environment.agent.reset_coord()
        terminated = False
        it = 0
        n_escalators_steps = 0
        all_fields = [self.environment.v_print(with_agent=True, return_field=True)]
        while not terminated and it < max_iter:
            state = tuple(self.environment.agent.coord)
            action = policy[state]
            self.environment.agent.step(action, self.environment.get_possible_actions(state))
            all_fields.append(self.environment.v_print(with_agent=True, return_field=True))
            terminated = self.environment.get_endgame(self.environment.agent.coord)
            n_escalators_steps += int(self.environment.get_escalator(self.agent.coord))
            if terminated and verbose: 
                print("End game")
            it += 1
        success = self.environment.get_success(self.environment.agent.coord)
        return all_fields, success, n_escalators_steps
    
    def run_algorithm(self):
        iteration = 0
        while True:
            biggest_change = 0
            for s in self.environment.states:            
                if not self.environment.get_endgame(s):

                    possible_actions = self.environment.get_possible_actions(s)
                    if len(possible_actions) == 0:
                        continue
                    
                    old_v = self.v[s]
                    new_v = 0

                    for a in possible_actions:
                        value = self.v[self.environment.agent.get_coord_from_action(a, s)]
                        
                        if len(possible_actions) == 1:
                            main_prob = 1.0
                            additional_prob = 0
                        else:
                            main_prob = self.environment.agent.main_step_prob
                            additional_prob = (1 - self.environment.agent.main_step_prob) / (len(possible_actions) - 1)
                            
                        additional_value = sum([self.v[self.environment.agent.get_coord_from_action(a_r, s)] for a_r in possible_actions if a_r != a])
                        
                        v = self.environment.get_reward(s) + (self.gamma * (value*main_prob + additional_prob*additional_value))

                        if v > new_v:
                            new_v = v
                            self.policy[s] = a

                    self.v[s] = new_v
                    biggest_change = max(biggest_change, np.abs(old_v - self.v[s]))

            self.hashed_policies.append(deepcopy(self.policy))
            if biggest_change < self.epsilon:
                break
            iteration += 1
            
        print(f"Converged in {iteration} iterations")
        
class QLearning(object):
    
    def __init__(
        self,
        environment: object,
        environment_config: Dict[str, Any],
        gamma: float,
        epsilon: float,
        lr: float,
        lr_shed: Optional[float] = None,
        epsilon_shed: Optional[float] = None
    ):
        self.environment = environment(**environment_config)
        
        self.gamma = gamma
        self.epsilon = epsilon
        self.lr = lr
        self.lr_shed = lr_shed
        self.epsilon_shed = epsilon_shed
        
        self.q_table = {}
        for s in self.environment.states:
            self.q_table[s] = {k:0 for k in self.environment.get_possible_actions(s)}
            
        self.hashed_runs = []
            
    def run_policy(self, max_iter=1_000, verbose=True):
        self.environment.agent.reset_coord()
        terminated = False
        it = 0
        n_escalators_steps = 0
        all_fields = [self.environment.v_print(with_agent=True, return_field=True)]
        while not terminated and it < max_iter:
            state = tuple(self.environment.agent.coord)
            actions = self.q_table[state]
            action = max(actions, key=actions.get)
            self.environment.agent.step(action, self.environment.get_possible_actions(state))
            all_fields.append(self.environment.v_print(with_agent=True, return_field=True))
            terminated = self.environment.get_endgame(self.environment.agent.coord)
            n_escalators_steps += int(self.environment.get_escalator(self.agent.coord))
            if terminated and verbose:
                print("End game")
            it += 1
        success = self.environment.get_success(self.environment.agent.coord)
        return all_fields, success, n_escalators_steps
    
    def run_algorithm(self, n_steps):
        lr = self.lr
        epsilon = self.epsilon
        self.hashed_runs = []
        for step in tqdm(range(n_steps)):
            self.environment.agent.reset_coord()
            terminated = False
            all_fields = [self.environment.v_print(with_agent=True, return_field=True)]
            while not terminated:

                state = tuple(self.environment.agent.coord)
                state_actions = self.q_table[state]

                if np.random.binomial(n=2, p=epsilon):
                    # Explore
                    action = np.random.choice(list(state_actions.keys()))
                else:
                    # Exploit
                    action = max(state_actions, key=state_actions.get)

                new_state = self.environment.agent.get_coord_from_action(action, state)
                reward = self.environment.get_reward(new_state)

                self.q_table[state][action] = lr * (reward + self.gamma*max(self.q_table[new_state].values())) + (1-lr)*self.q_table[state][action]

                self.environment.agent.step(action, self.environment.get_possible_actions(state))
                all_fields.append(self.environment.v_print(with_agent=True, return_field=True))
                terminated = self.environment.get_endgame(self.environment.agent.coord)
                
            success = self.environment.get_success(self.environment.agent.coord)
            self.hashed_runs.append((all_fields, success))
            if self.lr_shed is not None:
                lr = lr * self.lr_shed
            if self.epsilon_shed is not None:
                epsilon = epsilon * self.epsilon_shed

# Value Iteration

In [None]:
value_iter = ValueItteration(
    Labirint,
    {
        "size":10,
        "n_rocks":10,
        "n_volcanos":10,
        "n_escalators":2,
        "escalator_len":5,
        "main_step_prob":0.85
    },
    epsilon=0.005,
    gamma=0.9
)

value_iter.environment.v_print()

In [None]:
value_iter.run_algorithm()

In [None]:
all_actions, success = value_iter.run_policy()
print("Finished" if success else "Not Finished")
ani = animate_frames(all_actions, figsize=(10,10), interval=200)
HTML(ani.to_jshtml())

In [None]:
success_convergance = [value_iter.run_policy(input_policy=p, verbose=False)[1] for p in value_iter.hashed_policies]

In [None]:
plt.plot(success_convergance)

# Q Learning

In [None]:
q_learn = QLearning(
    Labirint,
    {
        "size":10,
        "n_rocks":10,
        "n_volcanos":10,
        "n_escalators":2,
        "escalator_len":5,
        "main_step_prob":0.9
    },
    epsilon=1.0,
    gamma=0.9,
    lr=1.0,
    lr_shed=0.99,
    epsilon_shed=0.99
)

q_learn.environment.v_print()

In [None]:
q_learn.run_algorithm(n_steps=1000)

In [None]:
all_actions, success = q_learn.run_policy()
print("Finished" if success else "Not Finished")
ani = animate_frames(all_actions, figsize=(10,10), interval=200)
HTML(ani.to_jshtml())

In [None]:
success_convergance = [el[1] for el in q_learn.hashed_runs]

In [None]:
plt.plot(success_convergance)