In [1]:
import os
print(os.getcwd())
def update_working_directory():
    from pathlib import Path
    p = Path(os.getcwd()).parents[0]
    os.chdir(p)
    print(p)
update_working_directory()

/Users/admin/Projects/doggo/notebooks
/Users/admin/Projects/doggo


# Importing the required libraries

In [2]:
import numpy as np
import pandas as pd
import gym
import time
import math
import statistics
from tqdm import tqdm
import random
from collections import defaultdict

# Building the environment

## Reset

In [3]:
decimals_state = 2
def get_state_id(dog_state):
    return '{:01.4f}_{:01.4f}_{:01.4f}_{}'.format(
        dog_state['food'], dog_state['fat'], dog_state['affection'], dog_state['can_action_be_taken'])

In [4]:
def env_reset():
    
    dog_state = {
        'food': 0.5,
        'fat': 0,
        'affection': 0.5,
        'last_action_taken': 0,
        'minutes_since_last_action': 0,
        'can_action_be_taken': True
        }
    
    dog_state['state_id'] = get_state_id(dog_state)
    
    return dog_state

In [5]:
env_reset()

{'food': 0.5,
 'fat': 0,
 'affection': 0.5,
 'last_action_taken': 0,
 'minutes_since_last_action': 0,
 'can_action_be_taken': True,
 'state_id': '0.5000_0.0000_0.5000_True'}

## Observation and action spaces

In [6]:
# n_states = env.observation_space.n
# n_states = 11*11*11
# n_actions = env.action_space.n
n_actions= 4

## Next state

In [7]:
WALKING_TIME = 15
EATING_TIME = 1
PLAYING_TIME = 4

food_consumption_rate = 1.0 / (30 * 3600)
affection_consumption_rate = 1.0 / (50 * 3600)
walking_fat_converge_rate = 0.2
walking_affection_converge_rate = 0.4
playing_fat_converge_rate = 0.1
playing_affection_converge_rate = 0.20
eating_food_increase = 0.6
eating_fat_increase = 0.25

In [8]:
def round_up(n, decimals=0):
    multiplier = 10 ** decimals
    return math.ceil(n * multiplier) / multiplier
def round_down(n, decimals=0):
    multiplier = 10 ** decimals
    return math.floor(n * multiplier) / multiplier

In [9]:
def apply_decreasing_rate(value: float, rate: float) -> float:
    """
    Apply a decreasing rate to a value
    :param value: current value
    :param rate: per second
    :return: updated value
    """
    return value - (60 * rate)

def converge(value: float, target: float, ratio: float) -> float:
    diff: float = (target - value) * ratio
    return value + diff


def update_food(dog_state):
    update_food = apply_decreasing_rate(dog_state['food'], food_consumption_rate)
    return round_down(max(0.0, update_food), decimals=decimals_state)

def update_fat(dog_state):
    update_fat = dog_state['fat']
    return update_fat

def update_affection(dog_state):
    update_affection = apply_decreasing_rate(dog_state['affection'], affection_consumption_rate)
    return round_down(max(0.0, update_affection), decimals=decimals_state)


def update_if_walking(dog_state):
    update_fat = round_down(converge(dog_state['fat'], 0.0, walking_fat_converge_rate), decimals=decimals_state)
    update_affection = round_up(converge(dog_state['affection'], 1.0, walking_affection_converge_rate), decimals=decimals_state)
    return (update_fat, update_affection)

def update_if_feeding(dog_state):
    update_food = round_up(min(dog_state['food'] + eating_food_increase, 1.0), decimals=decimals_state)
    update_fat = round_up(min(dog_state['fat'] + eating_fat_increase, 1.0), decimals=decimals_state)
    return (update_food, update_fat)

def update_if_playing(dog_state):
    update_fat = round_down(converge(dog_state['fat'], 0.0, playing_fat_converge_rate), decimals=decimals_state)
    update_affection = round_up(converge(dog_state['affection'], 1.0, playing_affection_converge_rate), decimals=decimals_state)
    return (update_fat, update_affection)


def get_happiness(dog_state):
    happiness = min(dog_state['food'], 1.0 - dog_state['fat'], dog_state['affection'])
    return happiness


def update_done(dog_state):
    happiness = get_happiness(dog_state)
    return happiness <= 0.0

