In [1]:
import statistics
import itertools
import numpy as np
import random

In [2]:
class Nim():

    def __init__(self, n_rows: int = 4):
        self.initial_state = [int((x + 1)) for x in range(0, n_rows * 2, 2)]
        self.possible_values_in_rows = []
        for idx in self.initial_state:
            temp_list = []
            for ldx in range(idx+1):
                temp_list.append(ldx)
            self.possible_values_in_rows.append(temp_list)

        self.states = (set(itertools.product(*self.possible_values_in_rows)))
        self.n_states = len(self.states)
        self.current_state = self.initial_state
        self.transition_probs = dict()

    def fill_transition_probs(self):
        for state in self.states:
            self.transition_probs[state] = dict()
            actions = self.get_possible_actions(state)
            for action in actions:
                self.transition_probs[state][action] = dict()
                new_state = tuple([idx_1 - idx_2 for idx_1, idx_2 in zip(state, action)])
                self.transition_probs[state][action][new_state] = 1

    def reset(self):
        self.current_state = self.initial_state
        return self.current_state

    def get_all_states(self):
        return self.states

    def is_terminal(self, state):
        if not any(state): return True
        return False

    def get_possible_actions(self, state):
        possible_actions = []

        if self.is_terminal(state):
            possible_actions.append(state)
            return tuple(possible_actions)

        for row_idx, number_in_row in enumerate(state):
            for number in range(1, number_in_row + 1):
                single_action = [0 for _ in range(len(state))]
                single_action[row_idx] = number
                single_action = tuple(single_action)
                possible_actions.append(single_action)

        return tuple(possible_actions)

    def get_next_states(self, state, action):
        assert action in self.get_possible_actions(
            state), "cannot do action %s from state %s" % (action, state)
        return self.transition_probs[state][action]

    def get_number_of_states(self):
        return self.n_states

    def get_reward(self, state, action, next_state):
        assert action in self.get_possible_actions(
            state), "cannot do action %s from state %s" % (action, state)

        reward = 1
        if self.is_terminal(next_state):
            reward = -100

        return reward

    def step(self, action):
        prev_state = self.current_state
        self.current_state = [idx_1 - idx_2 for idx_1, idx_2 in zip(self.current_state, action)]
        return self.current_state, self.get_reward(prev_state, action, self.current_state), \
               self.is_terminal(self.current_state), None

In [3]:
# play at random
nim = Nim()
player_1_wins = 0
player_2_wins = 0
games = 0

for _ in range(100000):
    nim.reset()
    turn = 0
    while not nim.is_terminal(nim.current_state):
        random_action = random.choice(nim.get_possible_actions(nim.current_state))
        nim.step(random_action)
        if nim.is_terminal(nim.current_state):
            games += 1
            if turn % 2:
                player_2_wins += 1
            else:
                player_1_wins += 1
        turn += 1

print(player_1_wins, player_2_wins, games)

50319 49681 100000


UCZENIE PASYWNE

In [23]:
def value_iteration(nim, gamma, theta):
    V = dict()
    nim.fill_transition_probs()

    for state in nim.get_all_states():
        V[state] = 0

    policy = dict()
    for current_state in nim.get_all_states():
        try:
            policy[current_state] = nim.get_possible_actions(current_state)[0]
        except IndexError:
            continue

    while True:
        last_mean_value = statistics.fmean(V.values())
        for current_state in nim.get_all_states():
            actions = nim.get_possible_actions(current_state)
            state_action_values = dict()

            for action in actions:

                state_action_values[action] = 0
                next_states = nim.get_next_states(current_state, action)

                for next_state in next_states:
                    state_action_values[action] += next_states[next_state] * (nim.get_reward(current_state, action, next_state) + gamma * V[next_state])

            V[current_state] = max(list(state_action_values.values()))
        if abs(statistics.fmean(V.values()) - last_mean_value) < theta:
            break

    for current_state in nim.get_all_states():

        state_action_values = dict()
        actions = nim.get_possible_actions(current_state)

        for action in actions:
            state_action_values[action] = 0
            next_states = nim.get_next_states(current_state, action)

            for next_state in next_states:
                state_action_values[action] += next_states[next_state] * (nim.get_reward(current_state, action, next_state) + gamma * V[next_state])

        max_value_action = max(state_action_values, key=state_action_values.get)

        if policy[current_state] != max_value_action:
            policy[current_state] = max_value_action

    return policy, V

