In [29]:
## From https://github.com/egeromin/mastermind/

import config
import numpy as np
import itertools
import random
from collections import Counter


class Episode:
    '''generate episode for Agent to learn using Q-learning algorithm'''
    

    def __init__(self, secret):
        if isinstance(secret, int):
            secret = self._number_from_index(secret)
        self.secret = secret

    @staticmethod
    def _index_from_number(number):
        """
        Convert a 4-digit guess to an index between 0 and 6**4 
        """
        assert(len(number) <= 4)
        assert(set(number) <= set(map(str, range(6))))
        return int(number, base=6)

    @staticmethod
    def _number_from_index(index):
        assert(0 <= index < config.max_guesses)
        digits = []
        while index > 0:
            digits.append(str(index % 6))
            index = index // 6
        return "".join(reversed(digits)).zfill(4)
    
    @staticmethod
    def score(p, q):
        hits = sum(p_i == q_i for p_i, q_i in zip(p, q))
        misses = sum((Counter(p) & Counter(q)).values()) - hits
        return hits, misses
        
    

#     @staticmethod
#     def _select_next_action(action_distribution):
#         """Sample the next action based on the action distribution"""
#         action_dist = action_distribution.detach().numpy().reshape(-1)
#         # `action_distribution` is a tensor
#         return np.random.choice(action_dist.shape[0],
#                                 p=action_dist)
    
    def generate_best_patten(self):
        l = []
        possibles = list(itertools.product('012345', repeat=4))
        possibles = [''.join(lst) for lst in possibles]
        
        while possibles:
            guess, = random.sample(possibles,1)
            result = score(self.secret, guess)
            l.append((guess, result))
            possibles = {p for p in possibles if score(p, guess) == result} - {guess}
        return l
    
    
    def generate_random_episodes(self):
        
        lst = []
        
        for idx in range(config.max_episode_length):
            guess = random.randint(0, 6**4 - 1)
            guess = self._number_from_index(guess)
            lst.append((guess, self.score(self.secret, guess)))
            if guess == self.secret:
                return lst
                       
        return lst
    
    def reward(self, guess):
        if guess == self.secret:
            return 1
        else:
            return -1
         
    

Testing Episode class

In [90]:
for idx in np.random.randint(0, 6**4, 50):
    code = Episode._number_from_index(idx)
    print(code)
    print(len(Episode(code).generate_best_patten()))
    print('-'*30)

0521
4
------------------------------
1203
4
------------------------------
0532
5
------------------------------
0254
4
------------------------------
1245
4
------------------------------
2245
6
------------------------------
2130
6
------------------------------
2232
5
------------------------------
3532
3
------------------------------
3031
5
------------------------------
4415
6
------------------------------
2331
5
------------------------------
1251
2
------------------------------
5001
4
------------------------------
4004
4
------------------------------
4145
4
------------------------------
5215
6
------------------------------
2213
3
------------------------------
4405
6
------------------------------
0333
4
------------------------------
1550
5
------------------------------
5251
5
------------------------------
0142
5
------------------------------
0330
4
------------------------------
0433
5
------------------------------
1040
4
------------------------------
4323
4
-----

In [91]:
for idx in np.random.randint(0, 6**4, 25):
    code = Episode._number_from_index(idx)
    print(code)
    print(len(Episode(code).generate_random_episodes()))
    print('-'*30)

4424
30
------------------------------
1213
30
------------------------------
4354
30
------------------------------
3531
30
------------------------------
3051
30
------------------------------
1301
30
------------------------------
2441
30
------------------------------
2450
30
------------------------------
3114
30
------------------------------
2114
30
------------------------------
2514
30
------------------------------
3551
30
------------------------------
2051
30
------------------------------
1213
30
------------------------------
4524
30
------------------------------
4024
30
------------------------------
1255
30
------------------------------
4521
30
------------------------------
5245
30
------------------------------
4321
30
------------------------------
2010
30
------------------------------
0353
30
------------------------------
0454
30
------------------------------
3513
30
------------------------------
4502
30
------------------------------


# Q-Learning

What does Agent have to do?

1. Take action based on the current environment.
2. Store a Value function for all the states.
3. Know the best action given a state.
4. Explore with epsilon probability.


