<a href="https://colab.research.google.com/github/barabonda/SK-AI-FLY/blob/main/4%EC%A3%BC%EC%B0%A8/airplain_game.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Blackjack with Monte-Carlo ES

In [None]:
import numpy as np
import random
import itertools

# blackjack

# observation (=state):
# triple ( integer, integer, integer )
# 1. integer: the player's score (12 ~ 21)
# 2. integer: the dealer's card score of upside (1 ~ 10)
# 3. integer: 1 if the player has at least an ace, and 0 otherwise

# action
# 0: hit
# 1: stay
# doesn't allow double down, surrender and split

# step types
STEPTYPE_FIRST = 0
STEPTYPE_MID = 1
STEPTYPE_LAST = 2

cardset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10 ]
deck = None

def shuffle_deck():
    global deck
    # card deck (we don't care the suite, but, for gui game in future) - 3 sets
    deck = \
      list(itertools.product(range(4), cardset)) \
      + list(itertools.product(range(4), cardset)) \
      + list(itertools.product(range(4), cardset))

    random.shuffle(deck)

shuffle_deck()

In [None]:
# the table of policy to access with the three indices
Q = np.random.uniform(size=(10, 10, 2, 2))

In [None]:
# environment parameters
dealer = None # dealer's hands
player = None # player's hands

# reset the environment
def generate_start_step():
    global dealer, player

    shuffle_deck()

    dealer = [ deck.pop(), deck.pop() ]
    player = [ deck.pop(), deck.pop() ]

    dealer_score = dealer[0][1]

    if player[0][1] == 1 and player[1][1] == 1:
        # if player gets double ace, the second one is counted as 1
        player_score = 12
        has_ace = 1
    elif player[0][1] == 1:
        player_score = 11 + player[1][1]
        has_ace = 1
    elif player[1][1] == 1:
        player_score = 11 + player[0][1]
        has_ace = 1
    else:
        player_score = player[0][1] + player[1][1]
        while player_score < 12:
            player.append(deck.pop())
            player_score += player[-1][1]
        has_ace = 0

    # 1st step
    return { 'observation': (player_score, dealer_score, has_ace),
             'reward': 0., 'step_type': STEPTYPE_FIRST }

In [None]:
import random
epsilon = 0.01
def get_eps_soft_action(step):
  assert(step['observation'][0] >= 12 and step['observation'][0] <= 21)
  observ = step['observation']
  idx = (observ[0] - 12, observ[1] - 1, observ[2])
  # epsilon-soft greedy policy
  if random.random() < epsilon:
    return 1 if Q[idx][0] > Q[idx][1] else 0
  else:
    return 1 if Q[idx][0] < Q[idx][1] else 0

