In [None]:
import matplotlib.pyplot as plt

from enum import Enum
import imageio
import numpy as np
import gym
import numpy as np
import math as m
import copy

print("all imported")

In [None]:
def sigmoid(x):
    return  1/(1 + np.exp(-np.minimum(np.maximum(x, -100), 100)))

def identity(x):
    return x

In [None]:
class baseRecurrentModel():
    nBaseRecurrentModels = 0
    
    def __init__(self, nInputNodes, nHiddenNodes, nOutputNodes, weightInit="zeros", hiddenActivation=sigmoid, outputActivation=identity, name=""):
        self.BaseRecurrentModelID = baseRecurrentModel.nBaseRecurrentModels
        baseRecurrentModel.nBaseRecurrentModels += 1

        if(name == ""):
            self.name = "baseRecurrentModel_" + str(self.BaseRecurrentModelID)
        
        ## set the instance variables
        self.hiddenActivation = hiddenActivation
        self.outputActivation = outputActivation

        self.nInputNodes  = nInputNodes
        self.nHiddenNodes = nHiddenNodes
        self.nOutputNodes = nOutputNodes

        inpWeightShape = (nInputNodes, nHiddenNodes)
        hiddenWeightShape = (nHiddenNodes, nHiddenNodes)
        outWeightShape = (nHiddenNodes, nOutputNodes)

        hiddenBiasShape = (1, nHiddenNodes)
        outputBiasShape = (1, nOutputNodes)

        ## Initialise the weight matrices
        if weightInit=="zeros":
            self.inputWeights   = np.zeros(inpWeightShape)
            self.hiddenWeights  = np.zeros(hiddenWeightShape)
            self.outputWeights  = np.zeros(outWeightShape)

            self.hiddenBias  = np.zeros(hiddenBiasShape)
            self.outputBias  = np.zeros(outputBiasShape)

        elif weightInit=="uniform":
            self.inputWeights   = np.random.rand(*inpWeightShape)
            self.hiddenWeights  = np.random.rand(*hiddenWeightShape)
            self.outputWeights  = np.random.rand(*outWeightShape)
            
            self.hiddenBias  = np.random.rand(*hiddenBiasShape)
            self.outputBias  = np.random.rand(*outputBiasShape)

        elif weightInit=="normal":
            self.inputWeights   = np.random.normal(size = inpWeightShape)
            self.hiddenWeights  = np.random.normal(size = hiddenWeightShape)
            self.outputWeights  = np.random.normal(size = outWeightShape)
            
            self.hiddenBias  = np.random.normal(size = hiddenBiasShape)
            self.outputBias  = np.random.normal(size = outputBiasShape)

        else:
            raise Exception("unknown weight init option: " + weightInit)
        
        self.hiddenPositions = np.random.rand(nHiddenNodes, 2)
        
    def __str__(self):
        if(hasattr(self, 'inputLayer')):
            inputLayerStr = str(self.inputLayer.shape)
        else: 
            inputLayerStr = "UNINITIALISED"

        if(hasattr(self, 'hiddenLayer')):
            hiddenLayerStr = str(self.hiddenLayer.shape)
        else: 
            hiddenLayerStr = "UNINITIALISED"

        if(hasattr(self, 'outputLayer')):
            outputLayerStr = str(self.outputLayer.shape)
        else: 
            outputLayerStr = "UNINITIALISED"

        return """\n
##################################################
NAME: {0} 
INPUT LAYER:           {1}
    INPUT WEIGHTS:     {4}
HIDDEN LAYER:          {2}
    HIDDEN WEIGHTS:    {5}
    HIDDEN BIAS:       {7}
    HIDDEN ACTIVATION: {9}
OUTPUT LAYER:          {3}
    OUTPUT WEIGHTS:    {6}
    OUTPUT BIAS        {8}
    OUTPUT ACTIVATION: {10}
###################################################
        """.format(self.name, inputLayerStr, hiddenLayerStr, outputLayerStr, 
            str(self.inputWeights.shape), str(self.hiddenWeights.shape), str(self.outputWeights.shape),
            str(self.hiddenBias.shape), str(self.outputBias.shape),
            str(self.hiddenActivation), str(self.outputActivation))


    def getOutputs(self):
        return self.outputLayer
    
    def setInputShape(self, exampleInput):
        # set the input shape for the model, must be [nBatches, nInputNodes, 1]
        # with nInputNodes == the models number of input nodes
        # this is really just to set the number of batches and as a sanity check that the input is as expected

        shapeOk = True
        # first check the shape is ok
        if len(exampleInput.shape) != 2:
            shapeOk = False
        if exampleInput.shape[1] != self.nInputNodes:
            shapeOk = False
        
        if not shapeOk:
            raise Exception("ERROR: in baseRecurrentModel.setInputShape The example input provided has shape", exampleInput.shape, " It should have 2 dimensions: (batch size, nInput nodes)")
            
        self.inputLayer = np.ndarray(exampleInput.shape)
        self.hiddenLayer = np.ndarray((exampleInput.shape[0], self.nHiddenNodes))
        self.outputLayer = np.ndarray((exampleInput.shape[0], self.nOutputNodes))

    def setInputs(self, inputVals):
        # set the values of the input layer, setInputShape() must have already been called
        # and the input provided here must match the shape of the one given there
        self.inputLayer[:,:] = inputVals

    def __call__(self, inputVals):
        # set the inputs, step the model and return output
        self.setInputs(inputVals)
        self.step()
        return self.getOutputs()
        
    
    def forget(self):
        ## make this model forget previous inputs (set the values of all layers to 0)
        self.inputLayer  = np.zeros((1, self.nInputNodes, 1))
        self.hiddenLayer = np.zeros((1, self.nHiddenNodes, 1))
        self.outputLayer = np.zeros((1, self.nOutputNodes, 1))
    
    def copy(self):
        ## get a copy of this model
        return copy.copy(self)
    
    def makeChild(self, variability = 0.01):
        ## make a child of this model (a copy of this model with some slight variation, governed by the variability parameter)

        child = self.copy()
        child.inputWeights  = child.inputWeights  + np.random.normal(size = child.inputWeights.shape) * variability
        child.hiddenWeights = child.hiddenWeights + np.random.normal(size = child.hiddenWeights.shape) * variability
        child.outputWeights = child.outputWeights + np.random.normal(size = child.outputWeights.shape) * variability

        child.hiddenBias = child.hiddenBias + np.random.normal(size = child.hiddenBias.shape) * variability
        child.outputBias = child.outputBias + np.random.normal(size = child.outputBias.shape) * variability

        child.hiddenPositions = self.hiddenPositions + np.random.normal(size = child.hiddenPositions.shape) * variability
        
        child.Initialise()
        
        return child
    
    def render(self, batchID, pltAxisObj=None, figsize=None):
        # render the agents brain to a numpy array of RGB values 
        if(pltAxisObj != None):
            self.render_pyplot(batchID, pltAxisObj)
        
        else:
            fig = plt.figure(0, dpi=160, figsize=figsize)
            ax_0 = fig.add_subplot(111)

            self.render_pyplot(batchID, ax_0)
            fig.canvas.draw()
            
            # Now we can save it to a numpy array.
            data = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
            data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,))

            fig.clf()

            return data
        

