In [1]:
import copy

import numpy as np
#first some imports
import torch
torch.set_default_dtype(torch.float64)  # double precision for numerical stability

import matplotlib.pyplot as plt

import pyro
import pyro.distributions as dist
import pyro.poutine as poutine

from sklearn.neighbors import KernelDensity
from sklearn.model_selection import GridSearchCV

In [2]:
def Infer(sampling_func, num_samples=50):
    """
    Estimate a distribution by importance sampling
    """
    return(pyro.infer.EmpiricalMarginal(
        pyro.infer.Importance(
            sampling_func, num_samples=num_samples).run()))

# 0. Game environment

In [118]:
# colors
GREEN = 0
YELLOW = 1
WHITE = 2
BLUE = 3
RED = 4
ALL_COLORS = [GREEN, YELLOW, WHITE, BLUE, RED]

# ranks
ONE = 0
TWO = 1
THREE = 2
FOUR = 3
FIVE = 4
ALL_RANKS = [ONE, TWO, THREE, FOUR, FIVE]

# actions
HINT_COLOR = 0
HINT_NUMBER = 1
PLAY = 2
DISCARD = 3

# misc.
HINT_MAX = 7
LIFE_MAX = 3
COUNT = [3,2,2,2,1]

def init_context(num_player, num_card):
    return {'life_tokens': LIFE_MAX, # game ends if ==0
            'information_tokens': HINT_MAX,
            # top of played cards 0-index based (-1 means no card for the color played)
            'fireworks': {GREEN: -1, YELLOW: -1, WHITE: -1, BLUE: -1, RED: -1},  
            # trashed/fasely played in forms of e.g. {'color':BLUE, 'rank':ONE}
            'discard_pile': [],  
            # specific realisation allowed by knowledge, used for update step
            'hand': [[{'color':None, 'rank':None} for c in range(num_card)] for p in range(num_player)],
            # num.instance for [player][card][color][rank]
            'knowledge': [[[COUNT for col in ALL_COLORS]
                           for card in range(num_card)] for plyr in range(num_player)],
            # possible actions: HINT_COLOR, HINT_NUMBER, PLAY, DISCARD
            'last_action': {'type':None, 'pnr':None, 'cnr':None, 'color':None, 'rank':None} 
           }


def CardPlayable(card, fireworks):
    # rank should be one high up than the current highest to be played
    if fireworks[card['color']] +1 == card['rank']:
        return True
    else:
        return False

# Card is not needed anymore in the future
def CardUseless(card, fireworks):
    if fireworks[card['color']] > int(card['rank']):
            return True
    else:
        return False

def remaining_copies(card, discard_pile):
    if card['rank'] == ONE:  # rank one
        total_copies = 3
    elif card['rank'] == FIVE:  # rank five
        total_copies = 1
    else:
        total_copies = 2
    
    # count how many of the sort given by `card` is discarded
    count = 0
    for discarded in discard_pile:
        col, rank = discarded['color'], discarded['rank']
        if (col == card['color']) and (rank == card['rank']):
            count += 1

    return total_copies - count

Demonstrate usuage

In [69]:
CardPlayable({'color':BLUE, 'rank':ONE}, 
             {GREEN: -1, YELLOW: -1, WHITE: -1, BLUE: 0, RED: -1})

False

In [119]:
# discard piles
dp = [{'color':BLUE, 'rank':ONE}, {'color':BLUE, 'rank':TWO}, {'color':WHITE, 'rank':ONE}]

remaining_copies({'color':BLUE, 'rank':TWO}, dp)

1

In [193]:
num_player = 3 
num_card = 2
context = init_context(num_player, num_card)

context

