In [1]:
import os
from collections import defaultdict
from utils.run_utils import Runner

import gym
import gym_maze

from lcs.agents import Agent
from lcs.agents.acs2 import ACS2, Configuration as CFG_ACS2, ClassifiersList
from lcs.agents.acs2er import ACS2ER, Configuration as CFG_ACS2ER, ReplayMemory, ReplayMemorySample
from lcs.agents.acs2eer import ACS2EER, Configuration as CFG_ACS2EER, TrialReplayMemory
from lcs.metrics import population_metrics

# Logger
import logging
logging.basicConfig(level=logging.INFO)


# EXPERIMENT CONFIGURATION

In [2]:
MAZE = "MazeXYZ-v0" 
EXPLORE_TRIALS = 5000
EXPLOIT_TRIALS = 1000

# The size of ER replay memory buffer
ER_BUFFER_SIZE = 10000
# The minimum number of samples of ER replay memory buffer to start replying samples (warm-up phase)
ER_BUFFER_MIN_SAMPLES = 1000
# The number of samples to be replayed druing ER phase
ER_SAMPLES_NUMBER_LIST = [3]

EER_BUFFER_SIZE = 1000
EER_BUFFER_MIN_SAMPLES = 25


#######

REPEAT_START = 1
REPEAT = 1

EXPERIMENT_NAME = "MAZE_XYZ_EXP_1" # Please edit if running new experiment to do not override saved results.


In [3]:
runner = Runner('MAZE', EXPERIMENT_NAME, MAZE)

## METRICS

In [4]:
MAZE_PATH = 0
MAZE_REWARD = 9

optimal_paths_env = gym.make(MAZE)
matrix = optimal_paths_env.matrix
X = matrix.shape[1]
Y = matrix.shape[0]

def get_reward_pos():
    for i in range(Y):
        for j in range(X):
            if(matrix[i, j] == MAZE_REWARD):
                return(i, j)

def get_possible_neighbour_cords(pos_y, pos_x):
    n = ((pos_y - 1, pos_x), 4)
    ne = ((pos_y - 1, pos_x + 1), 5)
    e = ((pos_y, pos_x + 1), 6)
    se = ((pos_y + 1, pos_x + 1), 7)
    s = ((pos_y + 1, pos_x), 0)
    sw = ((pos_y + 1, pos_x - 1), 1)
    w = ((pos_y, pos_x - 1), 2)
    nw = ((pos_y - 1, pos_x - 1), 3)

    return [n, ne, e, se, s, sw, w, nw]

    
optimal_actions = []

root_node = get_reward_pos()

def is_included(cords, level):
    return any(op_cords[0] == cords[0] and op_cords[1] == cords[1] and level != op_level for op_cords, _, op_level in optimal_actions)


def get_optimal_actions_to(node, level):
    neighbour_cords = get_possible_neighbour_cords(node[0], node[1])

    next_level_cords = []
    for (pos_y, pos_x), action in neighbour_cords:
        if (not is_included((pos_y, pos_x), level)) and matrix[pos_y, pos_x] == MAZE_PATH:
            optimal_actions.append(((pos_y, pos_x), action, level))
            next_level_cords.append((pos_y, pos_x))

    return next_level_cords

LEVEL = 0
next_level_cords = get_optimal_actions_to(root_node, LEVEL)

while len(next_level_cords) > 0:
    LEVEL += 1
    new_next_level_cords = []
    for nlc in next_level_cords:
        new_next_level_cords += get_optimal_actions_to(nlc, LEVEL)

    next_level_cords = new_next_level_cords

positions_actions = defaultdict(set)
for cords, a, _ in optimal_actions: positions_actions[cords].add(a)

positions_actions = positions_actions.items()
POSITIONS_OPTIMAL_ACTIONS = list(map(lambda pa: (optimal_paths_env.env.maze.perception(pa[0]), list(pa[1])), positions_actions))
POSITIONS_OPTIMAL_ACTIONS_LENGTH = len(POSITIONS_OPTIMAL_ACTIONS)