In [43]:
nim = Nim(4)
optimal_policy, optimal_value = value_iteration(nim, 0.9, 0.001)

In [45]:
# play (player 1) value iteration vs random
player_1_wins = 0
player_2_wins = 0
games = 0

for _ in range(10000):
    nim.reset()
    turn = 0
    while not nim.is_terminal(nim.current_state):
        if not turn % 2:
            action = optimal_policy[tuple(nim.current_state)]
        else:
            action = random.choice(nim.get_possible_actions(nim.current_state))
        nim.step(action)
        if nim.is_terminal(nim.current_state):
            games += 1
            if turn % 2:
                player_2_wins += 1
            else:
                player_1_wins += 1
        turn += 1

print(player_1_wins, player_2_wins, games)

7076 2924 10000


UCZENIE AKTYWNE

In [46]:
import random
from collections import defaultdict

class SARSALambdaAgent:
    def __init__(self, alpha, epsilon, discount, get_legal_actions, lambda_value):
        """
        SARSA Lambda Agent
        based on https://inst.eecs.berkeley.edu/~cs188/sp19/projects.html
        Instance variables you have access to
          - self.epsilon (exploration prob)
          - self.alpha (learning rate)
          - self.discount (discount rate aka gamma)

        Functions you should use
          - self.get_legal_actions(state) {state, hashable -> list of actions, each is hashable}
            which returns legal actions for a state
          - self.get_qvalue(state,action)
            which returns Q(state,action)
          - self.set_qvalue(state,action,value)
            which sets Q(state,action) := value
        !!!Important!!!
        Note: please avoid using self._qValues directly.
            There's a special self.get_qvalue/set_qvalue for that.
        """

        self.get_legal_actions = get_legal_actions
        self._qvalues = defaultdict(lambda: defaultdict(lambda: 0))
        self._evalues = defaultdict(lambda: defaultdict(lambda: 0))
        self.alpha = alpha
        self.epsilon = epsilon
        self.discount = discount
        self.lambda_value = lambda_value

    def get_qvalue(self, state, action):
        """ Returns Q(state,action) """
        return self._qvalues[str(state)][str(action)]

    def set_qvalue(self, state, action, value):
        """ Sets the Qvalue for [state,action] to the given value """
        self._qvalues[str(state)][str(action)] = value

    def reset(self):
        self._evalues = defaultdict(lambda: defaultdict(lambda: 0))

    # ---------------------START OF YOUR CODE---------------------#

    def get_value(self, state):
        """
        Compute your agent's estimate of V(s) using current q-values
        V(s) = max_over_action Q(state,action) over possible actions.
        Note: please take into account that q-values can be negative.
        """
        possible_actions = self.get_legal_actions(state)

        if len(possible_actions) == 0:
            return 0.0

        return max([self.get_qvalue(state, action) for action in possible_actions])

    def update(self, state, action, reward, next_state):
        """
        You should do your SARSA-Lambda update here:
        """

        # agent parameters
        gamma = self.discount
        learning_rate = self.alpha

        next_action = self.get_action(next_state)
        delta = reward + gamma * self.get_qvalue(next_state, next_action) - self.get_qvalue(state, action)
        self._evalues[str(state)][str(action)] += (1 - learning_rate) * gamma * self.lambda_value * self._evalues[str(state)][str(action)] + 1

        for every_state in self._qvalues.keys():
            for every_action in self._qvalues[every_state].keys():
                qvalue = self.get_qvalue(every_state, every_action) + learning_rate * delta * self._evalues[every_state][every_action]
                self.set_qvalue(every_state, every_action, qvalue)
                self._evalues[every_state][every_action] = gamma * self.lambda_value * self._evalues[every_state][every_action]

        return next_action

    def get_best_action(self, state):
        """
        Compute the best action to take in a state (using current q-values).
        """
        possible_actions = self.get_legal_actions(state)

        # If there are no legal actions, return None
        if len(possible_actions) == 0:
            return None

        possible_actions_dict = dict()
        for action in possible_actions:
            possible_actions_dict[tuple(action)] = self.get_qvalue(state, action)
        sorted_dict = sorted(possible_actions_dict.items(), key=lambda kv: kv[1])

        return random.choice([k for k, v in possible_actions_dict.items() if v == sorted_dict[-1][-1]])

    def get_action(self, state):
        """
        Compute the action to take in the current state, including exploration.
        With probability self.epsilon, we should take a random action.
            otherwise - the best policy action (self.get_best_action).

        Note: To pick randomly from a list, use random.choice(list).
              To pick True or False with a given probablity, generate uniform number in [0, 1]
              and compare it with your probability
        """

        # Pick Action
        possible_actions = self.get_legal_actions(state)

        # If there are no legal actions, return None
        if len(possible_actions) == 0:
            return None

        # agent parameters:
        epsilon = self.epsilon

        if random.random() < epsilon:
            return random.choice(possible_actions)

        return self.get_best_action(state)

    def turn_off_learning(self):
        self.epsilon = 0
        self.alpha = 0

    def display_qvalues(self):
        for s in self._qvalues:
            print("State: " + str(s) + " " + str(self._qvalues[s]))


