## Chess AI
<hr>
### Reinforcement Learning
<hr>
### Monte Carlo Method | Bellman Equation | Optimal Policy Iteration
<hr>

The following is a simulation that terminates after games.
The objective is to train a black chess agent how to play chess purely based off the `Monte Carlo`, `Bellman's Equation`, and `Optimal Policy Iteration`.

## Environment Setup
<hr>
## Simulation Setup
<hr>

In [None]:
import datetime
import os
os.chdir("/Users/laurensuarez/Documents/blake/chess/data")
def random_action(theBoard, thePolicy, eps=0.1):
    
    # Current state
    s=str(theBoard)
    
    #   ///////////////////
    #  // If Unexplored //
    # ///////////////////
    if s not in list(thePolicy.keys()): 
        return np.random.choice(list(theBoard.legal_moves))
    
    #   ////////////////////
    #  // Epsilon-greedy //
    # ////////////////////
    p=np.random.random() # [0,1]    
    
    #   /////////////
    #  // Exploit //
    # /////////////
    if p < (1-eps):
        selected=None
        a=thePolicy[s]
        for act in list(theBoard.legal_moves):
            #print(str(act))
            if str(act) == a:
                selected=act
                
        #   /////////////
        #  // Explore //
        # //////////////
        if selected is None:
            # The possibility of being in a state that had an option
            # but is now currently preoccupied by a piece, or check conditions, ect..
            # hence, just move randomly, treat it as if we do not have an option.
            return np.random.choice(list(theBoard.legal_moves))
        else:
            return selected
        
    #   /////////////
    #  // Explore //
    # //////////////
    else:
        return np.random.choice(list(theBoard.legal_moves))

# Play game as black
def play_game(theBoard, thePolicy, gamma=0.9):
        
    #   //////////////////////
    #  // White Move First //
    # //////////////////////
    move=np.random.choice(list(theBoard.legal_moves))
    theBoard.push(move)
    
    #   //////////////
    #  // Game on! //
    # //////////////
    state_action_rewards=[]
    while theBoard.is_game_over()==False:
        
        # Make intelligent move from policy (iteratively updated)
        state=str(theBoard)
        move=random_action(theBoard, thePolicy)
        action=str(move)
        state_action_rewards.append([state,action,0])
        theBoard.push(move)
    
    # Allocate winner points
    winner=newBoard.result()
    if winner == '1-0':
        # We lost
        state_action_rewards[-1][2]=-1
    elif winner == '0-1':
        # We won!
        state_action_rewards[-1][2]=1
    else:
        # Tie...
        state_action_rewards[-1][2]=0.001
        pass
    
    #   /////////////////////////////
    #  // Bellman Update Equation //
    # /////////////////////////////
    G = 0
    states_actions_returns = []

    for s,a,r in reversed(state_action_rewards):

        states_actions_returns.append((s, a, G))
        G = r + gamma*G
    states_actions_returns.reverse()
    
    return states_actions_returns
    
policy={}
Q=defaultdict(dict)
returns=defaultdict(list)
def run_simulation(num_games=5000, write_file_mod=200):

    #   //////////////////////
    #  // Agent Containers //
    # //////////////////////
    policy={}
    Q=defaultdict(dict)
    returns=defaultdict(list)

    #   //////////////////////
    #  // Simulation Begin //
    # //////////////////////
    deltas=[]
    seen_state_action_pairs=set()
    for i in range(num_games):
        
        if i % write_file_mod == 0 and i != 0:
            print(i)
            custom_time=str(datetime.datetime.now()).replace(".","_").replace(":","_").replace(" ", "")
            if not os.path.exists(os.path.join(os.getcwd(),custom_time)):
                os.makedirs(os.path.join(os.getcwd(),custom_time))
            pickle_dump(Q, custom_time+"/Q_"+custom_time+".pkl")
            pickle_dump(Q, custom_time+"/policy_"+custom_time+".pkl")
            pickle_dump(Q, custom_time+"/V_"+custom_time+".pkl")
        #Fresh board
        newBoard=chess.Board()
        
        #For visualization
        biggest_change=0
        
        #Collect 1 game sequence 
        state_action_returns_list=play_game(newBoard, policy)
        
        #   ///////////////////////////
        #  // Value function update //
        # ///////////////////////////
        for s, a, G in state_action_returns_list:
            sa = (s, a)
            if sa in seen_state_action_pairs:
                old_q=Q[s][a]
            elif sa not in seen_state_action_pairs:
                old_q=0
                seen_state_action_pairs.add(sa)
            else:
                print("Error: (s,a) is niether been seen in the past nor has it not been seen. Huge problem.")
            returns[sa].append(G)
            Q[s][a]=np.mean(returns[sa]) #rolling mean, Q now `learns`
            biggest_change=max([biggest_change, np.abs(old_q - Q[s][a])])
            deltas.append(biggest_change)
            
            
        #   ///////////////////
        #  // Policy Update //
        # ///////////////////
        for s in Q.keys():
            keys=list(Q[s].keys())
            vals=list(Q[s].values())
            a = keys[np.argmax(vals)]
            policy[s] = a
    V = {}
    for s in policy.keys():
        V[s] = Q[s][1]
    return Q, policy, V, deltas
                                