In [None]:

class selfConnectedModel(baseRecurrentModel):   
    def step(self):
        self.hiddenLayer = self.hiddenActivation(np.matmul(self.hiddenWeights, self.hiddenLayer) + np.matmul(self.inputWeights, self.inputLayer) + self.hiddenBias)
        self.outputLayer = self.outputActivation(np.matmul(self.outputWeights, self.hiddenLayer) + self.outputBias)

        return self.getOutputs()
    
    def render(self, batchID):
        fig = plt.figure(0, dpi=160)

        ax_0 = fig.add_subplot(111)

        for i in range(self.nHiddenNodes):
            for j in range(i-1):
                ax_0.plot(self.hiddenPositions[[i,j], 0], self.hiddenPositions[[i, j], 1], c ="k")

        ax_0.scatter(self.hiddenPositions[:, 0], self.hiddenPositions[:, 1], c = self.hiddenLayer[batchID, :, 0])

        fig.canvas.draw()

        # Now we can save it to a numpy array.
        data = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
        data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,))

        fig.clf()

        return data
    
    def Initialise():
        return
    
        

In [None]:
class spatialModel(baseRecurrentModel):
    def __init__(self, *args, neuronDistCutoff = 0.5, **kwargs):
        super().__init__(*args, **kwargs) # call the parent class constructor
        self.neuronDistCutoff = neuronDistCutoff

        self.Initialise()

    def Initialise(self):
        self.getHiddenMask()

    def getHiddenMask(self):
        # set the distance mask matrix, ifneurons are further apart than neuronDistCutoff, the weight between them is masked out
        '''self.hiddenMask = np.zeros((self.nHiddenNodes, self.nHiddenNodes))
        for i in range(self.nHiddenNodes):
            for j in range(self.nHiddenNodes):
                if(np.linalg.norm(self.hiddenPositions[i] - self.hiddenPositions[j]) < 0.5):
                    self.hiddenMask[i, j] = 1.0
        '''
        x2 = np.sum(self.hiddenPositions**2, axis=1) 
        y2 = x2
        xy = np.matmul(self.hiddenPositions, self.hiddenPositions.T)
        x2 = x2.reshape(-1, 1)
        distMatrix = np.sqrt(np.maximum(x2 - 2*xy + y2, 0.0))
        
        #print("dist matrix: ", distMatrix.shape)
        self.hiddenMask = np.less_equal(distMatrix, self.neuronDistCutoff) * 1.0

    def step(self):
        a = np.matmul(self.hiddenLayer, self.hiddenWeights *self.hiddenMask)
        b = np.matmul(self.inputLayer, self.inputWeights)
        self.hiddenLayer = self.hiddenActivation(a + b + self.hiddenBias)
        self.outputLayer = self.outputActivation(np.matmul(self.hiddenLayer, self.outputWeights) + self.outputBias)

        return self.getOutputs()
    
    def render_pyplot(self, batchID, pltAxisObj):
        # render the agents brain to a pyplot axis object
        for i in range(self.nHiddenNodes):
            for j in range(self.nHiddenNodes):
                if(self.hiddenMask[i,j] == 1.0):
                    pltAxisObj.plot(self.hiddenPositions[[i,j], 0], self.hiddenPositions[[i, j], 1], c ="k", lw=self.hiddenWeights[i,j])

        pltAxisObj.scatter(self.hiddenPositions[:, 0], self.hiddenPositions[:, 1], c = self.hiddenLayer[batchID, :])
    
    