## exploding state space.

In [74]:
class Agent:
    
    def __init__(self, epsilon=0.1, alpha=1.0):
        self.initialize_V()
        self.epsilon = epsilon
        self.alpha = alpha
        self.reset_possible_states()
        
        
    
    def initialize_V(self):
        '''initializes State Value function with zeros'''
        self.V = {}
        for idx in range(0, 6**4):
            self.V[Episode._number_from_index(idx)] = 0

    def reset_possible_states(self):
        self.possible_states = list(self.V.keys())
    
    def restrict_possible_states(self, guess, feedback):
        new_states = [state for state in self.possible_states if Episode.score(guess, state)==feedback]
        self.possible_states = new_states
        
    def learn_select_move(self):
        
        best_move = self.get_best_action()
        
        selected_move = best_move
        if random.random() < self.epsilon:
            selected_move = self.random_action()
        
        return (best_move, selected_move)
    
    def get_best_action(self):
        "For the best possible states, chose randomly amongst them."
        V_values = [self.V[state] for state in self.possible_states]
        max_V = max(V_values)
        chosen_state = random.choice([state for state in self.possible_states if self.V[state] == max_V])
        return chosen_state
    
    def random_action(self):
        return random.choice(self.possible_states)
    
    def make_move(self, action, feedback):
        self.restrict_possible_states(action, feedback)
        
    def learn_from_move(self, action):
        "The heart of Q-learning."
        
        # TODO: Finish each line with code and comments
        current_state = action  # action = state (guess the agent makes)
        r = episode.reward(current_state)  # reward for this state
        if r == 1:
            print(f"guess = secret")
            return 
        
        feedback = episode.score(episode.secret, action)
        self.make_move(action, feedback) ## restrict the states first
        
        best_next_move, selected_next_move = self.learn_select_move()  # Exploration vs exploitation
        
        current_state_value = self.V[current_state] # current value of state
        best_move_value = self.V[best_next_move]  # best possible value of next state.
        td_target = current_state_value + self.alpha * (r + best_move_value - 
                                                        current_state_value)  # Q-algorithm update
        self.V[current_state] = td_target # This is Q-learning. The previous lines setup this line. 
        
        
        
    
        

In [103]:
agent = Agent()
episode = Episode('1000')

In [107]:
for time in range(25):
    secret = Episode._number_from_index(random.randint(0, 6**4 - 1))
    episode = Episode(secret)
    agent.reset_possible_states()
    action = agent.random_action()
    while action != episode.secret:
        agent.learn_from_move(action)
        action = agent.random_action()
        print(secret,action)

5421 4522
5421 5324
5421 5421
4140 2104
4140 3402
4140 4110
4140 4140
0501 2552
0501 4541
0501 0511
0501 1511
0501 0501
0251 4310
0251 5131
0251 0125
0251 0251
3131 3345
3131 4105
3131 3123
3131 3131
4045 4203
4045 4140
4045 4045
5000 0323
5000 5000
2251 3315
2251 1123
2251 2532
2251 2251
1040 1051
1040 3053
1040 0041
1040 1040
3350 2220
3350 3050
3350 3550
3350 3350
0003 0003
2131 5405
2131 3312
2131 2231
2131 2131
2440 4254
2440 5423
2440 2440
1444 0411
1444 0043
1444 1313
1444 1444
0505 4550
0505 5040
0505 4405
0505 0505
4540 1044
4540 4504
4540 4540
0222 4531
0222 0222
5045 1054
5045 4350
5045 1403
5045 5045
2340 4340
2340 0340
2340 2340
3354 3354
1203 5513
1203 1500
1203 1203
5335 3555
5335 5533
5335 5353
5335 5335
3031 3313
3031 1331
3031 3031
3041 4243
3041 5042
3041 3041
4142 2232
4142 3334
4142 5042
4142 4142


In [115]:
secret = '1234'

In [116]:
agent.reset_possible_states()

In [117]:
guess = agent.get_best_action()

In [118]:
feedback = Episode.score(secret, guess)

In [119]:
agent.restrict_possible_states(guess, feedback)

In [120]:
guess = agent.get_best_action()

