In [48]:
import gym
import time
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense,Activation,Dropout,BatchNormalization,Conv1D,Flatten
from pymongo import MongoClient

In [2]:
env = gym.make("BeamRider-ram-v0");
Number_of_Actions = env.action_space.n
Observation_Shape = env.observation_space.shape
memory = deque(maxlen=1024)
print("Number of Actions: {}\nObservation Space: {}".format(Number_of_Actions,Observation_Shape));

Number of Actions: 9
Observation Space: (128,)


In [49]:
OBS_INDEX = 0
ACT_INDEX = 1
XOBS_INDEX = 2
REWARD_INDEX = 3
DONE_INDEX = 4
INFO_INDEX = 5


DENSE_MODEL = 0
CONV_MODEL = 1

DataBase = MongoClient("localhost")["Machine_Learning"]

class Agent():
    
    def __init__(self,model=None,model_type=None,loss="mse",optimizer="adam",metrics=["mse","mae"],
                 batch_size =1024,number_of_memories_to_replay = 10,gamma=0.99,eps_init=0.9,eps_decay=0.99,
                 eps_min=0.05,render_game=True):
        
        """
            Eposide_Experience = An Array holding the memory of the eposide;
            L          =        Lenght of  Eposide_Experience 
            N          =        batch_size or self.BatchSize

            While playing, After N Timesteps the Agent will train from Eposide_Experience[L: N],
        """
        self.ID = 0
        #Agent's Hyper-Parameters
        self.Loss = loss
        self.Optimizer = optimizer
        self.BatchSize = batch_size;
        self.Number_Of_Memories_To_Replay = number_of_memories_to_replay;
        
        #Agent Bellman Paramenters
        self.Epilison = eps_init;
        self.Epilison_Decay = eps_decay;
        self.Epilison_MinLimit = eps_min;
        self.Gamma = gamma;
        
        
        # Agent's Settings;
        self.Experience = []
        self.HighScore = 0;
        self.RenderGame = render_game
        self.Model_Type = model_type
        self.NumberofActions = Number_of_Actions;
        
        # Agent's History
        self.ActionsHistory = []
        self.RewardsHistory = []
        self.Metrics = list(set(metrics))
        self.Metrics.remove(self.Loss);
        
        self.MetricsHistory = {metric:[] for metric in self.Metrics}
        self.MetricsHistory[self.Loss] = []
        
       
        self.formatObservation = lambda obs: np.expand_dims(np.array(obs,dtype="float").reshape(self.Input_Shape)/255.0,axis=0);
        self.obsToImg = lambda obs:  np.array(obs,dtype="float").reshape(16,8,1)/255.0
        
         # Agent's Model;
        self.Model = model
        if self.Model == None:
            self.createDenseNet();
        elif self.Model == DENSE_MODEL:
            self.createDenseNet();
        elif self.Model == CONV_MODEL:
            self.createConvNet();
            
        self.Memory = DataBase["Memory"]
            
        pass;
    def createDenseNet(self):
        
        """
            Four Dense Layers - Input Shape 
        """
        self.Input_Shape = (1,128);self.Model_Type = DENSE_MODEL;
        
        self.Model = Sequential();
        self.Model.add(Dense(256,use_bias=True,input_shape=self.Input_Shape));
        self.Model.add(Activation("relu"));
        self.Model.add(Dropout(0.25));
        
        self.Model.add(Dense(1024,use_bias=True));
        self.Model.add(BatchNormalization())
        self.Model.add(Activation("relu"))
        self.Model.add(Dropout(0.25));
        
        self.finalizeModel("4 Layers Dense");
    
    def createConvNet(self,):
        """
            A Convoluational Neural Network
        """
        
        self.Input_Shape = (8,16);self.Model_Type = CONV_MODEL;
                
        self.Model = Sequential();
        self.Model.add(Conv1D(filters=12,kernel_size(4,4),use_bias=True,input_shape=(self.Input_Shape)));
        self.Model.add(BatchNormalization())
        self.Model.add(Activation("relu"));
        self.Model.add(Dropout(0.25));
        
        self.Model.add(Conv1D(,4,use_bias=True));
        self.Model.add(BatchNormalization())
        self.Model.add(Activation("relu"));
        self.Model.add(Dropout(0.25));
        
        self.Model.add(Flatten());
        self.finalizeModel("2 Convoluational Layers and 2 Dense Layers");
    
    def finalizeModel(self,structure_info):
        self.Model.add(Dense(128,use_bias=True));
        self.Model.add(BatchNormalization())
        self.Model.add(Activation("relu"))
        self.Model.add(Dropout(0.25));
        
        self.Model.add(Dense(self.NumberofActions,use_bias=True,));
        self.Model.add(Activation("linear"));
        self.Model.compile(loss=self.Loss,optimizer="adam",metrics=self.Metrics);
        print("Succesfully Created a Neural Network with {} .It has  Input Shape of {}".format(structure_info,self.Input_Shape),end="\r")

        
    def predictAction(self,obs):        
        return np.argmax(self.Model.predict(obs))
        
    def getAction(self,obs):
        """ Choose Random Action if random number is smaller than epilision for exporation """
        if np.random.uniform(0,1) > self.Epilison :
            return self.predictAction(obs);
        return np.random.choice(self.NumberofActions); 
    
    def learn(self,experience):
        if len(experience) == 0 :
            return np.nan
        x = np.zeros(shape=((len(experience),)+self.Input_Shape))
        y = np.zeros(shape=((len(experience),self.NumberofActions)));
        for i,(obs,act,_obs,reward,done,info) in enumerate(experience):
            x[i] = obs;
            y[i] = self.Model.predict(obs);
            #Bellman Equation
            y[i:act] = reward;
            if not done:
                 y[i:act] += reward + self.Gamma * np.max(self.Model.predict(_obs));
        
        
        for metric,cost in zip(self.MetricsHistory,self.Model.train_on_batch(x,y)):
            self.MetricsHistory[metric][-1].append(cost)
        return self.MetricsHistory[self.Loss][-1][-1]
    
    def restartGame(self,):
        self.TimeStep = 0;
        for metric in self.MetricsHistory:
            self.MetricsHistory[metric].append([])
        self.RewardsHistory.append([])
        self.ActionsHistory.append({act:0 for act in range(self.NumberofActions)});
        self.Experience = [[self.formatObservation(env.reset())]]
        
    
    def getCurrentScore(self):
        score = sum(self.RewardsHistory[-1]);
        if score > self.HighScore:self.HighScore = score;
        return score
    
    def parseScore(self):
        return "Score: "+str(self.getCurrentScore())+" |  HighScore: "+str(self.HighScore);
    
    def parseActions(self,nth_experience=-1):
        total = sum([act for act in self.ActionsHistory[nth_experience].values()]);
        return "Actions: "+" | ".join(["%.1f%s"%(100*(value/total),"%") for value in self.ActionsHistory[nth_experience].values()]);
    
    def parseAverageCost(self,_index=-1,):
        return "Cost: "+" | ".join(["%s:%.5f"%(metric[0].upper()+metric[1:],sum(self.MetricsHistory[metric][_index])/len(self.MetricsHistory[metric][_index])) for metric in self.MetricsHistory])
    
    def parseStatus(self):
        return "%d# | %s | ~ | %s | ~ | %s | ~ | Eps: %.3f Lives Left: %d \t"%(
            self.TimeStep,self.parseAverageCost(),self.parseActions(),self.parseScore(),self.Epilison,
            self.Experience[-1][INFO_INDEX]
        )
    def learnExperience(self,):
        self.learn(self.getExperienciesFromMemory(self.Number_Of_Memories_To_Replay));
        self.learn(self.Experience[max(0,self.TimeStep-self.BatchSize):])
        print(self.parseStatus(),end="\r");
    
    def saveExperience(self,):
        self.Memory.insert_one({"model_type":model_type,"game_experience":self.Experience,"agent_id":self.ID})
        memory.append(self.Experience);
    def playGame(self,):
        self.restartGame();
        while True:
           
            #Sending Action to the enviroment
            self.Experience[-1].append(self.getAction(self.Experience[-1][OBS_INDEX]));
            for item in env.step(self.Experience[-1][ACT_INDEX]):
                self.Experience[-1].append(item);
            self.Experience[-1][INFO_INDEX] = self.Experience[-1][INFO_INDEX]["ale.lives"]
            self.Experience[-1][XOBS_INDEX] = self.formatObservation(self.Experience[-1][XOBS_INDEX]);
            self.RewardsHistory[-1].append(self.Experience[-1][REWARD_INDEX]);
            self.ActionsHistory[-1][self.Experience[-1][ACT_INDEX]] += 1;
            self.TimeStep +=1;
            
            # When agent is killed
            if self.Experience[-1][DONE_INDEX]:
                self.learnExperience();
                    
                # When all lifes are finished
                if self.Experience[-1][INFO_INDEX] == 0:
                    break;
            
            # Training on Experience
            if self.TimeStep%self.BatchSize == 0: 
                self.learnExperience();
            
            # Rendering Frame(s)
            if self.RenderGame:
                env.render();
                
            # Pushing the Next Observation to Experience for the Next loop
            self.Experience.append([self.Experience[-1][XOBS_INDEX]])
        
        
        # Reducing The Epilison to give agent a bigger probability to choose action and action
        if self.Epilison > self.Epilison_MinLimit:self.Epilison = self.Epilison*self.Epilison_Decay;
                
    
    def getBatchFromExperience(self,experience):
        startIndex = max(0,np.random.choice(len(experience))-self.BatchSize);
        return experience[startIndex:startIndex+self.BatchSize]
    
    def getExperienciesFromMemory(self,n_memories=1):
        if len(memory) == 0: return []
        memories = []
        for _ in range(min(len(memory),n_memories)):
            memories.extend(self.getBatchFromExperience(memory[np.random.choice(len(memory))]))
        return memories
                     
   
        
        
    def pratice(self,trials=1000,test=2):
        np.random.seed(0);
        for n_trial in range(0,trials):
            print("\nPracting... On Trail %d out of %d Trials"%(n_trial+1,trials))
            self.playGame();
            print()
           # if n_trial%test == 0:
            self.play();
                
    
    
    def play(self,):
        eps = self.Epilison;
        self.Epilison = 0
        print("Playing Game Without Any Random Actions (Epilison == 0). No Training Wheels")
        self.playGame();
        self.Epilison = eps;
        print("\n")