In [None]:
# returns a step, which is a dictionary { 'observation', 'reward', 'step_type' }
def generate_next_step(step, action):
    global player, dealer

    player_score, dealer_open, has_ace = step['observation']
    # has_ace is used to check if the player has
    # the option to count an ace as 1

    game_stop = False
    busted = False

    # with hit, get a card
    if action == 0:
        # hit - retrieve an additional card
        player.append(deck.pop())

        # note that an additional ace should be counted as 1
        player_score += player[-1][1]

        # if blackjack or bust, the game stops
        if player_score == 21:
            game_stop = True
        elif player_score > 21:
            # if busted but has an ace, the ace is counted as 1
            # and has_ace becomes false since already used
            if has_ace == 1:
                player_score -= 10
                has_ace = 0
            else:
                game_stop = True
                busted = True

    # with stay, game_stop
    else:
        game_stop = True

    # if busted, immediately the player loses
    if busted:
        return { 'observation': (player_score, dealer_open, has_ace),
                 'reward': -1., 'step_type': STEPTYPE_LAST }

    # now, if game_stop, it's dealer's turn & game stop
    if game_stop:
        dealer_has_ace = False
        dealer_busted = False

        # examine dealer's hands
        if dealer[0][1] == 1 and dealer[1][1] == 1:
            dealer_score = 12.
            dealer_has_ace = True
        elif dealer[0][1] == 1:
            dealer_score = 11. + dealer[1][1]
            dealer_has_ace = True
        elif dealer[1][1] == 1:
            dealer_score = 11. + dealer[0][1]
            dealer_has_ace = True
        else:
            dealer_score = dealer[0][1] + dealer[1][1]
            dealer_has_ace = False

        # the dealer takes cards until the score is at least 17
        while dealer_score < 17:
            dealer.append(deck.pop())
            dealer_score += dealer[-1][1]

            # if busted but has an ace, the ace is counted as 1
            if dealer_score > 21:
                if dealer_has_ace:
                    dealer_score -= 10
                    dealer_has_ace = False
                else:
                    dealer_busted = True

        # compute the reward
        if dealer_busted:
            reward = 1.
        else:
            if player_score > dealer_score:
                reward = 1.
            elif player_score < dealer_score:
                reward = -1.
            else:
                reward = 0.

        return { 'observation': (player_score, dealer_score, has_ace),
                 'reward': reward, 'step_type': STEPTYPE_LAST }

    # continue
    else:
        return { 'observation': (player_score, dealer_open, has_ace),
                 'reward': 0., 'step_type': STEPTYPE_MID }

In [None]:
def generate_episode(policy_func=get_eps_soft_action):
  episode = list()
  actions = list()
  step = generate_start_step()
  episode.append(step)
  while step['step_type'] != STEPTYPE_LAST:
    action = policy_func(step)
    step = generate_next_step(step, action)
    episode.append(step)
    actions.append(action)
  return episode, actions

In [None]:
test = generate_episode()
test

([{'observation': (14, 10, 0), 'reward': 0.0, 'step_type': 0},
  {'observation': (14, 25, 0), 'reward': 1.0, 'step_type': 2}],
 [1])

In [None]:
# return true if (observ, action) exists in epi
def in_episode(epi, observ, action):
  for s, a in zip(*epi):
    if s['observation'] == observ and a == action:
      return True
    return False

In [None]:
# monte-carlo exploring-start policy learning
maxiter = 1000000
gamma = 1
epsilon = 0.4
N = np.zeros((10, 10, 2, 2), dtype='float32')
SUM = np.zeros((10, 10, 2, 2), dtype='float32')
Q = np.random.uniform(size=(10, 10, 2, 2))
for _ in range(maxiter):
  episode = generate_episode()
  G = 0.
  last_step = episode[0].pop()
  while len(episode[0]) > 0:
    G = gamma * G + last_step['reward']
    last_step = episode[0].pop()
    last_action = episode[1].pop()
    # exploring-start estimation: if the state appears for the first time
    # in the episode, update Q value
    observ = last_step['observation']
    idx = (observ[0] - 12, observ[1] - 1, observ[2], last_action)
    if not in_episode(episode, observ, last_action):
      N[idx] += 1.
      SUM[idx] += G
      Q[idx] = SUM[idx] / N[idx]

In [None]:
import pandas as pd
# without ace
wo_ace = pd.DataFrame(np.zeros((10, 10)),
                      columns = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                      index = [21, 20, 19, 18, 17, 16, 15, 14, 13, 12], dtype='int32')
for row in range(10):
  for col in range(10):
    v = 1 if Q[row, col, 0, 0] < Q[row, col, 0, 1] else 0
    wo_ace.loc[row + 12, col + 1] = v

## MC-on-results

In [None]:
wo_ace

Unnamed: 0,1,2,3,4,5,6,7,8,9,10
21,1,1,1,1,1,1,1,1,1,1
20,1,1,1,1,1,1,1,1,1,1
19,1,1,1,1,1,1,1,1,1,1
18,1,1,1,1,1,1,1,1,1,1
17,1,1,1,1,1,1,1,1,1,1
16,0,1,1,1,1,1,1,1,1,1
15,0,1,1,1,1,1,1,1,1,1
14,0,1,1,1,1,1,1,1,1,1
13,0,1,1,1,1,1,0,0,0,0
12,0,1,1,1,1,1,0,0,0,0


