# The (modified) Cliff Environment

The Cliff environment is an ideal task to test risk-aware rl algorithms. It consists in a gridworld where the shortest path from the initial position to the goal position is close to a cliff. 

Assumptions:

- We are on a slope: with probability 0.025 we will slip toward the cliff
- There is a punishment of -50 for trying to get ouside the gridworld
- Once the agent falls from the cliff he dies. While dying he collect a reward of -50. Perhaps dying is not a good experience, afterall
- If the agent arrives to the shelter (goal), positioned on the other side of the cliff, he gets a reward of 500. Since the motivation of having a beer to the shelter can vary a lot from person to person, this reward can be changed by the user, to experience how it affects on the risk.

## The code

In [None]:
import numpy as np
import matplotlib.pyplot as plt

class Cliff:
    def __init__(self, x_size=20, y_size=10, prob=0.025, max_reward=500):
        self.x_size = x_size
        self.y_size = y_size
        self.prob = prob
        self.my_pos = np.array([0,0])           # bottom-left
        self.act_meaning = [np.array([-1,0]),   # action 0 = left
                            np.array([1,0]),    # action 1 = right
                            np.array([0,1]),    # action 2 = up
                            np.array([0,-1])    # action 3 = down
                           ]
        self.max_reward = max_reward
        
    def cartesian_to_int(self, x, y):
        return y * self.x_size + x
    
    def int_to_cartesian(self, state):
        return state // self.x_size, state % self.x_size
    
    def step(self, a):
        action = np.copy(a)
        prew_pos = np.copy(self.my_pos)
        
        if np.random.rand() < self.prob:
            action = 3
        
        self.my_pos += self.act_meaning[action]
        
        reward = -1
        terminate = False
        if self.my_pos[1] < 0: # down the cliff
            reward = -50
            terminate = True
            
        if self.my_pos[1] >= self.y_size or self.my_pos[0] <= -1 or self.my_pos[0] >= self.x_size:
            self.my_pos = prew_pos
            reward = -50
        
        if self.my_pos[0] == self.x_size - 1 and self.my_pos[1] == 0: # Goal!
            reward = self.max_reward
            terminate = True
            
        return self.cartesian_to_int(*self.my_pos), reward, terminate
    
    def check_valid(self, x,y):
        return y < self.y_size and y >=0 and x < self.x_size and x >=0
    
    def reset(self):
        self.my_pos = np.array([0,0])
        return self.cartesian_to_int(*self.my_pos)
        
class QLearning:
    
    def __init__(self, update_rule, n_states=200, n_actions=4, gamma=0.99, initialization=500):
        
        self.Q = np.ones((n_states, n_actions), dtype=np.float32) * initialization # optimistic update
        self.N = np.ones((n_states, n_actions))
        self.update_rule = update_rule
        self.gamma = gamma
        
    def get_learning_rate(self, state, action):
        return 1. / (self.N[state, action])
    
    def update(self, s, a, r, s_t, t):
        alpha = self.get_learning_rate(s, a)
        # we have to implement the update rule
        self.Q[s,a] = self.update_rule(self.Q, s, a, r, s_t, t, alpha, self.gamma)
        self.N[s,a] = self.N[s,a] + 1
        
    def act(self, state, exploit=False):
        if exploit:
            return int(np.asscalar(np.argmax(self.Q[state,:])))
        else:
            return int(np.asscalar(np.argmin(self.N[state,:])))

def plot_cliff(cliff, Q,  n_visits, title=""):
    plt.grid(True)
    
    plt.tick_params(
        axis='both',
        which='both',      
        bottom=False,      
        top=False,
        left=False,         
        labelbottom=False,
        labelleft=False) 
    
    plt.axis([0, cliff.x_size, 0, cliff.y_size])
    plt.xticks(range(cliff.x_size))
    plt.yticks(range(cliff.y_size))
    plt.text(0.5, -0.5, "Start", fontsize=12, 
             horizontalalignment='center',
             verticalalignment='center')
    plt.text(cliff.x_size - 0.5, -0.5, "Goal", fontsize=12, 
             horizontalalignment='center',
             verticalalignment='center')
    if n_visits is not None:
        plt.imshow(np.flip(n_visits.T, axis=0), 
                cmap='Blues', 
                extent=(0, cliff.x_size, 0, cliff.y_size))

    for col in range(cliff.x_size):
        for row in range(cliff.y_size):
            action = np.argmax(Q[cliff.cartesian_to_int(col,row), :])
            x_act = cliff.act_meaning[action][0]
            y_act = cliff.act_meaning[action][1]
            plt.arrow(col + 0.5, row +0.5, 0.3 * x_act, 0.3 * y_act, width=0.04)
                