In [None]:
testModel = spatialModel(3, 10, 3, "normal", hiddenActivation=sigmoid)
print(testModel)

testArray = np.array([[0.1, 0.2, 0.3], [10.0, 10.1, 10.2]])

with imageio.get_writer("brain_test.mp4", fps=24) as video:
    child = testModel.makeChild(0.01)
    child.setInputShape(testArray)

    for i in range(250):
        child = child.makeChild(0.01)
        child.setInputs(testArray)
        video.append_data(child.render(0))

In [None]:
class LR:
   def Identity(initLR, iteration):
      return initLR
   
   def sigmoidal(center, slope):
      def retFunc(initLR, iteration):
         return (initLR * (1.0 - sigmoid(slope*(iteration - center))))

      return retFunc

def placeToNChildren(c, m, min=1):
   def retFunc(place):
      return(max(min, int(c+m*place)))
   
   return retFunc

xTest = []
yTest = []
yTestCum = 0
yTestCums = []

for i in range(20):
   xTest.append(i)
   yTest.append(placeToNChildren(10, -1, 2)(i))   
   yTestCum += yTest[i]
   yTestCums.append(yTestCum)

fig, axs = plt.subplots(2, 2, figsize=(10,5), sharex=True)
axs[0,0].plot(xTest, yTest, label = "N Children")
axs[0,0].set(title = "N Children by Place")
axs[0,0].legend()

axs[1,0].plot(xTest, yTestCums, label = "Cumulative")
axs[1,0].set(xlabel = "place")
axs[1,0].legend()

iterRange = np.arange(0.0, 100.0, 1.0)
axs[0,1].plot(iterRange, np.array([LR.sigmoidal(50.0, 0.075)(0.015, it) for it in iterRange]))
axs[0,1].set(title = "Learning Rate")


axs[1,1].set(xlabel = "iteration")

plt.show()


In [None]:
VerboseEnum = Enum('verboseEnum', 'silent iterationInfo plot progressBar agentInfo stepInfo trace')