In [10]:
# state2, reward1, done, info = env.step(action1)
def env_step(state1, action):
    
    state2 = state1.copy()
    reward_penalty = 0
    
    # Affect of time
    state2['food'] = update_food(state2)
    state2['fat'] = update_fat(state2)
    state2['affection'] = update_affection(state2)
    state2['minutes_since_last_action'] += 1 
    
    # Applying action
    if action != 0:
        if state2['can_action_be_taken']:
            reward_penalty += 0.1
            state2['can_action_be_taken'] = False
            state2['minutes_since_last_action'] = 0
            state2['last_action_taken'] = action
        else:
            reward_penalty += 0.5

    # Affect of actions
    if (state2['last_action_taken'] == 1) & (state2['minutes_since_last_action'] == WALKING_TIME):
        state2['fat'], state2['affection'] = update_if_walking(state2)
        state2['can_action_be_taken'] = True

    if (state2['last_action_taken'] == 2) & (state2['minutes_since_last_action'] == EATING_TIME):
        state2['food'], state2['fat'] = update_if_feeding(state2)
        state2['can_action_be_taken'] = True

    if (state2['last_action_taken'] == 3) & (state2['minutes_since_last_action'] == PLAYING_TIME):
        state2['fat'], state2['affection'] = update_if_playing(state2)
        state2['can_action_be_taken'] = True
                    
    done = update_done(state2)
    if done:
        reward = -10
    else:
        reward = min(state2['food'], 1.0 - state2['fat'], state2['affection']) - reward_penalty
    
    info = None
    
    state2['state_id'] = get_state_id(state2)
    
    return (state2, reward, done, info)

## Render

In [11]:
def env_render(dog_state, action, Q):
    print(dog_state)
    print(action)
    print(Q[dog_state['state_id']])

# Defining utility functions to be used in the learning process

## Initialising Q

In [12]:
def init_Q(n_actions, init_Q_type="ones"):
    """
    @param n_actions the number of actions
    @param type random, ones or zeros for the initialization
    """
    if init_Q_type == "ones":
        default_Q_values = np.ones(n_actions)
    elif init_Q_type == "random":
        default_Q_values = np.random.random(n_actions)
    elif init_Q_type == "zeros":
        default_Q_values = np.zeros(n_actions)
    
    def get_default_Q_values():
        return default_Q_values

    return defaultdict(get_default_Q_values)

## Choose an action

In [13]:
# Numpy generator
rng = np.random.default_rng()  # Create a default Generator.

In [14]:
def select_best_action(Q_state):
    winner = np.argwhere(Q_state == np.amax(Q_state))
    winner_list = winner.flatten().tolist()
    action = random.choice(winner_list)
    return action

In [15]:
def epsilon_greedy(Q, state_id, n_actions, epsilon):
    """
    @param Q Q values {state, action} -> value
    @param epsilon for exploration
    @param n_actions number of actions
    @param state state at time t
    """
    if rng.uniform(0, 1) < epsilon:
        action = np.random.randint(0, n_actions)
    else:
        action = select_best_action(Q[state_id])
    
    return action

## Update Q-matrice (state-action value function)

In [16]:
# Function to learn the Q-value  - Is it temporal-difference?
def update(state1_id, action1, reward1, state2_id, action2, expected=False):
    
    previous_Q_value_state1 = Q[state1_id].copy()
    
    predict = Q[state1_id][action1] 
    
    target = reward1 + gamma * Q[state2_id][action2] 
    if expected:
        expected_value = np.mean(Q[state2_id])
        target = reward1 + gamma * expected_value
    
    new_Q_value = Q[state1_id][action1] + alpha * (target - predict)
    previous_Q_value_state1[action1] = new_Q_value
    
    Q[state1_id] = previous_Q_value_state1
        
    return Q

## Updating parameters

### Epsilon $\epsilon$ - Exploration rate

In [17]:
# Exploration rate

def get_epsilon(episode, init_epsilon, divisor=25):
    
    n_epsilon = init_epsilon/(episode/10000+1)
    # n_epsilon = min(1, 1.0 - math.log10((episode + 1) / divisor))
    
    return n_epsilon

### Alpha $\alpha$ - Learning rate

In [18]:
# Learning rate

def get_alpha(episode, init_alpha, divisor=25):
    
    n_alpha = init_alpha/(episode/10000+1)
    # n_alpha = min(1.0, 1.0 - math.log10((episode + 1) / divisor))
    
    return n_alpha

## Plots Reward / Steps

In [19]:
import numpy as np
def running_mean(x, N):
    cumsum = np.cumsum(np.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)

