## Decision Trees for Expert Iteration (Reinforcement learning)
David Albuja 
1900863

In [1]:
#Importing libraries
import pandas as pd
import matplotlib
import numpy as np
import matplotlib.pyplot as plt
import itertools
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.dummy import DummyClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import accuracy_score as acc
from sklearn.metrics import make_scorer, confusion_matrix
from sklearn.metrics import classification_report

In [2]:
from math import *
import random

class OXOState:
    """ A state of the game, i.e. the game board.
        Squares in the board are in this arrangement
        012
        345
        678
        where 0 = empty, 1 = player 1 (X), 2 = player 2 (O)
    """
    def __init__(self):
        self.playerJustMoved = 2 # At the root pretend the player just moved is p2 - p1 has the first move
        self.board = [0,0,0,0,0,0,0,0,0] # 0 = empty, 1 = player 1, 2 = player 2
        
    def Clone(self):
        """ Create a deep clone of this game state.
        """
        st = OXOState()
        st.playerJustMoved = self.playerJustMoved
        st.board = self.board[:]
        return st

    def DoMove(self, move):
        """ Update a state by carrying out the given move.
            Must update playerToMove.
        """
        assert move >= 0 and move <= 8 and move == int(move) and self.board[move] == 0
        self.playerJustMoved = 3 - self.playerJustMoved
        self.board[move] = self.playerJustMoved
        
    def GetMoves(self):
        """ Get all possible moves from this state.
        """
        return [i for i in range(9) if self.board[i] == 0]
    
    def GetResult(self, playerjm):
        """ Get the game result from the viewpoint of playerjm. 
        """
        for (x,y,z) in [(0,1,2),(3,4,5),(6,7,8),(0,3,6),(1,4,7),(2,5,8),(0,4,8),(2,4,6)]:
            if self.board[x] == self.board[y] == self.board[z]:
                if self.board[x] == playerjm:
                    return 1.0
                else:
                    return 0.0
        if self.GetMoves() == []: 
            return 0.5 # draw
        return False # Should not be possible to get here

    def __repr__(self):
        s= ""
        for i in range(9): 
            s += ".XO"[self.board[i]]
            if i % 3 == 2: s += "\n"
        return s

class Node:
    """ A node in the game tree. Note wins is always from the viewpoint of playerJustMoved.
        Crashes if state not specified.
    """
    def __init__(self, move = None, parent = None, state = None):
        self.move = move # the move that got us to this node - "None" for the root node
        self.parentNode = parent # "None" for the root node
        self.childNodes = []
        self.wins = 0
        self.visits = 0
        self.untriedMoves = state.GetMoves() # future child nodes
        self.playerJustMoved = state.playerJustMoved # the only part of the state that the Node needs later
        
    def UCTSelectChild(self):
        """ Use the UCB1 formula to select a child node. Often a constant UCTK is applied so we have
            lambda c: c.wins/c.visits + UCTK * sqrt(2*log(self.visits)/c.visits to vary the amount of
            exploration versus exploitation.
        """
        s = sorted(self.childNodes, key = lambda c: c.wins/c.visits + sqrt(2*log(self.visits)/c.visits))[-1]
        return s
    
    def AddChild(self, m, s):
        """ Remove m from untriedMoves and add a new child node for this move.
            Return the added child node
        """
        n = Node(move = m, parent = self, state = s)
        self.untriedMoves.remove(m)
        self.childNodes.append(n)
        return n
    
    def Update(self, result):
        """ Update this node - one additional visit and result additional wins. result must be from the viewpoint of playerJustmoved.
        """
        self.visits += 1
        self.wins += result

    def __repr__(self): #Variable added to cehck flow of the varaibles in small iterations
        return "[M:" + str(self.move) + " W/V:" + str(self.wins) + "/" + str(self.visits) + " U:" + str(self.untriedMoves) + "P:" + str(self.playerJustMoved) + "]"

    def TreeToString(self, indent):
        s = self.IndentString(indent) + str(self)
        for c in self.childNodes:
             s += c.TreeToString(indent+1)
        return s

    def IndentString(self,indent):
        s = "\n"
        for i in range (1,indent+1):
            s += "| "
        return s

    def ChildrenToString(self):
        s = ""
        for c in self.childNodes:
             s += str(c) + "\n"
        return s