In [47]:
import random
from collections import defaultdict


class ExpectedSARSAAgent:
    def __init__(self, alpha, epsilon, discount, get_legal_actions):
        """
        Q-Learning Agent
        based on https://inst.eecs.berkeley.edu/~cs188/sp19/projects.html
        Instance variables you have access to
          - self.epsilon (exploration prob)
          - self.alpha (learning rate)
          - self.discount (discount rate aka gamma)

        Functions you should use
          - self.get_legal_actions(state) {state, hashable -> list of actions, each is hashable}
            which returns legal actions for a state
          - self.get_qvalue(state,action)
            which returns Q(state,action)
          - self.set_qvalue(state,action,value)
            which sets Q(state,action) := value
        !!!Important!!!
        Note: please avoid using self._qValues directly.
            There's a special self.get_qvalue/set_qvalue for that.
        """

        self.get_legal_actions = get_legal_actions
        self._qvalues = defaultdict(lambda: defaultdict(lambda: 0))
        self.alpha = alpha
        self.epsilon = epsilon
        self.discount = discount

    def get_qvalue(self, state, action):
        """ Returns Q(state,action) """
        return self._qvalues[str(state)][str(action)]

    def set_qvalue(self, state, action, value):
        """ Sets the Qvalue for [state,action] to the given value """
        self._qvalues[str(state)][str(action)] = value

    #---------------------START OF YOUR CODE---------------------#

    def get_value(self, state):
        """
        Compute your agent's estimate of V(s) using current q-values
        V(s) = max_over_action Q(state,action) over possible actions.
        Note: please take into account that q-values can be negative.
        """
        possible_actions = self.get_legal_actions(state)

        # If there are no legal actions, return 0.0
        if len(possible_actions) == 0:
            return 0.0

        #
        # INSERT CODE HERE to get maximum possible value for a given state
        #

        return max([self.get_qvalue(state, action) for action in possible_actions])

    def update(self, state, action, reward, next_state):
        """
        You should do your Q-Value update here:
           Q(s,a) := (1 - alpha) * Q(s,a) + alpha * (r + gamma * \sum_a \pi(a|s') Q(s', a))
        """

        # agent parameters
        gamma = self.discount
        learning_rate = self.alpha

        sum_of_strategies = list()

        [sum_of_strategies.append(1 / len(self.get_legal_actions(next_state)) * self.get_qvalue(next_state, action)) for action in self.get_legal_actions(next_state)]

        value = (1 - learning_rate) * self.get_qvalue(state, action) + learning_rate * (reward + gamma * sum(sum_of_strategies))
        self.set_qvalue(state, action, value)

    def get_best_action(self, state):
        """
        Compute the best action to take in a state (using current q-values).
        """
        possible_actions = self.get_legal_actions(state)

        # If there are no legal actions, return None
        if len(possible_actions) == 0:
            return None

        possible_actions_dict = dict()

        for action in possible_actions:
            possible_actions_dict[tuple(action)] = self.get_qvalue(state, action)

        sorted_dict = sorted(possible_actions_dict.items(), key=lambda kv: kv[1])

        return random.choice([k for k, v in possible_actions_dict.items() if v == sorted_dict[-1][-1]])

    def get_action(self, state):
        """
        Compute the action to take in the current state, including exploration.
        With probability self.epsilon, we should take a random action.
            otherwise - the best policy action (self.get_best_action).

        Note: To pick randomly from a list, use random.choice(list).
              To pick True or False with a given probablity, generate uniform number in [0, 1]
              and compare it with your probability
        """

        # Pick Action
        possible_actions = self.get_legal_actions(state)

        # If there are no legal actions, return None
        if len(possible_actions) == 0:
            return None

        # agent parameters:
        epsilon = self.epsilon

        if random.random() < epsilon:
            return random.choice(possible_actions)

        return self.get_best_action(state)

    def turn_off_learning(self):
        """
        Function turns off agent learning.
        """
        self.epsilon = 0
        self.alpha = 0