In [None]:
w_ace = pd.DataFrame(np.zeros((10, 10)),
                    columns = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                    index = [21, 20, 19, 18, 17, 16, 15, 14, 13, 12], dtype='int32')
for row in range(10):
  for col in range(10):
    v = 1 if Q[row, col, 1, 0] < Q[row, col, 1, 1] else 0
    w_ace.loc[row + 12, col + 1] = v
w_ace

Unnamed: 0,1,2,3,4,5,6,7,8,9,10
21,1,1,1,1,1,1,1,1,1,1
20,1,1,1,1,1,1,1,1,1,1
19,1,1,1,1,1,1,1,1,1,1
18,1,1,1,1,1,1,1,1,1,1
17,0,1,1,1,0,1,1,0,0,0
16,0,0,0,0,0,0,0,0,0,0
15,0,0,0,0,0,0,0,0,0,0
14,0,0,0,0,0,0,0,0,0,0
13,0,0,0,0,0,0,0,0,0,0
12,0,0,0,0,0,0,0,0,0,0


In [None]:
step = generate_start_step()

In [None]:
step

{'observation': (13, 7, 0), 'reward': 0.0, 'step_type': 0}

In [None]:
player

[(0, 3), (2, 3), (2, 7)]

In [None]:
dealer

[(0, 7), (2, 1)]

In [None]:
action = get_eps_soft_action(step)

In [None]:
action

1

In [None]:
step = generate_next_step(step, action)

In [None]:
step

{'observation': (13, 18.0, 0), 'reward': -1.0, 'step_type': 2}

In [None]:
dealer

[(0, 7), (2, 1)]

In [None]:
player

[(0, 3), (2, 3), (2, 7)]

In [None]:
behavior_prob_hit = 0.6
# choose hit(=0) with chance 0.6 or stay(=1) with chance 0.4
def get_random_action(step):
# epsilon-soft greedy policy
  if random.random() < behavior_prob_hit:
    return 0
  else:
    return 1

In [None]:
def get_greedy_action(step):
  observ = step['observation']
  idx = (observ[0] - 12, observ[1] - 1, observ[2])
  return 0 if Q[idx][0] > Q[idx][1] else 1

In [None]:
# monte-carlo off-policy learning
maxiter = 1000000
gamma = 1
N = np.zeros((10, 10, 2, 2), dtype='float32')
SUM = np.zeros((10, 10, 2, 2), dtype='float32')
Q = np.random.uniform(size=(10, 10, 2, 2))
for _ in range(maxiter):
  steps, actions = generate_episode(policy_func=get_random_action)
  G = 0.
  W = 1.
  last_step = steps.pop()
  while len(steps) > 0:
    G = gamma * G + last_step['reward']
    last_step = steps.pop()
    last_action = actions.pop()
    # exploring-start estimation
    observ = last_step['observation']
    idx = (observ[0] - 12, observ[1] - 1, observ[2], last_action)
    if not in_episode((steps, actions), observ, last_action):
      N[idx] += W
      SUM[idx] += W * G
      Q[idx] = SUM[idx] / N[idx]
    if last_action != get_greedy_action(last_step):
      break
    if last_action == 0:
      W = W / behavior_prob_hit
    else:
      W = W / (1. - behavior_prob_hit)

In [None]:
import pandas as pd
# without ace
wo_ace = pd.DataFrame(np.zeros((10, 10)),
                      columns = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                      index = [21, 20, 19, 18, 17, 16, 15, 14, 13, 12], dtype='int32')
for row in range(10):
  for col in range(10):
    v = 1 if Q[row, col, 0, 0] < Q[row, col, 0, 1] else 0
    wo_ace.loc[row + 12, col + 1] = v

## MC_off_results

In [None]:
wo_ace