def Exploration_Exploitation (r, dt):
    #90% of the choices will be decided by a decsion tree classifier and 10% will be random choices.
    #If decision tree choose a position that has already taken, then the decision will be randomly
    choices = random.uniform(0.0,1.0) #randomness
    
    if choices <= 0.9: #90% of decisions taken by decision tree classifer
        move = dt.predict([r.board])
        move = move[0]
        if move not in r.GetMoves(): #if the decision tree chose a move that is already taken, then the decision will be randomly as well
            move = random.choice(r.GetMoves())
            st_b = list(r.board) 
            st_b.append(move)
            st_tr.append(st_b)
    else: #10% of the decision will be taken radomly
        move = random.choice(r.GetMoves())  
    return move


def UCT(start, decisiontree, rootstate, itermax, verbose = False):
    """ Conduct a UCT search for itermax iterations starting from rootstate.
        Return the best move from the rootstate.
        Assumes 2 alternating players (player 1 starts), with game results in the range [0.0, 1.0]."""

    rootnode = Node(state = rootstate)

    for i in range(itermax):
        node = rootnode
        state = rootstate.Clone()

        # Select
        while node.untriedMoves == [] and node.childNodes != []: # node is fully expanded and non-terminal
            node = node.UCTSelectChild()
            state.DoMove(node.move)

        # Expand
        if node.untriedMoves != []:  # if we can expand (i.e. state/node is non-terminal)
            m = random.choice(node.untriedMoves) 
            state.DoMove(m)
            node = node.AddChild(m, state)  # add child and descend tree

        # Rollout - this can often be made orders of magnitude quicker using a state.GetRandomMove() function
        while state.GetMoves() != []: # while state is non-terminal
            if start == 1:
                r = random.choice(state.GetMoves())
            else:
                r = Exploration_Exploitation (state, decisiontree) #Compute the function made it for 90 -10% of decision
            state.DoMove(r)

        # Backpropagate
        while node != None: # backpropagate from the expanded node and work back to the root node
            w = state.GetResult(node.playerJustMoved) # state is terminal. Update node with result from POV of node.playerJustMoved
            node.Update(w)
            
            node = node.parentNode

    # Output some information about the tree - can be omitted
    #if verbose: print(rootnode.TreeToString(0))
    #else: print(rootnode.ChildrenToString())
    
    UCT_value = sorted(rootnode.childNodes, key = lambda c: c.visits)[-1].move # return the move that was most visited
    #Most returned best move
    #print("UCT value:", UCT_value)
    return UCT_value 

                
def UCTPlayGame(start, decisiontree):
    """ Play a sample game between two UCT players where each player gets a different number 
        of UCT iterations (= simulations = tree nodes).
    """
    state = OXOState()

    df_t=[[0,0,0,0,0,0,0,0,0]] #Creating the default row (first row) (code line added)
    bm_list=[] # creating a best moves empty list (code line added)
    while state.GetMoves() != []:
        #print(str(state))
        if state.playerJustMoved == 1:
            m = UCT(start, decisiontree, rootstate=state, itermax=50, verbose=False)  # play with values for itermax and verbose = True
        else:
            m = UCT(start, decisiontree, rootstate=state, itermax=50, verbose=False) #itermax decides the total value of visits.
        #print("Best Move: " + str(m) + "\n")
        state.DoMove(m)
        bm_list.append(m) # append the best moves to the list
        df_t.append(list(state.board))  #append all the positions list 
        
        if state.GetResult(state.playerJustMoved) != False:
            #print(str(state))            
            break

    if state.GetResult(state.playerJustMoved) == 1.0:
        winner = state.playerJustMoved
    elif state.GetResult(state.playerJustMoved) == 0.0:
        winner = (3-state.playerJustMoved) #Updating player 1
    else: 
        winner=0 #tie

    bm_list.append('NaN')  # Append NaN values ti the last state of each game because there are no more moves to choose. (code line added)
    
    
    #Join the postitions board with the correcponding best move
    for i in range(len(df_t)): # (code line added)
        df_t[i].append(bm_list[i]) # (code line added)
    return df_t, winner # (code line added)