In [20]:
import numpy as np
import matplotlib.pyplot as plt

def plot_evolution_reward(evolution_reward):
    
    n_moving_points = int(np.ceil(len(evolution_reward)/100))
    y = running_mean(evolution_reward,n_moving_points)
    x = range(len(y))

    plt.plot(x, y)
    plt.title('Evolution of Reward over time (smoothed over window size 100)')
    plt.xlabel('Episode') # will add a label “Year” to your x-axis
    plt.ylabel('Episode Reward (Smoothed)') # will add a label “Population” to your y-axis
    plt.xticks() # set the numbers on the x-axis to be 1, 2, 3, 4, 5. We can also pass and labels as a second argument. For, example, if we use this code plt.xticks([1, 2, 3, 4, 5], ["1M", "2M", "3M", "4M", "5M"]), it will set the labels 1M, 2M, 3M, 4M, 5M on the x-axis.
    plt.yticks() # - works the same as plt.xticks(), but for the y-axis.
    plt.grid(True)
    plt.show()

In [21]:
import numpy as np
import matplotlib.pyplot as plt

def plot_evolution_steps(evolution_steps):
    
    n_moving_points = int(np.ceil(len(evolution_steps)/100))
    y = running_mean(evolution_steps,n_moving_points)
    x = range(len(y))

    plt.plot(x, y)
    plt.title('Episode length over time (smoothed over window size 100)')
    plt.xlabel('Episode') # will add a label “Year” to your x-axis
    plt.ylabel('Episode Length (Smoothed)') # will add a label “Population” to your y-axis
    plt.xticks() # set the numbers on the x-axis to be 1, 2, 3, 4, 5. We can also pass and labels as a second argument. For, example, if we use this code plt.xticks([1, 2, 3, 4, 5], ["1M", "2M", "3M", "4M", "5M"]), it will set the labels 1M, 2M, 3M, 4M, 5M on the x-axis.
    plt.yticks() # - works the same as plt.xticks(), but for the y-axis.
    plt.grid(True)
    plt.show()

In [22]:
import numpy as np
import matplotlib.pyplot as plt

def plot_evolution_happiness(evolution_happiness_all):
    
    n_moving_points = int(np.ceil(len(evolution_happiness_all)/100))
    y = running_mean(evolution_happiness_all,n_moving_points)
    x = range(len(y))

    plt.plot(x, y)
    plt.title('Happiness over time (smoothed)')
    plt.xlabel('Episode') # will add a label “Year” to your x-axis
    plt.ylabel('Happiness (Smoothed)') # will add a label “Population” to your y-axis
    plt.xticks() # set the numbers on the x-axis to be 1, 2, 3, 4, 5. We can also pass and labels as a second argument. For, example, if we use this code plt.xticks([1, 2, 3, 4, 5], ["1M", "2M", "3M", "4M", "5M"]), it will set the labels 1M, 2M, 3M, 4M, 5M on the x-axis.
    plt.yticks() # - works the same as plt.xticks(), but for the y-axis.
    plt.grid(True)
    plt.show()

# Initializing different parameters

In [23]:
# Defining the different parameters 
init_epsilon = 1 # trade-off exploration/exploitation - better if decreasing
init_alpha = 0.5 # learning rate, better if decreasing

# Specific to environment
gamma = 0.95 # discount for future rewards (also called decay factor)
# n_states = env.observation_space.n # useless
n_actions = 4

# Episodes
n_episodes = 1000000
nmax_steps = 60*24*30 # maximum steps per episode

# Initializing the Q-matrix 
Q = init_Q(n_actions, init_Q_type="zeros")

# Training the learning agent

In [24]:
# Visualisation
(render_episode, render_training) = (False, False)
n_episodes_plot = int(np.ceil(n_episodes/100))

In [25]:
# Initializing the reward
evolution_reward = []
evolution_steps = []
evolution_happiness_all = []

