### Imports

In [1]:
import numpy as np
import pandas as pd
import random
import PW_explorer
from PW_explorer.run_clingo import run_clingo
from PW_explorer.load_worlds import load_worlds
from PW_explorer.pwe_nb_helper import ASPRules

In [2]:
%load_ext PWE_NB_Extension

## Implementation

We show how we can use a combination of Answer Set Programming (ASP) and Python for Reinforcement Learning, specifically the Q-Learning alogorithm. We leverage the modelling capabilities of ASP to define, model and drive the environment and its state, and use Python to drive the "learning". This allows us to achieve the best of both worlds and create a truly modular and efficient learning procedure.

We can encode a small GridsWorld instance in ASP as follows:

In [3]:
%%clingo --donot-run --load_combined_input_to grid_worlds_env_rules --donot-display_input

% GRIDS World

#const min_x=0.
#const max_x=5.
#const min_y=0.
#const max_y=5.
#const min_timestep=0.
#const max_timestep=1.

time(min_timestep..max_timestep).
valid_x(min_x..max_x).
valid_y(min_y..max_y).

% schema state(X, Y, TIME)
% temporal state(_,_,T)

%% state(0,0,0).  %% COMES FROM PYTHON LATER
:- state(X1,Y1,T), state(X2,Y2,T), valid_x(X1), valid_x(X2), valid_y(Y1), valid_y(Y2), time(T), X1!=X2.
:- state(X1,Y1,T), state(X2,Y2,T), valid_x(X1), valid_x(X2), valid_y(Y1), valid_y(Y2), time(T), Y1!=Y2.

% schema reward(REWARD)
reward(100) :- state(X,Y,_), reward_state(X,Y).

% schema barrier(X,Y)
reward_state(4, 4).
barrier(3,0).
barrier(1,4).
barrier(4,5).

%% :- not reward(100).


% schema action(ACTION, TIME)
% temporal action(_,T)
%% ACTION COMES FROM PYTHON LATER
%action(up, 0).
%% 1 {action(up, T) ; action(down, T); action(left, T) ; action(right, T)} 1 :- time(T).

state(X, Y, T+1) :- state(X, Y2, T), Y2=Y-1, action(up, T), valid_x(X), valid_y(Y), valid_x(X), valid_y(Y2), time(T), time(T+1), not barrier(X,Y).
state(X, Y, T+1) :- state(X, Y2, T), Y2=Y+1, action(down, T), valid_x(X), valid_y(Y), valid_x(X), valid_y(Y2), time(T), time(T+1), not barrier(X,Y).
state(X, Y, T+1) :- state(X2, Y, T), X2=X+1, action(left, T), valid_x(X), valid_y(Y), valid_x(X2), valid_y(Y), time(T), time(T+1), not barrier(X,Y).
state(X, Y, T+1) :- state(X2, Y, T), X2=X-1, action(right, T), valid_x(X), valid_y(Y), valid_x(X2), valid_y(Y), time(T), time(T+1), not barrier(X,Y).

%:- not reward(100).

#show state/3.
#show action/2.
#show reward/1.

We can now define the learning procedure in Python.

In [4]:
def egreedy_policy(q_values, state, epsilon=0.1):
    if np.random.rand() < epsilon:
        return random.choice(ACTIONS)
    else:
        return ACTIONS[np.argmax(q_values[state[0]][state[1]])]

In [5]:
ACTIONS = ['left', 'right', 'up', 'down']
class Grids_World_Env:
    
    asp_file = 'grids_world.lp4'
    env_rules = [grid_worlds_env_rules] #ASPRules(asp_file).splitlines()
    
    @staticmethod
    def _get_next_state(state, dfs):
        next_state = state
        state_df = dfs['state_3']
        state_df = state_df[state_df['pw']==1]
        state_df = state_df[state_df['TIME']==1]
        if len(state_df) > 0:
            next_state = (int(state_df.iloc[0]['X']), int(state_df.iloc[0]['Y']))
        return next_state
    
    @staticmethod
    def _get_reward(dfs):
        reward = 0
        if 'reward_1' in dfs:
            reward_df = dfs['reward_1']
            reward_df = reward_df[reward_df['pw'] == 1]
            if len(reward_df) > 0:
                reward = int(reward_df.iloc[0]['REWARD'])
        return reward
    
    @staticmethod
    def get_next_state_and_reward(state, action):
        curr_rules = Grids_World_Env.env_rules + ['action({},0).'.format(action), 
                                                  'state({},{},0).'.format(state[0], state[1])]
        
        asp_soln, meta_data = run_clingo(curr_rules)
        dfs, rels, pws = load_worlds(asp_soln, meta_data, 'clingo', silent=True)
        next_state = Grids_World_Env._get_next_state(state, dfs)
        reward = Grids_World_Env._get_reward(dfs)
        return next_state, reward
    
    @staticmethod
    def is_game_over(state, next_state, reward):
        return reward > 0
    
    @staticmethod
    def step(state, action):
        next_state, reward = Grids_World_Env.get_next_state_and_reward(state, action)
        game_over = Grids_World_Env.is_game_over(state, next_state, reward)
        return next_state, reward, game_over