def _maze_optimal(classifiers) -> float:
    nr_correct = 0

    for p0, optimal_actions_list in POSITIONS_OPTIMAL_ACTIONS:
        match_set = classifiers.form_match_set(p0)
        cl = match_set.get_best_classifier()

        if cl is not None and optimal_actions_list.count(cl.action) > 0:
            nr_correct += 1

    return nr_correct / POSITIONS_OPTIMAL_ACTIONS_LENGTH * 100.0


def _maze_optimal_reliable(classifiers) -> float:
    return _maze_optimal(ClassifiersList(*[c for c in classifiers if c.is_reliable()]))


In [5]:
def _get_transitions():
    knowledge_env = gym.make(MAZE)
    transitions = knowledge_env.env.get_transitions()
    transitions = list(map(lambda t: [knowledge_env.env.maze.perception(t[0]), t[1], knowledge_env.env.maze.perception(t[2])], transitions))

    return transitions

TRANSITIONS = _get_transitions()
TRANSITIONS_LENGTH = len(TRANSITIONS)

def _maze_knowledge(population) -> float:
    # Take into consideration only reliable classifiers
    reliable_classifiers = [c for c in population if c.is_reliable()]

    # Count how many transitions are anticipated correctly
    nr_correct = 0

    # For all possible destinations from each path cell
    for p0, action, p1 in TRANSITIONS:
        if any([True for cl in reliable_classifiers
                if cl.predicts_successfully(p0, action, p1)]):
            nr_correct += 1

    return nr_correct / TRANSITIONS_LENGTH * 100.0

def _maze_specificity(population) -> float:
    pop_len = len(population)
    if(pop_len) == 0:
        return 0
    return sum(map(lambda c: c.specificity, population)) / pop_len

def _maze_metrics(agent, env):
    pop = agent.population
    metrics = {
        'knowledge': _maze_knowledge(pop),
        "specificity": _maze_specificity(agent.population),
        "optimal": _maze_optimal(agent.population),
        "optimal_reliable": _maze_optimal_reliable(agent.population)
    }
    metrics.update(population_metrics(pop, env))
    
    return metrics

## EXPERIMENT

In [6]:
def _run_experiment(agent, path):
    runner.run_experiment(agent, gym.make(MAZE), EXPLORE_TRIALS, EXPLOIT_TRIALS, path)

def run_acs2_experiment():
    for i in range(REPEAT_START, REPEAT_START + REPEAT):
        # Create agent 
        cfg = CFG_ACS2(    
            classifier_length=8,
            number_of_possible_actions=8,
            metrics_trial_frequency=1,
            user_metrics_collector_fcn=_maze_metrics)
        agent = ACS2(cfg)

        _run_experiment(agent, f'{i}')

def _run_acs2er_experiment(er_samples_number: int):
    for i in range(REPEAT_START, REPEAT_START + REPEAT):
        # Create agent 
        cfg = CFG_ACS2ER(    
            classifier_length=8,
            number_of_possible_actions=8,
            metrics_trial_frequency=1,
            er_buffer_size=ER_BUFFER_SIZE,
            er_min_samples=ER_BUFFER_MIN_SAMPLES,
            er_samples_number=er_samples_number,
            user_metrics_collector_fcn=_maze_metrics)
        agent = ACS2ER(cfg)

        _run_experiment(agent, os.path.join(f'm_3-ER', f'{i}'))

def run_acs2er_experiments():
    for er_samples_number in ER_SAMPLES_NUMBER_LIST:
        print(f"START - ACS2ER - {er_samples_number}")
        _run_acs2er_experiment(er_samples_number)
        print(f"END - ACS2ER - {er_samples_number}")


def _run_acs2eer_experiment(er_samples_number: int):
    for i in range(REPEAT_START, REPEAT_START + REPEAT):
        # Create agent 
        cfg = CFG_ACS2EER(           
            classifier_length=8,
            number_of_possible_actions=8,
            metrics_trial_frequency=1,
            er_buffer_size=EER_BUFFER_SIZE,
            er_min_samples=int(EER_BUFFER_MIN_SAMPLES),
            er_samples_number=er_samples_number,
            user_metrics_collector_fcn=_maze_metrics)
        agent = ACS2EER(cfg)

        _run_experiment(agent, os.path.join(f'm_3-EER', f'{i}'))

