<font size = 6>Improving Hangman Puzzle Strategy, in the Perspective of Reinforcement Learning</font>
<font size = 4><div style="text-align: right"> Contributor: Haochen Jiang</div></font>
<font size = 4><div style="text-align: right"> June 3, 2023</div></font>

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

import re

from sklearn.preprocessing import MultiLabelBinarizer
from copy import deepcopy

from IPython import display
from matplotlib import pyplot as plt

from torch.utils.tensorboard import SummaryWriter

In [2]:
# Constants
ALPHABET = "abcdefghijklmnopqrstuvwxyz"

# Hyperparameters
BATCH_SIZE = 512                                # used in gradient descent
LR = 0.00001                                    # learning rate
EPSILON = 1                                     # e-greedy policy
GAMMA = 1                                       # reward discount coefficient
TARGET_REPLACE_ITER = 20000                     # target network update frequency
MEMORY_CAPACITY = 10000                          

N_STATES = 28                                   # state numebr
N_ACTIONS = 26                                  # action number

In [3]:
# Auxiliary Functions
def word_format(s):
    # pattern = re.compile('.{1}')
    # return ' '.join(pattern.findall(s))
    return ' '.join(list(s))

def find_letters(l, word):
    return [i.start() for i in re.finditer(l, word)] 

def get_multi_hot_encoding(letter_list):
    mlb = MultiLabelBinarizer()
    result = mlb.fit_transform([tuple(ALPHABET), letter_list])
    return result[1]  

def get_action_from_index(index_list):
    return [ALPHABET[_] for _ in index_list]

def get_best_action(action_list, s):
    for _ in action_list:
        if _ not in s[0]:
            return _

def fail_reward_gen(reward_list):
    # reward_list is stored from 5 to 0
    return {_[0]:_[1] for _ in zip(range(5, -1, -1), reward_list)}

def get_update_plot(update_nn_list, loss_list):
    update_index = (update_nn_list == 0)
    
    if_update_index = deepcopy(update_nn_list)
    
    if_update_index[update_index] = 1
    if_update_index[~update_index] = 0

    y = if_update_index * loss_list
    x = if_update_index * (np.arange(len(if_update_index)) + 1)
    return x, y

In [4]:
def generate_train_set(seed_num = False):
    all_words_path = "words_250000_train.txt"
    text_file = open(all_words_path, "r")
    all_words = text_file.read().splitlines()
    text_file.close()
    if seed_num:
        rng = np.random.default_rng(seed = seed_num)
        rng.shuffle(all_words)
    else:
        pass
    return all_words

In [5]:
class env(object):
    def __init__(self, word, succ_reward_each, fail_reward_dict, log = False):
        self.log = log
        self.status = "Start"
        self.word = word
        
        self.reward = 0
        self.episode_reward_sum = 0
        self.guessed_letters = []
        self.tries_remains = 6
        self.guessed_word = ''.join(["_"] * len(word))
        self.completeness = 0
        
        self.FAIL_GUESS_REWARD = fail_reward_dict
        
        if self.log:
            print("Successfully start a new game! # of tries remaining: {0}. 'word': {1}.".format(self.tries_remains, word_format(word)))
    
    
    def get_state(self):
        if self.status == "Fail" or self.status == "Success":
            done = 1
        else:
            done = 0
        return ((self.guessed_letters, self.tries_remains, self.completeness), self.reward, self.episode_reward_sum, self.status, done)
    
    
    def step(self, guess_letter):
        if self.status in ["Fail", "Success"]:
            raise Exception("This run has already finished !")  
        if self.log:
            print("Guessing letter: {0}".format(guess_letter))      
        if guess_letter in self.guessed_letters:
            raise Exception("Cannot guess a letter multiple times !")
        self.guessed_letters.append(guess_letter)
        
        if guess_letter not in set(self.word):
            self.tries_remains -= 1
            if self.tries_remains == 0:
                self.status = "Fail"
            else:
                self.status = "Ongoing"
            self.reward = self.FAIL_GUESS_REWARD[self.tries_remains]
            if self.log:    
                print("'status': {}. 'tries_remains': {}. 'word': {}. 'reward': {}. 'completeness': {:.2f}"
                      .format(self.status, self.tries_remains, word_format(self.guessed_word), self.reward, self.completeness))
            
        else:
            self.reward = 0
            temp_guessed_word = list(self.guessed_word)
            for _ in find_letters(guess_letter, self.word):
                temp_guessed_word[_] = guess_letter
                self.reward += succ_reward_each
            self.guessed_word = "".join(temp_guessed_word)
            self.completeness = 1 - self.guessed_word.count("_") / len(self.guessed_word)
            if self.completeness == 1:
                self.status = "Success"
            else:
                self.status = "Ongoing"
            if self.log:    
                print("'status': {}. 'tries_remains': {}. 'word': {}. 'reward': {}. 'completeness': {:.2f}".
                      format(self.status, self.tries_remains, word_format(self.guessed_word), self.reward, self.completeness))
        
        self.episode_reward_sum += GAMMA * self.reward 