## Run the sim
### Plot for convergence visualization
<hr>
<hr>

In [None]:
import matplotlib.pyplot as plt
Q,policy,V,deltas=run_simulation(num_games=5000, write_file_mod=200)
plt.plot(deltas)
plt.show

## Import data
Confirm process working correctly.
<hr><hr>

In [3]:
import os
import pickle
import numpy as np
os.chdir("/Users/laurensuarez/Documents/blake/chess/data")
class MacOSFile(object):

    def __init__(self, f):
        self.f = f

    def __getattr__(self, item):
        return getattr(self.f, item)

    def read(self, n):
        # print("reading total_bytes=%s" % n, flush=True)
        if n >= (1 << 31):
            buffer = bytearray(n)
            idx = 0
            while idx < n:
                batch_size = min(n - idx, 1 << 31 - 1)
                # print("reading bytes [%s,%s)..." % (idx, idx + batch_size), end="", flush=True)
                buffer[idx:idx + batch_size] = self.f.read(batch_size)
                # print("done.", flush=True)
                idx += batch_size
            return buffer
        return self.f.read(n)

    def write(self, buffer):
        n = len(buffer)
        print("writing total_bytes=%s..." % n, flush=True)
        idx = 0
        while idx < n:
            batch_size = min(n - idx, 1 << 31 - 1)
            print("writing bytes [%s, %s)... " % (idx, idx + batch_size), end="", flush=True)
            self.f.write(buffer[idx:idx + batch_size])
            print("done.", flush=True)
            idx += batch_size

def pickle_dump(obj, file_path):
    with open(file_path, "wb") as f:
        return pickle.dump(obj, MacOSFile(f), protocol=pickle.HIGHEST_PROTOCOL)

def pickle_load(file_path):
    with open(file_path, "rb") as f:
        return pickle.load(MacOSFile(f))


In [4]:
Q=pickle_load("Q_2018-11-2421_43_58_744997.pkl")
policy=pickle_load("policy_2018-11-2421_43_58_744997.pkl")
V=pickle_load("V_2018-11-2421_43_58_744997.pkl")

In [18]:
Q[list(Q.keys())[8]]

{'h7h5': 1.9622177898881668e-14,
 'f5f4': 2.776664336428095e-08,
 'a6a5': 7.943796589169338e-19}

## Approximation Functions

Since we know the size of a dictionary in python is limited. The better strategy is to learn the value function using deep learning techniques, so we can obtain high quality predictions `f(state, params)=value`.

We now seek to build a model that can predict the value function, rather than just storing them in a value dictionary with a policy.

This integrates supervised learning into reinforcement learning.

$$ error = (value_{true}-value_{pred})^2$$
$$ error = (E[G(t) | S_t = s]-value_{pred})^2$$
$$ error = (\dfrac{1}{N}\sum\limits_{k=1}^n G_{i,s} - value_{pred})^2$$

So each run in our simulation becomes a training point:

$$ error = \sum\limits_{k=1}^n (y_i -\hat{y_i})^2$$

When using linear methods to find the transformation of a state, you must manually do feature engineering. However, when using non-linear methods such as deep neural networks (which are also differentiable), we can allow them to construct the features that are most important. So we continue with the simple linear case and construct features for our board states.

One way to do this is to `one_hot_encode` by each of our states. A lot like time series data as a categorical variable. This blows up in dimensionality, so this really isn't any improvement. The positive to this is that you can clearly explain what the problem is, your model or your features. 

Another approach is to model your board as the raw data itself; a tensor. Construct a tensor transformation and make the data (8,8,14), representing each piece of the board and which piece currently occupies it (or if none do). This is also a challenge. 

Yet another approach is to apply `Taylor Series Expansion`, which contains the capability to approximate any real valued function using sum of polynomials. This could be done using `w=f(x,y,z)=x**2+y**2+z**2`. The possibilities are endless here if you are using a linear model.

We can manually apply gradient descent to optimize our linear model.



In [36]:
import datetime
import os
os.chdir("/Users/laurensuarez/Documents/blake/chess/data")

class Model(object):
    def __init__(self, alpha=0.001):
        self.theta=np.random.randn(4)/2
        self.alpha=alpha
    
    def s2x(self, s):
        piece_ratio=np.divide(np.sum([i.islower() for i in L]),np.sum(i.isupper() for i in L))
        black_count=np.sum([i.islower() for i in L])
        white_count=np.sum([i.isupper() for i in L])
        bias=1.0
        return np.array([piece_ratio,
                         black_count,
                         white_count,
                         bias])
    def predict(self, s):
        x=self.s2x(s)
        return self.theta.dot(x)
    
    def fit(self, V_hat, G):
        self.theta+=self.alpha*(G - V_hat)*x

    def set_alpha(self, newalpha):
        self.alpha=newalpha
        
