In [None]:
import numpy as np
import pandas as pd

# Agente simples
Espaço de percepção é 1d
Espaço de ação é 1d

In [None]:
# Classe do agente

def generate_matrix(percept_space, actuator_space):
  row_dim = len(percept_space)
  column_dim = len(actuator_space)
  matrix = np.matrix(np.zeros([row_dim, column_dim]))
  return matrix

def update_matrix(percept, action, percept_space, actuator_space, matrix, factor):
  percept_index = np.where(percept_space == percept)
  actuator_index = np.where(actuator_space == action)
  matrix[percept_index, actuator_index] += factor

def update_weight_matrix(weight_matrix, reward_matrix, forget_factor):
  weight_matrix = weight_matrix - forget_factor*(weight_matrix - 1) + reward_matrix
  return weight_matrix

def make_decision(weight_matrix, percept, percept_space, actuator_space):
  percept_index = np.where(percept_space == percept)
  prob_decision = weight_matrix[percept_index]/weight_matrix[percept_index].sum()
  action = np.random.choice(actuator_space, p = np.array(prob_decision)[0])
  return action

In [None]:
# Ambiente

# Jogo da invasao

percept_space = np.array([0,1])
actuator_space = np.array([0,1])

# Parâmetros
forget_factor = 0.1

def generate_reward(percept, action):
  if action == percept:
    return 1
  else:
    return 0

In [None]:
# Inicializando matrizes
interaction_matrix = generate_matrix(percept_space, actuator_space)
weight_matrix = generate_matrix(percept_space, actuator_space) + 1

reward_memory = []

rodadas = 300
for i in range(rodadas):
  # Matriz de recompensa da rodada
  reward_matrix = generate_matrix(percept_space, actuator_space)

  # Gerando percepção
  percept = np.random.choice(percept_space)

  # Gerando ação
  action = make_decision(weight_matrix, percept, percept_space, actuator_space)

  # Atualizando matriz de interação
  update_matrix(percept, action, percept_space, actuator_space, interaction_matrix, 1)

  # Gerando recompensa
  reward = generate_reward(percept, action)
  reward_memory.append([i+1, percept,action,reward,forget_factor])
  update_matrix(percept, action, percept_space, actuator_space, reward_matrix, reward)

  # Atualizando pesos
  weight_matrix = update_weight_matrix(weight_matrix, reward_matrix, forget_factor)
_ = pd.DataFrame(reward_memory, columns = ['rodada','percept','action','reward','forget_factor'])
_['blocking_efficiency'] = _['reward'].cumsum()/_['rodada']

In [None]:
db = pd.DataFrame()
for forget_factor in [0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1]:
  # Inicializando matrizes
  interaction_matrix = generate_matrix(percept_space, actuator_space)
  weight_matrix = generate_matrix(percept_space, actuator_space) + 1

  reward_memory = []

  rodadas = 500
  for i in range(rodadas):
    # Matriz de recompensa da rodada
    reward_matrix = generate_matrix(percept_space, actuator_space)

    # Gerando percepção
    percept = np.random.choice(percept_space)

    # Gerando ação
    action = make_decision(weight_matrix, percept, percept_space, actuator_space)

    # Atualizando matriz de interação
    update_matrix(percept, action, percept_space, actuator_space, interaction_matrix, 1)

    # Gerando recompensa
    reward = generate_reward(percept, action)
    reward_memory.append([i+1, percept,action,reward,forget_factor])
    update_matrix(percept, action, percept_space, actuator_space, reward_matrix, reward)

    # Atualizando pesos
    weight_matrix = update_weight_matrix(weight_matrix, reward_matrix, forget_factor)
  _ = pd.DataFrame(reward_memory, columns = ['rodada','percept','action','reward','forget_factor'])
  _['blocking_efficiency'] = _['reward'].cumsum()/_['rodada']
  db = pd.concat([db,_])

In [None]:
import altair as alt
alt.data_transformers.disable_max_rows()

DataTransformerRegistry.enable('default')

In [None]:
alt.Chart(db).mark_line().encode(
    x = 'rodada',
    y = 'blocking_efficiency',
    color = 'forget_factor:N'
)

# Ambiente com mais de um grupo de ações

In [None]:
def percept_preprocess(observation, num_percepts_list): # preparing for creating a percept
		"""Takes a multi-feature percept and reduces it to a single integer index.
        Input: list of integers >=0, of the same length as self.num_percept_list;
            respecting the cardinality specified by num_percepts_list: observation[i]<num_percepts_list[i] (strictly)
            Output: single integer."""
		percept = 0
		for which_feature in range(len(observation)):
			percept += int(observation[which_feature] * np.prod(num_percepts_list[:which_feature]))
		return percept

def update_matrix(percept, action, actions, matrix, factor):
  actuator_index = np.where(actions == action)
  matrix[percept, actuator_index] += factor