Unnamed: 0,1,2,3,4,5,6,7,8,9,10
21,1,1,1,1,1,1,1,1,1,1
20,1,1,1,1,1,1,1,1,1,1
19,1,1,1,1,1,1,1,1,1,1
18,1,1,1,1,1,1,1,1,1,1
17,1,1,1,1,1,1,1,1,1,1
16,0,1,1,1,1,1,0,0,0,0
15,0,1,1,1,1,1,0,0,0,0
14,0,1,1,1,1,1,0,0,0,0
13,0,1,1,1,1,1,0,0,0,0
12,0,0,0,0,1,1,0,0,0,0


In [None]:
# without ace
w_ace = pd.DataFrame(np.zeros((10, 10)),
                    columns = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                    index = [21, 20, 19, 18, 17, 16, 15, 14, 13, 12], dtype='int32')
for row in range(10):
  for col in range(10):
    v = 1 if Q[row, col, 1, 0] < Q[row, col, 1, 1] else 0
    w_ace.loc[row + 12, col + 1] = v

In [None]:
w_ace

Unnamed: 0,1,2,3,4,5,6,7,8,9,10
21,1,1,1,1,1,1,1,1,1,1
20,1,1,1,1,1,1,1,1,1,1
19,1,1,1,1,1,1,1,1,1,1
18,0,0,1,1,1,1,1,1,1,0
17,0,0,1,0,0,0,0,0,0,0
16,0,0,0,0,0,0,0,0,0,0
15,0,0,0,0,0,0,0,0,0,0
14,0,0,0,0,0,0,0,0,0,0
13,0,0,0,0,0,0,0,0,0,0
12,0,0,0,0,0,0,0,0,0,0


In [None]:
import random
import itertools
import numpy as np
# blackjack
# observation (=state):
# triple ( integer, integer, integer )
# 1. integer: the player's score (12 ~ 21)
# 2. integer: the dealer's card score of upside (1 ~ 10)
# 3. integer: 1 if the player has at least an ace, and 0 otherwise
# action
# 0: hit
# 1: stay
# doesn't allow double down, surrender and split
# step types
STEPTYPE_FIRST = 0
STEPTYPE_MID = 1
STEPTYPE_LAST = 2
cardset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10 ]
deck = None
def shuffle_deck():
  global deck
# card deck (we don't care the suite, but, for gui game in future) - 3 sets
deck = \
list(itertools.product(range(4), cardset)) \
+ list(itertools.product(range(4), cardset)) \
+ list(itertools.product(range(4), cardset))
random.shuffle(deck)
shuffle_deck()
# the table of policy to access with the three indices
Q = np.random.uniform(size=(10, 10, 2, 2))
# environment parameters
dealer = None # dealer's hands
player = None # player's hands
# reset the environment
def generate_start_step():
  global dealer, player
  shuffle_deck()
  dealer = [ deck.pop(), deck.pop() ]
  player = [ deck.pop(), deck.pop() ]
  dealer_score = dealer[0][1]
  if player[0][1] == 1 and player[1][1] == 1:
  # if player gets double ace, the second one is counted as 1
    player_score = 12
    has_ace = 1
  elif player[0][1] == 1:
    player_score = 11 + player[1][1]
    has_ace = 1
  elif player[1][1] == 1:
    player_score = 11 + player[0][1]
    has_ace = 1
  else:
    player_score = player[0][1] + player[1][1]
  while player_score < 12:
    player.append(deck.pop())
    player_score += player[-1][1]
    has_ace = 0
# 1st step
    return { 'observation': (player_score, dealer_score, has_ace),'reward': 0., 'step_type': STEPTYPE_FIRST }


In [None]:
# Taxi

!pip install gym[atari,toy_text,accept_rom_license]

import numpy as np
import random
import gym
env = gym.make('Taxi-v3', new_step_api=True)
# step types
STEPTYPE_FIRST = 0
STEPTYPE_MID = 1
STEPTYPE_LAST = 2
Q = np.random.uniform(size=(500, 6))

env.reset()

env.step(0)