### Experiment

##### Training

We can initilize the experiment as shown below:

In [6]:
num_states = 6
num_actions = 4
learning_rate = 0.1
num_episodes_for_exploration = 20
exploration_epsilon = 0.9
exploitation_epsilon = 0.2
gamma = 0.9
num_episodes_per_iter = 10

env = Grids_World_Env()
q_values = np.zeros((num_states, num_states, num_actions))
total_episodes = 0

Below we have the Q-Learning procedure:

In [7]:
for ep in range(num_episodes_per_iter):
    total_episodes += 1
    state = (0,0)
    num_steps = 0
    done = False
    # While episode is not over
    while not done:
        num_steps += 1
        # print("state:", state)
        # Choose action        
        action = egreedy_policy(q_values, state, 
                                epsilon=exploitation_epsilon 
                                if total_episodes > num_episodes_for_exploration 
                                else exploration_epsilon)
        # print("action:", action)
        # Do the action
        next_state, reward, done = Grids_World_Env.step(state, action)
        # Update q_values        
        td_target = reward + gamma * np.max(q_values[next_state[0]][next_state[1]])
        td_error = td_target - q_values[state[0]][state[1]][ACTIONS.index(action)]
        q_values[state[0]][state[1]][ACTIONS.index(action)] += learning_rate * td_error
        # Update state
        state = next_state
        if done:
            print("Episode {} Done".format(total_episodes))
            print("reward:", reward)
            print("num_steps:", num_steps)

Episode 1 Done
reward: 100
num_steps: 42
Episode 2 Done
reward: 100
num_steps: 362
Episode 3 Done
reward: 100
num_steps: 41
Episode 4 Done
reward: 100
num_steps: 92
Episode 5 Done
reward: 100
num_steps: 81
Episode 6 Done
reward: 100
num_steps: 118
Episode 7 Done
reward: 100
num_steps: 40
Episode 8 Done
reward: 100
num_steps: 10
Episode 9 Done
reward: 100
num_steps: 101
Episode 10 Done
reward: 100
num_steps: 39


##### Testing

Once we have learned a Q-value table, we can freeze it, and then try it out as follows:

In [8]:
# Simulating based on Q-Value table
state = (0,0)
is_game_over = False
while not is_game_over:
    print("State:", state)
    action = ACTIONS[np.argmax(q_values[state[0]][state[1]])]
    print("Action:", action)
    next_state, reward = Grids_World_Env.get_next_state_and_reward(state, action)
    print("Next State:", next_state)
    print("Reward:", reward)
    is_game_over = Grids_World_Env.is_game_over(state, next_state, reward)
    state = next_state
    print("")

State: (0, 0)
Action: right
Next State: (1, 0)
Reward: 0

State: (1, 0)
Action: right
Next State: (2, 0)
Reward: 0

State: (2, 0)
Action: up
Next State: (2, 1)
Reward: 0

State: (2, 1)
Action: up
Next State: (2, 2)
Reward: 0

State: (2, 2)
Action: up
Next State: (2, 3)
Reward: 0

State: (2, 3)
Action: up
Next State: (2, 4)
Reward: 0

State: (2, 4)
Action: right
Next State: (3, 4)
Reward: 0

State: (3, 4)
Action: right
Next State: (4, 4)
Reward: 100



We can easily display the "optimal" learned policy as shown below:

In [9]:
policy = [[ACTIONS[np.argmax(q_values[i][j])] for i in range(num_states)] for j in range(num_states)]
pd.DataFrame(policy)

Unnamed: 0,0,1,2,3,4,5
0,right,right,up,left,up,up
1,right,right,up,up,left,up
2,right,right,up,up,up,left
3,right,right,up,up,left,left
4,up,left,right,right,left,left
5,right,right,down,down,left,left