def update_weight_matrix(matrix, reward_matrix, forget_factor):
  matrix = matrix - forget_factor*(matrix - 1) + reward_matrix
  return matrix

def make_decision(h_matrix, percept, num_percepts_list, actions):
  prob_decision = h_matrix[percept]/h_matrix[percept].sum()
  action = np.random.choice(actions, p = np.array(prob_decision))
  return action

In [None]:
percept_space_1 = [0,1]
percept_space_2 = [0,1]

num_percept_space_1 = len(percept_space_1)
num_percept_space_2 = len(percept_space_2)

percepts = np.array(np.meshgrid(percept_space_1, percept_space_2)).T.reshape(-1,2)
num_percepts_list = [num_percept_space_1, num_percept_space_2]
num_percepts = len(percept_space)

actions = [0,1]
num_actions = len(actions)

# Parâmetros
forget_factor = 0.1

# Jogo da invasão com possibilidade de mentira
def generate_reward(observation, action):
  # se a ação é igual a observação e o invasor não mentiu ou o invasor mentiu e a ação foi contrária
  if ((action == observation[0]) & (observation[1] == 0)) | ((action != observation[0]) & (observation[1] == 1)) :
    return 1
  else:
    return 0

In [None]:
reward_memory = []
h_matrix = np.ones((num_percepts, num_actions), dtype=np.float64)
g_matrix = np.zeros((num_percepts, num_actions), dtype=np.float64)

rodadas = 300
for i in range(rodadas):
  # Matriz de recompensa da rodada
  reward_matrix = np.zeros((num_percepts, num_actions), dtype=np.float64)

  # Gerando percepção
  observation = percepts[np.random.choice(np.arange(num_percepts))]
  percept = percept_preprocess(observation, num_percepts_list)

  # Gerando ação
  action = make_decision(h_matrix, percept, percept_space, actions)

  # Atualizando matriz de interação
  update_matrix(percept, action, actions, g_matrix, 1)

  # Gerando recompensa
  reward = generate_reward(observation, action)
  reward_memory.append([i+1, observation, action, reward, forget_factor])
  update_matrix(percept, action, actuator_space, reward_matrix, reward)

  # Atualizando pesos
  h_matrix = update_weight_matrix(h_matrix, reward_matrix, forget_factor)
_ = pd.DataFrame(reward_memory, columns = ['rodada','percept','action','reward','forget_factor'])
_['blocking_efficiency'] = _['reward'].cumsum()/_['rodada']

In [None]:
db = pd.DataFrame()
for forget_factor in [0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1]:
  reward_memory = []
  h_matrix = np.ones((num_percepts, num_actions), dtype=np.float64)
  g_matrix = np.zeros((num_percepts, num_actions), dtype=np.float64)

  rodadas = 1000
  for i in range(rodadas):
    # Matriz de recompensa da rodada
    reward_matrix = np.zeros((num_percepts, num_actions), dtype=np.float64)

    # Gerando percepção
    observation = percepts[np.random.choice(np.arange(num_percepts))]
    percept = percept_preprocess(observation, num_percepts_list)

    # Gerando ação
    action = make_decision(h_matrix, percept, percept_space, actions)

    # Atualizando matriz de interação
    update_matrix(percept, action, actions, g_matrix, 1)

    # Gerando recompensa
    reward = generate_reward(observation, action)
    reward_memory.append([i+1, observation, action, reward, forget_factor])
    update_matrix(percept, action, actuator_space, reward_matrix, reward)

    # Atualizando pesos
    h_matrix = update_weight_matrix(h_matrix, reward_matrix, forget_factor)
  _ = pd.DataFrame(reward_memory, columns = ['rodada','percept','action','reward','forget_factor'])
  _['blocking_efficiency'] = _['reward'].cumsum()/_['rodada']
  db = pd.concat([db,_])

In [None]:
alt.Chart(db).mark_line().encode(
    x = 'rodada',
    y = 'blocking_efficiency',
    color = 'forget_factor'
)

# Criando classes para o agente e o ambiente

In [408]:
import numpy as np

In [523]:
class Agent(object):
  def __init__(self, action_set, num_percept_set, gamma_damping, eta_glow_damping):
    self.actions = action_set
    self.num_actions = len(self.actions)
    self.num_percept_set = num_percept_set
    self.num_percepts = int(np.prod(np.array(self.num_percept_set).astype(np.float64)))

    self.gamma_damping = gamma_damping
    self.eta_glow_damping = eta_glow_damping

    self.h_matrix = np.ones((self.num_percepts, self.num_actions), dtype=np.float64)
    self.g_matrix = np.zeros((self.num_percepts, self.num_actions), dtype=np.float64)

  def percept_preprocess(self, observation):
    percept = 0
    for which_feature in range(len(observation)):
      percept += int(observation[which_feature] * np.prod(self.num_percept_set[:which_feature]))
    return percept

  def learn_and_deliberate(self, observation, reward):
    self.h_matrix = self.h_matrix - self.gamma_damping*(self.h_matrix - 1) + self.g_matrix*reward
    percept = self.percept_preprocess(observation)
    action = self.deliberate(percept)
    self.g_matrix = (1 - self.eta_glow_damping)*self.g_matrix
    self.g_matrix[percept, np.where(self.actions == action)] = 1
    return action

  def deliberate(self, percept):
    prob_decision = self.h_matrix[percept]/np.sum(self.h_matrix[percept])
    action = np.random.choice(self.actions, p = np.array(prob_decision))
    return action