# wrapper for gym's blackjack environment
def generate_start_step():
  return { 'observation': env.reset(), 'reward': 0., 'step_type': STEPTYPE_FIRST }
def generate_next_step(step, action):
  obs, reward, done, _, info = env.step(action)
  step_type = STEPTYPE_LAST if done else STEPTYPE_MID
  return { 'observation': obs, 'reward': reward, 'step_type': step_type }


epsilon = 0.1
def get_eps_soft_action(step):
  # epsilon-soft greedy policy
  if random.random() < epsilon:
    return np.random.choice(6, 1)[0]
  else:
    observ = step['observation']
    return np.argmax(Q[observ])
def get_greedy_action(step):
  observ = step['observation']
  return np.argmax(Q[observ])
def get_random_action(step):
  return random.randint(0, env.action_space.n-1)
behavior_prob_hit = 1. / float(env.action_space.n)
# return true if (observ, action) exists in epi
def in_episode(epi, observ, action):
  for s, a in zip(*epi):
    if s['observation'] == observ and a == action:
      return True
  return False

def generate_episode(policy_func=get_random_action):
  episode = list()
  actions = list()
  frames = list()
  step = generate_start_step()
  frames.append(env.render(mode='ansi'))
  episode.append(step)
  while step['step_type'] != STEPTYPE_LAST:
    action = policy_func(step)
    step = generate_next_step(step, action)
    frames.append(env.render(mode='ansi'))
    episode.append(step)
    actions.append(action)
  return episode, actions, frames

from IPython.display import clear_output
from time import sleep
def print_frames(frames):
  for i, frame in enumerate(frames):
    clear_output(wait=True)
    print(frame)
    sleep(.2)

maxiter = 100000
gamma = 1
epsilon = 0.3
lr_rate = 0.8

for _ in range(maxiter):
  # starting step
  step = generate_start_step()
  action = get_random_action(step)
  done = False
  while not done:
    next_step = generate_next_step(step, action)
    if next_step['step_type'] == STEPTYPE_LAST:
      state = step['observation']
      idx1 = (state, action)
      Q[idx1] = Q[idx1] + lr_rate * (next_step['reward'] - Q[idx1])
      done = True
    else:
      best_action = get_greedy_action(next_step)
      state = step['observation']
      next_state = next_step['observation']
      idx1 = (state, action)
      idx2 = (next_state, best_action)
      Q[idx1] = Q[idx1] + lr_rate * ((next_step['reward'] + gamma * Q[idx2]) - Q[idx1])
      next_action = get_eps_soft_action(step)
      step = next_step
      action = next_action

epi, actions, frames = generate_episode(policy_func=get_greedy_action)
print_frames(frames)

## clifwalking

!pip install gym[atari,toy_text,accept_rom_license]

import numpy as np
import random
import gym
from IPython.display import clear_output
from time import sleep

env = gym.make('CliffWalking-v0', new_step_api=True)

# step types
STEPTYPE_FIRST = 0
STEPTYPE_MID = 1
STEPTYPE_LAST = 2

# Q-table initialization
Q = np.zeros((env.observation_space.n, env.action_space.n))

# wrapper for gym's cliff walking environment
def generate_start_step():
    return {'observation': env.reset(), 'reward': 0., 'step_type': STEPTYPE_FIRST}

def generate_next_step(step, action):
    obs, reward, done, _, info = env.step(action)
    step_type = STEPTYPE_LAST if done else STEPTYPE_MID
    return {'observation': obs, 'reward': reward, 'step_type': step_type}

epsilon = 0.1
lr_rate = 0.8
gamma = 1

def get_eps_soft_action(step):
    if random.random() < epsilon:
        return np.random.choice(env.action_space.n, 1)[0]
    else:
        observ = step['observation']
        return np.argmax(Q[observ])

def get_greedy_action(step):
    observ = step['observation']
    return np.argmax(Q[observ])

def get_random_action(step):
    return random.randint(0, env.action_space.n - 1)