In [50]:
agent = Agent(
    model=CONV_MODEL,
    batch_size =256,
    gamma=0.99,
    eps_init=0.79,
    eps_decay=0.97,
    eps_min=0.05,
    render_game=True,
    metrics=["mse","mae"]
)
agent.pratice(trials=100000)

Succesfully Created a Neural Network with 2 Convoluational Layers and 2 Dense Layers
Practing... On Trail 1 out of 100000 Trials
1359# | Cost: Mae:0.63537 | Mse:0.59172 | ~ | Actions: 8.7% | 8.8% | 8.8% | 10.3% | 11.4% | 10.1% | 16.3% | 17.4% | 8.3% | ~ | Score: 308.0 |  HighScore: 308.0 | ~ | Eps: 0.790 Lives Left: 0 	
Playing Game Without Any Random Actions (Epilison == 0). No Training Wheels
2550# | Cost: Mae:0.36201 | Mse:0.44421 | ~ | Actions: 49.6% | 7.9% | 4.5% | 0.0% | 34.2% | 0.2% | 2.0% | 1.5% | 0.0% | ~ | Score: 176.0 |  HighScore: 308.0 | ~ | Eps: 0.000 Lives Left: 0 	


Practing... On Trail 2 out of 100000 Trials
2026# | Cost: Mae:0.26855 | Mse:0.38287 | ~ | Actions: 16.2% | 11.7% | 17.1% | 8.3% | 9.3% | 9.5% | 8.8% | 8.6% | 10.4% | ~ | Score: 352.0 |  HighScore: 352.0 | ~ | Eps: 0.766 Lives Left: 0 	
Playing Game Without Any Random Actions (Epilison == 0). No Training Wheels
512# | Cost: Mae:1.08408 | Mse:0.37319 | ~ | Actions: 12.1% | 17.8% | 41.4% | 6.6% | 0.0% | 0.0% |

KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
fig, ax = plt.subplots();
for i in range(100):
    img = (memory[0][i][0][0]*255.0).reshape(16,8)
    
    fig.add_subplot(2,1,2)
    ax = fig.imshow(img)
    plt.pause(0.1)
    


In [None]:
from matplotlib.animation import ArtistAnimation,FFMpegWriter

fig = plt.figure();
images = [[plt.imshow((memory[0][i][0][0]*255.0).reshape(16,8), animated=True)] for i in range(100)]

animation = ArtistAnimation(fig,images,blit=True,interval=50,repeat_delay=1000)
plt.show()

In [None]:
animation.save("ram.html")

In [None]:
writer = FFMpegWriter(fps=15, metadata=dict(artist='Me'), bitrate=1800)
animation.save("ram.mp4", writer=writer,)

In [None]:
from IPython.core.display import display, HTML

html = None
with open("./ram.html") as file:
    html = HTML(file.read())

In [34]:
type(agent.Experience[-1][INFO_INDEX])

int