---

# CSCI 3202, Fall 2022
# Homework 2: MDP & Reinforcement Learning
# Due: Friday September 9, 2022 at 6:00 PM

<br> 

### Your name: Mokhtar Salem Ba Wahal

<br> 

In [2]:
import pandas as pd
import numpy as np
import scipy.stats as stats
import matplotlib.pyplot as plt
from collections import defaultdict, OrderedDict
from itertools import product
import random
# added packages
import heapq
from matplotlib import colors
from numpy import argmax



---

Consider a **cube** state space defined by $0 \le x, y, z \le L$. Suppose you are piloting/programming a drone to learn how to land on a platform at the center of the $z=0$ surface (the bottom). Some assumptions:
* In this discrete world, if the drone is at $(x,y,z)$ it means that the box is centered at $(x,y,z)$. There are boxes (states) centered at $(x,y,z)$ for all $0 \le x,y,z \le L$. Each state is a 1 unit cube. 
* In this world, $L$ is always an even value.
* All of the states with $z=0$ are terminal states.
* The state at the center of the bottom of the cubic state space is the landing pad. For example, when $L=4$, the landing pad is at $(x,y,z) = (2,2,0)$.
* All terminal states ***except*** the landing pad have a reward of -1. The landing pad has a reward of +1.
* All non-terminal states have a living reward of -0.01.
* The drone takes up exactly 1 cubic unit, and begins in a random non-terminal state.
* The available actions in non-terminal states include moving exactly 1 unit Up (+z), Down (-z), North (+y), South (-y), East (+x) or West (-x). In a terminal state, the training episode should end.

#### Part A
How many states would be in the discrete state space if $L=2$? Explain your reasoning.

*Number of states = 3 * 3 * 3 = 27*


#### Part B
Write a class `MDPLanding` to represent the Markov decision process for this drone. Include methods for:
1. `actions(state)`, which should return a list of all actions available from the given state
2. `reward(state)`, which should return the reward for the given state
3. `result(state, action)`, which should return the resulting state of doing the given action in the given state

and attributes for:
1. `states`, a list of all the states in the state space, where each state is represented as an $(x,y,z)$ tuple
2. `terminal_states`, a dictionary where keys are the terminal state tuples and the values are the rewards associated with those terminal states
3. `default_reward`, a scalar for the reward associated with non-terminal states
4. `all_actions`, a list of all possible actions (Up, Down, North, South, East, West)
5. `discount`, the discount factor (use $\gamma = 0.999$ for this entire problem)

How you feed arguments/information into the class constructor is up to you.

Note that actions are *deterministic* here.  The drone does not need to include transition probabilities for outcomes of particular actions. What the drone does need to learn, however, is where the landing pad is, and how to get there from any initial state.

Before moving on to Part C, we recommend that you test that your MDPLanding code is set up correctly. Write unit tests that display the actions for a given state, rewards, results, etc. This will help you identify errors in your implementation and save you a lot of debugging time later.

In [5]:

