# Model Playground
Based on Bellotti et al.'s "Adaptive Experience Engine for Serious Games."


They proposed 2 variations of their model for serious games: a genetic algorithm model and a **reinforcement learning model** (implementation details are in p. 274-6)
* I am interested in implementing a simple prototype based on the proposed RL model.
* By simple, I mean I will include all of the core implementation details specified in the paper but will also exclude some other details.
* I have started implementing some details of the genetic algorithm model, but the implementation is still _incomplete_

## Imports

In [1]:
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
matplotlib.style.use('ggplot')

import numpy as np
import itertools
import sys
from sklearn.linear_model import SGDRegressor

## Reinforcement Learning Model
Bellotti et al. specified that they used a "Q-learning algorithm with linear function approximation." The following implementation of Q-learning with value function approximation is adapted from [this implementation](https://github.com/dennybritz/reinforcement-learning/blob/master/FA/Q-Learning%20with%20Value%20Function%20Approximation%20Solution.ipynb).

In the following implementation,
* a **state** refers to a selected sequence of tasks (these tasks are presented to the user in this order)
* an **action** refers to appending a new task to the selected sequence of tasks
* the **reward** is calculated as the "edutainment" value proposed by the paper

### TODO
* add names to tasks so that I can print and identify the sequenced tasks
* prettier printing

### Status/State Features (from table VI in Bellotti et al.'s paper)
This implementation will only consider a subset of these features:
* Average Score (AS): average score obtained by the player in the tasks in the selected sequence
* Average Length (AL): average time-length spent by the player for tasks in the selected sequence
* Interrupted Tasks Frequency (ITF): ratio between number of tasks the player has quit before the end and the total number of tasks in the selected sequence
* Variety of the task types (VT): variance of the distribution of the task types in the selected sequence

Features like AD, AL and ITF are supposed to be values collected from the user's gameplay, but since I am not implementing a game, I will simple simulate gameplay data collection by manually input the values of these features. 

### Tasks

In [2]:
# for example, the set of task types could include puzzles, quizzes, and conversations with virtual characters

TYPES = set(["puzzle", "quiz", "conversation"])

In [3]:
class Task():
    def __init__(self, *args):
        self.types = {t:0 for t in TYPES}
        for t in args:
            if t in TYPES:
                self.types[t] = 1  # indicates that the task is of this type
        
        self.score = 0  # how well the user scored on this task, normalized to [0,1]
        self.length = 0  # how long the user spent on this task, in minutes
        self.interrupted = 0  # iwhether or not the user quit before the end of this task, int 1 or 0

In [4]:
# 4 example tasks for testing the behavior of this model

puzzle = Task("puzzle")  # a puzzle-type task
convo = Task("conversation")  # a conversation-type task
quiz = Task("quiz")  # a quiz-type task
convo_quiz = Task("conversation", "quiz")  # a task that e.g. incorporates quizzes within conversation

TASKS = set([puzzle, convo, quiz, convo_quiz])  

### State Features

In [5]:
def calc_features(state):
    # given the state, or the sequence (list) of selected tasks,
    # this function calculates the state features as an array of AS, AL, ITF and VT values (returned in this order)
    
    num_tasks = len(state)
    
    if num_tasks == 0:
        return [0,0,0,0]
    
    AS = 0  # tracks the average score in the selected sequence
    AL = 0  # tracks the average time-length in the selected sequence
    
    # ITF tracks the ratio between number of tasks the player has quit before the end and 
    # the total number of tasks in the selected sequence
    ITF = 0
    
    # dictionary that tracks the distribution of the task types in the selected sequence
    type_dist = {t:0 for t in TYPES}
    
    for t in state:
        AS += t.score
        AL += t.length
        ITF += t.interrupted
        
        for type in t.types:
            type_dist[type] += t.types[type]
    
    AS = AS/num_tasks
    AL = AL/num_tasks
    ITF = ITF/num_tasks
    VT = np.var(list(type_dist.values()))  # the variance in type_dist
    
    return [AS, AL, ITF, VT]
        

### Reward Value: "Edutainment"

According to the paper, the **edutainment value** is calculated as follows:

$$\alpha * \frac{TTV}{ITF} + (1-\alpha)*AS$$

"The entertainment is proportional to the ratio between the variety of the task types in the sequence (TTV) and the frequency of interrupted tasks (ITF). The learning is evaluated as the average score (AS) obtained by the player in the sequence, assuming that the game score rewards the player’s learning [84]. Finally, the two goals are weighted through a tradeoff factor within the 0–1 range, which indicates the relative importance of the two goals."
* I think the TTV value is the same as the VT feature value (variance of the distribution of the task types in the selected sequence)

In [6]:
def calc_reward(state, a=0.5):
    # given the state, or the sequence (list) of selected tasks,
    # this function calculates the edutainment value
    
    if len(state) == 0:
        return 0
    
    AS, _, ITF, VT = calc_features(state)
    if ITF == 0:
        ITF = 1  # to avoid a infinitely large reward when we divide by 0
    return a*VT/ITF + (1-a)*AS

### Estimator for value function approximation

In [7]:
class Estimator():
    """
    Value Function approximator. 
    """
    
    def __init__(self):
        self.model = SGDRegressor(learning_rate="constant")
        self.model.partial_fit(self.featurize_state([]), [0])
        
    def featurize_state(self, state):
        """
        Returns the featurized representation for a state.
        """

        return [calc_features(state)]
    
    def predict(self, s, a=None):
        """
        Makes value function predictions.
        
        Args:
            s: state to make a prediction for
            
        Returns single value prediction
            
        
        eturns
            If an action a is given this returns a single number as the prediction.
            If no action is given this returns a vector or predictions for all actions
            in the environment where pred[i] is the prediction for action i.
            
        """
        
        if not a:
            unselected_tasks = TASKS-set(s)
            q_val_dict = dict()

            for t in unselected_tasks:  # calculate q value for all possible actions
                s.append(t)
                features = self.featurize_state(s)
                
                # this new action/state only contributes a nonzero value for its type
                # (the user has not yet performed the task, so score, length and interrupted values are all zero)
                # thus, the following prediction is only dependent on the task type variation (VT) feature
                q_val = self.model.predict(features)[0]
                
                q_val_dict[t] = q_val
                s.pop()
            
            return q_val_dict
        else:
            s.append(a)
            features = self.featurize_state(s)
            q_val = estimator.predict(features)[0]
            s.pop()
            
            return q_val
    
    def update(self, s, y):
        """
        Updates the estimator parameters for a given state the target y.
        """
        features = self.featurize_state(s)
        self.model.partial_fit(features, [y])

### Epsilon Greedy Policy

In [8]:
def make_epsilon_greedy_policy(estimator, epsilon):
    """
    Creates an epsilon-greedy policy based on a given Q-function approximator and epsilon.
    
    Args:
        estimator: An estimator that returns q values for a given state
        epsilon: The probability to select a random action . float between 0 and 1.
    
    Returns:
        A function that takes the observation as an argument and returns
        the probabilities for each action in the form of a numpy array of length nA.
    
    """
    def policy_fn(state):
        # the state is represented as the list of selected tasks
        unselected_tasks = TASKS-set(state)
        action_probs = {t:epsilon/len(unselected_tasks) for t in unselected_tasks}
        
        highest_q_val = -1
        best_action = None  # aka best task to append to the current state (selected sequence of tasks)
        
        q_val_dict = estimator.predict(state)
        for task in q_val_dict:
            if q_val_dict[task] > highest_q_val:
                best_action = task
        
        action_probs[best_action] += (1.0 - epsilon)
        return action_probs
    
    return policy_fn

### Q-Learning

In [9]:
def q_learning(estimator, num_episodes, discount_factor=1.0, epsilon=0.1, epsilon_decay=1.0):
    """
    Q-Learning algorithm for fff-policy TD control using Function Approximation.
    Finds the optimal greedy policy while following an epsilon-greedy policy.
    
    Args:
        estimator: Action-Value function estimator
        num_episodes: Number of episodes to run for.
        discount_factor: Gamma discount factor.
        epsilon: Chance the sample a random action. Float betwen 0 and 1.
        epsilon_decay: Each episode, epsilon is decayed by this factor
    
    Returns:
        An ordered list of tasks.
    """ 
    
    for i_episode in range(num_episodes):
        # state is the list/sequence of selected tasks
        state = []
        
        # indicator of when we are done with sequencing tasks: 
        # when the user wants to stop or when we have sequenced all tasks
        done = False
        
        # The policy we're following
        policy = make_epsilon_greedy_policy(
            estimator, epsilon * epsilon_decay**i_episode)
        
        # Print out which episode we're on, useful for debugging.
        print(i_episode)
        
        # One step in the environment
        for t in itertools.count():
                        
            # Choose an action (unselected task) to take
            action_probs = policy(state)  # dictionary
            
            action_choices = []
            action_choices_probs = []
            
            for t in action_probs:
                action_choices.append(t)
                action_choices_probs.append(action_probs[t])
                
            action = np.random.choice(action_choices, p=action_choices_probs)
            
            # Take a step
            next_state = state.copy()
            next_state.append(action)  # add the best next task to the selected sequence of tasks
            # and perform the task
            action.score = float(input("Score between 0 and 1"))
            action.length = float(input("Number of minutes spent"))
            action.interrupted = int(input("Interrupted?: 1 for yes, 0 for no"))
            
            done = bool(int(input("Want to stop doing tasks?: 1 for yes, 0 for no")))
            
            reward = calc_reward(next_state)
            
            if len(next_state) == len(TASKS):
                done = True
            
            # TD Update
            q_values_next_dict = estimator.predict(next_state)
            
            # Use this code for Q-Learning
            # Q-Value TD Target
            td_target = reward + discount_factor * np.max(list(q_values_next_dict.values()))
            print(reward)
            
            # Update the function approximator using our target
            estimator.update(next_state, td_target)
            
            print("\rStep {} @ Episode {}/{} ({})".format(t, i_episode + 1, num_episodes, calc_reward(state)), end="")
                
            if done:
                break
                
            state = next_state

In [10]:
estimator = Estimator()



In [11]:
q_learning(estimator, 3, epsilon=0.0)

0




0.3611111111111111
Step <__main__.Task object at 0x1077916d8> @ Episode 1/3 (0)1
0.5111111111111112
Step <__main__.Task object at 0x1077916d8> @ Episode 2/3 (0)0.4861111111111111
Step <__main__.Task object at 0x107791748> @ Episode 2/3 (0.5111111111111112)2
0.2611111111111111
Step <__main__.Task object at 0x1077916d8> @ Episode 3/3 (0)

## Genetic Model (Incomplete)

Note: In the models below, there is a difference between calling "make" and "set" for an instance.
* calling the "make" function will create an instance attribute from scratch everytime
* calling the "set" function will _modify_ an existing attribute everytime

### Helper Functions

In [None]:
def sigmoid(x):
    return 1/(1+np.exp(-x))

In [None]:
# plot of the original sigmoid function
x = np.arange(-10,10,0.1)
y = sigmoid(x)
plt.plot(x,y)
plt.show()

In [None]:
# plot of the transformed sigmoid function
# that will be used for the Task model's skill benefit curve
x = np.arange(0,10,0.1)
y = sigmoid(-(x-5))
plt.plot(x,y)
plt.show()

### Predefined Constants

In [None]:
NUM_LEVELS = 10
TYPES = set()
SKILLS = set()
LEARNING_STYLES = set()

### Task Model

In [None]:
# TODO: print messages
# TODO: implement the difficulty adaptation range
# Note: I have chosen to omit the "Place" parameter because I will not be using it

class Task:
    def make_types(self, **kwargs):
        # key: string (task type)
        # value: int/float (numeric extent to which this task covers the associated type)
        # all values are normalized to add up to 1
        total = sum(kwargs.values())
        self.types = dict()
        for t in kwargs:
            if t in TYPES:
                self.types[t] = kwargs[t]/total
            else:
                print("Type {} has not been predefined and therefore has not been added to the task."\
                      .format(t))
            
    def make_val(self, symbol, val, range=5):
        # setting values such that they are normalized to 0..1
        if symbol == 'E':
            self.entertainment_val = val/range
        elif symbol == 'Q':
            self.quality_val = val/range
        elif symbol == 'L':
            self.length = val/range
        elif symbol == 'IL':
            self.interactivity_level = val/range
        
    def make_skill_relevance(self, **kwargs):
        # key: string (skill name)
        # value: int/float (numeric extent to which this task covers the associated skill)
        # all values are normalized to add up to 1
        total = sum(kwargs.values())
        self.skill_relevance = dict()
        for skill in kwargs:
            if skill in SKILLS:
                self.skill_relevance[skill] = kwargs[skill]/total
            else:
                print("Skill {} has not been predefined and therefore has not been added to the task."\
                      .format(skill))
        
    def calc_skill_benefits(self, arg):
        # for each skill, we calculate skill benefit = sigmoid(-(x-5))
        # x is the player's ability level in the skill
        # the transformed sigmoid is a 0-1 normalized s-curve in the range of x=0..10
        # for now, I will use the same skill benefit function for all skills
        
        if isinstance(arg, User):  # input is a user
            if hasattr(self, 'skill_relevance'):
                skill_benefits = dict()
                for skill in self.skill_relevance:
                    x = arg.skill_levels[skill]  # user's skill level is in range 0..1
                    x = x*10  # scale the skill level to 0..10
                    skill_benefits[skill] = sigmoid(-(x-5))
                return skill_benefits
            else:
                print("Could not calculate skill benefits because make_skill_relevance has not been called.")
            
        elif isinstance(arg, (float,int)):
            # input is a numeric skill level for one skill
            x = arg*(10/NUM_LEVELS)  # scale to range 0..10
            return sigmoid(-(x-5))
    
    def calc_difficulty(self):
        # estimate the difficulty of this task instance
        # as a function of skill benefits, which are weighted by the corresponding skill relevance
        if hasattr(self, 'skill_relevance'):
            sum_outer = 0
            for skill in self.skill_relevance:
                sum_inner = 0
                for j in range(NUM_LEVELS):
                    sum_inner += j*self.calc_skill_benefits(j)
                sum_outer += self.skill_relevance[skill]*sum_inner
            return sum_outer
        else:
            print("Could not calculate difficulty because make_skill_relevance has not been called.")
            
    # TODO: difficulty adaptation range
        
    def make_learning_styles(self, **kwargs):
        # key: string (learning style)
        # value: int/float (numeric extent to which this task covers the associated style)
        # all values are normalized to add up to 1
        total = sum(kwargs.values())
        self.learning_styles = dict()
        for style in kwargs:
            if style in LEARNING_STYLES:
                self.learning_styles[style] = kwargs[style]/total
            else:
                print("Learning style {} has not been predefined and therefore has not been added to the task."\
                      .format(style))
        
    def make_dependencies(self, **args):
        self.dependencies = set()
        for task in args:  # the current Task instance depends on these other Task instances
            self.dependencies.add(task)

### User Model

In [None]:
# Note: I have chosen to omit the "Navigation Ability Level" parameter because I will not be using it
# "Navigation Ability Level" is associated with the Task model's "Place" parameter,
# which I will also not be using

class User:
    def __init__(self):
        self.skill_levels = {skill:0 for skill in SKILLS}
        
        self.type_pref_unscaled = {t:0 for t in TASK_TYPES}
        self.type_pref_scaled = {t:0 for t in TASK_TYPES}
        
        self.skill_pref_unscaled = {s:0 for s in SKILLS}
        self.skill_pref_scaled = {s:0 for s in SKILLS}
        
        self.learning_style_pref_unscaled = {ls:0 for ls in LEARNING_STYLES}
        self.learning_style_pref_scaled = {ls:0 for ls in LEARNING_STYLES}
        
    def set_skill_level(self, skill, level):
        if skill in SKILLS:
            self.skill_levels[skill] = level/NUM_LEVELS  # normalize to range 0..1
        else:
            print("Skill {} has not been predefined and therefore will not be included in the user's skill levels."\
                      .format(skill))
            
    def set_type_pref(self, **kwargs):
        # key: string (task type)
        # value: int/float (numeric preference for the associated type)
        for t in kwargs:
            if t in TYPES:
                self.type_pref_unscaled[t] = kwargs[t]
            else:
                print("Type {} has not been predefined and therefore will not be included in the user's preferences."\
                      .format(t))
        
        # all values in type_pref_unscaled are normalized/scaled to add up to 1
        total = sum(self.types_pref_unscaled.values())
        for t in self.types_pref_unscaled:
            self.type_pref_scaled[t] = self.type_pref_unscaled[t]/total
                
    def set_skill_pref(self, **kwargs):
        # key: string (skill)
        # value: int/float (numeric preference for the associated skill)
        for skill in kwargs:
            if t in SKILLS:
                self.skill_pref_unscaled[skill] = kwargs[skill]
            else:
                print("Skill {} has not been predefined and therefore will not be included in the user's preferences."\
                      .format(skill))
        
        # all values in skill_pref_unscaled are normalized/scaled to add up to 1
        total = sum(self.skill_pref_unscaled.values())
        for t in self.skill_pref_unscaled:
            self.skill_pref_scaled[skill] = self.skill_pref_unscaled[skill]/total
            
    def set_learning_style_pref(self, **kwargs):
        # key: string (skill)
        # value: int/float (numeric preference for the associated learning style)
        for style in kwargs:
            if style in LEARNING_STYLES:
                self.learning_style_pref_unscaled[style] = kwargs[style]
            else:
                print("Learning style {} has not been predefined and therefore will not be included in the user's preferences."\
                      .format(style))
        
        # all values in learning_style_pref_unscaled are normalized/scaled to add up to 1
        total = sum(self.learning_style_pref_unscaled.values())
        for style in self.learning_style_pref_unscaled:
            self.learning_style_pref_scaled[style] = self.learning_style_pref_unscaled[style]/total
                
                

In [None]:
# TODO: user-task parameters