def random_action(theBoard, thePolicy, eps=0.1):
    
    # Current state
    s=str(theBoard)
    
    #   ///////////////////
    #  // If Unexplored //
    # ///////////////////
    if s not in list(thePolicy.keys()): 
        return np.random.choice(list(theBoard.legal_moves))
    
    #   ////////////////////
    #  // Epsilon-greedy //
    # ////////////////////
    p=np.random.random() # [0,1]    
    
    #   /////////////
    #  // Exploit //
    # /////////////
    if p < (1-eps):
        selected=None
        a=thePolicy[s]
        for act in list(theBoard.legal_moves):
            #print(str(act))
            if str(act) == a:
                selected=act
                
        #   /////////////
        #  // Explore //
        # //////////////
        if selected is None:
            # The possibility of being in a state that had an option
            # but is now currently preoccupied by a piece, or check conditions, ect..
            # hence, just move randomly, treat it as if we do not have an option.
            return np.random.choice(list(theBoard.legal_moves))
        else:
            return selected
        
    #   /////////////
    #  // Explore //
    # //////////////
    else:
        return np.random.choice(list(theBoard.legal_moves))

# Play game as black
def play_game(theBoard, thePolicy, gamma=0.9):
        
    #   //////////////////////
    #  // White Move First //
    # //////////////////////
    move=np.random.choice(list(theBoard.legal_moves))
    theBoard.push(move)
    
    #   //////////////
    #  // Game on! //
    # //////////////
    state_action_rewards=[]
    while theBoard.is_game_over()==False:
        
        # Make intelligent move from policy (iteratively updated)
        state=str(theBoard)
        move=random_action(theBoard, thePolicy)
        action=str(move)
        state_action_rewards.append([state,action,0])
        theBoard.push(move)
    
    # Allocate winner points
    winner=newBoard.result()
    if winner == '1-0':
        # We lost
        state_action_rewards[-1][2]=-1
    elif winner == '0-1':
        # We won!
        state_action_rewards[-1][2]=1
    else:
        # Tie...
        state_action_rewards[-1][2]=0.001
        pass
    
    #   /////////////////////////////
    #  // Bellman Update Equation //
    # /////////////////////////////
    G = 0
    states_actions_returns = []

    for s,a,r in reversed(state_action_rewards):

        states_actions_returns.append((s, a, G))
        G = r + gamma*G
    states_actions_returns.reverse()
    
    return states_actions_returns
    
policy={}
Q=defaultdict(dict)
returns=defaultdict(list)
def run_simulation_approx(num_games=5000, write_file_mod=200, alpha=0.001):

    #   //////////////////////
    #  // Agent Containers //
    # //////////////////////
    policy={}
    Q=defaultdict(dict)
    returns=defaultdict(list)

    
    #     ////////////////////////////////////////////
    #    // Linear Value Function Approximation    //
    #   // Kernel Function                        //
    #  // Transform state (8,8,13) into vector   //
    # ////////////////////////////////////////////
    model=Model(alpha=alpha)
    
    #   //////////////////////
    #  // Simulation Begin //
    # //////////////////////
    deltas=[]
    seen_states=set()
    t=1.0
    for i in range(num_games):
        
        #Decaying learning rate
        alpha/=t
        model.set_alpha(alpha)
        
        if i % write_file_mod == 0 and i != 0:
            print(i)
            custom_time=str(datetime.datetime.now()).replace(".","_").replace(":","_").replace(" ", "")
            if not os.path.exists(os.path.join(os.getcwd(),custom_time)):
                os.makedirs(os.path.join(os.getcwd(),custom_time))
            pickle_dump(Q, custom_time+"/Q_"+custom_time+".pkl")
            pickle_dump(Q, custom_time+"/policy_"+custom_time+".pkl")
            pickle_dump(Q, custom_time+"/V_"+custom_time+".pkl")
            
            #Update learning decay rate
            t+=0.01
        
        #Fresh board
        newBoard=chess.Board()
        
        #For visualization
        biggest_change=0
        
        #Collect 1 game sequence 
        state_action_returns_list=play_game(newBoard, policy)
        
        #   ///////////////////////////
        #  // Value function update //
        # ///////////////////////////
        for s, a, G in state_action_returns_list:
            
            # For every state, regardless of whether we have seen
            # it or not, we should be able to just use previous
            # knowledge, or simply update from the past if we
            # see the same state in context, so this is where
            # I deviate a bit from the traditional algorithm.
            old_theta=model.theta.copy()
            x=model.s2x(s)
            V_hat=model.predict(s)
            model.fit(V_hat, G) #grad(V_hat w.r.t theta=x)
            
            biggest_change=max([biggest_change, np.abs(old_theta - theta).sum()])
            seen_states.add(s)
            deltas.append(biggest_change)
            
        
    #   //////////////////
    #  // Value Update //
    # //////////////////
    V = {}
    for s in seen_states:
        V[s] = model.theta.dot(s2x(s))
    return V, theta, deltas
                                



1.0