In [4]:
import os
dir = "/home/daanyal/Documents/GitHub/MyML/"
os.chdir(dir)
!pwd

/home/daanyal/Documents/GitHub/MyML


In [5]:
os.chdir("mlProjects/Connect-4/")
!pwd

/home/daanyal/Documents/GitHub/MyML/mlProjects/Connect-4


In [52]:
import struct
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import activations
import pydot
import time
from collections import namedtuple


from game import game as ConnectGame

In [7]:
class Agent:
    """
    Interface: Any agents for this game must inherit from this type
    """
    def __init__(self, env:ConnectGame, plr:int):
        self.env = env
        self.plr = plr
        self.learning = True
        self.stateConverter = Agent.StateConverter()
        self.defaultPlr = plr # plr should be trained as the default plr, else it will help the other player win
    
    def switchPlayer(self, plr:int):
        self.plr = plr
    
    def step(self):
        move = self.getMove()
        state, reward, done = self.env.step(move, self.plr)

    def getMove(self, state=None):
        if state:
            state = self.stateConverter.convertState(state)
            
        moves = self.env.getLegalMoves()
        return np.random.choice(moves)
    
    class StateConverter:
        def convertState(self, state):
            # Default state is np array which takes the size of the gameboard:
            #    -1: empty,  0: player 1,   1: player 2
            self.shape = state.shape
            return state
        
        def getStateShape(self) -> int | tuple:
            return self.shape


In [49]:
class QFunction:
    def evaluate_state(self, state):
        return 0
    def evaluate_states(self, states):
        return [0 for s in states]
    def update_state(self, state, qVal):
        return
    def update_states(self, states, qVals):
        return

class TableQFunction(QFunction):
    def __init__(self, stateShape):
        rows, cols = stateShape[0], stateShape[1]
        possibleCols = 2**(rows+1) - 1 # Number of different possible columns that can exist
        self.qTable = np.ones(possibleCols ** cols) * 110
    
    def evaluate_state(self, state):
        return self.qTable[state]
    def evaluate_states(self, states):
        return np.array([self.qTable[state] for state in states])
    def update_state(self, state, qVal):
        assert type(qVal) == float or type(qVal) == np.float64, f"Invalid qVal: {qVal}"
        self.qTable[state] = qVal
    def update_states(self, states, qVals):
        assert len(states) == len(qVals), "Unequal number of states and Q-values" 
        for i in range(len(states)):
            self.update_state(states[i], qVals[i])

In [None]:
class NeuralNetQFunction(QFunction):
    def __init__(self, stateShape:tuple, architecture:list[int]=[200,200,1]):
        self.inputSize = stateShape[0]*stateShape[1]
        self.architecture = architecture
        # use tpu here if available:
        self.initNetwork()
    
    def initNetwork(self): # reminder: implement transfer learning
        self.model = keras.Sequential([
            layers.Dense(self.architecture[0], activation="relu", input_shape=(self.inputSize,))
        ])
        for layer in self.architecture[1:]:
            self.model.add(layer, activation="relu")
        
        
        self.model.compile()

    def evaluate_state(self, state):
        pass
    def evaluate_states(self, states):
        pass
    def update_state(self, state, qVal):
        pass
    def update_states(self, states, qVals):
        pass

In [None]:
class ModelTester:
    pass

In [9]:
Memory = namedtuple("Memory", ("state","q", "next_state", "next_q", "reward"))

class GameMemory:
    def __init__(self):
        self.memories = []
    
    def addMemory(self, mem:Memory):
        self.memories.append(mem)
    
    def popMemory(self) -> Memory:
        return self.memories.pop()
    
    def hasMemory(self) -> bool:
        return len(self.memories) > 0
    
    def countMemories(self) -> int:
        return len(self.memories)

In [10]:
class ScheduledParameter:
    def __init__(self, value:float):
        self.value = value
        self.valueFloor = 0
        self.valueCeil = 1
    
    def get(self) -> float:
        return self.value
    
    def step(self):
        self.value = max(min(self.value, self.valueCeil), self.valueFloor)
    def reset(self):
        return
    def set(self, value:float):
        self.value = value
