In [1]:
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import copy
from keras.optimizers import Adam,RMSprop
from keras.models import Sequential
from keras.layers.core import Dense, Dropout
import pandas as pd
import seaborn as sns
import random
import math

Using TensorFlow backend.


In [2]:
#Tree for implementation of Prioritized Experience Replay
class SumTree:
    write = 0

    def __init__(self, capacity):
        self.capacity = capacity
        self.tree = np.zeros( 2*capacity - 1 )
        self.data = np.zeros( capacity, dtype=object )

    def _propagate(self, idx, change):
        parent = (idx - 1) // 2

        self.tree[parent] += change

        if parent != 0:
            self._propagate(parent, change)

    def _retrieve(self, idx, s):
        left = 2 * idx + 1
        right = left + 1

        if left >= len(self.tree):
            return idx

        if s <= self.tree[left]:
            return self._retrieve(left, s)
        else:
            return self._retrieve(right, s-self.tree[left])

    def total(self):
        return self.tree[0]

    def add(self, p, data):
        idx = self.write + self.capacity - 1

        self.data[self.write] = data
        self.update(idx, p)

        self.write += 1
        if self.write >= self.capacity:
            self.write = 0

    def update(self, idx, p):
        change = p - self.tree[idx]

        self.tree[idx] = p
        self._propagate(idx, change)

    def get(self, s):
        idx = self._retrieve(0, s)
        dataIdx = idx - self.capacity + 1

        return (idx, self.tree[idx], self.data[dataIdx])

In [3]:
#Class memory, using the Tree for PER
class Memory:   # stored as ( s, a, r, s_ ) in SumTree
    e = 0.01
    a = 0.6

    def __init__(self, capacity):
        self.tree = SumTree(capacity)

    def _getPriority(self, error):
        #converting error in priority with this formula
        #epsilon self.e is a small positive constant that ensures that no transition has zero priority
        #alpha, 0≤ self.a ≤1, controls the difference between high and low error
        return (error + self.e) ** self.a

    def add(self, error, sample):
        p = self._getPriority(error)
        self.tree.add(p, sample) 

    def sample(self, n):
        batch = []
        segment = self.tree.total() / n

        for i in range(n):
            a = segment * i
            b = segment * (i + 1)

            s = random.uniform(a, b)
            (idx, p, data) = self.tree.get(s)
            batch.append( (idx, data) )

        return batch

    def update(self, idx, error):
        p = self._getPriority(error)
        self.tree.update(idx, p)

In [8]:
class DQNAgent(object):
    
    def __init__(self):
        self.memory_cap=200000
        self.batch_size=32
        self.gamma = 0.8
        self.max_eps=1
        #Higer min_eps for the start, tha smaller
        self.min_eps=0.001
        self.learning_rate = 0.00025
        self.epsilon=1
        self.steps=0
        #smaller lambda for the start , than bigger
        #Step 1
        self.lambd=0.0005
        self.update_target_freq= 5000
        self.memory = Memory(self.memory_cap)
        #self.model = self.network()
        self.model = self.network("8x8_weights_new.hdf5")
        
        #model for target network
        #self.model_=self.network()
        self.model_=self.network("8x8_weights_new.hdf5")
        
        
    def network(self, weights=None):
        model = Sequential()        
        model.add(Dense(640 , activation="relu", input_shape=(64,)))
        model.add(Dropout(0.20))
        model.add(Dense(128 , activation="relu", input_shape=(64,)))
        model.add(Dropout(0.20))
        model.add(Dense(64,activation='softmax'))
        #I use the huber loss function
        opt = RMSprop(lr=self.learning_rate)
        model.compile(loss=tf.keras.losses.Huber(), optimizer=opt)

        if weights:
            model.load_weights(weights)
        return model
        
    def get_state(self,board):
        state_old=board.get_board_state_array()
        state_old=np.where(state_old==1,2,state_old)
        state_old=np.where(state_old==0,1,state_old)
        state_old=np.where(state_old==None,0,state_old)
        return state_old
    
    def makemove(self,board):
        print("Predicted move:")
        state_old = self.get_state(board)
        pred=self.predict(state_old.reshape((1,64)))[0]
        mov=np.argmax(pred)
        row=mov//8            
        column=mov%8
        action=(row,column)
        return action
    
    #I give a +1 reward if i win and -1 if i lose, everything else is 0
    def set_reward(self, board, action):
        reward = 0
        
        d=board.get_game_over()
        #the player is the opponent because i did the last action
        pl=board.current_player
        bl=board.get_black_score()
        wh=board.get_white_score()
        
        if d==False:
            cell = board.get_board_state()[action]
            if cell.is_valid_move is True:
                reward= 0.5
            else:
                reward= -1
        else:
            #two option if i win and i'm white or black
            if(pl==1 and bl>wh):
                reward= reward + 1
            elif(pl==0 and wh>bl):
                reward= reward + 1
            else:
                #here I've lost
                reward= reward -1
        return reward    
    
    #MEMORY
    
    def remember(self, state, action, reward, next_state, done):
        sample=(state, action, reward, next_state, done)
        x,y,errors = self.replay_new([(0,sample)])
        self.memory.add(errors[0],sample)
        
        #update target model
        if self.steps % self.update_target_freq == 0:
            self.updateTargetModel()
          
        self.steps+=1
        self.epsilon = self.min_eps + (self.max_eps - self.min_eps) * math.exp(-self.lambd * self.steps)
        
    #PREDICTION
    
    def predict(self,s,target=False):
        if target:
            return self.model_.predict(s)
        else:
            return self.model.predict(s)
    
    def updateTargetModel(self):
        self.model_.set_weights(self.model.get_weights())
    
    #TRAIN
    
    def replay_new(self,minibatch):        
        #contain information for the all minibatch
        x = []
        y = []
        errors = []
        for index, sample in minibatch:
            (state, action, reward, next_state, done) = sample
            #getting prediction on states
            pred=self.predict(state.reshape((1,64)))[0]
            next_pred=self.predict(next_state.reshape((1,64)),target=False)[0]
            next_pred_target=self.predict(next_state.reshape((1,64)),target=True)[0]

            oldVal=pred[action[0]*8+action[1]]
            
            if not done:
                #Bellman equation with double DQN
                pred[action[0]*8+action[1]] = reward + self.gamma * next_pred_target[np.argmax(next_pred)]
            else:
                pred[action[0]*8+action[1]] = reward
            
            error= abs(oldVal - pred[action[0]*8+action[1]])
            x.append(state)
            y.append(pred)
            errors.append(error)
        return(x,y,errors)
    
    def replay(self):
        minibatch = self.memory.sample(self.batch_size)
        x,y,errors = self.replay_new(minibatch)
        X = np.array(x)
        Y = np.array(y)
        #update errors
        for i in range(len(minibatch)):
            idx = minibatch[i][0]
            self.memory.update(idx, errors[i])
        
        self.model.fit(X,Y,batch_size=self.batch_size, epochs=1, verbose=0)