# Problem 1 - Monty Hall

In [23]:
import numpy as np

from typing import Optional


np.random.seed(0)


GAMES = 2 * 10 ** 5
DOOR = {
    'Left': 0,
    'Mid': 1,
    'Right': 2
}


class MHGame():
    def __init__(self, size_n: np.int32, p: np.ndarray = np.array([1 / 3, 1 / 3, 1 / 3])) -> None:
        self.__size_n = size_n
        self.__prob = p
        self.__prize = np.array([np.random.choice([DOOR['Left'], DOOR['Mid'], DOOR['Right']], p = self.__prob) for _ in range(size_n)])


    def validate(self, model: callable, verbose: bool = False) -> np.float32:  
        results = self.__simulate(self.__prize, model, self.__size_n)
        accuracy = np.mean(results == self.__prize)
        if verbose:
            print(f'{model.get_model_name()} model: Acc -> {100 * accuracy:.3f}%')
        return accuracy
    

    def __get_closed_doors(self, doors: np.ndarray, prize: np.ndarray, guesses: np.ndarray) -> np.ndarray:
        mask_doors = (doors != guesses[:, None])
        unchosen_doors = doors[mask_doors].reshape(-1, 2)
        second_door = np.where(np.isin(unchosen_doors, prize).any(axis = 1), prize, unchosen_doors[:, 0])

        mask_prob = np.zeros_like(unchosen_doors, dtype = np.float32)
        for i in range(2):
            mask_prob[:, i] = np.where(unchosen_doors[:, i] == DOOR['Left'], self.__prob[DOOR['Left']], unchosen_doors[:, i])
            mask_prob[:, i] = np.where(unchosen_doors[:, i] == DOOR['Mid'], self.__prob[DOOR['Mid']], self.__prob[DOOR['Right']])
        mask_prob_sum = mask_prob.sum(axis = 1)
        mask_prob = mask_prob / mask_prob_sum[:, None]

        random_picks = np.random.rand(len(mask_prob))
        unchosen_doors_picks = np.where(random_picks < mask_prob[:, 0], unchosen_doors[:, 0], unchosen_doors[:, 1])

        second_door = np.where(second_door == guesses, unchosen_doors_picks, second_door)
        closed_doors = np.column_stack([guesses, second_door])

        assert np.isin(closed_doors, prize).any(axis = 1).all(), 'No prize in closed doors'
        assert np.isin(closed_doors, guesses).any(axis = 1).all(), 'No guess in closed doors'
        assert np.where(closed_doors[:, 0] != closed_doors[:, 1], True, False).all(), 'Duplicate doors'

        return closed_doors


    def __simulate(self, prize: np.ndarray, model: callable, size_n: np.int32) -> np.ndarray:
        doors = np.array([DOOR['Left'], DOOR['Mid'], DOOR['Right']])
        doors = np.tile(doors, (size_n, 1))
        first_guesses = model(doors)
        closed_doors = self.__get_closed_doors(doors, prize, first_guesses)
        return model(closed_doors, first_guesses)


mhGame = MHGame(GAMES)

In [24]:
class BaselineModel():
    def __init__(self, name):
        self.__run = 0
        self.__name = name
    

    def get_model_name(self) -> str:
        return self.__name


    def __call__(self, doors: np.ndarray, first_guesses: Optional[np.ndarray] = None) -> np.ndarray:
        pred = self.predict(doors, first_guesses).reshape(-1, 1)
        assert pred.shape[0] == doors.shape[0], 'Invalid shape'

        validate = np.where(np.isin(doors, pred).any(axis = 1), True, False)
        assert validate.all(), 'Invalid door'

        self.__run += 1
        return pred.reshape(-1)
        

    def predict(self, doors: np.ndarray, first_guesses: Optional[np.ndarray] = None) -> np.ndarray:
        return doors[:, 0]


baselineModel = BaselineModel('Baseline')

In [25]:
baselineScore = mhGame.validate(baselineModel, verbose = True)

Baseline model: Acc -> 33.253%


# Pure randomness