def print_frames(frames):
    for i, frame in enumerate(frames):
        clear_output(wait=True)
        print(frame)
        sleep(.2)

def q_learning(maxiter):
    for _ in range(maxiter):
        # starting step
        step = generate_start_step()
        action = get_random_action(step)
        done = False
        while not done:
            next_step = generate_next_step(step, action)
            if next_step['step_type'] == STEPTYPE_LAST:
                state = step['observation']
                idx1 = (state, action)
                Q[idx1] = Q[idx1] + lr_rate * (next_step['reward'] - Q[idx1])
                done = True
            else:
                best_action = get_greedy_action(next_step)
                state = step['observation']
                next_state = next_step['observation']
                idx1 = (state, action)
                idx2 = (next_state, best_action)
                Q[idx1] = Q[idx1] + lr_rate * ((next_step['reward'] + gamma * Q[idx2]) - Q[idx1])
                next_action = get_eps_soft_action(step)
                step = next_step
                action = next_action

# Run Q-learning algorithm
max_iterations = 100000
q_learning(max_iterations)

# Generate and print an episode using the learned Q-values
def generate_episode(policy_func=get_greedy_action):
    episode = list()
    actions = list()
    frames = list()
    step = generate_start_step()
    frames.append(env.render(mode='ansi'))
    episode.append(step)
    while step['step_type'] != STEPTYPE_LAST:
        action = policy_func(step)
        step = generate_next_step(step, action)
        frames.append(env.render(mode='ansi'))
        episode.append(step)
        actions.append(action)
    return episode, actions, frames

# Display an episode using the learned Q-values
epi, actions, frames = generate_episode(policy_func=get_greedy_action)
print_frames(frames)


epi, actions, frames = generate_episode(policy_func=get_greedy_action)
print_frames(frames)


Collecting ale-py~=0.7.5 (from gym[accept_rom_license,atari,toy_text])
  Downloading ale_py-0.7.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pygame==2.1.0 (from gym[accept_rom_license,atari,toy_text])
  Downloading pygame-2.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m18.3/18.3 MB[0m [31m24.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pygame, ale-py
  Attempting uninstall: pygame
    Found existing installation: pygame 2.5.2
    Uninstalling pygame-2.5.2:
      Successfully uninstalled pygame-2.5.2
Successfully installed ale-py-0.7.5 pygame-2.1.0


See here for more information: https://www.gymlibrary.ml/content/api/[0m
  deprecation(


+---------+
|[35mR[0m: | : :G|
| : | : : |
| : : : : |
| | : | : |
|Y| : |[34;1mB[0m:[43m [0m|
+---------+


+---------+
|[35mR[0m: | : :G|
| : | : : |
| : : : : |
| | : | : |
|Y| : |[34;1m[43mB[0m[0m: |
+---------+
  (West)

+---------+
|[35mR[0m: | : :G|
| : | : : |
| : : : : |
| | : | : |
|Y| : |[42mB[0m: |
+---------+
  (Pickup)

+---------+
|[35mR[0m: | : :G|
| : | : : |
| : : : : |
| | : |[42m_[0m: |
|Y| : |B: |
+---------+
  (North)

+---------+
|[35mR[0m: | : :G|
| : | : : |
| : : :[42m_[0m: |
| | : | : |
|Y| : |B: |
+---------+
  (North)

+---------+
|[35mR[0m: | : :G|
| : | : : |
| : :[42m_[0m: : |
| | : | : |
|Y| : |B: |
+---------+
  (West)

+---------+
|[35mR[0m: | : :G|
| : | : : |
| :[42m_[0m: : : |
| | : | : |
|Y| : |B: |
+---------+
  (West)

+---------+
|[35mR[0m: | : :G|
| :[42m_[0m| : : |
| : : : : |
| | : | : |
|Y| : |B: |
+---------+
  (North)

+---------+
|[35mR[0m:[42m_[0m| : :G|
| : | : : |
| : : : : |
| | : | : |
|Y| : |