class MDPLanding:

    def __init__(self, L):
        self.L = L

        self.states = res = [ele for ele in product(range(0, L + 1), repeat=3)]
        terminal_states = {}
        for i in range(L + 1):
            for j in range(L + 1):
                if (i == (L / 2)) and (j == (L / 2)):
                    terminal_states[(i, j, 0)] = 1
                else:
                    terminal_states[(i, j, 0)] = -1

        self.terminal_states = terminal_states
        # print(self.terminal_states)
        self.default_reward = -0.01
        self.all_actions = ['Up', 'Down', 'North', 'South', 'West', 'East']
        self.discount = 0.999
        self.pi = {s: random.choice(self.actions(s)) for s in self.states}

    def actions(self, state):

        if state in self.states:
            act = []
            if state[0] in range(1, self.L + 1):
                act.append('East')
                act.append('West')
            elif state[0] > 0:
                act.append('West')
            elif state[0] < self.L:
                act.append("East")

            if state[1] in range(1, self.L + 1):
                act.append('South')
                act.append('North')
            elif state[1] > 0:
                act.append('South')
            elif state[1] < self.L:
                act.append('North')

            if state[2] in range(1, self.L + 1):
                act.append('Up')
                act.append('Down')
            elif state[2] > 0:
                act.append('Down')
            elif state[2] < self.L:
                act.append("Up")

            return act
        else:
            print("Error: state is out of scope")

    def reward(self, state):

        if state in self.terminal_states:
            return self.terminal_states[state]
        else:
            return self.default_reward

    def result(self, state, action):

        if state in self.states:
            if state in self.terminal_states:
                # print("Error: state is terminal")
                return state
            elif action in self.actions(state):
                if action == 'Up':
                    new_state = tuple(map(lambda i, j: i + j, state, (0, 0, 1)))
                    if new_state in self.states:
                        return new_state
                    else:
                        return state

                elif action == 'Down':
                    new_state = tuple(map(lambda i, j: i + j, state, (0, 0, -1)))
                    if new_state in self.states:
                        return new_state
                    else:
                        return state
                elif action == 'South':
                    new_state = tuple(map(lambda i, j: i + j, state, (0, -1, 0)))
                    if new_state in self.states:
                        return new_state
                    else:
                        return state
                elif action == 'North':
                    new_state = tuple(map(lambda i, j: i + j, state, (0, 1, 0)))
                    if new_state in self.states:
                        return new_state
                    else:
                        return state
                elif action == 'East':
                    new_state = tuple(map(lambda i, j: i + j, state, (1, 0, 0)))
                    if new_state in self.states:
                        return new_state
                    else:
                        return state
                elif action == 'West':
                    new_state = tuple(map(lambda i, j: i + j, state, (-1, 0, 0)))
                    if new_state in self.states:
                        return new_state
                    else:
                        return state
            else:
                print("Action is not valid")
        else:
            print("Error: state  is not valid")

    def result1(self, state, action):
        if state in self.terminal_states:
            return []
        ppss = []
        for i in self.actions(state):
            if i == action:
                ppss.append((0.5, self.result(state, i)))
            else:
                ppss.append((0.1, self.result(state, i)))

        return ppss

    def actions1(self, state):
        if state in self.terminal_states:
            return []
        else:
            return self.all_actions



#### Part C
Write a function to implement **policy iteration** for this drone landing MDP. Create an MDP environment to represent the $L=4$ case.

Use your function to find an optimal policy for your new MDP environment. Check (by printing to screen) that the policy for the following states are what you expect, and **comment on the results**:
1. $(2,2,1)$
1. $(0,2,1)$
1. $(2,0,1)$

The policy for each of these states is the action that the agent should take in that state. 

In [6]:


def policy_evaluation(pi, U, MDP, key=25):
    R = MDP.reward
    T = MDP.result
    gamma = MDP.discount
    for i in range(key):
        for s in MDP.states:
            if s in MDP.terminal_states:
                U[s] = R(s)
            else:
                U[s] = R(s) + gamma * U[T(s, pi[s])]

    return U
 

def policy_iteration(MDP: MDPLanding):
    U = {s: 0 for s in MDP.states}
    pi = {s: random.choice(MDP.actions(s)) for s in MDP.states}
    while True:
        U = policy_evaluation(pi, U, MDP)
        unchanged = True
        for s in MDP.states:
            if s not in MDP.terminal_states:
                a = max(MDP.actions(s), key=lambda a: U[MDP.result(s, a)])

                if a != pi[s]:
                    pi[s] = a
                    unchanged = False
        if unchanged:
            return pi


space = MDPLanding(4)