In [26]:
class RandomModel(BaselineModel):
    def __init__(self, name):
        super().__init__(name)

    
    def first_guess(self, doors: np.ndarray) -> np.ndarray:
        output = np.zeros(doors.shape[0], dtype = np.int32)
        mask = np.random.choice([True, False, False], doors.shape[0])
        output = np.where(mask, doors[:, 0], output)
        
        picked = np.where(mask, True, False)
        mask = np.random.choice([True, False], doors.shape[0])
        output = np.where(mask & picked, doors[:, 1], doors[:, 2])
        return output
        

    def predict(self, doors: np.ndarray, first_guesses: Optional[np.ndarray] = None) -> np.ndarray:
        output = np.zeros(doors.shape[0], dtype = np.int32)
        #################################
        # Place for your code           #
        #################################
        return output
        

randomModel = RandomModel('Random')

In [27]:
randomScore = mhGame.validate(randomModel, verbose = True)

assert randomScore > baselineScore, 'Random model should be better than baseline'
assert randomScore > 0.49, 'Your model is too weak (49% accuracy is required)'
assert randomScore < 0.51, 'Your model is too strong (51% accuracy is required)'

Random model: Acc -> 50.011%


# Change model

In [28]:
class ChangeModel(RandomModel):
    def __init__(self, name):
        super().__init__(name)
        

    def predict(self, doors: np.ndarray, first_guesses: Optional[np.ndarray] = None) -> np.ndarray:
        output = np.zeros(doors.shape[0], dtype = np.int32)
        #################################
        # Place for your code           #
        #################################
        return output
        

changeModel = ChangeModel('Change')

In [29]:
changeScore = mhGame.validate(changeModel, verbose = True)

assert changeScore > randomScore, 'Change model should be better than random'
assert changeScore > 0.65, 'Your model is too weak (65% accuracy is required)'

Change model: Acc -> 66.570%


# Bayes network

In [30]:
from pgmpy.models import BayesianNetwork
from pgmpy.factors.discrete import TabularCPD


bayes_model = BayesianNetwork([('C', 'H'), ('P', 'H')])

In [31]:
cpd_contestant = TabularCPD(
    variable = 'C',
    variable_card = 3, 
    values = [[1 / 3], [1 / 3], [1 / 3]],
)

print(cpd_contestant)

+------+----------+
| C(0) | 0.333333 |
+------+----------+
| C(1) | 0.333333 |
+------+----------+
| C(2) | 0.333333 |
+------+----------+


In [32]:
cpd_prize = TabularCPD(
    #################################
    # Place for your code           #
    #################################
)

print(cpd_prize)

+------+----------+
| P(0) | 0.333333 |
+------+----------+
| P(1) | 0.333333 |
+------+----------+
| P(2) | 0.333333 |
+------+----------+


In [33]:
cpd_host = TabularCPD(
    #################################
    # Place for your code           #
    #################################
)

print(cpd_host)

+------+------+------+------+------+------+------+------+------+------+
| C    | C(0) | C(0) | C(0) | C(1) | C(1) | C(1) | C(2) | C(2) | C(2) |
+------+------+------+------+------+------+------+------+------+------+
| P    | P(0) | P(1) | P(2) | P(0) | P(1) | P(2) | P(0) | P(1) | P(2) |
+------+------+------+------+------+------+------+------+------+------+
| H(0) | 0.0  | 0.0  | 0.0  | 0.0  | 0.5  | 1.0  | 0.0  | 1.0  | 0.5  |
+------+------+------+------+------+------+------+------+------+------+
| H(1) | 0.5  | 0.0  | 1.0  | 0.0  | 0.0  | 0.0  | 1.0  | 0.0  | 0.5  |
+------+------+------+------+------+------+------+------+------+------+
| H(2) | 0.5  | 1.0  | 0.0  | 1.0  | 0.5  | 0.0  | 0.0  | 0.0  | 0.0  |
+------+------+------+------+------+------+------+------+------+------+


In [34]:
bayes_model.add_cpds(cpd_contestant, cpd_prize, cpd_host)

assert bayes_model.check_model()

In [35]:
from tqdm import tqdm
from pgmpy.inference import VariableElimination