class ExponentialDecayParameter(ScheduledParameter):
    def __init__(self, initialValue:float, decayRate:float, valueFloor:float = 0,valueCeil=1):
        ScheduledParameter.__init__(self,initialValue)
        self.initialValue = initialValue
        self.decayRate = decayRate
        self.valueFloor = valueFloor
    def step(self):
        self.value *= self.decayRate
        return ScheduledParameter.step(self)
    def reset(self):
        self.value = self.initialValue
class LinearDecayParameter(ExponentialDecayParameter):
    def step(self):
        self.value -= self.decayRate
        return ScheduledParameter.step(self)

In [11]:
class ParameterConfiguration:
    def __init__(self, epsilon=ExponentialDecayParameter(1,0.995,0.1),
                 gamma=ScheduledParameter(0.7), stepSize=ScheduledParameter(0.3)):
        self.epsilon = epsilon
        self.gamma = gamma
        self.stepSize = stepSize
    def resetAll(self):
        self.epsilon.reset()
        self.gamma.reset()
        self.stepSize.reset()

In [12]:
class DebugVariableStore:
    def __init__(self):
        self.nextStateVals = None
        self.actions = None

In [41]:


class QLearningAgent(Agent):
    def __init__(self, env:ConnectGame, plr:int, configuration:ParameterConfiguration = ParameterConfiguration()):
        Agent.__init__(self, env, plr)
        self.stateConverter = QLearningAgent.EncodingStateConverter()
        self.qFunction = TableQFunction(env.gameboard.shape)

        self.debugVariables = DebugVariableStore()

        self.switchPlayer(plr)

        self.parameterConfig = configuration
        self.resetHyperParams()

        self.currentGameMemory = GameMemory()

    def switchPlayer(self, plrNum:int):
        Agent.switchPlayer(self, plrNum)
        self.plr = plrNum
        self.stateSelectors = (np.amax, np.argmax)
        if plrNum != self.defaultPlr:
            self.stateSelectors = (np.amin, np.argmin)

    def resetHyperParams(self):
        self.epsilon = self.parameterConfig.epsilon
        self.gamma = self.parameterConfig.gamma
        self.stepSize = self.parameterConfig.stepSize
        self.parameterConfig.resetAll()

    def episodeEnded(self): #learning and stuff
        memories = self.currentGameMemory
        i = 0
        nextStateQ = 0
        while memories.hasMemory():
            lastMem = memories.popMemory()
            state, q, next_state, next_q, reward = lastMem

            for state, qVal, reward in ((next_state, next_q, reward), (state, q, 0)):
                targetQ = reward + self.gamma.get() * nextStateQ
                assert type(nextStateQ) == float or type(nextStateQ) == int, f"nextStateQ val type should be a float, not {nextStateQ}"
                assert type(targetQ) == float or type(targetQ) == int, "invalid target Q value"

                qVal += self.stepSize.get() * (targetQ - qVal)#Q + stepSize(targetQ - Q)
                
                assert type(qVal) == np.float64, f"Invalide Q val: {qVal}"
                #trainData[i] = [state, qVal]
                self.qFunction.update_state(state, qVal)

                nextStateQ = targetQ # Or set it to updated Q? 
                i+=1
        
        self.currentGameMemory = GameMemory()
        self.epsilon.step()
        


    def step(self):
        self.currentMemory = [] #namedTuple

        move = self.getMove()

        nextState, reward, done = self.env.step(move, self.plr)

        encodedNext = self.stateConverter.convertState(nextState)

        if done:
            self.currentMemory[2] = encodedNext
        elif not self.currentMemory[2] == encodedNext:
            print(self.currentMemory[2])
            print(encodedNext)
            print(nextState)
        assert self.currentMemory[2] == encodedNext, "Bug spotted: Bug likely exists in getMove"

        self.currentMemory.append(reward)
        x = self.currentMemory

        self.currentGameMemory.addMemory(Memory(x[0],x[1],x[2],x[3],x[4]))

        if done:
            self.episodeEnded()
        
    def getMove(self): ## need to store:
        ## state, Q, next_state, nextQ, reward, numActions, epsilon
        state = self.env.getState(self.plr)
        state = self.stateConverter.convertState(state)

        self.currentMemory.append(state)

        actions = self.env.getLegalMoves()

        self.debugVariables.actions = self.env.getLegalMoves()

        if actions[0] == -1:
            stateVal = self.qFunction.evaluate_state(state)
            self.currentMemory.append(stateVal)
            self.currentMemory.append(state)
            self.currentMemory.append(stateVal)
            return -1
        
        nextStates = self.env.peekActions(actions, self.plr)
        nextStates = [self.stateConverter.convertState(state) for state in nextStates]
        nextStates.append(state)
        stateVals = self.qFunction.evaluate_states(nextStates)

        self.debugVariables.nextStateVals = stateVals.copy()

        if self.epsilon.get() < np.random.random():
            act = self.stateSelectors[1](stateVals[:-1])
            topStateVal, topAction = self.stateSelectors[0](stateVals[:-1]), actions[act]
        else:
            act = np.random.randint(0, len(actions))
            topAction = actions[act]
            topStateVal=stateVals[act]##dsfsdfdsfcsdvfdds

        self.currentMemory.append(stateVals[-1])
        self.currentMemory.append(nextStates[act])
        self.currentMemory.append(topStateVal)
        return topAction
        

    class Configuration:
        def __init__(self):
            self.epsilon = 1 ## start, stop, schedule
    
    class EncodingStateConverter(Agent.StateConverter):
        def __init__(self):
            self.shape = (1)
        def convertState(self, state):
            state = state.copy()
            state+=1
            rows, cols = state.shape[0], state.shape[1]

            multiplyTable = np.array([ [2**(rows-i-1) ] for i in range(rows) ])
            state = state * multiplyTable
            colVal = 2**(rows+1) - 1
            state = state * np.array([(colVal)**(i) for i in range(cols)])
            state = state.sum()
            return state