the_best_policy = policy_iteration(space)
print("When we test for (2,2,1), the agent will think how to reach (2,2,1), so the best move will be down")
print(the_best_policy[(2, 2, 1)])
print("When we test for (0,2,1), the agent will think how to reach (2,2,1), so the best move will be East")
print(the_best_policy[(0, 2, 1)])
print("When we test for (2,0,1), the agent will think how to reach (2,2,1), so the best move will be south, then south, then down ")
print(the_best_policy[(2, 0, 1)])
print(the_best_policy[(2, 1, 1)])
print(the_best_policy[(2, 2, 1)])



When we test for (2,2,1), the agent will think how to reach (2,2,1), so the best move will be down
Down
When we test for (0,2,1), the agent will think how to reach (2,2,1), so the best move will be East
East
When we test for (2,0,1), the agent will think how to reach (2,2,1), so the best move will be south, then south, then down 
North
North
Down


#### Part D
Provide an example of a non-deterministic transition that could be included in your code in Part C. Describe the function. How would you modify your code to handle a non-deterministic transition function?

*Answer*
<br>

It is the situation when the agent must include the probabilities for the outcomes of doing a particular action. Assuming that the probability of doing the desired action is 0.5, then the probability of doing each of the rest of the actions is 0.1. 

<br>

*Example:*  assume the drone in state (2,2,2), the result of doing action 'Up' is that $p = 0.5 \times -0.01 + 0.1 \times -0.01 + 0.1 \times-0.01 + 0.1 \times -0.01 + 0.1 \times -0.01 + 0.1 \times -0.01$ 
with taking $L = 4$. 


*Change in code:* <br>
The change in code will happen in the result method, to return a tuple of the sum of probabilities ans the resulting state. As a result of thr previous chang, we must edit the policy evaulation function so that it sum the probabiltes returned from the result function. 

#### Part E
Describe the main differences between **policy iteration** and **value iteration**? How would your code change in Part C to convert it to **value iteration**?

Answer:

**Value iteration:** it keeps track of utilities to find an optimal action and it terminates when converge. It is slow but easy in implementation. 

 **Policy iteration:** Starts with random policy and it keeps track of policies by repeating two steps; evaluation, and improvment of policies to find an optimal action and it terminates when no policy changed. It is faster but hard in implementation. 


#### Part F

Code up a **Q-learning** agent/algorithm to learn how to land the drone. You can do this however you like, as long as you use the MDP class structure defined above.  

Your code should include some kind of a wrapper to run many trials to train the agent and learn the Q values.  You also do not need to have a separate function for the actual "agent"; your code can just be a "for" loop within which you are refining your estimate of the Q values.

From each training trial, save the cumulative discounted reward (utility) over the course of that episode. That is, add up all of $\gamma^t R(s_t)$ where the drone is in state $s_t$ during time step $t$, for the entire sequence. We refer to this as "cumulative reward" because we usually refer to "utility" as the utility *under an optimal policy*.

Some guidelines:
* The drone should initialize in a random non-terminal state for each new training episode.
* The training episodes should be limited to 50 time steps, even if the drone has not yet landed. If the drone lands (in a terminal state), the training episode is over.
* You may use whatever learning rate $\alpha$ you decide is appropriate, and gives good results.
* There are many forms of Q-learning. You can use whatever you would like, subject to the reliability targets below.
* Your code should return:
  * The learned Q values associated with each state-action pair.
  * The cumulative reward for each training trial. 
  * Anything else that might be useful in the ensuing analysis.

In [7]:

