In [1]:
import gym
from collections import defaultdict

In [2]:
import numpy as np
import itertools

In [3]:
env = gym.make('Blackjack-v0', natural=True)

In [4]:
observation = env.reset()
print(f'observation: {observation}')
done = False
while not done:
    if observation[0] < 19:
        observation, reward, done, _ = env.step(1)
        print(f"observation: {observation}, reward: {reward}, done: {done}")
    else:
        observation, reward, done, _ = env.step(0)
        print(f"observation: {observation}, reward: {reward}, done: {done}")
    

observation: (15, 10, False)
observation: (16, 10, False), reward: 0.0, done: False
observation: (18, 10, False), reward: 0.0, done: False
observation: (22, 10, False), reward: -1.0, done: True


In [5]:
def run_episode(env, pi, eps=.01):
    observation = env.reset()
    states, rewards, actions = [], [], []
    done = False
    
    while not done:
        # epsilon policy
        action = pi[observation] if np.random.rand() > eps else env.action_space.sample()
        states.append(observation)
        actions.append(action)
        observation, reward, done, _ = env.step(action)
        rewards.append(reward)

    return states, rewards, actions

In [6]:
# в дискретном пространстве состояний можно записать стратегию 
# как словарь состоягие -> действие

states_list = list(
    itertools.product(
        range(2, 22), # the players current sum,
        range(1, 11), # the dealer's one showing card (1-10 where 1 is ace),
        (True, False) # the player holds a usable ace
    )
)

In [7]:
# если сумма меньше 19, то hit
pi = {
    state: int(state[0] < 19)
    for state in states_list
}

In [8]:
def avg_reward(env, pi, epochs=100000):
    sum_reward = 0
    cnt = 0
    for _ in range(epochs):
        cnt += 1
        states, rewards, actions = run_episode(env, pi, eps=0.)
        sum_reward += rewards[-1]
    return sum_reward / cnt

In [9]:
avg_reward(env, pi)

-0.17862

In [20]:
def get_random_Q(env):
    # инициализируем случайным образом
    Q = {
        state: np.zeros(env.action_space.n) #np.random.random(env.action_space.n)
        for state in states_list
    }
    for state in states_list:
        if state[0] == 21:
            Q[state] = np.zeros(env.action_space.n)   
    return Q

In [21]:
def compute_policy_by_Q(env, Q, gamma=1.0):
    return {key: np.argmax(value) for key, value in Q.items()}

In [22]:
def Q_learning_episode(env, pi, Q, alpha=0.05, epsilon=0.0, gamma=0.9):
    observation = env.reset()
    done = False
    while not done:
        a = pi[observation] if np.random.rand() > epsilon else env.action_space.sample()
        observation_prime, reward, done, _ = env.step(a)
        # если на следующем ходу проигрыш, то выходим
        if done:
            break
        Q[observation][a] = Q[observation][a] + alpha * (reward + gamma * np.max( Q[observation_prime] ) - Q[observation][a])
        observation = observation_prime

In [23]:
import copy

In [24]:
total_episodes = 100000
gamma = 0.9

Q_hist_Qlearn = [ ]
pi_hist = []

Q = get_random_Q(env)
pi = compute_policy_by_Q(env, Q)

for n in range(total_episodes):
    Q_learning_episode(env, pi, Q, alpha=0.01, epsilon= 0.9, gamma=gamma)
    pi = compute_policy_by_Q(env, Q)
    # Q_hist_Qlearn.append(copy.deepcopy(Q))
    # pi_hist.append(copy.deepcopy(pi))

In [25]:
avg_reward(env, pi)

-0.167975

In [None]:
avg_reward_hist = [avg_reward(env, pi) for pi in pi_hist]