#Q-learner Agent


In [473]:
import logging
from collections import namedtuple
import random
from typing import Callable
from copy import deepcopy
from itertools import accumulate
from operator import xor
import numpy as np

In [474]:
NIM_SIZE = 5

In [475]:
Nimply = namedtuple("Nimply", "row, num_objects") # move

In [476]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    @property
    def k(self) -> int:
        return self._k

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects

In [477]:
def pure_random(state: Nim) -> Nimply:
    """A strategy that returns a random possible move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)

In [478]:
def nim_sum(state: Nim) -> int:
    """Calculates the nim-sum of the board in a given state"""
    *_, result = accumulate(state.rows, xor)
    return result

In [479]:
def active_rows_index(state: Nim) -> list:
  """Returns a list with the index of all the active rows(rows with elem > 0)"""
  active_rows_index = []
  count = 0
  for o in state.rows:
      if o > 0:
        active_rows_index.append(count)
      count += 1
  return active_rows_index

In [480]:
def cook_status(state: Nim) -> dict:
    """ """
    cooked = dict()
    cooked["possible_moves"] = [
        (r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1) if state.k is None or o <= state.k
    ]
    cooked["active_rows_number"] = sum(o > 0 for o in state.rows)

    #list with all active rows
    cooked["active_rows_index"] = active_rows_index(state)

    #list of rows with only 1 elem
    cooked["rows_with_one_element"] = [(index, r) for index, r in enumerate(state.rows) if r == 1]

    #list of rows with multiple elem
    cooked["rows_multiple_elem"] = [(index, r) for index, r in enumerate(state.rows) if r > 1]

    #index of the shortest row
    cooked["shortest_row"] = min((x for x in enumerate(state.rows) if x[1] > 0), key=lambda y: y[1])[0]

    #index of the longest row
    cooked["longest_row"] = max((x for x in enumerate(state.rows)), key=lambda y: y[1])[0]

    cooked["nim_sum"] = nim_sum(state)
    cooked["pure_random"] = pure_random(state)

    brute_force = list()
    for m in cooked["possible_moves"]:
        tmp = deepcopy(state)
        tmp.nimming(m)
        brute_force.append((m, nim_sum(tmp)))
    cooked["brute_force"] = brute_force

    return cooked

In [481]:
def random_agent(state: Nim):
    """A strategy returning a random possible move"""
    data = cook_status(state)
    return data["pure_random"]

In [482]:
def random_smart_agent(state: Nim):
    """A strategy returning a random possible move"""
    data = cook_status(state)
    if (data["active_rows_number"] == 1):
        row = data["active_rows_index"][0]
        elem = state.rows[row]
        ply = Nimply(row, elem)
    else:
        ply = data["pure_random"]
    return ply

In [483]:
def hard_coded_agent(state: Nim):
    """Agent using fixed rules"""
    data = cook_status(state=state)

    active_rows_number = data["active_rows_number"]
    active_rows_index = data["active_rows_index"]
    rows_with_multiple_elem = data["rows_multiple_elem"]
    longest_row = data["longest_row"]

    if active_rows_number == 1:
        row = active_rows_index[0]
        elem = state.rows[row]
        
    elif active_rows_number % 2 == 0:
        if len(rows_with_multiple_elem) == 1: 
            row = rows_with_multiple_elem[0][0]
            elem = rows_with_multiple_elem[0][1] - 1 # take all elem exept one
            logging.debug(f"Even rows one mul, elem: {elem}") 
        else:
            row = longest_row
            logging.debug(f"longest row index: {longest_row}, elem: {state.rows[longest_row]}")
            elem = max(state.rows[longest_row] - 1, 1) # take all elem exept one
            logging.debug(f"Even rows, several mul, elem: {elem}") 
    else:
        if len(rows_with_multiple_elem) == 1:
            row = rows_with_multiple_elem[0][0]
            elem = rows_with_multiple_elem[0][1] # take all elem
            logging.debug(f"Odd rows, one mul, elem: {elem}") 
        else:
            row = longest_row
            elem = state.rows[longest_row]
            logging.debug(f"Even rows, several mul, elem: {elem}") 

    ply = Nimply(row, elem)
    return ply

In [484]:
def optimal_startegy(state: Nim) -> Nimply:
    """A strategy using nim sum to return the optimal move"""
    data = cook_status(state)
    return next((bf for bf in data["brute_force"] if bf[1] == 0), random.choice(data["brute_force"]))[0]

In [485]:
def dumb(state: Nim):
    """A dumb strategy that always picks one element from the longest row"""
    data = cook_status(state=state)
    row = data["longest_row"]
    
    return Nimply(row, 1)

In [486]:
OPPONENTS = [dumb, hard_coded_agent, random_agent, random_smart_agent, optimal_startegy]
#Improvement: play against more agents with wider veriety of level

In [487]:
class QLearner:
    #learning rate and discount factor

    REWARD = 1
    PENALTY = -1
    previous_state = None
    previous_move = None

    def __init__(self, learning_rate, discount_rate, exploration_rate):
       q = {} # (state_rows: list, move: tuple (row, elem)) -> value: int
       self.q = q
        
       #in a deterministic environment, the optimal learning rate is 1
       #in practice, often a constant learning rate is used
       self.learning_rate = learning_rate
       #starting with a lower discont factor and increasing it towards its final value acelerates learning
       self.discount_rate = discount_rate
       #try to reduce the exploration rate while we are training the q-learner 
       self.exploration_rate = exploration_rate
    
    def clear_previous_vars(self):
      self.previous_state = None
      self.previous_move = None
    
    def change_exploration_rate(self, new_exploration_rate):
      self.exploration_rate = new_exploration_rate

    def change_discount_rate(self, new_discount_rate):
      self.discount_rate = new_discount_rate

    def change_learning_rate(self, new_learning_rate):
      self.learning_rate = new_learning_rate

    def add_state_moves(self, current_state): #function to add new state, moves combinations
      
      data = cook_status(current_state)
      possible_moves = data['possible_moves']

      for move in possible_moves:
        if (current_state.rows, move) not in self.q: #adds the combination state, move to the q
          self.q[(current_state.rows, move)] = np.random.uniform(0.0,0.01) #attribute a small random value 
    
    #gets the move to apply
    def policy(self, current_state):

      data = cook_status(current_state)
      possible_moves = data['possible_moves']

      if np.random.random() > self.exploration_rate:

        #we want to return the action with the biggest value
        q_val_list = [self.q[(current_state.rows, move)] for move in possible_moves] #list of the values of state and action
        max_val_index = np.argmax(q_val_list) #returns the index of the max element of the array 
        return possible_moves[max_val_index]  #returns the move with the biggest q_value

      else: #we explore
        return random.sample(possible_moves, 1)[0] #returns a random possible move - moves are in tuples
    
    def updateQ(self, current_state): #current_state: Nim

      if not current_state: #if the game is finished
        self.q[(self.previous_state, self.previous_move)] += \
                self.learning_rate * (self.PENALTY - self.q[(self.previous_state, self.previous_move)])
        current_move = self.previous_state = self.previous_move = None #clear in order to prepare for the next game

      else: #if the game is not finished 
        self.add_state_moves(current_state) #adds the new state, moves
        current_move = self.policy(current_state) #gets the move that we want to use

        if self.previous_move is not None: #if it is not the first move
          next_state = deepcopy(current_state) #current_state: Nim
          next_state.nimming(Nimply(current_move[0], current_move[1])) #get the next state applying the move (result of your move)

          reward = 0 if next_state else self.REWARD #gets the value of the reward, if it wins, reward = 1
          logging.debug(f" REWARD: {reward}")
          data = cook_status(current_state)
          possible_moves = data['possible_moves']

          maxQ = max([self.q[(current_state.rows, move)] for move in possible_moves]) #max qvalue from the possible moves of the current_state

          
          self.q[(self.previous_state, self.previous_move)] += \
                    self.learning_rate * (reward + (self.discount_rate * maxQ) - \
                    self.q[(self.previous_state, self.previous_move)]) 
      

        self.previous_state, self.previous_move = current_state.rows, current_move
        logging.debug(f"current_move - game not finished: {current_move}")
      return current_move
        

In [488]:
def play_q_learning(nim_size, q_learner, external_agent): #plays the game once
  nim = Nim(nim_size) #creates nim

  #q-learner is the first player
  #second player is the external agent - can be either dumb, random, optimizer

  game_on = True #bool that is true while the game is happening
  is_q_learner = True #we start with q-learner

  while game_on:

    if is_q_learner: #if the current player is our q_learner
        move_params = q_learner.updateQ(nim)
        logging.debug(f" Wanted move after player = q-learner: {move_params}, State before move: {nim}")
        
        if(move_params == None): #if q_learner loses
            logging.debug(f" Q-learner lost")
            return "q_learner lost"
        
        move_to_apply = Nimply(move_params[0], move_params[1])
        logging.debug(f"move to apply: {move_to_apply}")
        nim.nimming(move_to_apply)
        
        logging.debug(f" <<NIM>> after q-learner move: {nim}")
        
        if(sum(nim.rows) == 0): #if q_learner wins
            logging.debug(f"Q-learner won")
            q_learner.clear_previous_vars()
            
            return "q_learner won"
        
        is_q_learner = False
    
    else: #if the current player is the external agent
        move_to_apply = external_agent(nim) 
        logging.debug(f" Agent move to apply: {move_to_apply}")
        nim.nimming(move_to_apply)
        is_q_learner = True


In [489]:

logging.getLogger().setLevel(logging.INFO)

def q_learner_strategy(nim_size) -> QLearner: #function to train the q_learner
  #q_learner will play against dumb, random, optimizer
  num_games = 200
  #num_games = 50 when nim_size = 10
  current_explorration_rate = 0.6
  q_learner_agent = QLearner(learning_rate=0.9, discount_rate=0.4, exploration_rate=current_explorration_rate) #change this later

  for opponent in OPPONENTS:

    for game in range(num_games):
        play_q_learning(nim_size, q_learner_agent, opponent)
        logging.debug(f" GAME FINISHED")
    
    current_explorration_rate -= 0.10

    if (current_explorration_rate < 0.1):
      current_explorration_rate = 0.1
    
    q_learner_agent.change_exploration_rate(current_explorration_rate)

    num_games += 2*(game+1)
    print(f"NUM_GAMES", num_games)
  return q_learner_agent

# improvement: practice agains each optiment until 100% winning rate, then move on to the next better opponent until reach optimal.

In [490]:
strat = q_learner_strategy(NIM_SIZE)


NUM_GAMES 600
NUM_GAMES 1800
NUM_GAMES 5400
NUM_GAMES 16200
NUM_GAMES 48600


In [491]:
def play_nim(q_learner: QLearner, opponent: Callable):

    result = play_q_learning(NIM_SIZE, q_learner, opponent)
    print(result)

# Create evoultion agent


In [492]:
play_nim(strat, optimal_startegy)

#test = Nim(4)
#test.nimming(Nimply(2,1))
#test.nimming(Nimply(3,4))

#print(optimal_startegy(test))

q_learner won


In [493]:
for _ in range(100):
    play_nim(strat, random_smart_agent)



q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner lost
q_learner lost
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner lost
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner lost
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner lost
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner won
q_learner lost
q_learner won
q_learner won
q_learner won
q_learner won