In [122]:
feedback = Episode.score(secret, guess)

In [123]:
agent.restrict_possible_states(guess, feedback)

In [124]:
guess = agent.get_best_action()

In [130]:
def guess_the_code(secret='1234'):
    agent.reset_possible_states()
    guess = agent.get_best_action()
    print(f"initial guess = {guess}")
    _ = input()
    while guess!= secret:
        feedback = Episode.score(secret, guess)
        agent.restrict_possible_states(guess, feedback)
        guess = agent.get_best_action()
        print(f"next guess = {guess}")
        _ = input()
        

In [128]:
guess_the_code()

initial guess = 1324


 


next guess = 1423


 


next guess = 1234


 


In [132]:
guess_the_code('1223')

initial guess = 4433


 


next guess = 2030


 


next guess = 0421


 


next guess = 1223


 


## Policy Gradient

In [172]:
# define policy

In [173]:
import torch
import torch.nn as nn

In [185]:
import config

In [187]:
import autoreload

In [194]:
%load_ext autoreload
%autoreload 2

In [188]:
autoreload.reload(config)

<module 'config' from '/Users/nithish/workspace/mastermind/config.py'>

In [197]:
config.feedback_embedding_size

8

inputs to lstm `(seq_len, batch, input_size)`

In [347]:
class Policy(nn.Module):

    def __init__(self):
        super(Policy, self).__init__()
        self.guess_embed    = nn.Embedding(config.max_guesses, config.guess_embedding_size)
        self.pos_layer = nn.Embedding(5, config.feedback_embedding_size)
        self.neg_layer = nn.Embedding(5, config.feedback_embedding_size)
        self.lstm      = nn.LSTM(input_size = config.guess_embedding_size + 2 * config.feedback_embedding_size,
                                      hidden_size = config.lstm_hidden_size)
        self.fc             = nn.Linear(config.lstm_hidden_size, config.max_guesses)
        self.optimizer = optim.Adam(self.parameters(), lr=config.reinforce_alpha)
    
    def forward(self, states, feedbacks):
        '''
        states list like [1000, 1100,...] all previous guesses
        list of feedbacks [(2,2), (1,3), (4,0)...] feedback for each guess above.
        '''
        
        states = [Episode._index_from_number(elem) for elem in states]
        states = [self.guess_embed(torch.tensor(elem)) for elem in states]
        
        pos_feedbacks = [elem[0] for elem in feedbacks]
        neg_feedbacks = [elem[1] for elem in feedbacks]
        
        pos_feedbacks = [self.pos_layer(torch.tensor(elem)) for elem in pos_feedbacks]
        neg_feedbacks = [self.neg_layer(torch.tensor(elem)) for elem in neg_feedbacks]
        
        combined = [torch.cat([i,j,k], dim=-1) for i,j,k in zip(states, pos_feedbacks, neg_feedbacks)]
        combined = torch.stack(combined)
        combined = torch.unsqueeze(combined, 1)
#         print(combined.shape)
        
        out,_ = self.lstm(combined)
        out = out[-1,0,:]
        out = self.fc(out)
        
        return out
        
        
        
        

In [348]:
policy = Policy()

In [352]:
import torch as T
import torch.nn.functional as F
import torch.optim as optim

In [353]:
# inspired from this repo https://github.com/simoninithomas/Policy-Gradient-Doom--

class PolicyGradientAgent:
    def __init__(self, policy, GAMMA):
        self.gamma = GAMMA
        self.reward_memory = []
        self.action_memory = []
        self.policy = policy

    def choose_action(self, states, feedbacks):
        probabilities = F.softmax(self.policy.forward(states, feedbacks),-1)
        print(probabilities.shape, 'prob')
        action_probs = T.distributions.Categorical(probabilities)
        action = action_probs.sample()
        log_probs = action_probs.log_prob(action)
        self.action_memory.append(log_probs)

        return action.item()

    def store_rewards(self,reward):
        self.reward_memory.append(reward)

    def learn(self):
        self.policy.optimizer.zero_grad()
        # Assumes only a single episode for reward_memory
        G = np.zeros_like(self.reward_memory, dtype=np.float64)
        for t in range(len(self.reward_memory)):
            G_sum = 0
            discount = 1
            for k in range(t, len(self.reward_memory)):
                G_sum += self.reward_memory[k] * discount
                discount *= self.gamma
            G[t] = G_sum
        mean = np.mean(G)
        std = np.std(G) if np.std(G) > 0 else 1
        G = (G - mean) / std

        G = T.tensor(G, dtype=T.float)

        loss = 0
        for g, logprob in zip(G, self.action_memory):
            loss += -g * logprob

        loss.backward()
        self.policy.optimizer.step()

        self.action_memory = []
        self.reward_memory = []

