Make sure there's this file, policy.npy and dice_neighbours.npy in the same folder before running.

Load required libraries.

In [1]:
import pickle
import numpy as np
from itertools import combinations_with_replacement, product
from copy import deepcopy
import random

import Yahtzee  # LETS GO :))))

This program creates the model for Yahtzee game.


For simplified Yahtzee game where there are only 7 categories, there are a total of:
- 3 rerolls
- 2^7 ways to fill categories
- 10C5 unique dice combinations
For a total of 96,768 states.


The IDs of each unique dice combination is simply the index of the combinations list.
Fortunately, these are already all sorted in ascending order of die faces.
Mapping between them is done with dictionaries.

In [2]:
dice_combinations = list(combinations_with_replacement(range(1, 7), 5))
num_of_combinations = len(dice_combinations)  # 252

id_to_combination = {}
combination_to_id = {}
for id in range(num_of_combinations):
    id_to_combination[id] = dice_combinations[id]
    combination_to_id[dice_combinations[id]] = id

There are total of 2^5 = 32 ways of selecting dice to reroll.
Since (0,0,0,0,0), or selecting no die for reroll is illegal, it is not accounted for.

This will also be used for action space later. Simplest way to generate them is buy converting integers to bits.

In [3]:
num_of_selections = 32
selections = []
for i in range(1, 32):
    select_combination = (i >> 4, i >> 3 & 1, i >> 2 & 1, i >> 1 & 1, i & 1)
    selections.append(select_combination)

Now it is time for transition functions.


Each dice rolled may lead to 6 other states for dice, exponentially increasing per dice rerolled.
That means, for every state, there are 31 reroll combinations and each combination leads to
6^number of selected dice combinations.

The resulting combination however is not unique to selection combination, for example:
- Current dice: 1,1,2,2,3
  - Reroll: 1,0,1,0,0, Result: 4,1,4,2,3 -> 1,2,3,4,4                
  - Reroll: 0,1,0,1,0, Result: 1,4,2,4,3 -> 1,2,3,4,4

