In [2]:
import pygame, sys, random
import numpy as np
from keras.layers import Dense
from keras.models import Sequential
from keras.optimizers import Adam
import math
import matplotlib.pyplot as plt

num_episodes = 10000

In [3]:
# A2C(Advantage Actor-Critic) agent for the Cartpole
class A2CAgent:
    def __init__(self, state_size, action_size):
        # if you want to see Cartpole learning, then change to True
        self.render = False
        self.load_model = False
        # get size of state and action
        self.state_size = state_size
        self.action_size = action_size
        self.value_size = 1

        # These are hyper parameters for the Policy Gradient
        self.discount_factor = 0.99
        self.actor_lr = 0.001
        self.critic_lr = 0.005

        # create model for policy network
        self.actor = self.build_actor()
        self.critic = self.build_critic()

        if self.load_model:
            self.actor.load_weights("./Practice002_DataSave/Actor.h5")
            self.critic.load_weights("./Practice002_DataSave/Critic.h5")

    # approximate policy and value using Neural Network
    # actor: state is input and probability of each action is output of model
    def build_actor(self):
        actor = Sequential()
        actor.add(Dense(24, input_dim=self.state_size, activation='relu', kernel_initializer='he_uniform'))
        actor.add(Dense(self.action_size, activation='softmax', kernel_initializer='he_uniform'))
        actor.summary()
        # See note regarding crossentropy in cartpole_reinforce.py
        actor.compile(loss='categorical_crossentropy', optimizer=Adam(lr=self.actor_lr))
        return actor

    # critic: state is input and value of state is output of model
    def build_critic(self):
        critic = Sequential()
        critic.add(Dense(24, input_dim=self.state_size, activation='relu', kernel_initializer='he_uniform'))
        critic.add(Dense(self.value_size, activation='linear', kernel_initializer='he_uniform'))
        critic.summary()
        critic.compile(loss="mse", optimizer=Adam(lr=self.critic_lr))
        return critic

    # using the output of policy network, pick action stochastically
    def get_action(self, state):
        policy = self.actor.predict(state, batch_size=1).flatten()
        return np.random.choice(self.action_size, 1, p=policy)[0]

    # update policy network every episode
    def train_model(self, state, action, reward, next_state, done):
        target = np.zeros((1, self.value_size))
        advantages = np.zeros((1, self.action_size))

        value = self.critic.predict(state)[0]
        next_value = self.critic.predict(next_state)[0]

        if done:
            advantages[0][action] = reward - value
            target[0][0] = reward
        else:
            advantages[0][action] = reward + self.discount_factor * (next_value) - value
            target[0][0] = reward + self.discount_factor * next_value

        self.actor.fit(state, advantages, epochs=1, verbose=0)
        self.critic.fit(state, target, epochs=1, verbose=0)

In [4]:
def ckWall(xPos, yPos):
    flagWall = 0
    if(xPos < 400):
        xPos = 400
        flagWall = -1
    elif(xPos > 880):
        xPos = 880
        flagWall = -1
    if(yPos > 690):
        yPos = 690
        flagWall = -1
    elif(yPos < 210):
        yPos = 210
        flagWall = -1
        
    return [xPos, yPos, flagWall]

In [5]:
def stateGenerator(obsPosition, agtPosition, wallcheck):
    returnSum = []
    for i in range(0,10):
        #returnSum = returnSum + [math.sqrt((agtPosition[0] - obsPosition[i][0])**2 + (agtPosition[1] - obsPosition[i][1])**2)]
        returnSum = returnSum + [agtPosition[0] - obsPosition[i][0], agtPosition[1] - obsPosition[i][1]]
    returnSum = returnSum + [agtPosition[0] - 640, agtPosition[1] - 450]
    return returnSum

In [None]:
if __name__ == "__main__":
    pygame.init()
    screen = pygame.display.set_mode([1280,960])
    screen.fill([200, 200, 200])
    
    # get size of state and action from environment
    state_size = 22
    action_size = 8

    # make A2C agent
    agent = A2CAgent(state_size, action_size)

    scores, episodes = [], []

    # Make Obstacles (10)
    obstaclePos = [[0, 0] for _ in range(10)]
    for i in range(0,10):
        while True:
            obstaclePos[i][0] = 400 + random.randrange(1,490)
            obstaclePos[i][1] = 690 - random.randrange(1,490)
            if obstaclePos[i][0] <= 620 or obstaclePos[i][0] >= 660:
                if obstaclePos[i][1] >= 470 or obstaclePos[i][1] <= 430:
                    break

    for e in range(num_episodes):
        # Initialize
        done = False
        score = 0
        x = 400
        y = 690
        
        state = stateGenerator(obstaclePos, [x,y], -1)
        #state = np.reshape(state, [1, state_size])

        while not done:
            if agent.render:
                env.render()

            action = agent.get_action(state)
            next_state, reward, done, info = env.step(action)
            next_state = np.reshape(next_state, [1, state_size])
            # if an action make the episode end, then gives penalty of -100
            reward = reward if not done or score == 499 else -100

            agent.train_model(state, action, reward, next_state, done)

            score += reward
            state = next_state

            if done:
                # every episode, plot the play time
                score = score if score == 500.0 else score + 100
                scores.append(score)
                episodes.append(e)
                pylab.plot(episodes, scores, 'b')
                pylab.savefig("./Practice002_DataSave/ActorCriticGraph.png")
                print("episode:", e, "  score:", score)

                # if the mean of scores of last 10 episode is bigger than 490
                # stop training
                if np.mean(scores[-min(10, len(scores)):]) > 490:
                    sys.exit()

        # save the model
        if e % 50 == 0:
            agent.actor.save_weights("./Practice002_DataSave/Actor.h5")
            agent.critic.save_weights("./Practice002_DataSave/Critic.h5")


In [None]:
print("Percent of successful episodes: " + str(sum(rList)/num_episodes) + "%")
plt.bar(range(len(rList)), rList, color = "white", width = 0.00001)
plt.show()