def dataset (games, start, decisiontree):
    """ Play a single game to the end using UCT for both players. 
    """
    #Generate the game for N times (for purpose of the assigment it will be generated 2000 games)
    df =[] # LINE ADDED. List of list, each list is a game
    fdf = [] # LINE ADDED. List of all the games' stages
    
    #Creating the counters that will indicate if player 1 or 2 wins, or if the game is tie.
    winner1=0
    winner2=0
    winner0=0
    winner=0
    
    
    for i in range(games): # Runing the game for as many times the value in range is written (code line added)
        ds, winner = UCTPlayGame(start, decisiontree) # Calling the game to play (code line added)
        df.append(ds) # Append the games into a dataframe (code line added)
        if winner == 1:
            winner1 += 1
        elif winner == 2:
            winner2 += 1
        else:
            winner0 += 1
    #print (df)

    #Change from a list of lists into one list
     # (code line added)
    for i in df:  # (code line added)
        for j in i:  # (code line added)
            fdf.append(j)  # (code line added)
    #print (fdf)

     
    #Formating the data frame
    data_df=pd.DataFrame(fdf, columns=['pos: 0', 'pos: 1', 'pos: 2', 'pos: 3', 'pos: 4',
                                       'pos: 5', 'pos: 6', 'pos: 7', 'pos: 8', 'Best_Move']) #(code line added)
    #Deleting the rows that have NaN values  
    data_df.drop(data_df[data_df.Best_Move == 'NaN'].index, inplace = True) #(code line added)
    #data_df=data_df.ix[data_df.iloc[:,-1]!='NaN'] #(code line added)

    #Generate the correct indexing for the rows (because it change after removing NaN values)
    data_df.reset_index(drop=True, inplace=True) #(code line added)
     
    #Printing the final Data Frame
    #print("wins Player 1:", winner1 ) # (code line added)
    #print("wins Player 2:", winner2 ) # (code line added)
    #print("None wins:", winner0 ) # (code line added)
    
    return data_df # (code line added)

In [3]:
#Training the decision tree (The test of the classifiers is in the file named: Training the classifier_Assignment 2.ipynb)

def train_DT(input_DT):
    features = ["pos: 0", "pos: 1", "pos: 2", "pos: 3", "pos: 4", "pos: 5", "pos: 6","pos: 7","pos: 8",]
    outcome = ["Best_Move"]
    #Knowing the features and outcomes of the dataset
    X = input_DT[features]
    y = input_DT[outcome].astype('int')
    #Divide data into training and testing set
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=42)
    
    # Create Decision Tree classifer object
    clf = DecisionTreeClassifier(max_depth=11)
    
    # Train Decision Tree Classifer
    clf.fit(X_train,y_train)
    
    #Predict the response for test dataset
    y_pred_train = clf.predict(X_train)
    
    #Predict the response for test dataset
    y_pred_test = clf.predict(X_test)
    
    # Model Accuracy/recall/precisio train set
    #print(classification_report(y_train, y_pred_train))
    
    # Model Accuracy/recall/precisio test set
    #print(classification_report(y_test, y_pred_test))
    
    return clf


In [4]:
#Funtion that computed that the agent plays with all its past self 10 games and record the results
def iterate():
    start = 1 #For computing the first iteration (Expert policy)
    dc = {} #Dictionary for storing the records
    global st_tr
    st_tr=[]
    decisiontree = None #Storing the classifier
    #The range is the number of iterations generated
    for i in range (10): 
        datadf = dataset(300, start, decisiontree) #Playing 300 games for every iterations
        game_s = pd.DataFrame(st_tr, columns=['pos: 0', 'pos: 1', 'pos: 2', 'pos: 3', 'pos: 4',
                                       'pos: 5', 'pos: 6', 'pos: 7', 'pos: 8', 'Best_Move'])
        #Create the final dataset which is going to be computed in the classifer
        datadf = pd.concat([datadf,game_s], ignore_index=True, axis=0)
        decisiontree = train_DT(datadf)
        dc.update({i:decisiontree})
        start = 0
        st_tr.clear()
    return dc

In [5]:
#Modification of the main functions of the code provided for unification of the previous funtions.

