In [1]:
from abc import ABC, abstractmethod
from collections import defaultdict, deque
from dataclasses import dataclass, field
from typing import List, NamedTuple, Dict

import numpy as np

from ScoreLogger import ScoreLogger, PLOT_REFRESH, ScoreLoggerFinalPlot
from blackjack import BlackjackEnv2
from pyxtension.streams import stream

In [18]:
epsilon = 0.1
env = BlackjackEnv2()
nA = env.action_space.n

In [19]:
@dataclass
class V:
    returns: float = 0.0
    state_cnt: int = 0

    @property
    def val(self) -> float:
        return self.returns / self.state_cnt if self.state_cnt > 0 else 0.0


class State(NamedTuple):
    my_sum: int
    dealers_card: int
    usable_ace: bool


@dataclass
class StateActions:
    actions: List[V] = field(default_factory=lambda: [V() for i in range(nA)])

    @property
    def best_action(self):
        '''Similar with np.argmax but if there's multiple max values, returns a random one out of them'''
        max_idxes = stream(range(len(self.actions))).maxes(key=lambda i: self.actions[i].val)
        rnd_best_action = np.random.choice(max_idxes)
        return rnd_best_action

In [20]:
class GreedyMCPlayer:
    def __init__(self, rounds: int = 30) -> None:
        self.observation_space = env.observation_space.shape
        self.action_space = env.action_space.n

        self._N = rounds
        self.sl = ScoreLoggerFinalPlot(success_rounds=50)
        self._STOP_THRESHOLD = 1.0  # 0.86- with RP
        self._round = 0
        self._score = 0
        self._total_score = 0
        self._max_avg_score = -100

    def get_epsilon_greedy_action_policy(self, observation: State):
        def_vals = epsilon / nA
        A = np.ones(nA, dtype=float) * def_vals
        best_action = self.P[observation].best_action
        A[best_action] += (1.0 - epsilon)

        return A

    def generate_episode(self):
        episode = []
        current_state = env.reset()
        current_state = State(*current_state)

        while True:
            prob_scores = self.get_epsilon_greedy_action_policy(current_state)
            action = np.random.choice(np.arange(len(prob_scores)), p=prob_scores)  # 0 or 1

            next_state, reward, done, _ = env.step(action)
            next_state = State(*next_state)
            episode.append((current_state, action, reward))
            if done:
                self._round += 1
                self._total_score += int(reward)

                self.sl.add_score(int(reward), self._round)

                break
            current_state = next_state

        return episode

    def mc_control_epsilon_greedy(self):
        self.P: Dict[State, StateActions] = defaultdict(StateActions)

        for k in stream(range(self._N)).tqdm(total=self._N):
            episode = self.generate_episode()

            for i, (state, action, reward) in enumerate(episode):
                G = sum([_rew for _, _, _rew in episode[i:]])
                v = self.P[state].actions[action]
                v.returns += G
                v.state_cnt += 1

        self.sl.show()
        return self.P

In [21]:
player = GreedyMCPlayer(10000)
policy = player.mc_control_epsilon_greecvdy()

100%|██████████| 10000/10000 [00:33<00:00, 125.35it/s]


In [24]:
epsilon = 0.05
player = GreedyMCPlayer(10000)
policy = player.mc_control_epsilon_greedy()

100%|██████████| 10000/10000 [00:31<00:00, 316.79it/s]


In [25]:
epsilon = 0.3
player = GreedyMCPlayer(10000)
policy = player.mc_control_epsilon_greedy()

100%|██████████| 10000/10000 [00:31<00:00, 166.64it/s]


In [9]:
class Agent(ABC):
    @property
    # @abstractmethod
    def name(self) -> str:
        return self.__class__.__name__

    @abstractmethod
    def action(self, state: State) -> int:
        ...


class RandomAgent(Agent):
    def __init__(self, action_space: int):
        self._action_space = action_space

    def action(self, state: State) -> int:
        return np.random.randint(self._action_space)


class PredictAgent(Agent):

    def __init__(self, policy: Dict[State, StateActions]):
        self._P = policy

    def action(self, state: State) -> int:
        return self._P[state].best_action


def evaluate_agent(agent: Agent, rounds: int):
    total_reward = 0
    sl = ScoreLoggerFinalPlot(success_rounds=50)
    for i in range(rounds):
        current_state = env.reset()
        current_state = State(*current_state)
        done = False
        while not done:
            action = agent.action(current_state)
            next_state, reward, done, _ = env.step(action)
            next_state = State(*next_state)

            if done:
                total_reward += reward
                sl.add_score(int(reward), i)
                break
            current_state = next_state
    sl.show(title=agent.name)
    return total_reward

In [10]:
random_agent = RandomAgent(nA)
q_agent = PredictAgent(policy)
n_rounds = 1000
print(evaluate_agent(random_agent, n_rounds))
print(evaluate_agent(q_agent, n_rounds))

-409


-115