In [544]:
# Jogo da invasão simples
invasor_action_set_1 = [0,1] #representa os movimentos do invasor
invasor_action_list = [invasor_action_set_1]

class TaskEnvironment(object):
  def __init__(self, action_set):
    self.action_set = action_set
    self.num_action_set = [len(action_space) for action_space in self.action_set]
    self.actions = np.array(np.meshgrid(*zip(action_space for action_space in self.action_set))).T.reshape(-1,len(self.action_set))
    self.num_actions = len(self.actions)

    self.next_state = self.actions[np.random.choice(self.num_actions)]

  def move(self):
    self.next_state = self.actions[np.random.choice(self.num_actions)]

  def generate_reward(self, observation, action):
    # se a ação é igual a observação e o invasor não mentiu ou o invasor mentiu e a ação foi contrária
    self.move()
    if (action == observation[0]):
      return 1
    else:
      return 0


action_set = [0,1] #direita ou esquerda

In [541]:
# Jogo da invasão com mentira
invasor_action_set_1 = [0,1] #representa os movimentos do invasor
invasor_action_set_2 = [0,1] #representa se o invasor está blefando (1) ou não (0)
invasor_action_list = [
    invasor_action_set_1, invasor_action_set_2
    ]

class TaskEnvironment(object):
  def __init__(self, action_set):
    self.action_set = action_set
    self.num_action_set = [len(action_space) for action_space in self.action_set]
    self.actions = np.array(np.meshgrid(*zip(action_space for action_space in self.action_set))).T.reshape(-1,len(self.action_set))
    self.num_actions = len(self.actions)

    self.next_state = self.actions[np.random.choice(self.num_actions)]

  def move(self):
    self.next_state = self.actions[np.random.choice(self.num_actions)]

  # Recompensa do jogo da invasão
  def generate_reward(self, observation, action):
    # se a ação é igual a observação e o invasor não mentiu ou o invasor mentiu e a ação foi contrária
    self.move()
    if ((action == observation[0]) & (observation[1] == 0)) | ((action != observation[0]) & (observation[1] == 1)) :
      return 1
    else:
      return 0

action_set = [0,1] #direita ou esquerda

In [None]:

gamma_damping = 0

eta_glow_damping = 1

# Interação agente x ambiente

# Invasor
ambiente = TaskEnvironment(invasor_action_list)
# Agente
agente = Agent(action_set, ambiente.num_action_set, gamma_damping, eta_glow_damping)

rodadas = 10
reward = 0
learning_curve = []
for rodada in range(rodadas):
  print('rodada', rodada)
  print('H', agente.h_matrix)
  print('G', agente.g_matrix)
  observation = ambiente.next_state
  print('observacao', observation)
  action = agente.learn_and_deliberate(observation, reward)
  print('ação', action)
  reward = ambiente.generate_reward(observation, action)
  print('reward', reward)
  print('-----------------')

  learning_curve.append([rodada, reward, gamma_damping, eta_glow_damping, observation, action])

_ = pd.DataFrame(learning_curve, columns = ['rodada','reward','gamma_damping','eta_glow_damping','observation','action'])

In [545]:
# Interação agente x ambiente
db = pd.DataFrame()

eta_glow_damping = 1
for gamma_damping in [0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1]:
  for episode in range(0,10):
    # Invasor
    ambiente = TaskEnvironment(invasor_action_list)
    # Agente
    agente = Agent(action_set, ambiente.num_action_set, gamma_damping, eta_glow_damping)

    rodadas = 250
    reward = 0
    learning_curve = []
    for rodada in range(rodadas):
      observation = ambiente.next_state
      action = agente.learn_and_deliberate(observation, reward)
      reward = ambiente.generate_reward(observation, action)

      learning_curve.append([rodada+1, episode + 1, reward, gamma_damping, eta_glow_damping, observation, action])

    _ = pd.DataFrame(learning_curve, columns = ['rodada','episode','reward','gamma_damping','eta_glow_damping','observation','action'])
    _['blocking_efficiency'] = _['reward'].cumsum()/(_['rodada'])
    db = pd.concat([db,_])

In [546]:
alt.Chart(db.groupby(['rodada','gamma_damping'])['blocking_efficiency'].mean().reset_index()).mark_line().encode(
      x = 'rodada',
      y = 'blocking_efficiency',
      color = 'gamma_damping:N'
)