def mo_UCT(DT_player1, DT_player2,rootstate, itermax, verbose = False): 
    """ Conduct a UCT search for itermax iterations starting from rootstate.
        Return the best move from the rootstate.
        Assumes 2 alternating players (player 1 starts), with game results in the range [0.0, 1.0]."""

    rootnode = Node(state = rootstate)

    for i in range(itermax):
        node = rootnode
        state = rootstate.Clone()

         # Select
        while node.untriedMoves == [] and node.childNodes != []: # node is fully expanded and non-terminal
            node = node.UCTSelectChild()
            state.DoMove(node.move)

        # Expand
        if node.untriedMoves != []:  # if we can expand (i.e. state/node is non-terminal)
            m = random.choice(node.untriedMoves) 
            state.DoMove(m)
            node = node.AddChild(m, state)  # add child and descend tree

        # Rollout - this can often be made orders of magnitude quicker using a state.GetRandomMove() function
        while state.GetMoves() != []: # while state is non-terminal
            
            if(state.playerJustMoved==1):
                r=DT_player1.predict([state.board])
                r=r[0]
                if r not in state.GetMoves():
                    r=random.choice(state.GetMoves())
            else:
                r=DT_player2.predict([state.board])
                r=r[0]
                if r not in state.GetMoves():
                    r=random.choice(state.GetMoves())
            state.DoMove(r)

        # Backpropagate
        while node != None: # backpropagate from the expanded node and work back to the root node
            node.Update(state.GetResult(node.playerJustMoved)) # state is terminal. Update node with result from POV of node.playerJustMoved
            node = node.parentNode

    # Output some information about the tree - can be omitted
    #if (verbose): print(rootnode.TreeToString(0))
    #else: print(rootnode.ChildrenToString())

    return sorted(rootnode.childNodes, key = lambda c: c.visits)[-1].move # return the move that was most visited
                
def mo_UCTPlayGame(DT_player1, DT_player2):
    """ Play a sample game between two UCT players where each player gets a different number 
        of UCT iterations (= simulations = tree nodes).
    """
    
    state = OXOState() 
    
    while (state.GetMoves() != []):
        if state.playerJustMoved == 1:
            m = mo_UCT(DT_player1, DT_player2,rootstate = state, itermax = 50, verbose = False) # play with values for itermax and verbose = True
        else:
            m = mo_UCT(DT_player1, DT_player2, rootstate = state, itermax = 50, verbose = False) #Giving more probability of winnign to player 1
        #print("Best Move: " + str(m) + "\n")
        state.DoMove(m)
        if state.GetResult(state.playerJustMoved) != False:
            break
    if state.GetResult(state.playerJustMoved) == 1.0:
        return state.playerJustMoved
    elif state.GetResult(state.playerJustMoved) == 0.0:
        return (3-state.playerJustMoved)
    else:
        return 0

In [6]:
#Function for running the complete system (Imitation learning)
def Imitation_Learning(games):
    for i in range(games):
        call=iterate() #Calling the function that that computed that the agent plays with all its past self 10 games and record the results
        clf_10=0 #Counter of the 10th decision tree of every iteration
        clf_dif=0 # Counter of all decision trees but 10th for every iteration
        tie=0 #Counter of games ending in tie
        #Compute for the 9 postions
        for i in range(len(call)-1):
            w = mo_UCTPlayGame(call[len(call)-1], call[i])
            if w==1:
                clf_10+=1
            elif w==2:
                clf_dif+=1
            else:
                tie+=1
        #Printing the outputs
        print("10th DT Winnings  ", clf_10)
        print("Other DTs Winnings: ", clf_dif)
        print("Ties : ", tie)

In [7]:
#Executing the function of imitation learning
Imitation_Learning(10)

10th DT Winnings   5
Other DTs Winnings:  0
Ties :  4
10th DT Winnings   6
Other DTs Winnings:  0
Ties :  3
10th DT Winnings   6
Other DTs Winnings:  0
Ties :  3
10th DT Winnings   2
Other DTs Winnings:  1
Ties :  6
10th DT Winnings   7
Other DTs Winnings:  0
Ties :  2
10th DT Winnings   6
Other DTs Winnings:  0
Ties :  3
10th DT Winnings   5
Other DTs Winnings:  1
Ties :  3
10th DT Winnings   5
Other DTs Winnings:  1
Ties :  3
10th DT Winnings   6
Other DTs Winnings:  0
Ties :  3
10th DT Winnings   7
Other DTs Winnings:  1
Ties :  1


The learning is improving based on the greater number of winnings for the 10th classifier