In [6]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(N_STATES, 75)
        self.fc1.weight.data.normal_(0, 0.1)
        #self.fc2 = nn.Linear(100, 100)                                        
        #self.fc2.weight.data.normal_(0, 0.1)                                   
        self.out = nn.Linear(75, N_ACTIONS)
        self.out.weight.data.normal_(0, 0.1)

    def forward(self, x):
        x = F.relu(self.fc1(x)) 
        # x = F.relu(self.fc2(x))    
        actions_value = self.out(x) 
        return actions_value

class DQN(object):
    def __init__(self):  
        self.eval_net, self.target_net = Net(), Net()                           # create two neural networks: target network and evaluation network
        self.learn_step_counter = 0                                             # for target updating
        self.memory_counter = 0                                                 # for memory storing 
        self.memory = np.zeros((MEMORY_CAPACITY, N_STATES * 2 + 3))             # initialize memory space (one row stands for a transition)
        self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR)    # Adam optimizer (Input is learning rate and parameters of evaluation network)
        self.loss_func = nn.MSELoss()                                 

        
    def off_policy_action_engine(self, s):
        global RANDOM_START
        global RANDOM_SEED

        if RANDOM_START:
            np.random.seed(RANDOM_SEED)
            RANDOM_START = False
        shuffle_seed = int(np.random.random() * 10e6)
        alphabet = set(ALPHABET)
        cur_alphabet = list(alphabet - set(s[0]))
        rng = np.random.default_rng(seed = shuffle_seed)
        rng.shuffle(cur_alphabet)
        action = cur_alphabet[0]
        return action    
          
        
    def on_policy_action_engine(self, s):
        s_tensor = np.hstack((get_multi_hot_encoding(s[0]), [s[1], s[2]]))
        s_tensor = torch.unsqueeze(torch.FloatTensor(s_tensor), 0)                # convert s to the form of 32-bit floating point, increase one dimension at dim = 0
        if np.random.uniform() < EPSILON:                                         # generate a random number in [0, 1), if it is less than EPSILON, then choose best action
            actions_value = self.eval_net.forward(s_tensor)                       
            #print(actions_value)
            action_sort_index = torch.sort(actions_value, descending=True)[1][0].numpy()
            #print(action_sort_index)
            #print(torch.sort(actions_value, descending=True)[0][0])
            #print()
            action_sort_list = get_action_from_index(action_sort_index)     
            #print(action_sort_list)
            action = get_best_action(action_sort_list, s)
        else:
            action = self.off_policy_action_engine(s)
        return action
    

    def store_transition(self, s, a, r, done, new_s):
        temp_s, temp_new_s = s, new_s
        a = ALPHABET.find(a)
        s = get_multi_hot_encoding(s[0])
        new_s = get_multi_hot_encoding(new_s[0])
        
        # 在水平方向上拼接数组
        transition = np.hstack((s, [temp_s[1], temp_s[2]], [a, r, done], new_s, [temp_new_s[1], temp_new_s[2]]))  
        
        # overwrite old data if memory space is full
        index = self.memory_counter % MEMORY_CAPACITY
        self.memory[index, :] = transition
        self.memory_counter += 1

        
    def learn(self):

        update_nn_index = self.learn_step_counter % TARGET_REPLACE_ITER

        if update_nn_index == 0:       
            self.target_net.load_state_dict(self.eval_net.state_dict())     
        self.learn_step_counter += 1        

        # extract the batch data
        sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE)      
        b_memory = self.memory[sample_index, :]       
        b_s = torch.FloatTensor(b_memory[:, :N_STATES])
        b_a = torch.LongTensor(b_memory[:, N_STATES:N_STATES+1].astype(int))
        b_r = torch.FloatTensor(b_memory[:, N_STATES+1:N_STATES+2])
        b_done = torch.LongTensor(b_memory[:, N_STATES+2:N_STATES+3].astype(int))
        b_s_ = torch.FloatTensor(b_memory[:, -N_STATES:])

        q_eval = self.eval_net(b_s).gather(dim=1, index=b_a)
        next_q_eval = self.target_net(b_s_).max(1)[0].view(BATCH_SIZE, 1).detach()
        q_target = b_r + GAMMA * next_q_eval * (1-b_done)
        
        loss = self.loss_func(q_eval, q_target)
        loss_num = loss.detach().numpy()
        writer.add_scalar('Loss', loss_num, self.learn_step_counter)
        
        self.optimizer.zero_grad()   
        loss.backward()       
        self.optimizer.step()                  
        
        return update_nn_index, loss_num

In [None]:
dqn = DQN()                                                       

all_words = generate_train_set(353)
word_list = all_words * 10

fail_reward_dict = fail_reward_gen([-5, -5, -5, -5, -5, -5])
succ_reward_each = 2

writer = SummaryWriter(log_dir = "D:/Software_Workspace/Tensorboard_Workspace/")