{'life_tokens': 3,
 'information_tokens': 7,
 'fireworks': {0: -1, 1: -1, 2: -1, 3: -1, 4: -1},
 'discard_pile': [],
 'hand': [[{'color': None, 'rank': None}, {'color': None, 'rank': None}],
  [{'color': None, 'rank': None}, {'color': None, 'rank': None}],
  [{'color': None, 'rank': None}, {'color': None, 'rank': None}]],
 'knowledge': [[[[3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1]],
   [[3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1]]],
  [[[3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1]],
   [[3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1]]],
  [[[3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1]],
   [[3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1],
    [3, 2, 2, 2, 1]]]],
 'last_action': {'type'

In [192]:
def update_game(context, last_action):
    '''
    Adapt context accordingly based on last_action
    '''
    nc = copy.deepcopy(context)
    # update last_action
    nc['last_action'] = copy.deepcopy(last_action)

    # 1) PLAY
    if last_action['type'] == 'PLAY':
        # i) update other stuff (hint token, error token, discard piles, fireworks)
        card = {'color':last_action['color'],
                         'rank':last_action['rank']}
        # if played successfully
        if CardPlayable(card, context['fireworks']):
            # hint token
            if (last_action['rank']  == FIVE) and ( # +1 hint for completing the deck
                nc['information_tokens'] < HINT_MAX): # if hint token isn't full
                nc['information_tokens'] += 1
            # fireworks
            nc['fireworks'][last_action['color']] = last_action['rank']
        # wrong playing
        else:
            # error token
            nc['life_tokens'] -= 1
            # discard piles
            nc['discard_pile'].append(card)
        
        # ii) update knowledge
        # for a player that played the card, any cards newer than played card shift down by 1
        for i in range(last_action['cnr'], num_card-1):
            # knowledge[player][card]
            nc['knowledge'][last_action['pnr']][i] = context['knowledge'][last_action['pnr']][i+1]
        # draw a new card (highest card index, which is num_card-1)
        nc['knowledge'][nc['last_action']['pnr']][num_card-1] = [[remaining_copies({'color':col, 'rank':rank}, nc['discard_pile'])
                                                for rank in ALL_RANKS] for col in ALL_COLORS]
                
                       
    
#     elif nc['last_action']['type'] == 'DISCARD':
# TODO: ask if Bianca could finish this function?
    
    return nc

demonstrate usuage

In [194]:
# falsely play blue 2
context = copy.deepcopy(update_game(context, {'type':'PLAY', 'pnr':1, 'cnr':0, 'color':BLUE, 'rank':TWO}) )
print('i)', context)
context = copy.deepcopy(update_game(context, {'type':'PLAY', 'pnr':1, 'cnr':0, 'color':BLUE, 'rank':ONE}) )
print('ii)', context)

i) {'life_tokens': 2, 'information_tokens': 7, 'fireworks': {0: -1, 1: -1, 2: -1, 3: -1, 4: -1}, 'discard_pile': [{'color': 3, 'rank': 1}], 'hand': [[{'color': None, 'rank': None}, {'color': None, 'rank': None}], [{'color': None, 'rank': None}, {'color': None, 'rank': None}], [{'color': None, 'rank': None}, {'color': None, 'rank': None}]], 'knowledge': [[[[3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1]], [[3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1]]], [[[3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1]], [[3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 1, 2, 2, 1], [3, 2, 2, 2, 1]]], [[[3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1]], [[3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1], [3, 2, 2, 2, 1]]]], 'last_action': {'type': 'PLAY', 'pnr': 1, 'cnr': 0, 'color': 3, 'rank': 1}}
ii) {'life_tokens': 2, 'information_tok

# 1. Marginalise

### Auxilary functions

In [189]:
def get_realisation_indicies(num_player, num_card):
    '''
    Aux. function emul_marg(). 
    Returns indicies which has a dimension of num_player x num_card x 4. For each card of each player,
    a vector of with length 4 is decided. The first two elements represent player and card indicies.
    The following elements are sampled randomly which choose color and rank of the card respectively.
    '''
    mya = [[[pi,i] for i in range(num_card)]  for pi in range(num_player)]  # (num_player, num_card, 2)
    realisation_i = np.random.randint(low=0, high=5, size=(num_player,num_card,2)) 
    return np.concatenate((mya, realisation_i), axis=2)

### emul_marg()

In [191]:
def emul_marg(last_action, context, player_index, card_index):
    '''
    :param last_action: dict, information about type, player, card, color and rank of the last action
    :param context: dict, general game situations and knowledge structure
    :param player_index: int, 0-based index of player for which the intention should be updated
    :param card_index: int, 0-based index of card " "
    :returns : (,3) np.array, which is a simplex with probability for each sort of intention (PLAY, DISCARD, KEEP)
    '''
    
    # Initialize
    threshold = 20  # num. of realisations to iterate over. Ideally, sum over all realisations
    # plyr(4)*card(4)*col(5)*rank(5) = 400 realisations
    # TODO: maybe also implement time limit (Minseok) 
    samples = np.zeros((threshold, 3)) # each sample is a simplex with 3 categories (play, discard, keep)
    
    # Restrict possible realisations by emulating last action
    my_knowledge = copy.deepcopy(update_game(action, context)['knowledge'])
    
    for sample_i in range(threshold):
        # Use rejection sampling
        while True:
            # single instance of a complete realisation indicies (choose reali. for each card for each plyr)          
            c_r_i = get_realisation_indicies(num_player, num_card)
            # numerator: num. instance for a chosen realisation of a chosen card
            # denominator: total num. possible instance for a chosen card
            probs = [my_knowledge[c_r_i[pi,i,0]][c_r_i[pi,i,1]][c_r_i[pi,i,2]][c_r_i[pi,i,3]] /
                          np.sum(my_knowledge[pi][i])
                              for pi in range(num_player) for i in range(num_card)]
            
            # Reject samples that cannot happen
            if not np.all(probs > 0):  # rejected
                pass # so go back into infinite loop again
            else:  # accept since no cards with p=0
                # Proceed with choosing r
                realisation = copy.deepcopy(estimated_board)
                realisation['hand'] = [  # fix color and rank for each card for each plyr
                [{'color': ci[pi,i], 'rank': ri[pi,i]} for pi in range(num_player)] for i in range(num_card)]  
                
                # Collect P(r|a,c) * P(i|r) for single r given 
                samples[sample_i] = np.exp(np.sum(np.log(probs))) * pragmatic_listener( # P(r|a,c)
                    realisation, card_index, player_index)  # P(i|r)
                break # break the infinite loop
    
    # Approximate density when enough samples are there
    grid = GridSearchCV(KernelDensity(), # use CV to choose best bandwidth for smoothing
                        {'bandwidth': np.logspace(-1, 1, 20)})  
    grid.fit(samples)
    kde = grid.best_estimator_
    
    # TODO: compute median (50% quantile) A bit tricky since we have a categorical dist.
#     return kde.score_samples()


# TODO: instead of rejection sampling, consider MCMC. This also solves the problem of mean estimate automatically.
# https://docs.pymc.io/api/inference.html?highlight=sample_ppc
                
                
                
                
                
        
#         # Collect samples which approximate P(r|a,c)  
#         while True: # use rejection sampling (samples is valid only if p>0)
#             log_prob = np.zeros(num_player, num_card)  # collect log_prob for each card
#             # Sample indicies for choosing a realisation (r)
#             ci, ri = np.random.randint( # color, rank indicies for every hand of every plyr
#                 low=0, high=5, size=(2, num_player, num_card)) 
#             # Assuming independence, sum over log_prob of all cards
#             for pi in range(num_player): 
#                 for i in range(num_card):
#                     numer = my_knowledge[pi][i][ci[pi,i]][ri[pi,i]] # num. instance of the specific reali.
#                     denom = np.sum(my_knowledge[pi][i]) # total num. instance for the card
#                     log_prob[pi][i] = np.log(numer/denom)
#             if np.all(np.exp(log_prob) != 0): # sample is valid only if no summand has 0 prob
#                 break  # TODO: prolly lots of samples get rejected here, more efficient sampling needed

#         # Choose a realisation (r)
#         realisation = copy.deepcopy(estimated_board)
#         realisation['hand'] = [  # fix color and rank for each card
#             [{'color': c_r_i[pi,i,2], 'rank': c_r_i[pi,i,3]} 
#              for pi in range(num_player)] for i in range(num_card)]          
#         print('realisation', realisation)

#         # Compute a sample of P(i|a,c) = P
#         samples.append(np.exp(log_prob) * 
#                        pragmatic_listener(realisation, card_index, player_index))  # vector of dim 3 gets appended

#         # return mean_posterior(approx_density(samples))  # TODO: mean of density of samples
#         # https://docs.pymc.io/api/inference.html?highlight=sample_ppc
#     # 7. compute mean_posterior of samples 
#     grid = GridSearchCV(KernelDensity(), {'bandwidth': np.logspace(-1, 1, 20)})  # use CV to choose best bandwidth
#     grid.fit(samples)
#     kde = grid.best_estimator_
    
#     return np.max(KernelDensity(kernel='gaussian', bandwidth=0.1).fit(samples))
#     # use MLE for the moment

snippet demonstrating how density estimation could be done using sklearn

In [None]:
samples = [[0.1, 0.2, 0.7], [0.11, 0.19, 0.7], [0.15, 0.2, 0.65], [0.14, 0.21, 0.65], [0.12, 0.2, 0.68]] # some fake samples, all simplex
grid = GridSearchCV(KernelDensity(), {'bandwidth': np.logspace(-1, 1, 20)})  # use CV to choose best bandwidth
grid.fit(samples)
kde = grid.best_estimator_
kde.score_samples([[0.05,0.25,0.7]])  # log_prob of a given sample
# one could in principle do grid search to find MLE

# 2. Pragmatic listener

In [None]:
def int_prior():
    # TODO: think about a proper prior, uniform at the moment
    sample = pyro.sample("utt", dist.Categorical(probs=torch.ones(3) / 3))
    if sample == 0:
        return 'play'
    elif sample ==1:
        return 'keep'
    else:
        return 'discard'

In [None]:
def pragmatic_listener(realisation, card_index, player_index):
    def sample_pl(context, card_index, player_index):
        intention = int_prior() # P(i)
        pyro.sample("pragmatic_speaker", # P(r|i)
                    pragmatic_speaker(intention, realisation, card_index, player_index), obs=action) 
        return intention
    return Infer(sample_pl(context,card_index))

# 3. Pragmatic speaker

In [185]:
def pragmatic_speaker(intention, realisation, player_index, card_index):
    alpha = 1 # TODO: fit(adjust) rationality parameter according to behaviour of coplyr
    
    # compute numerator
    numerator = utility(intention, realisation, card_index, player_index)
    numerator = np.exp(alpha * numerator)
    
    # compute denominator
    denominator = 0
    for r in possible_realisations:  # TODO: possible_realisations should come from emul_marg
        summand = utility(intention, r, card_index, player_index)
        summand = np.exp(alpha * summand)
        denominator += summand
        
    return numerator / denominator

In [188]:
pragmatic_speaker(intention='play',
                  realisation= reali,
                  player_index=0,
                  card_index=2
)

NameError: name 'possible_realisations' is not defined

# 4. Utility function (hand-crafted)

Utility as suggested by Saskia

In [150]:
def utility(intention, realisation, player_index, card_index):
    '''
    return a utility for a given card (decided by card_index, player_index) of a given realisation from various realisations
    '''
    score = 0
    card = copy.deepcopy(realisation['hand'][player_index][card_index])  # select a card

    if intention == 'play':
        # in intention is play and card is playable, 
        # this results in one more card on the fireworks.
        # reward this.
        if CardPlayable(card, realisation['fireworks']):
            score += 10

        # if intention is play and card is not playable at the time
        else:
            # punish loosing a card from stack
            score -= 1
            # and punish getting a bomb depending on the number of bombs
            if realisation['life_tokens'] == 3:
                score -= 1
            elif realisation['life_tokens'] == 2:
                score -= 3
            elif realisation['life_tokens'] == 1: # game would end directly
                score -= 25

            # if card would still have been relevant in the future,
            # punish loosing it depending on the remaining copies of this card in the deck
            if not CardUseless(card, realisation['fireworks']):
                if remaining_copies(card, realisation['discard_pile']) == 2:
                    score -= 1
                elif remaining_copies(card, realisation['discard_pile']) == 1:
                    score -= 2
                elif remaining_copies(card, realisation['discard_pile']) == 0:
                    score -= 5


    elif intention == 'discard':
        # punish loosing a card from stack
        score -= 1

        # reward gaining a hint token:
        score += 0.5

        # punish discarding a playable card
        if CardPlayable(card, realisation['fireworks']):
            score -= 5

        # if card is not playable right now but would have been relevant in the future, punish
        # discarding it depending on the number of remaining copies in the game
        elif not CardUseless(card, realisation['fireworks']):
            if remaining_copies(card, realisation['discard_pile']) == 2:
                score -= 1
            elif remaining_copies(card, realisation['discard_pile']) == 1:
                score -= 2
            elif remaining_copies(card, realisation['discard_pile']) == 0:
                score -= 5

        # do we want to reward this additionally? I think rewarding gaining a hint token should be
        # enough, so nothing happens here
        elif CardUseless(card, realisation['fireworks']):
            pass

    elif intention == 'keep':
        # keeping a playable card is punished, because it does not help the game
        if CardPlayable(card, realisation['fireworks']):
            score -= 2

        # if card is not playable right now but is relevant in the future of the game reward keeping
        # this card depending on the remaining copies in the game
        elif not CardUseless(card, realisation['fireworks']):
            if remaining_copies(card, realisation['discard_pile']) == 2:
                score += 1
            elif remaining_copies(card, realisation['discard_pile']) == 1:
                score += 2
            elif remaining_copies(card, realisation['discard_pile']) == 0:
                score += 5

        # punish keeping a useless card
        elif CardUseless(card, realisation['fireworks']):
            score -= 1

    return score

example

In [177]:
# knowledge_structure = copy.deepcopy(context['knowledge'])
# knowledge_structure[0] = [  # of player 0
#     [[2,0,2,2,1], [0,0,2,2,1], [2,0,2,2,1], [2,0,2,2,1], [2,0,2,2,1]], # card 0
#     [[0,0,2,2,0], [2,0,2,1,1], [2,0,0,0,0], [2,0,2,2,1], [2,0,0,0,0]]] # card
# # unrealistic but suppose every plyr have same card possibilities for simplicity
# knowledge_structure[1] = copy.deepcopy(knowledge_structure[0])
# knowledge_structure[2] = copy.deepcopy(knowledge_structure[0])
# # plyr 1 played their first card, which was Blue 3
# last_action = {'type':'PLAY', 'pnr':1, 'cnr':0, 'col':'B', 'num':3} 


reali = {'life_tokens': 3, # game ends if ==0
         'information_tokens': 7, # hint token
         'fireworks': {RED: THREE, YELLOW: TWO, GREEN: FOUR, WHITE: -1, BLUE: -1},
         'discard_pile': [{'color':BLUE,'rank':ONE}, {'color':BLUE,'rank':ONE}, {'color':BLUE,'rank':THREE}],
         'hand': [
             [{'color':BLUE,'rank':TWO}, {'color':RED,'rank':ONE}],  # 3 plyr x 2 card
             [{'color':BLUE,'rank':FOUR}, {'color':YELLOW,'rank':ONE}],
             [{'color':BLUE,'rank':ONE}, {'color':WHITE,'rank':THREE}]],
         'knowledge_structure': None  # doesn't matter for utility
        }

In [178]:
reali

{'life_tokens': 3,
 'information_tokens': 7,
 'fireworks': {4: 2, 1: 1, 0: 3, 2: -1, 3: -1},
 'discard_pile': [{'color': 3, 'rank': 0},
  {'color': 3, 'rank': 0},
  {'color': 3, 'rank': 2}],
 'hand': [[{'color': 3, 'rank': 1}, {'color': 4, 'rank': 0}],
  [{'color': 3, 'rank': 3}, {'color': 1, 'rank': 0}],
  [{'color': 3, 'rank': 0}, {'color': 2, 'rank': 2}]],
 'knowledge_structure': None}

In [182]:
utility('play', reali, 2, 0)  # BLUE ONE in this example

10