In [25]:
###Encoding state converter unit test
def unit_test_getPossibleColumns(rows):
    possibleCols = [[-1]*rows,[0],[1]]
    if rows == 1:
        return possibleCols
    possibleUppers = unit_test_getPossibleColumns(rows-1)
    for i in range(1,3):
        for upper in possibleUppers:
            res = upper + possibleCols[i]
            res = [-1]*(rows - len(res)) + res
            if not res in possibleCols:
                possibleCols.append(res)
                
    return [possibleCols[0]] + possibleCols[3:]

def unit_test():
    rows = 4
    cols = 4
    converter = QLearningAgent.EncodingStateConverter()
    possibleColumns = unit_test_getPossibleColumns(rows)

    encodedStates = []
    for c in possibleColumns:
        state = np.array([c]).T
        converted = converter.convertState(state)
        assert converted not in encodedStates, f"Unit test failed:\nstate {converted}:\n{state} == state {encodedStates.index(converted)}"
        encodedStates.append(converted)

    print(f"There are {len(possibleColumns)} possible columns")
    
    possibleStates = [[c] for c in possibleColumns]
    i = 1
    while i < rows:
        x = []
        for j in range(len(possibleStates)):
            for c in possibleColumns:
                x.append(possibleStates[j] + [c])
        possibleStates = x
        i+=1
    print("Got all possible states")
    encodedStates = []
    failed = 0
    i = 0
    for state in possibleStates:
        if i % 2000 == 0:
            print(f"{i} / {len(possibleStates)}")
        state = np.array(state).T
        converted = converter.convertState(state)
        x=-1
        if converted in encodedStates:
            failed+=1
            x = encodedStates.index(converted)
        assert converted not in encodedStates, f"Unit test failed:\nstate {converted}:\n{state} == state {x}:\n{np.array(possibleStates[x]).T}"
        encodedStates.append(converted)
        i+=1
    print(failed / len(encodedStates))
    print("Test passed!")

unit_test()

There are 31 possible columns


Got all possible states
0 / 923521
2000 / 923521
4000 / 923521
6000 / 923521
8000 / 923521
10000 / 923521
12000 / 923521
14000 / 923521
16000 / 923521
18000 / 923521
20000 / 923521
22000 / 923521
24000 / 923521
26000 / 923521
28000 / 923521
30000 / 923521
32000 / 923521
34000 / 923521
36000 / 923521
38000 / 923521
40000 / 923521
42000 / 923521
44000 / 923521
46000 / 923521
48000 / 923521
50000 / 923521
52000 / 923521
54000 / 923521
56000 / 923521
58000 / 923521
60000 / 923521
62000 / 923521