def run_acs2eer_experiments():
    for er_samples_number in ER_SAMPLES_NUMBER_LIST:
        print(f"START - ACS2EER - {er_samples_number}")
        _run_acs2eer_experiment(er_samples_number)
        print(f"END - ACS2EER - {er_samples_number}")

### RUN ACS2 Experiments

In [7]:
run_acs2_experiment()

INFO:lcs.agents.Agent:{'trial': 500, 'steps_in_trial': 11, 'reward': 1000, 'perf_time': 0.037226700000019264, 'knowledge': 51.008645533141205, 'specificity': 0.7758620689655172, 'optimal': 60.67415730337079, 'optimal_reliable': 43.82022471910113, 'population': 986, 'numerosity': 986, 'reliable': 384}
INFO:lcs.agents.Agent:{'trial': 1000, 'steps_in_trial': 22, 'reward': 1000, 'perf_time': 0.07720150000000103, 'knowledge': 68.87608069164266, 'specificity': 0.7843620331950207, 'optimal': 64.04494382022472, 'optimal_reliable': 68.53932584269663, 'population': 964, 'numerosity': 964, 'reliable': 522}


KeyboardInterrupt: 

### RUN ACS2ER Experiments

In [None]:
run_acs2er_experiments()

START - ACS2ER - 3


INFO:lcs.agents.Agent:{'trial': 1000, 'steps_in_trial': 23, 'reward': 1000, 'perf_time': 0.3538936999998441, 'knowledge': 78.6046511627907, 'specificity': 0.7293595679012346, 'optimal': 79.3103448275862, 'optimal_reliable': 81.03448275862068, 'population': 648, 'numerosity': 648, 'reliable': 472}
INFO:lcs.agents.Agent:{'trial': 2000, 'steps_in_trial': 4, 'reward': 1000, 'perf_time': 0.05758299999979499, 'knowledge': 93.48837209302326, 'specificity': 0.7366144975288303, 'optimal': 79.3103448275862, 'optimal_reliable': 79.3103448275862, 'population': 607, 'numerosity': 607, 'reliable': 535}
INFO:lcs.agents.Agent:{'trial': 3000, 'steps_in_trial': 17, 'reward': 1000, 'perf_time': 0.30541649999941, 'knowledge': 95.34883720930233, 'specificity': 0.7389643463497453, 'optimal': 84.48275862068965, 'optimal_reliable': 84.48275862068965, 'population': 589, 'numerosity': 589, 'reliable': 547}
INFO:lcs.agents.Agent:{'trial': 4000, 'steps_in_trial': 26, 'reward': 1000, 'perf_time': 0.361430199999631

END - ACS2ER - 3


In [None]:
run_acs2eer_experiments()

START - ACS2EER - 3


INFO:lcs.agents.Agent:{'trial': 1000, 'steps_in_trial': 14, 'reward': 1000, 'perf_time': 1.4907734999997047, 'knowledge': 85.11627906976744, 'specificity': 0.7476679104477612, 'optimal': 79.3103448275862, 'optimal_reliable': 77.58620689655173, 'population': 536, 'numerosity': 536, 'reliable': 439}
INFO:lcs.agents.Agent:{'trial': 2000, 'steps_in_trial': 16, 'reward': 1000, 'perf_time': 0.3009994000003644, 'knowledge': 92.09302325581396, 'specificity': 0.75, 'optimal': 84.48275862068965, 'optimal_reliable': 84.48275862068965, 'population': 521, 'numerosity': 521, 'reliable': 468}
INFO:lcs.agents.Agent:{'trial': 3000, 'steps_in_trial': 3, 'reward': 1000, 'perf_time': 0.17036390000066604, 'knowledge': 96.27906976744185, 'specificity': 0.7531800391389433, 'optimal': 86.20689655172413, 'optimal_reliable': 86.20689655172413, 'population': 511, 'numerosity': 511, 'reliable': 483}
INFO:lcs.agents.Agent:{'trial': 4000, 'steps_in_trial': 12, 'reward': 1000, 'perf_time': 0.1730215000006865, 'knowl

END - ACS2EER - 3