This is why the probability of reaching a certain combination is not simply just (1/6)^dice rolled.
There may be some clever math to work it out, but we have computer, memory and time, (most importantly, I'm too stupid)
so why not just count the whole thing and then compute the probability?

In [4]:
dice_neighbours = np.empty(252, dtype=object)

for combination in dice_combinations:  # total of 252 combinations

    # print("Currently looking at: ", combination)

    transitions = []

    for selection in selections:  # total of 31 ways to select

        new_combinations = {}

        # Keep the kept faces.
        kept_faces = []
        for i in range(5):
            if selection[i] == 0:
                kept_faces.append(combination[i])
        
        # Generate all new combinations.
        num_of_selected_dice = sum(selection)
        new_faces = list(product(range(1, 7), repeat=num_of_selected_dice))
        total = len(new_faces)
        for new_face in new_faces:
            new_combination = sorted(kept_faces + list(new_face))
            try:
                new_combinations[tuple(new_combination)] += (1 / total)
            except KeyError:
                new_combinations[tuple(new_combination)] = (1 / total)
        
        # Converting to list for easier loop operations in future.
        transition = []
        for key in new_combinations.keys():
            pair = (key, new_combinations[key])
            transition.append(pair)
        
        transitions.append((selection, transition))

        # print("Action completed for: ", selection)
    
    id = combination_to_id[combination]
    dice_neighbours[id] = transitions

np.save("dice_neighbours.npy", dice_neighbours)

In [4]:
with open('dice_neighbours.npy', 'rb') as g:
    dice_neighbours = np.load(g, allow_pickle=True)

In [5]:
# print(dice_neighbours[0][30])
len(dice_neighbours[0][30][1])

252

Now define the action space.

The possible actions in each space are:
- Select dices to roll (31 ways)
- Write in to one of the category (7 options)

For a total of 38 actions.

They are indexed from 0 ~ 43, where 0 ~ 30 are the dice combinations, 31~37 are selecting each of the 13 categories in ascending order. For reference, the categories are:
- 31: Three-of-a-Kind
- 32: Four-of-a-Kind
- 33: Full House
- 34: Small Straight
- 35: Large Straight
- 36: Yahtzee
- 37: Chance

In [6]:
num_of_actions = 38
category_names = ["Three-of-a-Kind", "Four-of-a-Kind", "Full House", "Small Straight", "Large Straight", "Yahtzee", "Chance"]
id_to_action = {}
action_to_id = {}
for id in range(31):
    id_to_action[id] = selections[id]
    action_to_id[selections[id]] = id
for id in range(7):
    id_to_action[id + 31] = category_names[id]
    action_to_id[category_names[id]] = id + 31

In [7]:
# Test
print(id_to_action[20], id_to_action[37])
print(action_to_id[(1,1,0,0,1)], action_to_id["Three-of-a-Kind"])

(1, 0, 1, 0, 1) Chance
24 31


State is partitioned as such:
- 3 rerolls
    - 128 categories
        - 252 dice combinations

Write a parser for index and encoder for states.

In [8]:
# Bits are read from left to right.
def parse_state(index):
    rerolls_left, temp = divmod(index, 32256)  # [0 ~ 2]
    categories, dice_combination = divmod(temp, 252)
    return rerolls_left, categories, dice_combination


def get_state_id(state):
    rerolls_left, categories, dice_combination = state
    id = rerolls_left * 32256 + categories * 252 + dice_combination
    return id

In [9]:
print(parse_state(4246))
# print(id_to_category[4096])
# print(id_to_combination[0])
print(get_state_id((0, 16, 214)))

(0, 16, 214)
4246


Total of 2^7 possible combination of categories, index them similar to the dice selections.

In [10]:
category_combinations = []
for i in range(128):
    category = []
    for j in reversed(range(7)):
        category.append(i >> j & 1)
    category_combinations.append(category)

id_to_category = {}
category_to_id = {}
for id in range(128):
    id_to_category[id] = category_combinations[id]
    category_to_id[tuple(category_combinations[id])] = id

In [11]:
# Test
print(category_to_id[(1,0,0,0,1,1,1)])
print(id_to_category[71])

71
[1, 0, 0, 0, 1, 1, 1]


Next we define the reward function. Reward will be simply the score obtained from writing into the category. Since this is invariant of successor state, it is in the form of R(s, a)

May also give negative reward for giving up on current dice combination.

In [12]:
neg_multiplier = 0
pos_multiplier = 1

def calc_reward(state, action):
    reward = 0
    rerolls_left, categories, dice_combination = parse_state(state)
    if action <= 30:  # rolling dice
        pass
    else:  # writing in to category 
        category_index = action - 31
        parsed_dice = id_to_combination[dice_combination]
        score = Yahtzee.CATEGORIES_SCORING[category_index](np.array(parsed_dice))
        reward += score
    return reward

Now we finally model the game. For the total of 6,193,152 states, we create a transition function for each action: 
- For actions that roll the dice, it should transition to state where reroll count is reduced, categories are kept with new dice combination.
- For actions that write the category, it should transition to state with that category written, reroll reset with fresh dice combination.
- For illegal actions (rolling when no rerolls, writing to filled category), it will not be available in the state.


In [13]:
# Action check to see if it is legal.
def is_legal_action(state, action):
    rerolls_left, categories, dice_combination = parse_state(state)
    if action <= 30:  # rolling dice
        return rerolls_left > 0
    else:  # writing in to category
        category_index = action - 31
        return categories[category_index] == 0

In [14]:
# THE MODEL
yahtzee_model = np.empty(96768, dtype=object)
num_of_states = 96768

for state in range(num_of_states):

    state_model = []
    rerolls_left, categories, dice_combination = parse_state(state)
    
    # Model all actions to successor states with probability and reward.
    for action in range(num_of_actions):

        if action <= 30:  # rerolling dice
            if rerolls_left > 0:  # are there rerolls left?
                state_transition = []
                new_rerolls_left = rerolls_left - 1
                action_transition = dice_neighbours[dice_combination][action]
                reward = calc_reward(state, action)  # reward is independent of successor so compute outside to save calculations
                # Now we need to append the rerolls_left, categories and dice_combination for proper state number.
                for next_combination in action_transition[1]:
                    next_combination_id = combination_to_id[next_combination[0]]
                    next_state_id = get_state_id((new_rerolls_left, categories, next_combination_id))
                    next_state_prob = next_combination[1]
                    state_transition.append((next_state_id, next_state_prob, reward))
                state_model.append((action, state_transition))

        else:  # writing in to category
            category_index = action - 31
            parsed_categories = id_to_category[categories]
            if parsed_categories[category_index] == 0:  # is category available?
                state_transition = []
                new_rerolls_left = 2
                # Updating category.
                new_category = list(parsed_categories)
                new_category[category_index] = 1
                # print(new_category)
                new_category = tuple(new_category)
                # Get the id of category to encode to state.
                new_category = category_to_id[new_category]
                # print(categories, parsed_categories, new_category)
                action_transition = dice_neighbours[0][30]  # reroll everything
                reward = calc_reward(state, action)
                # Now we need to append the rerolls_left, categories and dice_combination for proper state number.
                for next_combination in action_transition[1]:
                    next_combination_id = combination_to_id[next_combination[0]]
                    next_state_id = get_state_id((new_rerolls_left, new_category, next_combination_id))
                    next_state_prob = next_combination[1]
                    state_transition.append((next_state_id, next_state_prob, reward))
                state_model.append((action, state_transition))
    yahtzee_model[state] = state_model
    # return state_model
    # print("Finished state: ", state)
    
np.save("yahtzee_model.npy", yahtzee_model)

In [13]:
with open('yahtzee_model.npy', 'rb') as f:
    yahtzee_model = np.load(f, allow_pickle=True)

Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x000002A822526F70>>
Traceback (most recent call last):
  File "c:\Users\opand\miniconda3\lib\site-packages\ipykernel\ipkernel.py", line 770, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(
KeyboardInterrupt: 


In [85]:
print(yahtzee_model[32255])
print(parse_state(32255))
print(id_to_category[127])

[]
(0, 127, 251)
[1, 1, 1, 1, 1, 1, 1]


In [14]:
# Hyperparameters
gamma = 0.95
theta = 0.0001
max_iter = 10

In [84]:
# Setting up for policy iteration
num_of_states = 96768
values = np.zeros(num_of_states, dtype=float)
policy_function = np.zeros(num_of_states, dtype=int)
print(values.shape, policy_function.shape)

(96768,) (96768,)


In [86]:
# Random initialization of policies.
for state in range(num_of_states):
    state_model = yahtzee_model[state]  # [(action, [(state, prob, reward)])]
    avail_actions = len(state_model)
    random_action = 39
    if avail_actions != 0:
        random_action = random.choice(range(avail_actions))
    policy_function[state] = random_action

In [None]:
print(policy_function[4246], policy_function[82426])

In [None]:
for iteration in range(max_iter):

    # values = np.zeros(num_of_states, dtype=float)
    unchanged = True
    
    # Policy evaluation.
    for state in range(num_of_states):
        action_index = policy_function[state]
        if action_index == 39:  # terminal state
            continue
        q_val = 0.0
        policy_action = yahtzee_model[state][policy_function[state]]
        # print(policy_action[0])
        for transition in policy_action[1]:
            next_state = transition[0]
            prob = transition[1]
            reward = transition[2]
            # print(next_state)
            # print(values[next_state])
            val = prob * (reward + gamma * values[next_state])
            q_val = q_val + val
        values[state] = q_val
    print("Policy evaluated for iteration: ", iteration)
    
    # Policy improvement
    for state in range(num_of_states):
        state_model = yahtzee_model[state]  # [(action, [(state, prob, reward)])]
        best_action = policy_function[state]
        index = 0
        for action_transition in state_model:
            action = action_transition[0]
            transitions = action_transition[1]
            q_val = 0.0
            
            for transition in transitions:
                next_state = transition[0]
                prob = transition[1]
                reward = transition[2]
                val = prob * (reward + gamma * values[next_state])
                q_val = q_val + val
            if q_val > values[state] * (1 + theta):
                best_action = index
                print("Changed state ", state, " action to ", best_action)
                unchanged = False
            index = index + 1

    if unchanged:
        print("BREAK")
        break
    
    print("Iteration done: ", iteration)

Policy evaluated for iteration:  0
Iteration done:  0
Policy evaluated for iteration:  1
Iteration done:  1
Policy evaluated for iteration:  2
Iteration done:  2
Policy evaluated for iteration:  3


In [102]:
# Convert index to proper actions.
policy = np.zeros(num_of_states, dtype=int)
for state in range(num_of_states):
    action_index = policy_function[state]
    if action_index == 39:  # terminal state
        policy[state] = 39
        continue
    action = yahtzee_model[state][action_index][0]
    policy[state] = action  
np.save("policy.npy", policy)
np.save("policy_function.npy", policy_function)

In [103]:
print(np.reshape(policy, (384, 252)))

[[31 37 35 ... 35 31 32]
 [34 34 34 ... 32 32 31]
 [37 34 32 ... 34 35 34]
 ...
 [ 4  1  1 ...  1  6  5]
 [ 1 29 11 ...  4 10 26]
 [ 0  1 26 ... 21 27  5]]
