<a href="https://colab.research.google.com/github/himavamsianumula/Cart-Pole/blob/master/cartpole.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Importing Libraries

In [None]:
import numpy as np
import tensorflow.keras as K
import gym
import random
import math
import tensorflow as tf
import matplotlib.pyplot as plt
from collections import namedtuple

Testing with random actions

In [None]:
env=gym.make("CartPole-v1")

In [None]:
for episode in range(10):
    state=env.reset()
    for step in range(100):
        #env.render()
        #print(state)
        action=env.action_space.sample()
        state,reward,done,info=env.step(action)
        if done:
            #print("Over after "+str(step)+"s")
            break

Neural Network

In [None]:
class DNN:
    def __init__(self,input_shape):
        model=K.models.Sequential()
        
        model.add(K.layers.Dense(512,input_shape=input_shape,activation='relu',kernel_initializer='he_uniform'))
        
        model.add(K.layers.Dense(256,activation='relu',kernel_initializer='he_uniform'))
    
        model.add(K.layers.Dense(64,activation='relu',kernel_initializer='he_uniform'))
    
        model.add(K.layers.Dense(2,activation='linear',kernel_initializer='he_uniform'))
        self.model=model

    def get_model(self,adam_lr,adam_lr_decay):
        self.model.compile(loss='mse',optimizer=K.optimizers.Adam(lr=adam_lr,decay=adam_lr_decay))
        return self.model

In [None]:
dnn=DNN((4,)).get_model(0.1,0.01)
dnn.summary()

In [None]:
Experience=namedtuple('Experience',('state','action', 'next_state', 'reward','done'))

Replay Memory

In [None]:
class ReplayMemory:
    def __init__(self,size_limit):
        self.memory=[]
        self.size_limit=size_limit
        self.push_count=0

    def add(self,Experience):
        if len(self.memory)<self.size_limit:
            self.memory.append(Experience)
        else:
            self.memory[self.push_count%self.size_limit]=Experience
        self.push_count+=1

    def get_sample(self,batch_size):
        return random.sample(self.memory,min(batch_size,len(self.memory)))
    
    def extract(self,samples):
        l=[[],[],[],[],[]]
        for i in samples:
            l[0].append(i[0])
            l[1].append(i[1])
            l[2].append(i[2])
            l[3].append(i[3])
            l[4].append(i[4])
        return (l[0],l[1],l[2],l[3],l[4])

In [None]:
rm=ReplayMemory(4)
rm.add(Experience(2,2,2,1,0))
rm.add(Experience(2,2,2,2,1))
rm.add(Experience(2,2,2,3,0))
rm.add(Experience(2,2,2,4,1))
rm.add(Experience(2,2,2,5,0))
a=rm.get_sample(2)
#print(a)
a=rm.extract(a)
#print(a)

Agent class

In [None]:
class Agent:
    def __init__(self,epsion, epsilon_min,epsilon_decay,gamma,adam_lr,adam_lr_decay,eps_updt_iter):
        self.epsilon=epsilon
        self.epsilon_min=epsilon_min
        self.epsilon_decay=epsilon_decay
        self.gamma=gamma
        self.eps_updt_iter=eps_updt_iter
        self.trainDNN=DNN((4,)).get_model(adam_lr,adam_lr_decay)
        self.targetDNN=DNN((4,)).get_model(adam_lr,adam_lr_decay)
        self.targetDNN.set_weights(self.trainDNN.get_weights())
        
    def update(self):
        self.targetDNN.set_weights(self.trainDNN.get_weights())
        
    def select_action(self,state):
        if self.epsilon>np.random.random():
            return env.action_space.sample()         
        else:
            return np.argmax(self.trainDNN.predict(state))
    
    def preprocess(self,state):
        return np.reshape(state,[1,4])
    
    def train(self,rm,batch_size):
        if batch_size>len(rm.memory):
            return
        
        sample = rm.get_sample(batch_size)
        state, action,next_state, reward, done = rm.extract(sample)
        state=np.reshape(np.array(state),(batch_size,4))
        next_state=np.reshape(np.array(next_state),(batch_size,4))
            
        target = self.trainDNN.predict(state)
        target_next = self.targetDNN.predict(next_state)
        
        
        for i in range(batch_size):
            if done[i]:
                target[i][action[i]] = reward[i]
            else:
                target[i][action[i]] = reward[i] + self.gamma * (np.amax(target_next[i]))
                        
        self.trainDNN.fit(state, target, batch_size=batch_size, verbose=0)
        
        if self.epsilon > self.epsilon_min and len(rm.memory)>self.eps_updt_iter:
            self.epsilon *= self.epsilon_decay
        
        return

Environment

In [None]:
class Environment:
    def __init__(self,agent,rm,env,num_episodes,num_steps,update_iter=5):
        self.num_episodes=num_episodes
        self.num_steps=num_steps
        self.update_iter=update_iter
        self.agent=agent
        self.env=env
        self.rm=rm
    
    def training(self):
        for episode in range(num_episodes):
            s=self.env.reset()
            state=self.agent.preprocess(s)
            done=False
            step=0
            while not done:
                action=self.agent.select_action(state)
                next_s,reward,done, info = self.env.step(action)
                next_state=self.agent.preprocess(next_s)
                self.rm.add(Experience(state,action,next_state,reward,done))
                state=next_state
                step+=1
                self.agent.train(rm,batch_size)
                
            if episode%self.update_iter==0:
                self.agent.update()
                print("Episode:"+str(episode),",Score:"+str(step))
        print("Saving trained model")
        #self.agent.trainDNN.save("cartpole.h5")

    
    def test(self):
        episode_dur=[]
        model =K.models.load_model("cartpole.h5")
        self.agent.trainDNN.set_weights(model.get_weights())
        for episode in range(200):
            s=self.env.reset()
            state=self.agent.preprocess(s)
            done=False
            step=0
            while not done:
                action=np.argmax(self.agent.trainDNN.predict(state))
                next_s,reward,done, info = self.env.step(action)
                next_state=self.agent.preprocess(next_s)
                state=next_state
                step+=1
                if done:
                    print("Episode:"+str(episode),",Score:"+str(step))
            episode_dur.append(step)
        return episode_dur

Defining parameters

In [None]:
num_episodes=200
num_steps=500
gamma=0.95

epsilon=1
epsilon_min=0.001
epsilon_decay=0.999
eps_update_iter=1000

adam_lr=0.001
adam_lr_decay=0.001

batch_size=64
rm_size_limit=1000000

trgtDNN_updt_iter=5

win_dur=195

Training and Testing

In [None]:
env=gym.make("CartPole-v1")
rm=ReplayMemory(rm_size_limit)

agent=Agent(epsilon,epsilon_min,epsilon_decay,gamma,adam_lr,adam_lr_decay,eps_update_iter)

Env=Environment(agent,rm,env,num_episodes,num_steps,update_iter=trgtDNN_updt_iter)

#Uncomment below line to train
#Env.training()

#Testing
eps_dur=Env.test()

Calculating moving average

In [None]:
#Env.agent.trainDNN.save("cartpole.h5")
#eps_dur=Env.test()
eps=np.linspace(0,len(eps_dur),1)
mean=np.zeros(eps.shape)
for i in range(len(eps_dur)):
    if i<=98:
        mean[i]=np.mean(eps_dur[:i+1])
    else:
        mean[i]=np.mean(eps_dur[i-99:i+1])

Moving average plot

In [None]:
plt.plot(eps[99:],mean[99:],'b')  
plt.xlabel("Episodes")
plt.ylabel("Average over last 100 epi")

for i in mean:
    if i>win_dur:
        print('Solved')
        break