In [None]:
# agent = PolicyGradientAgent(ALPHA=0.001, input_dims=[8], GAMMA=0.99,
#                             n_actions=4, layer1_size=128, layer2_size=128)
# #agent.load_checkpoint()
# env = gym.make('LunarLander-v2')
# score_history = []
# score = 0
# num_episodes = 2500
# #env = wrappers.Monitor(env, "tmp/lunar-lander",
# #                        video_callable=lambda episode_id: True, force=True)
# for i in range(num_episodes):
#     print('episode: ', i,'score: ', score)
#     done = False
#     score = 0
#     observation = env.reset()
#     while not done:
#         action = agent.choose_action(observation)
#         observation_, reward, done, info = env.step(action)
#         agent.store_rewards(reward)
#         observation = observation_
#         score += reward
#     score_history.append(score)
#     agent.learn()
#     #agent.save_checkpoint()
# filename = 'lunar-lander-alpha001-128x128fc-newG.png'
# plotLearning(score_history, filename=filename, window=25)

In [354]:
agent = PolicyGradientAgent(policy, 0.99)
episode = Episode('1234')
one_episode = episode.generate_best_patten()

In [355]:
code = '1234'

In [356]:
states = [Episode._number_from_index(random.randint(0, 6**4-1))]
feedbacks = [Episode.score(initial_guess[0], code)]

In [357]:
while states[-1]!=code:
    print(states)
    action = agent.choose_action(states, feedbacks)
    print('done')
    action = Episode._number_from_index(action)
    
    if action == episode.secret:
        reward = +1
    else:
        reward = -1
    agent.store_rewards(reward)
    
    fdbck = Episode.score(action, code)
    
    states.append(action)
    feedbacks.append(fdbck)

['4524']
torch.Size([1296]) prob
done
['4524', '1300']
torch.Size([1296]) prob
done
['4524', '1300', '3404']
torch.Size([1296]) prob
done
['4524', '1300', '3404', '5312']
torch.Size([1296]) prob
done
['4524', '1300', '3404', '5312', '1421']
torch.Size([1296]) prob
done
['4524', '1300', '3404', '5312', '1421', '0243']
torch.Size([1296]) prob
done
['4524', '1300', '3404', '5312', '1421', '0243', '3234']
torch.Size([1296]) prob
done
['4524', '1300', '3404', '5312', '1421', '0243', '3234', '1030']
torch.Size([1296]) prob
done
['4524', '1300', '3404', '5312', '1421', '0243', '3234', '1030', '4504']
torch.Size([1296]) prob
done
['4524', '1300', '3404', '5312', '1421', '0243', '3234', '1030', '4504', '2555']
torch.Size([1296]) prob
done
['4524', '1300', '3404', '5312', '1421', '0243', '3234', '1030', '4504', '2555', '2355']
torch.Size([1296]) prob
done
['4524', '1300', '3404', '5312', '1421', '0243', '3234', '1030', '4504', '2555', '2355', '1114']
torch.Size([1296]) prob
done
['4524', '1300',

KeyboardInterrupt: 

In [327]:
for observation in one_episode:
    state, feedback = observation
    state = [state]; feedback = [feedback]
    action = agent.choose_action(state, feedback)
    action = Episode._number_from_index(action)
    
    if action == episode.secret:
        reward = +1
    else:
        reward = -1
    
    agent.store_rewards(reward)
    
    

torch.Size([1, 1, 256])
torch.Size([1, 1, 256])
torch.Size([1, 1, 256])
torch.Size([1, 1, 256])
torch.Size([1, 1, 256])


In [328]:
agent.learn()