class BayesModel(RandomModel):
    def __init__(self, name, model):
        super().__init__(name)
        self.__model = model
        self.__infer = VariableElimination(self.__model)
    

    def second_guess(self, doors: np.ndarray, first_guesses: np.ndarray) -> np.ndarray:
        output = np.zeros(doors.shape[0], dtype = np.int32)
        for i in tqdm(range(doors.shape[0]), desc = 'Second guess'):
            #################################
            # Place for your code           #
            #################################
            pass
        return output
        

    def predict(self, doors: np.ndarray, first_guesses: Optional[np.ndarray] = None) -> np.ndarray:
        output = np.zeros(doors.shape[0], dtype = np.int32)
        if first_guesses is None:
            output = self.first_guess(doors)
        else:
            output = self.second_guess(doors, first_guesses)
        output = output.T
        return output


bayesModel = BayesModel('Bayes', bayes_model)

In [36]:
bayesScore = mhGame.validate(bayesModel, verbose = True)

assert bayesScore > randomScore, 'Bayes model should be better than random'
assert bayesScore > 0.65, 'Your model is too weak (65% accuracy is required)'

Second guess:   1%|          | 1639/200000 [00:00<00:23, 8312.85it/s]

Second guess: 100%|██████████| 200000/200000 [00:22<00:00, 9019.24it/s]

Bayes model: Acc -> 66.609%





# Unfair Monty Hall

In [37]:
unfair_prob = np.array([0.6, 0.2, 0.4])
unfair_prob /= unfair_prob.sum()

mhGameUnfair = MHGame(GAMES, unfair_prob)

print('Unfair game')
baselineScoreUnfair = mhGameUnfair.validate(baselineModel, verbose = True)
randomScoreUnfair = mhGameUnfair.validate(randomModel, verbose = True)
changeScoreUnfair = mhGameUnfair.validate(changeModel, verbose = True)

Unfair game
Baseline model: Acc -> 49.730%
Random model: Acc -> 50.047%
Change model: Acc -> 69.117%


In [38]:
cpd_contestant_unfair = TabularCPD(
    variable = 'C',
    variable_card = 3, 
    values = list(unfair_prob.reshape(-1, 1).tolist()),
)

print(cpd_contestant_unfair)

+------+----------+
| C(0) | 0.5      |
+------+----------+
| C(1) | 0.166667 |
+------+----------+
| C(2) | 0.333333 |
+------+----------+


In [39]:
cpd_prize_unfair = TabularCPD(
    variable= 'P',
    variable_card = 3, 
    values = list(unfair_prob.reshape(-1, 1).tolist())
)

print(cpd_prize_unfair)

+------+----------+
| P(0) | 0.5      |
+------+----------+
| P(1) | 0.166667 |
+------+----------+
| P(2) | 0.333333 |
+------+----------+


In [40]:
cpd_host_unfair = TabularCPD(
    variable = 'H',
    variable_card = 3,
    values = [
        [0, 0, 0, 0, 3, 1, 0, 1, 3],
        [1, 0, 1, 0, 0, 0, 1, 0, 1],
        [2, 1, 0, 1, 2, 0, 0, 0, 0],
    ],
    evidence = ['C', 'P'],
    evidence_card = [3, 3]
)
cpd_host_unfair.normalize()

print(cpd_host_unfair)

+------+--------------------+------+-----+------+------+------+------+------+
| C    | C(0)               | C(0) | ... | C(1) | C(1) | C(2) | C(2) | C(2) |
+------+--------------------+------+-----+------+------+------+------+------+
| P    | P(0)               | P(1) | ... | P(1) | P(2) | P(0) | P(1) | P(2) |
+------+--------------------+------+-----+------+------+------+------+------+
| H(0) | 0.0                | 0.0  | ... | 0.6  | 1.0  | 0.0  | 1.0  | 0.75 |
+------+--------------------+------+-----+------+------+------+------+------+
| H(1) | 0.3333333333333333 | 0.0  | ... | 0.0  | 0.0  | 1.0  | 0.0  | 0.25 |
+------+--------------------+------+-----+------+------+------+------+------+
| H(2) | 0.6666666666666666 | 1.0  | ... | 0.4  | 0.0  | 0.0  | 0.0  | 0.0  |
+------+--------------------+------+-----+------+------+------+------+------+


In [41]:
bayes_model_unfair = BayesianNetwork([('C', 'H'), ('P', 'H')])
bayes_model_unfair.add_cpds(cpd_contestant_unfair, cpd_prize_unfair, cpd_host_unfair)

