
# Computational Modeling in HCI


$$\newcommand{\vec}[1]{{\bf #1} } 
\newcommand{\real}{\mathbb{R} }
\newcommand{\expect}[1]{\mathbb{E}[#1] }
\DeclareMathOperator*{\argmin}{arg\,min}
$$

----

 **Nikola Banovic**

* **University of Michigan**
* nbanovic@umich.edu
* [www.nikolabanovic.net](www.nikolabanovic.net)
* @nikola_banovic  
* [github.com/nbanovic](https://github.com/nbanovic)

---


----


# Modeling Human Behavior via Inverse Reinforcement Learning

Models of human behavior enable us to explore and describe people’s cognition, behaviors, and environments in which they are situated. We define behaviors as sequences of situatoins people find themselves in and actions they perform in those situations.

<img src="imgs/sequence.png"/>

Human behavior are thus physical actions and emotions that people exhibit. Although there are many kinds of behaviors, we will focus on modeling human routine behavior. We define routine behaviors as "likely, weakly ordered, interruptible sequences of situations and actions that a person will perform in those situations to create or reach opportunities that enable the person to accomplish a goal" ([Banovic et al, 2018](https://global.oup.com/academic/product/computational-interaction-9780198799603?cc=us&lang=en&)).

Computational models, or mathematical representions of such complex systems of behaviors and environments, enable us to apply various computational modeling methods for exploring the systems by simulation and prediction. Such computational models and methods enable next generation of behavior-aware User Interfaces that can automatically reason about and act in response to people’s behaviors. In this module, we teach you how to create such computational models of human behavior. We will illustrate [a recent behavior modeling approach](http://dl.acm.org/authorize?N03134) based on Inverse Reinforcement Learning. We will use a Python implementation of Maximum Causal Entropy Inverse Reinforcement Learning from [Brian Ziebart's PhD thesis (2010)](http://www.cs.cmu.edu/~bziebart/publications/thesis-bziebart.pdf) borrowed from [krasheninnikov/max-causal-ent-irl](https://github.com/krasheninnikov/max-causal-ent-irl).

Inverse Reinforcement Learning is particularly well suited for this problem because it allows us to mathematically  express our definition of human routine behavior and to capture and estimate the preference that people have for different goal situations.

Here, we are going to follow a computaional modeling pipeline. However, our focus in this module will primarely be on building a computational model base on Inverse Reinforcement Learning.

---


In [None]:
# Standard imports that we will need in the rest of the notebook.
import numpy as np
from numpy import inf

# Discrete distributions and sampling
from gym import Env, spaces, utils
from gym.utils import seeding

----

## Model Definition and Feature Engineering

<img src="imgs/feature_engineering.png"/>

Unlike a purely algorithmic approach, we seek to define behaviors based on our existing knowledge and hypotheses. Thus, we carefully pick features that describe the behavior we are interested in modeling. In this module, we will work with a simple example that models people's routine when deciding if they should wear a jacket or not when leaving home.


### Defining Situation and Action Features
Here we define our features. Note that these features are carefuly crafted in a way that will help us capture the economics of this particular behavior. More on economics of behavior later on.

In [None]:
state_feature_names_dict = {'Weather':['Hot','Cold']} #TODO: Add your features here

In [None]:
action_feature_names_dict = {}  #TODO: Add your features here

----


## Collecting and Pre-processing the Data
Here  we load the behavior instances data from an existing file. However, in  a real-world project, this step would require a data collection app, followed by a pre-processing step that would convert raw data into behavior instances.

In [None]:
behavior_instances = np.load("data/behavior_instances.npy")
behavior_instances

In [None]:
#TODO: Explore behaviors instances here. What did you find?

----

##  Parameter  Estimation
Here, we will train an IRL model from demonstrated behavior.

<img src="imgs/irl_modeltraining.png"/>


### MDP Refresher
Inverse Reinforcement Learning uses a Markov Decision Process (MDP) to encode behaviors. Here we briefly summarize details from and how it applies to [modeling human behaviors](http://dl.acm.org/authorize?N03134). A Markov decision process is a tuple. It consists of a set of states representing situations  people find themselves in, and actions that a person can take in those situations. In addition, the model includes an action-dependent probability distribution for each state transition p(s'|s,a), which specifies the probability of the next state s'
 when the person performs action a in state s. This state transition probability distribution models how the environment responds to the actions that people perform in different
states. When modeling human behavior, the transitions are often stochastic (each pair (s, a) can transition to many transition states s' with different probabilities). However, if the person has full control over the environment, they can also be deterministic (i.e., for each pair (s, a) there is exactly one transition state s' with probability 1). Finally, there is a reward function R that the person incurs when performing action a in state s, which represents the utility that people get from performing different actions in different contexts.

In [None]:
class MDP(object):
    '''
    MDP object
    Attributes
    ----------
    self.nS : int
        Number of states in the MDP.
    self.nA : int
        Number of actions in the MDP.
    self.P : two-level dict of lists of tuples
        First key is the state and the second key is the action.
        self.P[state][action] is a list of tuples (prob, nextstate, reward).
    self.T : 3D numpy array
        The transition prob matrix of the MDP. p(s'|s,a) = self.T[s,a,s']
    '''
    def __init__(self, env):
        P, nS, nA, desc = MDP.env2mdp(env)
        self.isd = env.isd # initial state probabilities
        self.P = P # state transition and reward probabilities, explained below
        self.nS = nS # number of states
        self.nA = nA # number of actions
        self.desc = desc # 2D array specifying what each grid cell means
        self.env = env
        self.T = self.get_transition_matrix()
        self.s = self.reset()

    def env2mdp(env):
        return ({s : {a : [tup[:3] for tup in tups]
                for (a, tups) in a2d.items()} for (s, a2d) in env.P.items()},
                env.nS, env.nA, env.desc)

    def get_transition_matrix(self):
        '''Return a matrix with index S,A,S' -> P(S'|S,A)'''
        T = np.zeros([self.nS, self.nA, self.nS])
        for s in range(self.nS):
            for a in range(self.nA):
                transitions = self.P[s][a]
                s_a_s = {t[1]:t[0] for t in transitions}
                for s_prime in range(self.nS):
                    if s_prime in s_a_s:
                        T[s, a, s_prime] = s_a_s[s_prime]
        return T

    def reset(self):
        self.s = np.random.choice(self.nS, p=self.isd)
        return self.s

    def step(self, a, s=None):
        if s == None: s = self.s
        if len(self.P[s][a])==1:
            self.s = self.P[s][a][0][1]
            return self.s
        else:
            p_s_sa = np.asarray(self.P[s][a])[:,0]
            next_state_index = np.random.choice(range(len(p_s_sa)), p=p_s_sa)
            self.s = self.P[s][a][next_state_index][1]
            return self.s

We are using a stochastic MDP (i.e., an MDP where all transitions are probabilistic), so we need  helper functions to sample from the probability  distributions defined by  the MDP.

In [None]:
def categorical_sample(prob_n, np_random):
    """
    Sample from categorical distribution
    Each row specifies class probabilities
    """
    prob_n = np.asarray(prob_n)
    csprob_n = np.cumsum(prob_n)
    return (csprob_n > np_random.rand()).argmax()

class DiscreteEnv(Env):

    """
    Has the following members
    - nS: number of states
    - nA: number of actions
    - P: transitions (*)
    - isd: initial state distribution (**)
    (*) dictionary dict of dicts of lists, where
      P[s][a] == [(probability, nextstate, reward, done), ...]
    (**) list or array of length nS
    """
    def __init__(self, nS, nA, P, isd):
        self.P = P
        self.isd = isd
        self.lastaction=None # for rendering
        self.nS = nS
        self.nA = nA

        self.action_space = spaces.Discrete(self.nA)
        self.observation_space = spaces.Discrete(self.nS)

        self._seed()
        self._reset()

    def _seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def _reset(self):
        self.s = categorical_sample(self.isd, self.np_random)
        self.lastaction=None
        return self.s

    def _step(self, a):
        transitions = self.P[self.s][a]
        i = categorical_sample([t[0] for t in transitions], self.np_random)
        p, s, r, d= transitions[i]
        self.s = s
        self.lastaction=a
        return (s, r, d, {"prob" : p})

### Defining World  Dynamics
Here we define our world dynamics. Note that in this simple case we only have one feature that determines the probability of transition to the next situation. What is that feature?

In [None]:
weather_transition_matrix = None #TODO: What should the transitions be?

We now have all of the components for our environments. Let's define the environment.

In [None]:
class ThinkBeforeYouStepOut(DiscreteEnv):
    """
    Defines the world in which a person decides to bring or not  bring a jacket with them when they leave home.
    
    """

    def __init__(self, state_feature_names_dict, action_feature_names_dict, weather_transition_matrix):
        
        self.desc = 'TBYSO'
        
        feature_matrix = None
        nA = 0
        nS = 0

        # Flatten.
        self.state_feature_names = [key + ':' + value for key, value_list in state_feature_names_dict.items() for value in value_list]

        self.action_feature_names = [key + ':' + value for key, value_list in action_feature_names_dict.items() for value in value_list]

        state_feature_num = len(self.state_feature_names)
        action_feature_num = len(self.action_feature_names)

        # Initial state indicator (1 when initial state, 0 otherwise)
        isi = []

        # Create all possible states (i.e., reject those that are not possible).
        for weather in state_feature_names_dict['Weather']:
            for at_home in state_feature_names_dict['At Home']:
                for has_jacket in state_feature_names_dict['Has Jacket']:

                    # If you are at home you always have a jacket.
                    if at_home == 'True' and has_jacket == 'False':
                        continue
                        
                    for holding_jacket in state_feature_names_dict['Holding Jacket']:
                        
                        # If you are at home you are not holding your jacket.
                        if at_home == 'True' and holding_jacket == 'True':
                            continue
                            
                        # If you are not at home you can hold the jacket only when you have it.
                        if at_home == 'False' and has_jacket == 'False' and holding_jacket == 'True':
                            continue

                        for wearing_jacket in state_feature_names_dict['Wearing Jacket']:

                            # Come on! Nobody wears jackets at home.
                            if at_home == 'True' and wearing_jacket == 'True':
                                continue

                            # You can only wear jacket when you have it with you.
                            if has_jacket == 'False' and wearing_jacket == 'True':
                                continue
                                
                            # You cannot wear and hold the jacket and the same time.
                            if holding_jacket == 'True' and wearing_jacket == 'True':
                                continue
                                
                            # If you are outside and you have a jacket, you must either wearing it or hold it.
                            if at_home == 'False' and has_jacket == 'True' and holding_jacket == 'False' and wearing_jacket == 'False':
                                continue
                                
                            # If you are outside and you don't have a jacket, you can't hold it or wear it.
                            if at_home == 'False' and has_jacket == 'False' and (holding_jacket == 'True' or wearing_jacket == 'True'):
                                continue

                            for feeling in state_feature_names_dict['Feeling']:

                                # When at home you do not wear a jacket and feel just right.

                                if at_home == 'True':
                                    if not(feeling == 'Just Right'):
                                        continue                            
                                elif at_home == 'False':
                                    if weather == 'Cold':
                                        #If the weather is cold, you can feel just right only when wearing a jacket, and cold otherwise.
                                        if wearing_jacket == 'True' and not(feeling == 'Just Right'):
                                            continue
                                        elif wearing_jacket == 'False' and not(feeling == 'Cold'):
                                            continue

                                    elif weather == 'Hot':
                                        #If the weather is hot, you can feel just right only when not wearing a jacket, and hot otherwise.
                                        if wearing_jacket == 'True' and not(feeling == 'Hot'):
                                            continue
                                        elif wearing_jacket == 'False' and not(feeling == 'Just Right'):
                                            continue

                                #  We are here, so it must mean it is a valid state because we haven't rejected it.
                                current_state = np.array([weather == 'Hot', weather == 'Cold', at_home == 'True', at_home == 'False', has_jacket == 'True', has_jacket == 'False', holding_jacket == 'True', holding_jacket == 'False', wearing_jacket == 'True', wearing_jacket == 'False', feeling == 'Cold', feeling == 'Just Right', feeling == 'Hot'], dtype = int)

                                print('State: ', ['Weather:'+weather, 'At Home:' + at_home, 'Has Jacket:' + has_jacket, 'Holding Jacket:' + holding_jacket, 'Wearing Jacket:' + wearing_jacket, 'Feeling:' + feeling])

                                if feature_matrix is None:
                                    feature_matrix = np.matrix(current_state).T
                                else:
                                    feature_matrix = np.concatenate((feature_matrix, np.matrix(current_state).T), axis=1)


                                isi.append(at_home == 'True')

                                nS = nS + 1


         # Create all possible actions (i.e., reject those that are not possible).
        for jacket in action_feature_names_dict['Jacket']:
            #  There is one action for each feature.
            nA = nA + 1

        self.feature_matrix = feature_matrix.T
        
        #  Initial state distribution. Initial states are the one when you are at home before you leave.
        isd = np.array(isi).astype('float64').ravel()
        isd /= isd.sum()

        # What does isd tell us about probability of being hot or cold?
        print("Initial state distribution: ", isd)

        # Create transition matrix.
        P = {s : {a : [] for a in range(nA)} for s in range(nS)}

        for from_state in range(nS):
            for action in range(nA):
                for to_state in range(nS):
                    li = P[from_state][action]

                    from_state_weather_cold = feature_matrix.item(1,from_state)
                    from_state_at_home_true = feature_matrix.item(2,from_state)
                    from_state_has_jacket_true = feature_matrix.item(4,from_state)
                    from_state_holding_jacket_true = feature_matrix.item(6,from_state)
                    from_state_wearing_jacket_true = feature_matrix.item(8,from_state)
                    from_state_feeling_hot = feature_matrix.item(10,from_state)
                    from_state_feeling_justright = feature_matrix.item(11,from_state)
                    from_state_feeling_cold = feature_matrix.item(12,from_state)

                    to_state_weather_cold = feature_matrix.item(1,to_state)
                    to_state_at_home_true = feature_matrix.item(2,to_state)
                    to_state_has_jacket_true = feature_matrix.item(4,to_state)
                    to_state_holding_jacket_true = feature_matrix.item(6,to_state)
                    to_state_wearing_jacket_true = feature_matrix.item(8,to_state)
                    to_state_feeling_hot = feature_matrix.item(10,to_state)
                    to_state_feeling_justright = feature_matrix.item(11,to_state)
                    to_state_feeling_cold = feature_matrix.item(12,to_state)

                    # Initialize transition probability
                    p = 1.0

                    # The weather can change and you have no control over this!
                    p *= weather_transition_matrix.item(from_state_weather_cold, to_state_weather_cold)

                    # Can only transition from home to outside and from outside to outside.
                    if from_state_at_home_true == 1 and to_state_at_home_true == 1:
                        # We cannot stay at home. Have to keep moving.
                        p = 0.0

                    if from_state_at_home_true == 0 and to_state_at_home_true == 1:
                        # We cannot go home. Not in this example.
                        p = 0.0

                    # Where are you?
                    if(from_state_at_home_true == 1):
                        # You are inside.
                        if action == 0:
                            # Bring. That means in the next state you will have it but not wear it or p is 0.
                            if to_state_has_jacket_true == 0 or to_state_wearing_jacket_true == 1:
                                p = 0
                        elif action == 1:
                            # Leave. That means in the next state you will not have it and thus not be wearing it or p is 0.
                            if to_state_has_jacket_true == 1:
                                p = 0
                        elif action == 2:
                            # Put on. That means in the next state you will have it and wear it or p is 0.
                            if to_state_has_jacket_true == 0 or to_state_wearing_jacket_true == 0:
                                p = 0
                        elif action == 3:
                            # Can't take off at home because you are not wearing it.
                            p = 0

                    else:
                        # You are outside.
                        #If you have jacket you must have it with you no matter what you do and if you don't you can't get it.
                        if not(from_state_has_jacket_true == to_state_has_jacket_true):
                            p = 0
                        else:
                            if action == 0:
                                # Bring. You cannot bring the jacket when you are outside.
                                p = 0
                            elif action == 1:
                                # Leave. You cannot leave the jacket when you are outside.
                                p = 0
                            elif action == 2:
                                # Put on  or keep on. That means in the next state you will have it and wear it, but only when you already have it, or p is 0.
                                if from_state_has_jacket_true == 0 or to_state_has_jacket_true == 0 or to_state_wearing_jacket_true == 0:
                                    p = 0
                            elif action == 3:
                                # Take off or keep off. That means that in the next state you will have it, but not wear it, but only  if you already had it with you.
                                if to_state_wearing_jacket_true == 1:
                                    p = 0

                    # Suppose that the "true" reward is when you are feeeling just right. We do not use this reward in IRL, we learn it. In RL you would identify it using preference illicitation and calculate optimal behavior.
                    r = to_state_feeling_justright

                    li.append((p, to_state, r, False))

        super(ThinkBeforeYouStepOut, self).__init__(nS, nA, P, isd)

### Exploring the Environment

Let's explore the environment.

In [None]:
environment = ThinkBeforeYouStepOut(state_feature_names_dict, action_feature_names_dict, weather_transition_matrix)
mdp = MDP(environment)    

print("State features: ", environment.state_feature_names)
print("Action features: ", environment.action_feature_names)

print(environment.feature_matrix)
print("Transition probabilities: ")
print(mdp.T)


In [None]:
#TODO: Explore the environemnt here. What are transitions between different states?

----

###  Reinforcement Learning and Finding Optimal Behavior

Unlike IRL that estimates a person's perference for certain situationsa and actions and computes a policy based on those estimated preferences, Reinforcement Learning (RL) computes an optimal policy (what they should do) given feedback about their preferences. The code below is for reference only, and you can use it to experiment later.

In [None]:
def vi_rational(mdp, gamma, r, horizon=None, threshold=1e-16):
    '''
    Finds the optimal state and state-action value functions via value 
    iteration with the Bellman backup.
    
    Computes the rational policy \pi_{s,a} = \argmax(Q_{s,a}).
    
    Parameters
    ----------
    mdp : object
        Instance of the MDP class.
    gamma : float 
        Discount factor; 0<=gamma<=1.
    r : 1D numpy array
        Initial reward vector with the length equal to the 
        number of states in the MDP.
    horizon : int
        Horizon for the finite horizon version of value iteration.
    threshold : float
        Convergence threshold.
    Returns
    -------
    1D numpy array
        Array of shape (mdp.nS, 1), each V[s] is the value of state s under 
        the reward r and Boltzmann policy.
    2D numpy array
        Array of shape (mdp.nS, mdp.nA), each Q[s,a] is the value of 
        state-action pair [s,a] under the reward r and Boltzmann policy.
    2D numpy array
        Array of shape (mdp.nS, mdp.nA), each value p[s,a] is the probability 
        of taking action a in state s.
    '''
    
    V = np.copy(r)

    t = 0
    diff = float("inf")
    while diff > threshold:
        V_prev = np.copy(V)
        
        # Q[s,a] = (r_s + gamma * \sum_{s'} p(s'|s,a)V_{s'})
        Q = r.reshape((-1,1)) + gamma * np.dot(mdp.T, V_prev)
        # V_s = max_a(Q_sa)
        V = np.amax(Q, axis=1)

        diff = np.amax(abs(V_prev - V))
        
        t+=1
        if horizon is not None:
            if t==horizon: break
    
    V = V.reshape((-1, 1))

    # Compute policy
    # Assigns equal probability to taking actions whose Q_sa == max_a(Q_sa)
    max_Q_index = (Q == np.tile(np.amax(Q,axis=1),(mdp.nA,1)).T)
    policy = max_Q_index / np.sum(max_Q_index, axis=1).reshape((-1,1))

    return V, Q, policy

----

### Inverse Reinforcement Learning

We use the [MaxCausalEnt](http://www.cs.cmu.edu/~bziebart/publications/thesis-bziebart.pdf) IRL algorithm to train our  model. The principle of Maximum Entropy ensures that the probability distribution of actions given states is the one that makes least amount of assumptions about the behaviors than what is present in the data.

#### Softmax

Softmax allows us to compute a probability distribution over actions given situations.


In [None]:
def softmax(x, t=1):
    '''
    Numerically stable computation of t*log(\sum_j^n exp(x_j / t))
    
    If the input is a 1D numpy array, computes it's softmax: 
        output = t*log(\sum_j^n exp(x_j / t)).
    If the input is a 2D numpy array, computes the softmax of each of the rows:
        output_i = t*log(\sum_j^n exp(x_{ij} / t))
    
    Parameters
    ----------
    x : 1D or 2D numpy array
        
    Returns
    -------
    1D numpy array 
        shape = (n,), where: 
            n = 1 if x was 1D, or 
            n is the number of rows (=x.shape[0]) if x was 2D.
    '''
    assert t>=0
    if len(x.shape) == 1: x = x.reshape((1,-1))
    if t == 0: return np.amax(x, axis=1)
    if x.shape[1] == 1: return x
   
    def softmax_2_arg(x1,x2, t):
        ''' 
        Numerically stable computation of t*log(exp(x1/t) + exp(x2/t))
        
        Parameters
        ----------
        x1 : numpy array of shape (n,1)
        x2 : numpy array of shape (n,1)
        
        Returns
        -------
        numpy array of shape (n,1)
            Each output_i = t*log(exp(x1_i / t) + exp(x2_i / t))
        '''
        tlog = lambda x: t * np.log(x)
        expt = lambda x: np.exp(x/t)
                
        max_x = np.amax((x1,x2),axis=0)
        min_x = np.amin((x1,x2),axis=0)    
        return max_x + tlog(1+expt((min_x - max_x)))
    
    sm = softmax_2_arg(x[:,0],x[:,1], t)
    # Use the following property of softmax_2_arg:
    # softmax_2_arg(softmax_2_arg(x1,x2),x3) = log(exp(x1) + exp(x2) + exp(x3))
    # which is true since
    # log(exp(log(exp(x1) + exp(x2))) + exp(x3)) = log(exp(x1) + exp(x2) + exp(x3))
    for (i, x_i) in enumerate(x.T):
        if i>1: sm = softmax_2_arg(sm, x_i, t)
    return sm

#### Value Iteration and Policy



In [None]:
def vi_boltzmann(mdp, gamma, r, horizon=None,  temperature=1, 
                            threshold=1e-16):
    '''
    Finds the optimal state and state-action value functions via value 
    iteration with the "soft" max-ent Bellman backup:
    
    Q_{sa} = r_s + gamma * \sum_{s'} p(s'|s,a)V_{s'}
    V'_s = temperature * log(\sum_a exp(Q_{sa}/temperature))
    Computes the Boltzmann rational policy 
    \pi_{s,a} = exp((Q_{s,a} - V_s)/temperature).
    
    Parameters
    ----------
    mdp : object
        Instance of the MDP class.
    gamma : float 
        Discount factor; 0<=gamma<=1.
    r : 1D numpy array
        Initial reward vector with the length equal to the 
        number of states in the MDP.
    horizon : int
        Horizon for the finite horizon version of value iteration.
    threshold : float
        Convergence threshold.
    Returns
    -------
    1D numpy array
        Array of shape (mdp.nS, 1), each V[s] is the value of state s under 
        the reward r and Boltzmann policy.
    2D numpy array
        Array of shape (mdp.nS, mdp.nA), each Q[s,a] is the value of 
        state-action pair [s,a] under the reward r and Boltzmann policy.
    2D numpy array
        Array of shape (mdp.nS, mdp.nA), each value p[s,a] is the probability 
        of taking action a in state s.
    '''
    
    # No rewards for state-action pairs where there is no transition
    mask = np.sum(mdp.T, axis=2)
        
    #Value iteration    
    V = np.copy(r)
    t = 0
    diff = float("inf")
    while diff > threshold:
        V_prev = np.copy(V)

        # ∀ s,a: Q[s,a] = (r_s + gamma * \sum_{s'} p(s'|s,a)V_{s'})
        Q = np.multiply(r.reshape((-1,1)) + gamma * np.dot(mdp.T, V_prev), mask)
        
        # ∀ s: V_s = temperature * log(\sum_a exp(Q_sa/temperature))
        V = softmax(Q, temperature)
        
        diff = np.amax(abs(V_prev - V))
        
        t+=1
        if t<horizon and gamma==1:
            # When \gamma=1, the backup operator is equivariant under adding 
            # a constant to all entries of V, so we can translate min(V) 
            # to be 0 at each step of the softmax value iteration without 
            # changing the policy it converges to, and this fixes the problem 
            # where log(nA) keep getting added at each iteration.
            V = V - np.amin(V)
        if horizon is not None:
            if t==horizon: break
    
    V = V.reshape((-1, 1))
        
    # Compute policy
    expt = lambda x: np.exp(x/temperature)
    tlog = lambda x: temperature * np.log(x)

    # TODO: How do we calculate the policy?
    policy = None
        
    return V, Q, policy

#### Computing State Counts from Data



In [None]:
def compute_s_a_visitations(mdp, gamma, trajectories):
    '''
    Given a list of trajectories in an mdp, computes the state-action 
    visitation counts and the probability of a trajectory starting in state s.
    
    State-action visitation counts:
    sa_visit_count[s,a] = \sum_{i,t} 1_{traj_s_{i,t} = s AND traj_a_{i,t} = a}
    P_0(s) -- probability that the trajectory will start in state s. 
    P_0[s] = \sum_{i,t} 1_{t = 0 AND traj_s_{i,t} = s}  / i
    P_0 is used in computing the occupancy measure of the MDP.
    Parameters
    ----------
    mdp : object
        Instance of the MDP class.
    gamma : float 
        Discount factor; 0<=gamma<=1.
    trajectories : 3D numpy array
        Expert trajectories. 
        Dimensions: [number of traj, timesteps in the traj, 2: state & action].
    Returns
    -------
    (2D numpy array, 1D numpy array)
        Arrays of shape (mdp.nS, mdp.nA) and (mdp.nS).
    '''

    s_0_count = np.zeros(mdp.nS)
    sa_visit_count = np.zeros((mdp.nS, mdp.nA))
    
    for traj in trajectories:
        # traj[0][0] is the state of the first timestep of the trajectory.
        s_0_count[traj[0][0]] += 1
        for (s, a) in traj:
            sa_visit_count[s, a] += 1
      
    # Count into probability        
    P_0 = s_0_count / trajectories.shape[0]
    
    return sa_visit_count, P_0

#### Computing State Counts



In [None]:
def compute_D(mdp, gamma, policy, P_0=None, t_max=None, threshold=1e-6):
    '''
    Computes occupancy measure of a MDP under a given time-constrained policy 
    -- the expected discounted number of times that policy π visits state s in 
    a given number of timesteps.
    
    The version w/o discount is described in Algorithm 9.3 of Ziebart's thesis: 
    http://www.cs.cmu.edu/~bziebart/publications/thesis-bziebart.pdf.
    
    The discounted version can be found in the supplement to Levine's 
    "Nonlinear Inverse Reinforcement Learning with Gaussian Processes" (GPIRL):
    https://graphics.stanford.edu/projects/gpirl/gpirl_supplement.pdf.
    Parameters
    ----------
    mdp : object
        Instance of the MDP class.
    gamma : float 
        Discount factor; 0<=gamma<=1.
    policy : 2D numpy array
        policy[s,a] is the probability of taking action a in state s.
    P_0 : 1D numpy array of shape (mdp.nS)
        i-th element is the probability that the traj will start in state i.
    t_max : int
        number of timesteps the policy is executed.
    Returns
    -------
    1D numpy array of shape (mdp.nS)
    '''

    if P_0 is None: P_0 = np.ones(mdp.nS) / mdp.nS
    D_prev = np.zeros_like(P_0)     
    
    t = 0
    diff = float("inf")
    while diff > threshold:
        
        # ∀ s: D[s] <- P_0[s]
        D = np.copy(P_0)

        for s in range(mdp.nS):
            for a in range(mdp.nA):
                # for all s_prime reachable from s by taking a do:
                for p_sprime, s_prime, _ in mdp.P[s][a]:
                    D[s_prime] += gamma * D_prev[s] * policy[s, a] * p_sprime

        diff = np.amax(abs(D_prev - D))    
        D_prev = np.copy(D)
        
        if t_max is not None:
            t+=1
            if t==t_max: break
    
    return D

#### Putting it All Together



In [None]:
def max_causal_ent_irl(mdp, feature_matrix, trajectories, gamma=1, h=None, 
                       temperature=1e-2, epochs=300, learning_rate=0.01, theta=None):
    '''
    Finds theta, a reward parametrization vector (r[s] = features[s]'.*theta) 
    that maximizes the log likelihood of the given expert trajectories, 
    modelling the expert as a Boltzmann rational agent with given temperature. 
    
    This is equivalent to finding a reward parametrization vector giving rise 
    to a reward vector giving rise to Boltzmann rational policy whose expected 
    feature count matches the average feature count of the given expert 
    trajectories (Levine et al, supplement to the GPIRL paper).
    Parameters
    ----------
    mdp : object
        Instance of the MDP class.
    feature_matrix : 2D numpy array
        Each of the rows of the feature matrix is a vector of features of the 
        corresponding state of the MDP. 
    trajectories : 3D numpy array
        Expert trajectories. 
        Dimensions: [number of traj, timesteps in the traj, state and action].
    gamma : float 
        Discount factor; 0<=gamma<=1.
    h : int
        Horizon for the finite horizon version of value iteration.
    temperature : float >= 0
        The temperature parameter for computing V, Q and policy of the 
        Boltzmann rational agent: p(a|s) is proportional to exp(Q/temperature);
        the closer temperature is to 0 the more rational the agent is.
    epochs : int
        Number of iterations gradient descent will run.
    learning_rate : float
        Learning rate for gradient descent.
    theta : 1D numpy array
        Initial reward function parameters vector with the length equal to the 
        #features.
    Returns
    -------
    1D numpy array
        Reward function parameters computed with Maximum Causal Entropy 
        algorithm from the expert trajectories.
    '''    
    
    # Compute the state-action visitation counts and the probability 
    # of a trajectory starting in state s from the expert trajectories.
    sa_visit_count, P_0 = compute_s_a_visitations(mdp, gamma, trajectories)
    
    # Mean state visitation count of expert trajectories
    # mean_s_visit_count[s] = ( \sum_{i,t} 1_{traj_s_{i,t} = s}) / num_traj
    mean_s_visit_count = np.sum(sa_visit_count,1) / trajectories.shape[0]
    # Mean feature count of expert trajectories
    mean_f_count = np.dot(feature_matrix.T, mean_s_visit_count)
    
    if theta is None:
        theta = np.random.rand(feature_matrix.shape[1])
        

    for i in range(epochs):
        r = np.squeeze(np.asarray(np.dot(feature_matrix, theta.reshape(-1,1))))
        # Compute the Boltzmann rational policy \pi_{s,a} = \exp(Q_{s,a} - V_s) 
        V, Q, policy = vi_boltzmann(mdp, gamma, r, h, temperature)
        
        # IRL log likelihood term: 
        # L = 0; for all traj: for all (s, a) in traj: L += Q[s,a] - V[s]
        L = np.sum(sa_visit_count * (Q - V))
        
        # The expected #times policy π visits state s in a given #timesteps.
        D = compute_D(mdp, gamma, policy, P_0, t_max=trajectories.shape[1])        

        # IRL log likelihood gradient w.r.t rewardparameters. 
        # Corresponds to line 9 of Algorithm 2 from the MaxCausalEnt IRL paper 
        # www.cs.cmu.edu/~bziebart/publications/maximum-causal-entropy.pdf. 
        # Negate to get the gradient of neg log likelihood, 
        # which is then minimized with GD.
        dL_dtheta = -(mean_f_count - np.dot(feature_matrix.T, D))

        # Gradient descent
        theta = theta - learning_rate * dL_dtheta

        if (i+1)%10==0: 
            print('Epoch: {} log likelihood of all traj: {}'.format(i,L), 
                  ', average per traj step: {}'.format(
                  L/(trajectories.shape[0] * trajectories.shape[1])))
    return theta, policy

### Estimating the probability distribution of actions given situations
Here we run the MaxCausalEnt algorithm to estimate the probability of actions given situations. This completes our model.

In [None]:
# Demonstrates the usage of the implemented MaxCausalEnt IRL algorithm. 
    
#     First a number of expert trajectories is generated using the true reward 
#     giving rise to the Boltzmann rational expert policy with temperature t_exp. 
    
#     Hereafter the max_causal_ent_irl() function is used to find a reward vector
#     that maximizes the log likelihood of the generated expert trajectories, 
#     modelling the expert as a Boltzmann rational agent with temperature t_irl.
    
#     Parameters
#     ----------
#     t_expert : float >= 0
#         The temperature parameter for computing V, Q and policy of the 
#         Boltzmann rational expert: p(a|s) is proportional to exp(Q/t_expert);
#         the closer temperature is to 0 the more rational the expert is.
#     t_irl : float
#         Temperature of the Boltzmann rational policy the IRL algorithm assumes
#         the expert followed when generating the trajectories.
#     gamma : float 
#         Discount factor; 0<=gamma<=1.
#     h : int
#         Horizon for the finite horizon version of value iteration subroutine of
#         MaxCausalEnt IRL algorithm.
#     n_traj : int
#         Number of expert trajectories generated.
#     traj_len : int
#         Number of timesteps in each of the expert trajectories.
#     learning_rate : float
#         Learning rate for gradient descent in the MaxCausalEnt IRL algorithm.
#     epochs : int
#         Number of gradient descent steps in the MaxCausalEnt IRL algorithm.

horizon=4

# Features
feature_matrix = environment.feature_matrix

print('State feature names: ', environment.state_feature_names)
print('Action feature names: ', environment.action_feature_names)

# Find a reward vector that maximizes the log likelihood of the generated 
# expert trajectories.
theta, policy = max_causal_ent_irl(mdp, feature_matrix, behavior_instances, h=horizon)
print('Final reward weights: ', theta)
print('Final policy: ', policy)

In [None]:
#TODO: explore the model here. What can you say about the behaviors?

If you are curious, you can generatet syntetic data by specifying a person's preferences, and then sampling that person's behaviors from the model.


In [None]:
def generate_trajectories(mdp, policy, timesteps=4, num_traj=50):
    '''
    Generates trajectories in the MDP given a policy.
    
    Parameters
    ----------
    mdp : object
        Instance of the MDP class.
    policy : 2D numpy array
        Array of shape (mdp.nS, mdp.nA), each value p[s,a] is the probability 
        of taking action a in state s.
    timesteps : int
        Length of each of the generated trajectories.
    num_traj : 
        Number of trajectories to generate.
    
    Returns
    -------
    3D numpy array
        Expert trajectories. 
        Dimensions: [number of traj, timesteps in the traj, 2: state & action].
    '''
    
    trajectories = np.zeros([num_traj, timesteps, 2]).astype(int)
    
    s = mdp.reset()
    for i in range(num_traj):
        for t in range(timesteps):
            action = np.random.choice(mdp.nA, p=policy[s, :])
            trajectories[i, t, :] = [s, action]
            s = mdp.step(action)
        s = mdp.reset()
    
    return trajectories

In [None]:
t_expert=1e-16
gamma = 1
n_traj=364
traj_len = 4

np.random.seed(0)

print('State feature names: ',  environment.state_feature_names)
print('Action feature names: ',  environment.action_feature_names)
# The "true" reward weights and the reward
theta_expert = np.array([0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1000.0,0.0])

r_expert = np.squeeze(np.asarray(np.dot(feature_matrix, theta_expert)))

# Compute the Boltzmann rational expert policy from the given true reward.
if t_expert>0:
    V, Q, policy_expert = vi_boltzmann(mdp, gamma, r_expert, horizon, t_expert)
if t_expert==0:
    V, Q, policy_expert = vi_rational(mdp, gamma, r_expert, horizon)
    
print("My  theta: ", theta_expert)
print("My policy:")
print(policy_expert)

# Generate expert trajectories using the given expert policy.
trajectories = generate_trajectories(mdp, policy_expert, traj_len, n_traj)

# Compute and print the stats of the generated expert trajectories.
sa_visit_count, _ = compute_s_a_visitations(mdp, gamma, trajectories)

log_likelihood = np.sum(sa_visit_count * (Q - V))
print('Generated {} traj of length {}'.format(n_traj, traj_len))
print('Log likelihood of all traj under the policy generated ', 
      'from the true reward: {}, \n average per traj step: {}'.format(
       log_likelihood, log_likelihood / (n_traj * traj_len)))
print('Average return per expert trajectory: {} \n'.format(
        np.sum(np.sum(sa_visit_count, axis=1)*r_expert) / n_traj))


In [None]:
np.save("data/generated_behavior_instances.npy", trajectories)

# What have we done today?

This is a very simple model for a very simple problem that illustrates using IRL to model people's behaviors from  behavior traces.

* **Defined  behaviors**: used a definition of routine behavior to inform our model.
* **Mathematically expressed those behaviors**: used an MDP to express the situations that people find themselves in and the actions they perform those situations. We estimated people's preference for specific features of both situations and action using IRL.
* **Captured the uncertainity of human behavior**: created a probabilistic model of human behavior.
* **Estimated probability distribution of different behaviors given context from data**: used a principled approach to estimating these probablitis from the data. 