RANDOM_START = True
RANDOM_SEED = 343

PLOT_WARM_UP = False
PLOT_INTERVAL = 100
PLOT_ACCURATE_RANGE = 200

# The following variables are for plotting
r_list = np.array([])
c_list = np.array([])
loss_list = np.array([])
update_nn_list = np.array([])

writer.add_scalar('Accuracy', 0, 1)
accurate_list = np.array([0])
accurate_index = np.array([1])

%matplotlib qt5
plt.ion()



for n_iter in range(1, 100001):
    print('<<<<<<<<<< Episode - %s' % n_iter)
    cur_env = env(word_list[n_iter - 1], succ_reward_each, fail_reward_dict, False)
    s = deepcopy(cur_env.get_state()[0])

    while True:                        # begin an episode (one loop for an step)
        #print("------------------------------------------")
        #print("States")
        #print(s)
        
        #a = dqn.off_policy_action_engine(s) 
        a = dqn.on_policy_action_engine(s)
        cur_env.step(a)    
        
        # Get interaction from last step
        new_s, r, reward_sum, status, done = deepcopy(cur_env.get_state())


        #print((s, a, r, new_s))
        dqn.store_transition(s, a, r, done, new_s)                 # store a transiton sample
                        
        #print("------------------------------------------")

        s = new_s
        #print(s)

        if dqn.memory_counter > MEMORY_CAPACITY:       
            
            if not PLOT_WARM_UP:
                print("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Warming Up End ! <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n")
                print("At Episode : %d ." % n_iter)
                print("\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Warming Up End ! <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<\n")
                PLOT_WARM_UP = n_iter
                
            learn_results = dqn.learn()
            update_nn_list = np.append(update_nn_list, learn_results[0])
            loss_list = np.append(loss_list, learn_results[1])
            
                
        if status == "Fail" or status == "Success":   # Finish
            r_list = np.append(r_list, reward_sum)
            writer.add_scalar('Reward Sum', r_list[-1], n_iter)
            c_list = np.append(c_list, s[2])
            writer.add_scalar('Completeness', c_list[-1], n_iter)
            

            
            if not PLOT_WARM_UP:
                if n_iter % PLOT_INTERVAL == 0:
                    plt.clf()          # clear previous picture
                    
                    x_index = np.arange(len(r_list)) + 1
                    plt.subplot(2, 2, 1)
                    plt.plot(x_index, r_list)
                    plt.title("Reward Sum")

                    plt.subplot(2, 2, 2)
                    plt.plot(np.arange(len(loss_list)) + 1, loss_list)
                    plt.title("Loss")
                    
                    plt.subplot(2, 2, 3)
                    plt.plot(x_index, c_list)
                    plt.title("Completeness")

                    plt.pause(0.001)   # pause for a while, or it will be very lagged
                    plt.ioff()         # close real-time window
                
            else:
                if n_iter % PLOT_INTERVAL == 0:
                    accurate_list = np.append(accurate_list, (c_list[-PLOT_ACCURATE_RANGE:] == 1).sum() / PLOT_ACCURATE_RANGE)
                    accurate_index = np.append(accurate_index, n_iter)
                    writer.add_scalar('Accuracy', accurate_list[-1], n_iter)
                    
                    plt.clf()
                    
                    x_index = np.arange(len(r_list)) + 1
                    plt.subplot(2, 2, 1)
                    plt.plot(x_index, r_list)
                    plt.plot([PLOT_WARM_UP] * 100, np.linspace(min(r_list) - 1, max(r_list) + 1, 100), c = "r", linestyle = "--", zorder =2)
                    plt.title("Reward Sum")
                    
                    x, y = get_update_plot(update_nn_list, loss_list)
                    plt.subplot(2, 2, 2)
                    plt.plot(np.arange(len(loss_list)) + 1, loss_list)
                    plt.plot(x, y, lw = 0, marker = "o", ms = 2.5, c = "r", zorder = 2)
                    plt.title("Loss")
                    
                    plt.subplot(2, 2, 3)
                    plt.plot(x_index, c_list)
                    plt.plot([PLOT_WARM_UP] * 100, np.linspace(-0.03, 1.03, 100), c = "r", linestyle = "--", zorder =2)
                    plt.title("Completeness")
                    
                    plt.subplot(2, 2, 4)
                    plt.plot(accurate_index, accurate_list)
                    plt.title("Accuracy")
                    
                    plt.pause(0.001)
                    plt.ioff()
            
            
            # Use round() to return a formatted float
            print('>>>>>>>>>> Episode - %s     Reward_sum: %s' % (n_iter, round(reward_sum, 2)))
            print("==================================================================================================")
            # Do not forget to use break to go out of While loop
            break

writer.close()

In [None]:
# Tensorboard real-time update is extremely slow.
# So it is better to execute the following code after we have finished training.
%reload_ext tensorboard
%tensorboard --logdir "D:/Software_Workspace/Tensorboard_Workspace/"