In [48]:
def play_and_train_exp(env, agent):
    """
    This function should
    - run a full game, actions given by agent's e-greedy policy
    - train agent using agent.update(...) whenever it is possible
    - return total reward
    """
    total_reward = 0.0
    state = env.reset()

    done = False

    while not done:
        # get agent to pick action given state state.
        action = agent.get_action(state)

        next_state, reward, done, _ = env.step(action)

        agent.update(state, action, reward, next_state)

        state = next_state
        total_reward += reward
        if done:
            break

    return total_reward

In [49]:
def play_and_train_lambda(env, agent):
    """
    This function should
    - run a full game, actions given by agent's e-greedy policy
    - train agent using agent.update(...) whenever it is possible
    - return total reward
    """
    total_reward = 0.0
    state = env.reset()

    done = False

    agent.reset()
    action = agent.get_action(state)

    while not done:
        # get agent to pick action given state state.

        next_state, reward, done, _ = env.step(action)

        action = agent.update(state, action, reward, next_state)

        state = next_state
        total_reward += reward
        if done:
            break

    return total_reward

In [50]:
nim = Nim()
agent_lambda = SARSALambdaAgent(alpha=0.1, epsilon=0.1, discount=0.99,
                   get_legal_actions=nim.get_possible_actions, lambda_value = 0.5)

agent_exp = ExpectedSARSAAgent(alpha=0.1, epsilon=0.1, discount=0.99,
                       get_legal_actions=nim.get_possible_actions)

for i in range(10000):
    play_and_train_exp(nim, agent_exp)
    play_and_train_lambda(nim, agent_lambda)

In [100]:
# play sarsa lambda vs random
player_1_wins = 0
player_2_wins = 0
games = 0

for _ in range(100000):
    nim.reset()
    turn = 0
    while not nim.is_terminal(nim.current_state):
        if not turn % 2:
            # action = agent_exp.get_best_action(nim.current_state)
            action = agent_lambda.get_best_action(nim.current_state)
        else:
            action = random.choice(nim.get_possible_actions(nim.current_state))

        nim.step(action)

        if nim.is_terminal(nim.current_state):
            games += 1
            if turn % 2:
                player_2_wins += 1
            else:
                player_1_wins += 1

        turn += 1

print(player_1_wins, player_2_wins, games)

61261 38739 100000


MCTS

In [13]:
import numpy as np