def Train_genetic(N_AGENTS, BATCH_SIZE, ITERATIONS, VERBOSE=VerboseEnum.iterationInfo, 
                  PLOT_FREQ=10, PLOT_VIDEO="", VIDEO_PLOT_FREQ=1, initialLR=0.015, 
                  lrScheduler=LR.Identity, placeToNChildren=placeToNChildren(10, -1, 1)):
   
   agentList = []
   for _ in range(N_AGENTS):
      agentList.append(spatialModel(8, 15, 4, "normal", sigmoid, neuronDistCutoff=1000.0))

   env = gym.vector.make("LunarLander-v2", num_envs=BATCH_SIZE)
   #env = gym.make("LunarLander-v2")

   ## lists to hold values for plotting
   plotScores = []
   plotIterations = []

   ## keep track of which environments have finished
   dones = np.ndarray(shape=(BATCH_SIZE,), dtype=bool)

   ## initialise arrays
   observation = env.reset(seed=42)
   scores = np.zeros((N_AGENTS))

   if(PLOT_VIDEO != ""):
      video = imageio.get_writer(PLOT_VIDEO, fps=4)

   for iteration in range(ITERATIONS):
      learningRate = lrScheduler(initialLR, iteration)
      scores[...] = 0.0

      if(VERBOSE.value >= VerboseEnum.agentInfo.value): print("=========== Iteration: {0: < 5} ===========".format(iteration))

      for idx, agent in enumerate(agentList):
         ## reset the values of arrays
         dones[...] = False
         observation[...] = env.reset()
         
         ## setup the agents input
         agent.setInputShape(observation)

         agentScore = np.zeros(shape=(BATCH_SIZE,))
         
         if(VERBOSE.value > VerboseEnum.agentInfo.value): print("  :::::::: Agent: {0: < 5} :::::::::".format(idx))
         stepCount = 0
         while(not np.all(dones)):
            action = np.argmax(agent(observation), axis=1, keepdims=True)[:, 0]
            observation, reward, terminated, info = env.step(action)
            dones = np.logical_or(dones, terminated)
            agentScore = agentScore + reward
            if(VERBOSE.value >= VerboseEnum.stepInfo.value): print("    Step: {0: < 5}; current avg score: {1:<8.2f}; done: {2}".format(stepCount, np.mean(agentScore), np.all(dones)))
            if(VERBOSE.value >= VerboseEnum.trace.value): 
               print("      {0:<12} | {1:<12} | {2:<12} | {3:<12} | {4:<12}".format("BATCH", "OBSERVATIONS", "ACTION", "REWARD", "SCORE"))
               print("     -------------------------------------------------------------------------------")
               for batchId in range(BATCH_SIZE):
                  print("      {0:<12} | {1:<12} | {2:<12} | {3:<12.4f} | {4:<12.4f}".format(str(batchId), str(observation[batchId].shape), str(action[batchId]), reward[batchId], agentScore[batchId]))

               print("")
            
            stepCount += 1

         ## update the agents score and wipe its memory
         scores[idx] = np.mean(agentScore)
         agent.forget()
         if(VERBOSE.value >= VerboseEnum.agentInfo.value): print("  Agent: {0: < 5}; Score: {1:<8.2f}".format(idx, scores[idx]))

         ## update arrays for plots
         plotScores.append(scores[idx])
         plotIterations.append(iteration)

      ranks = np.flip(np.argsort(scores))
      
      if(VERBOSE.value >= VerboseEnum.iterationInfo.value):
         print("Iteration: {0:<5} | Learning Rate: {1:<5.3f} | Mean Score: {2:<5.3f} | Best Score: {3:<5.3f}".format(iteration, learningRate, np.mean(scores), scores[ranks[0]]))

      ## make a plot of the scores, if option specified
      if(VERBOSE.value >= VerboseEnum.plot.value):
         showPlot, plotVideo = False, False
         if(iteration%PLOT_FREQ == 0): showPlot = True
         if(iteration%VIDEO_PLOT_FREQ ==0): plotVideo = True

         if(showPlot or plotVideo):
            fig, axs = plt.subplots(2, 2, figsize=(20, 5))
            ## scatter plot of all agents scores
            axs[0,0].scatter(plotIterations, plotScores)
            axs[0,0].set(xlabel = "Iteration", ylabel="Agent Scores", ylim = (np.min(plotScores[-4*N_AGENTS:]), np.max(plotScores[-4*N_AGENTS:])))
            
            agentList[ranks[0]].render(0, pltAxisObj = axs[0,1])
            axs[0,1].set(title = "Best Agent, Iteration " + str(iteration))

            agentIDs = []
            for agent in agentList: agentIDs.append(agent.BaseRecurrentModelID)
            axs[1,1].hist(agentIDs, bins=N_AGENTS)
            axs[1,1].set(title = "Agent Families")

            if(showPlot): plt.show()
            if(plotVideo): 
               fig.canvas.draw()
               
               # Now we can save it to a numpy array.
               data = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
               data = data.reshape(fig.canvas.get_width_height()[::-1] + (3,))

               fig.clf()

               video.append_data(data)
            
            plt.close()

      newAgentList = []
      place = 0 #where the agent placed in the competition
      while(len(newAgentList) < N_AGENTS):
         for _ in range(min(placeToNChildren(place), N_AGENTS - len(newAgentList))):
            newAgentList.append(agentList[ranks[place]].makeChild(learningRate))
         place += 1

      agentList = newAgentList

   if(PLOT_VIDEO != ""):
      video.close()

   env.close()

   return agentList, np.mean(scores), scores[ranks[0]]