KeyboardInterrupt: 

In [50]:
numGames = 10000

game = ConnectGame(4,5,4)
#ExponentialDecayParameter(1,0.9995,0.1)
agent0 = QLearningAgent(env=game, plr=0,
                        configuration=ParameterConfiguration(epsilon=ExponentialDecayParameter(1,0.9995,0.1),
                                                             stepSize=ScheduledParameter(0.2),
                                                             gamma=ScheduledParameter(0.2)))
agent1 = QLearningAgent(env=game, plr=1,
                        configuration=ParameterConfiguration(epsilon=ExponentialDecayParameter(1,0.9995,0.1),
                                                                            stepSize=ScheduledParameter(0.2),
                                                             gamma=ScheduledParameter(0.2)))
agents = [agent0,agent1]
plr1Wins = 0
draws = 0
for gameNum in range(numGames):
    if gameNum % 400 == 0:
        print(f"starting game {gameNum}")
        print(f"Epsilon: {agent0.epsilon.get()}")
        print(f"Player 1 won {(plr1Wins/400):.2%} of the last 400 games")
        print(f"{(draws/400):.2%} of the last 400 games were drawn\n")
        
        plr1Wins = 0
        draws = 0
        ## Gonna try making plr 2 qFunction the same as plr 1 every x games.
        ## Could try as either a copy, so it learns independantly, or a reference,
        ## so it is literally playing against itself the same way a person does in chess
        #agent1.qFunction.qTable = agent0.qFunction.qTable.copy()
    
    i=0
    while not game.done:
        agents[i%2].step()
        i+=1
    agents[i%2].step()
    if game.winner == 0:
        plr1Wins += 1
    elif game.winner == -1:
        draws += 1

    game.reset()


starting game 0
Epsilon: 1
Player 1 won 0.00% of the last 400 games
0.00% of the last 400 games were drawn

starting game 400
Epsilon: 0.8186898039137951
Player 1 won 41.50% of the last 400 games
28.00% of the last 400 games were drawn

starting game 800
Epsilon: 0.6702529950324074
Player 1 won 42.25% of the last 400 games
26.50% of the last 400 games were drawn

starting game 1200
Epsilon: 0.548729293075715
Player 1 won 43.25% of the last 400 games
27.00% of the last 400 games were drawn

starting game 1600
Epsilon: 0.44923907734991153
Player 1 won 42.25% of the last 400 games
26.00% of the last 400 games were drawn

starting game 2000
Epsilon: 0.3677874521460121
Player 1 won 37.50% of the last 400 games
31.75% of the last 400 games were drawn

starting game 2400
Epsilon: 0.3011038370793723
Player 1 won 38.75% of the last 400 games
32.00% of the last 400 games were drawn

starting game 2800
Epsilon: 0.24651064133620196
Player 1 won 41.25% of the last 400 games
27.50% of the last 400 g

In [None]:
# testing...
numGames = 1
oldEps = agent0.epsilon.get()
agent0.epsilon.set(0)
for gameNum in range(numGames):
    if gameNum % 20 == 0:
        print(f"starting game {gameNum}")
    game.show()
    time.sleep(1)
    i=0
    while not game.done:
        agents[i%2].step()
        game.show()
        time.sleep(1)
        print()
        i+=1
    agents[i%2].step()

    if game.winner == -1:
        print("Game was a draw")
    else:
        print(f"Winner is player {game.winner+1}")
    game.reset()
agent0.epsilon.set(oldEps)

starting game 0
|---||---||---|
|   ||   ||   |
|---||---||---|
|   ||   ||   |
|---||---||---|
|   ||   ||   |
|---||---||---|
|   ||   ||   |
|---||---||---|
|   ||   ||   |
|---||---||---|
| o ||   ||   |

|---||---||---|
|   ||   ||   |
|---||---||---|
| x ||   ||   |
|---||---||---|
| o ||   ||   |