class MonteCarloTreeSearchNode():
    def __init__(self, state, parent=None, parent_action=None):
        self.state = state
        self.parent = parent
        self.parent_action = parent_action
        self.children = []
        self._number_of_visits = 0
        self._results = defaultdict(int)
        self._results[1] = 0
        self._results[-1] = 0
        self._untried_actions = None
        return

    def untried_actions(self):
        self._untried_actions = self.state.get_legal_actions()
        return self._untried_actions

    def q(self):
        wins = self._results[1]
        loses = self._results[-1]
        return wins - loses

    def n(self):
        return self._number_of_visits

    def expand(self):

        action = self._untried_actions.pop()
        next_state = self.state.move(action)
        child_node = MonteCarloTreeSearchNode(
            next_state, parent=self, parent_action=action)

        self.children.append(child_node)
        return child_node

    def is_terminal_node(self):
        return self.state.is_game_over()

    def rollout(self):
        current_rollout_state = self.state

        while not current_rollout_state.is_game_over():

            possible_moves = current_rollout_state.get_legal_actions()

            action = self.rollout_policy(possible_moves)
            current_rollout_state = current_rollout_state.move(action)
        return current_rollout_state.game_result()

    def backpropagate(self, result):
        self._number_of_visits += 1.
        self._results[result] += 1.
        if self.parent:
            self.parent.backpropagate(result)

    def is_fully_expanded(self):
        return len(self._untried_actions) == 0

    def best_child(self, c_param=0.1):

        choices_weights = [(c.q() / c.n()) + c_param * np.sqrt((2 * np.log(self.n()) / c.n())) for c in self.children]
        return self.children[np.argmax(choices_weights)]

    def rollout_policy(self, possible_moves):

        return possible_moves[np.random.randint(len(possible_moves))]

    def _tree_policy(self):

        current_node = self
        while not current_node.is_terminal_node():

            if not current_node.is_fully_expanded():
                return current_node.expand()
            else:
                current_node = current_node.best_child()
        return current_node

    def best_action(self):
        simulation_no = 100

        for i in range(simulation_no):

            v = self._tree_policy()
            reward = v.rollout()
            v.backpropagate(reward)

        return self.best_child(c_param=0.)

    def get_legal_actions(self):
        possible_actions = []

        # if self.is_terminal(self.state):
        #     possible_actions.append(state)
        #     return tuple(possible_actions)

        for row_idx, number_in_row in enumerate(self.state):
            for number in range(1, number_in_row + 1):
                single_action = [0 for _ in range(len(self.state))]
                single_action[row_idx] = number
                single_action = tuple(single_action)
                possible_actions.append(single_action)

        return tuple(possible_actions)

    def is_game_over(self):
        if not any(self.state): return True
        return False


In [14]:
class Nim():

    def __init__(self, n_rows: int = 4):
        self.initial_state = [int((x + 1)) for x in range(0, n_rows * 2, 2)]
        self.possible_values_in_rows = []
        for idx in self.initial_state:
            temp_list = []
            for ldx in range(idx+1):
                temp_list.append(ldx)
            self.possible_values_in_rows.append(temp_list)

        self.states = (set(itertools.product(*self.possible_values_in_rows)))
        self.n_states = len(self.states)
        self.current_state = self.initial_state
        self.transition_probs = dict()

    def fill_transition_probs(self):
        for state in self.states:
            self.transition_probs[state] = dict()
            actions = self.get_possible_actions(state)
            for action in actions:
                self.transition_probs[state][action] = dict()
                new_state = tuple([idx_1 - idx_2 for idx_1, idx_2 in zip(state, action)])
                self.transition_probs[state][action][new_state] = 1

    def reset(self):
        self.current_state = self.initial_state
        return self.current_state

    def get_all_states(self):
        return self.states

    def is_terminal(self, state):
        if not any(state): return True
        return False

    def get_possible_actions(self, state):
        possible_actions = []

        if self.is_terminal(state):
            possible_actions.append(state)
            return tuple(possible_actions)

        for row_idx, number_in_row in enumerate(state):
            for number in range(1, number_in_row + 1):
                single_action = [0 for _ in range(len(state))]
                single_action[row_idx] = number
                single_action = tuple(single_action)
                possible_actions.append(single_action)

        return tuple(possible_actions)

    def get_next_states(self, state, action):
        assert action in self.get_possible_actions(
            state), "cannot do action %s from state %s" % (action, state)
        return self.transition_probs[state][action]

    def get_number_of_states(self):
        return self.n_states

    def get_reward(self, state, action, next_state):
        assert action in self.get_possible_actions(
            state), "cannot do action %s from state %s" % (action, state)

        reward = -1
        if self.is_terminal(next_state):
            reward = 100

        return reward

    def step(self, action):
        prev_state = self.current_state
        self.current_state = [idx_1 - idx_2 for idx_1, idx_2 in zip(self.current_state, action)]
        return self.current_state, self.get_reward(prev_state, action, self.current_state), \
               self.is_terminal(self.current_state), None

In [15]:
mcts = MonteCarloTreeSearchNode(state=nim.initial_state)