In [1]:
# Starting the SARSA learning 
for episode in tqdm(range(n_episodes)):
    
    n_episode_steps = 0
    episode_reward = 0
    evolution_happiness = []
    done = False
    
    state1 = env_reset()
    evolution_happiness.append(get_happiness(state1))
    action1 = epsilon_greedy(Q, state1['state_id'], n_actions, init_epsilon)
    
    while (not done) and (n_episode_steps < nmax_steps):
    
        # Update parameters
        epsilon = get_epsilon(episode, init_epsilon)
        alpha = get_alpha(episode, init_alpha)
    
        # Visualizing the training
        if render_training:
            env_render(state1, action1, Q)
    
        # Getting the next state 
        state2, reward1, done, info = env_step(state1, action1)
        episode_reward += reward1
        evolution_happiness.append(get_happiness(state2))
    
        # Choosing the next action
        action2 = epsilon_greedy(Q, state2['state_id'], n_actions, epsilon)

        # Learning the Q-value
        Q = update(state1['state_id'], action1, reward1, state2['state_id'], action2)

        # Updating the respective values 
        state1 = state2 
        action1 = action2
        n_episode_steps += 1
        
    # At the end of learning process 
    if render_episode:
        print('Episode {0}, Score: {1}, Timesteps: {2}, Epsilon: {3}, Alpha: {4}'.format(
            episode+1, episode_reward, n_episode_steps, epsilon, alpha))
    
    evolution_reward.append(episode_reward)
    evolution_steps.append(n_episode_steps)
    evolution_happiness_all.append(np.mean(evolution_happiness))
    
    if ((episode+1) % n_episodes_plot == 0):
        plot_evolution_reward(evolution_reward)
        plot_evolution_steps(evolution_steps)
        plot_evolution_happiness(evolution_happiness_all)

NameError: name 'tqdm' is not defined

In [None]:
pct_state_visited = len(Q)/(101*101*101)*100
print(pct_state_visited)

In [None]:
Q

# Evaluating the performance

## Mean reward

In [None]:
# Evaluating the performance 
print ("Performance : ", sum(evolution_reward)/n_episodes) 

## Evolution of Reward overtime

In [None]:
plot_evolution_reward(evolution_reward)

In [None]:
plot_evolution_steps(evolution_steps)

## Evaluation through episode

### One

In [None]:
# Variables
nmax_steps = 20000

n_episode_steps = 0
evolution_episode_reward = []
done = False

# Start episode and get initial observation
state = env_reset()

while (not done) and (n_episode_steps < nmax_steps):

    # Get an action (0:Left, 1:Down, 2:Right, 3:Up)
    action = select_best_action(Q_state = Q[state['state_id']])

    # Perform a step
    state, reward, done, info = env_step(state, action)

    # Update score
    evolution_episode_reward.append(reward)
    n_episode_steps += 1

print('Test Episode, Score: {0}, Timesteps: {1}'.format(
    sum(evolution_episode_reward)/n_episode_steps, n_episode_steps))

plot_evolution_reward(evolution_episode_reward)

### Multiple

In [None]:
# Variables
episodes = 100
nmax_steps = 20000
total_reward = []

In [None]:
# Loop episodes
for episode in range(episodes):

    n_episode_steps = 0
    evolution_episode_reward = []
    evolution_dog_happines = []
    done = False
    
    # Start episode and get initial observation
    state = env_reset()
    evolution_dog_happines.append(get_happiness(state))
    
    while (not done) and (n_episode_steps < nmax_steps):

        # Get an action (0:Left, 1:Down, 2:Right, 3:Up)
        action = select_best_action(Q_state = Q[state['state_id']])
        
        # Perform a step
        state, reward, done, info = env_step(state, action)
        evolution_dog_happines.append(get_happiness(state))
        
        # Update score
        evolution_episode_reward.append(reward)
        n_episode_steps += 1

    print('Episode {0}, Score: {1}, Timesteps: {2}'.format(
        episode+1, sum(evolution_episode_reward)/n_episode_steps, n_episode_steps))
    
    plot_evolution_reward(evolution_dog_happines)
    total_reward.append(sum(evolution_dog_happines)/n_episode_steps)

# Print the score
print('--- Evaluation ---')
print ('Score: {0} +/- {1}'.format(np.mean(total_reward), statistics.stdev(total_reward)))
print()

In [None]:
state1

In [None]:
Q

In [None]:
state0 = env_reset()

In [None]:
state1, reward, _, _  = env_step(state0, action = 0)

In [None]:
state1, reward, _, _  = env_step(state1, action = 0)

In [None]:
state1

In [None]:
for i in range(100):
    state1, reward, done, _  = env_step(state1, action = 2)
    print(state1)
    print(done)

In [None]:
state1

In [None]:
get_happiness(state1)

In [27]:
import dill

ModuleNotFoundError: No module named 'dill'

In [28]:
import pickle

In [30]:
pickle.dumps(Q)

AttributeError: Can't pickle local object 'init_Q.<locals>.get_default_Q_values'