|---||---||---|
| o ||   ||   |
|---||---||---|
| x ||   ||   |
|---||---||---|
| o ||   ||   |

|---||---||---|
| o ||   ||   |
|---||---||---|
| x ||   ||   |
|---||---||---|
| o || x ||   |

NO
|---||---||---|
| o ||   ||   |
|---||---||---|
| x ||   ||   |
|---||---||---|
| o || x ||   |

|---||---||---|
| o ||   ||   |
|---||---||---|
| x ||   ||   |
|---||---||---|
| o || x || x |

NO
|---||---||---|
| o ||   ||   |
|---||---||---|
| x ||   ||   |
|---||---||---|
| o || x || x |

|---||---||---|
| o ||   ||   |
|---||---||---|
| x || x ||   |
|---||---||---|
| o || x || x |

NO
|---||---||---|
| o ||   ||   |
|---||---||---|
| x || x ||   |
|---||---||---|
| o || x || x |

|---||

In [51]:
## Play vs human test
# testing...

class HumanAgent(Agent):
    def step(self):
        move = int(input("Where to place? (1-3)")) - 1
        state, reward, done = self.env.step(move, self.plr)

numGames = 1
oldEps = agent0.epsilon.get()
agent0.epsilon.set(0)
oldAgents = agents

agents = [agent0, HumanAgent(game, 1)]

#agents.reverse()

agents[0].switchPlayer(0)
agents[1].switchPlayer(1)
print(agents)
print(agents[0].plr)
print(agents[1].plr)

agent0.debugVariables = DebugVariableStore()
agent1.debugVariables = DebugVariableStore()
game.reset()
for gameNum in range(numGames):
    if gameNum % 20 == 0:
        print(f"starting game {gameNum}")
    game.show()
    time.sleep(0.5)
    i=0
    while not game.done:
        agents[i%2].step()
        print(f"plr {i%2}'s turn:")
        print(f"State vals:\n{agent0.debugVariables.nextStateVals}")
        game.show()
        print(f"plr 0 value: {agent0.qFunction.evaluate_state(agent0.stateConverter.convertState(game.getState(0)))}")
        #print(f"plr 1 value: {agent0.qFunction.evaluate_state(agent1.stateConverter.convertState(game.getState(0)))}")

        ### TODO: Improve exploration by randomly choosing action from the set of equally good actions
        #print(f"plr 1 State vals:\n{agent1.debugVariables.nextStateVals}")
        time.sleep(0.5)
        #input("Press enter to continue...")
        print()
        i+=1
    agents[i%2].step()

    if game.winner == -1:
        print("Game was a draw")
    else:
        print(f"Winner is player {game.winner+1}")
    game.reset()
agents = oldAgents
agent0.epsilon.set(oldEps)

[<__main__.QLearningAgent object at 0x7f8566a18a10>, <__main__.HumanAgent object at 0x7f85669b5b10>]
0
1
starting game 0
|---||---||---||---||---|
|   ||   ||   ||   ||   |
|---||---||---||---||---|
|   ||   ||   ||   ||   |
|---||---||---||---||---|
|   ||   ||   ||   ||   |
|---||---||---||---||---|
|   ||   ||   ||   ||   |
plr 0's turn:
State vals:
[-2.68670561e-06 -3.29676240e-05  1.17134783e-08 -2.67615343e-06
 -2.32805711e-06  1.29227883e-09]
|---||---||---||---||---|
|   ||   ||   ||   ||   |
|---||---||---||---||---|
|   ||   ||   ||   ||   |
|---||---||---||---||---|
|   ||   ||   ||   ||   |
|---||---||---||---||---|
|   ||   || o ||   ||   |
plr 0 value: 1.171347828593813e-08

plr 1's turn:
State vals:
[-2.68670561e-06 -3.29676240e-05  1.17134783e-08 -2.67615343e-06
 -2.32805711e-06  1.29227883e-09]
|---||---||---||---||---|
|   ||   ||   ||   ||   |
|---||---||---||---||---|
|   ||   ||   ||   ||   |
|---||---||---||---||---|
|   ||   ||   ||   ||   |
|---||---||---||---||