In [None]:

firstParamValues = np.arange(2, 52, 12)
secondParamValues = np.linspace(-10.0, 0.0, 10)

paramGrid = np.zeros((firstParamValues.shape[0], secondParamValues.shape[0]))

for firstParId, NFirstChildren in enumerate(firstParamValues):
    for secondParId, slope in enumerate(secondParamValues):
        print("###############################################")
        print("N First: ", NFirstChildren, "Slope:   ", slope)
        agents, meanScore, bestScore = Train_genetic(100, 10, 10, placeToNChildren=placeToNChildren(NFirstChildren, slope), VERBOSE=VerboseEnum.silent)
        print("Final Avg Score: ", meanScore, "Final Best Score: ", bestScore)
        paramGrid[firstParId, secondParId] = meanScore
        print("###############################################\n")

fig, ax = plt.subplots(1,1, figsize=(20, 20))
ax.imshow(paramGrid)
ax.set(ylabel = "N Children for 1st place")
ax.set_yticks(range(0, firstParamValues.shape[0]), labels = firstParamValues)
ax.set(xlabel = "N Children slope")
ax.set_xticks(range(0, secondParamValues.shape[0]), labels = secondParamValues)

for (j,i),label in np.ndenumerate(paramGrid):
    ax.text(i,j,"{0:<4.1f}".format(float(label)),ha='center',va='center')

plt.show()


In [None]:
N_AGENTS = 100
BATCH_SIZE = 1
ITERATIONS = 100
PLOT_FREQ = 10
VIDEO_PLOT_FREQ = 1
PLOT_VIDEO = "Training.mp4"
VERBOSE = VerboseEnum.plot



In [None]:
## test the trained set of agents
TEST_BATCH_SIZE = 1
env = gym.vector.make("LunarLander-v2", num_envs=TEST_BATCH_SIZE)

testScores = np.zeros((N_AGENTS))
for idx, agent in enumerate(agentList):
   dones = np.ndarray(shape=(TEST_BATCH_SIZE,), dtype=bool)
   dones[:] = False
   observation = env.reset()
   agent.setInputShape(observation)
   agentScore = 0
   agentScore = np.zeros(shape=(TEST_BATCH_SIZE,))
   while((not np.all(dones))):
      action = np.argmax(agent(observation), axis=1, keepdims=True)[:, 0]
      observation, reward, terminated, truncated = env.step(action)
      dones = np.logical_or(dones, terminated)
      agentScore += reward

   testScores[idx] = np.mean(agentScore)


testRanks = np.flip(np.argsort(testScores))
print(testScores[testRanks])
env.close()

In [None]:
fps = 24
ranks = np.flip(np.argsort(scores))
env = gym.make("LunarLander-v2")

terminated= truncated = False
observation = env.reset()

for finalPlace in range(10):
  bestAgent = agentList[testRanks[finalPlace]]
  bestAgent.forget()
  filename = "winner" + str(finalPlace) + ".mp4"

  terminated = truncated = False
  with imageio.get_writer(filename, fps=fps) as video:
    observation = env.reset()
    bestAgent.setInputShape(np.expand_dims(observation, 0))

    frame = env.render("rgb_array")

    video.append_data(frame)
    score = 0
    while((not terminated) and (not truncated)):
      action = np.argmax(bestAgent(np.expand_dims(observation, 0)), axis=1, keepdims=True)[0, 0]
      observation, reward, terminated, truncated = env.step(action)
      video.append_data(env.render("rgb_array"))
      score += reward
      print(score)