assert bayes_model_unfair.check_model()

print('Unfair game')
bayesModelUnfair = BayesModel('Bayes', bayes_model_unfair)
bayesScore = mhGameUnfair.validate(bayesModelUnfair, verbose = True)

Unfair game


Second guess: 100%|██████████| 200000/200000 [00:21<00:00, 9145.30it/s]

Bayes model: Acc -> 64.770%





# Can we do better?

In [42]:
def concat_output(ys, tag = None):
    ys = [y.rsplit('\n') for y in ys]
    big_y = []
    for i in range(len(ys[0])):
        big_y.append(' '.join([y[i] for y in ys]))
    return (tag + '\n' if tag else '') + '\n'.join(big_y)

In [45]:
infer = VariableElimination(bayes_model_unfair)
variables = ['P']
for contestant_door in range(3):
    score = 0
    total_prob = 0
    ys = []
    for host_door in range(3):
        if contestant_door == host_door:
            continue
        evidence = {
            'H': host_door,
            'C': contestant_door
        }
        y = infer.query(variables = variables, evidence = evidence)
        ys.append(y.__str__())
        max_idx = y.values.argmax()
        score += y.values[max_idx] * unfair_prob[max_idx]
        total_prob += unfair_prob[max_idx]

    score /= total_prob

    print(concat_output(ys, tag=f'Contestant Door: {contestant_door}'))
    print(f'Score: {100 * score:.3f}\n')

Contestant Door: 0
+------+----------+ +------+----------+
| P    |   phi(P) | | P    |   phi(P) |
| P(0) |   0.3333 | | P(0) |   0.6667 |
+------+----------+ +------+----------+
| P(1) |   0.0000 | | P(1) |   0.3333 |
+------+----------+ +------+----------+
| P(2) |   0.6667 | | P(2) |   0.0000 |
+------+----------+ +------+----------+
Score: 66.667

Contestant Door: 1
+------+----------+ +------+----------+
| P    |   phi(P) | | P    |   phi(P) |
| P(0) |   0.0000 | | P(0) |   0.8824 |
+------+----------+ +------+----------+
| P(1) |   0.2308 | | P(1) |   0.1176 |
+------+----------+ +------+----------+
| P(2) |   0.7692 | | P(2) |   0.0000 |
+------+----------+ +------+----------+
Score: 83.710

Contestant Door: 2
+------+----------+ +------+----------+
| P    |   phi(P) | | P    |   phi(P) |
| P(0) |   0.0000 | | P(0) |   0.8571 |
+------+----------+ +------+----------+
| P(1) |   0.4000 | | P(1) |   0.0000 |
+------+----------+ +------+----------+
| P(2) |   0.6000 | | P(2) |   0.

In [44]:
class ImprovedRandomModel(RandomModel):
    def __init__(self, name):
        super().__init__(name)
    
    def first_guess(self, doors: np.ndarray) -> np.ndarray:
        return doors[:, 1]


class ImprovedChangeModel(ChangeModel):
    def __init__(self, name):
        super().__init__(name)
    

    def first_guess(self, doors: np.ndarray) -> np.ndarray:
        return doors[:, 1]


class ImprovedBayesModel(BayesModel):
    def __init__(self, name, model):
        super().__init__(name, model)

   
    def first_guess(self, doors: np.ndarray) -> np.ndarray:
        return doors[:, 1]
    

improvedRandomModel = ImprovedRandomModel('Improved Random')
improvedChangeModel = ImprovedChangeModel('Improved Change')
improvedBayesModel = ImprovedBayesModel('Improved Bayes', bayes_model_unfair)

improvedRandomModelScore = mhGameUnfair.validate(improvedRandomModel, verbose = True)
improvedChangeScore = mhGameUnfair.validate(improvedChangeModel, verbose = True)
improvedBayesScore = mhGameUnfair.validate(improvedBayesModel, verbose = True)

Improved Random model: Acc -> 49.895%
Improved Change model: Acc -> 83.408%


Second guess: 100%|██████████| 200000/200000 [00:22<00:00, 8948.38it/s]

Improved Bayes model: Acc -> 83.408%