class Learning:
    
    def __init__(self, algorithm, mdp):
        self.algorithm = algorithm
        self.mdp = mdp
        
    def evaluate(self, n_steps=2000):
        visits = np.zeros((self.mdp.x_size, self.mdp.y_size))
        state = self.mdp.reset()
        rewards = []
        tot_reward = 0.
        n=0
        n_cliff = 0
        step_per_episode = 0
        while n < n_steps:
            step_per_episode += 1
            visits[self.mdp.my_pos[0],self.mdp.my_pos[1]] =\
                    visits[self.mdp.my_pos[0],self.mdp.my_pos[1]] + 1
            action = self.algorithm.act(state, exploit=True)
            state, reward, terminate = self.mdp.step(action)
            tot_reward += reward
            if terminate or step_per_episode >= 500: 
                step_per_episode = 0
                if terminate and reward==-50:
                    n_cliff += 1
                if self.mdp.check_valid(*self.mdp.my_pos):
                    visits[self.mdp.my_pos[0],self.mdp.my_pos[1]] =\
                        visits[self.mdp.my_pos[0],self.mdp.my_pos[1]] + 1
                state = self.mdp.reset()
                rewards.append(tot_reward)
                tot_reward=0.
            n+=1
        return (rewards if len(rewards)>0 else [-2000]), visits, n_cliff
    
    def learn(self, n_steps=30000):
        state = self.mdp.reset()
        for _ in range(n_steps):
            action = self.algorithm.act(state, exploit=False)
            next_state, reward, terminate = self.mdp.step(action)
            self.algorithm.update(state, action, reward, next_state, terminate)
            state = next_state
            if terminate: 
                state = self.mdp.reset()


## The classical Q-Learning Update

In [None]:
def classical_update(Q, s, a, r, s_t, t, alpha, gamma):
    q_next = np.asscalar(np.max(Q[s_t,:])) if not t else 0
    return (1-alpha) * Q[s,a] + alpha * (r + gamma * q_next)

## The first experiment


In [None]:
def experiment(q_update_rule, max_reward=500):
    q_learning = QLearning(q_update_rule, initialization=max_reward)
    cliff = Cliff(max_reward=max_reward)
    learner = Learning(q_learning, cliff)
    Js = []
    for _ in range(50):
        print(".",end='')
        Js.append(np.mean(learner.evaluate()[0]))
        learner.learn()

    plt.plot(Js)
    plt.xlabel("n epochs")
    plt.ylim(-500, 1000)
    h = plt.ylabel("E[J]")
    h.set_rotation(0)
    plt.show()
    
    # evaluation of the last policy
    last_eval = learner.evaluate(n_steps=10000)
    plot_cliff(cliff, q_learning.Q, last_eval[1])
    plt.show()
    
    plt.title("Distribution of the Return")
    plt.hist(last_eval[0],bins=100)
    plt.show()
    
    print("Number of fallings from a cliff %d/%d" % (last_eval[2],len(last_eval[0])))
    
experiment(classical_update)

## Exercise: Implement a risk update rule, and execute it! 

In [None]:
def beta_update(Q, s, a, r, s_t, t, alpha, gamma, beta=0.1):
    def beta_update_core(Q, s, a, r, s_t, t, alpha, gamma):
        #TODO: Implement!
        return None
    return beta_update_core

def hat_update(Q, s, a, r, s_t, t, alpha, gamma):
    #TODO: Implement!
    return None

experiment(hat_update, max_reward=1000)