class QLearningAgent:

    def __init__(self, mdp: MDPLanding, Ne, Rplus, alpha=None):
        self.reward = mdp.reward
        self.gamma = mdp.discount
        self.terminals = mdp.terminal_states
        self.all_act = mdp.all_actions
        self.Ne = Ne  # iteration limit in exploration function
        self.Rplus = Rplus  # large value to assign before iteration limit
        self.Q = defaultdict(float)
        self.Nsa = defaultdict(float)
        self.policy = mdp.pi
        self.get_actions = mdp.actions1
        self.s = None
        self.a = None
        self.r = None

        if alpha:
            self.alpha = alpha
        else:
            self.alpha = lambda n: 1. / (1 + n)  # udacity video

    def f(self, u, n):
        # Ne(s,a)
        """Exploration function. Returns fixed Rplus until
        agent has visited state, action a Ne number of times.
        Same as ADP agent in book."""
        if n < self.Ne:
            return self.Rplus
        else:
            return u

    def actions_in_state(self, state):
        """Return actions possible in given state.
        Useful for max and argmax."""

        return self.get_actions(state)

    def __call__(self, percept):
        s1, r1 = percept
        Q, Nsa, s, a, r = self.Q, self.Nsa, self.s, self.a, self.r
        alpha, gamma, terminals = self.alpha, self.gamma, self.terminals,
        actions_in_state = self.actions_in_state

        if s in terminals:
            Q[s, None] = r1
            self.policy[s] = None
        if s is not None:
            Nsa[s, a] += 1
            if s1 not in terminals:
                Q[s, a] += alpha(Nsa[s, a]) * (r + gamma * max(Q[s1, a1] for a1 in actions_in_state(s1)) - Q[s, a])
                self.a = max(actions_in_state(s1), key=lambda a1: self.f(Q[s1, a1], Nsa[s1, a1]))

        if s in terminals:
            self.s = self.a = self.r = None
        else:
            self.s, self.r = s1, r1
            self.policy[s1] = self.a
        return self.a

    def update_state(self, percept):
        """To be overridden in most cases. The default case
        assumes the percept to be of type (state, reward)."""
        return percept, self.reward(percept)


def run_single_trial(agent_program: QLearningAgent, mdp: MDPLanding):
    """Execute trial for given agent_program
    and mdp. mdp should be an instance of subclass
    of mdp.MDP """

    def take_single_action(mdp: MDPLanding, s, a):
        """
        Select outcome of taking action a
        in state s. Weighted Sampling.
        """
        x = random.uniform(0, 1)
        cumulative_probability = 0.0
        state = s
        for probability_state in mdp.result1(s, a):
            probability, state = probability_state
            cumulative_probability += probability
            if x < cumulative_probability:
                break
        return state

    t = [0, 0, 0]
    while t[2] == 0:
        t = np.random.randint(0, 5, size=3)
    current_state = (t[0], t[1], t[2])
    while True:
        current_reward = mdp.reward(current_state)
        percept = (current_state, current_reward)
        next_action = agent_program(percept)
        if next_action is None:
            break
        current_state = take_single_action(mdp, current_state, next_action)

## This code is quoted from the repo provided in class

#### Part G

Initialize the $L=10$ environment (so that the landing pad is at $(5,5,0)$). Run some number of training trials to train the drone.

**How do I know if my drone is learned enough?**  If you take the mean cumulative reward across the last 5000 training trials, it should be around 0.80. This means at least about 10,000 (but probably more) training episodes will be necessary. It will take a few seconds on your computer, so start small to test your code.

**Then:** Compute block means of cumulative reward from all of your training trials. Use blocks of 500 training trials. This means you need to create some kind of array-like structure such that its first element is the mean of the first 500 trials' cumulative rewards; its second element is the mean of the 501-1000th trials' cumulative rewards; and so on. Make a plot of the block mean rewards as the training progresses. It should increase from about -0.5 initially to somewhere around +0.8.

**And:** Print to the screen the mean of the last 5000 trials' cumulative rewards, to verify that it is indeed about 0.80.

In [None]:
# Solution:

space = MDPLanding(10)

q_agent = QLearningAgent(space, Ne=5, Rplus=2, alpha=lambda n: 60. / (59 + n))
for i in range(500):
    run_single_trial(q_agent, space)

#### Part H

**Question 1:** Why does the cumulative reward start off around -0.5 at the beginning of the training?

**Question 2:** Why will it be difficult for us to train the drone to reliably obtain rewards much greater than about 0.8?

**Your answer here:**

