In [1]:
import gym
import numpy as np
import torch
import matplotlib.pyplot as plt
import time

In [2]:
from gym.wrappers import Monitor

In [3]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [4]:
import math
import copy

In [5]:
env = gym.make('CarRacing-v0')



In [6]:
# get useful informations from an observation
def capteur(observation):
    '''
    Position du nez de la voiture codé en dur pour l'instant.
    Cette fonction renvoie les quatres distances d'intérêt !
    '''
    i_nose = 67
    j_left = 46
    j_right = 49
    
    grass_color = 180
    road_color = 102

    # informations horizontales
    horizontal = np.where(observation[67] == grass_color)[0]
    hor_road = np.where(observation[67] == road_color)[0]
    try:
        hori_gauche = 46 - horizontal[horizontal < 46][-1]
        if (hori_gauche == 1):
            hori_gauche = 49 - hor_road[hor_road > 49][0]
    except:
        hori_gauche = 46
    try:
        hori_droite = horizontal[horizontal > 49][0] - 49
        if (hori_droite == 1):
            hori_droite = hor_road[hor_road < 46][-1] - 46
    except:
        hori_droite = 46
    
    # informations verticales
    vertical_gauche = np.where(observation[:, 46] == grass_color)[0]
    try:
        verti_gauche = 67 - vertical_gauche[vertical_gauche < 67][-1] 
    except:
        verti_gauche = 67
    verti_gauche = (verti_gauche -33) / 33

    vertical_droite = np.where(observation[:, 49] == grass_color)[0]
    try:
        verti_droite = 67 - vertical_droite[vertical_droite < 67][-1]
    except:
        verti_droite = 67
        
    verti_droite = (verti_droite - 33) / 33

    res = np.array([hori_gauche, hori_droite, verti_gauche, verti_droite])

    return res/20

def preprocess(rgb):
    '''
    Simplifie l'image. PAsse de RGB à gray et enlève le base inutile.
    '''
    end_img = 84
    
    gray = np.dot(rgb[...,:3], [0.0, 0.5, 0.5])
    gray[gray>150] = 180
    return capteur(gray[:84])

def processAction(output):
    action = [0, 0, 0]
    action[0] = (output[0] - 0.5)*2
    if (output[1] >= 0.5):
        action[1] = (output[1] - 0.5)*2
    else:
        action[2] = (0.5 - output[1])*2
    return action

In [7]:
class CarRacingAI(nn.Module):
        def __init__(self):
            super().__init__()
            self.lin = nn.Sequential(
                nn.Linear(4,128, bias=True),
                nn.ReLU(),
                nn.Linear(128,game_actions, bias=True),
                nn.Sigmoid()
            )

                
        def forward(self, inputs):
            x = self.lin(inputs)
            return x

In [8]:
def init_weights(m):        
        if ((type(m) == nn.Linear) | (type(m) == nn.Conv2d)):
            torch.nn.init.xavier_uniform(m.weight)
            m.bias.data.fill_(0.00)
                

In [9]:
def return_random_agents(num_agents):
    
    agents = []
    for _ in range(num_agents):
        
        agent = CarRacingAI()
        agent = agent.float()
        
        for param in agent.parameters():
            param.requires_grad = False
            
        init_weights(agent)
        agents.append(agent)
        
        
    return agents
    

In [10]:
def run_agents(agents):
    
    reward_agents = []
    
    for agent in agents:
        agent.eval()
    
        observation = env.reset()
        
        r=0
        
        for _ in range(500):
            observation = np.ascontiguousarray(observation)
            observation = preprocess(observation)
            inp = torch.tensor(observation).type('torch.FloatTensor')
            output = agent(inp).detach().numpy()
            action = processAction(output)
            new_observation, reward, done, info = env.step(action)
            r=r+reward
            
            observation = new_observation

            if(done):
                break

        reward_agents.append(r)      
    
    return reward_agents

In [11]:
def return_average_score(agent, runs):
    score = 0.
    for i in range(runs):
        score += run_agents([agent])[0]
    return score/runs

In [12]:
def run_agents_n_times(agents, runs):
    avg_score = []
    for agent in agents:
        avg_score.append(return_average_score(agent,runs))
    return avg_score

In [13]:
def mutate(agent):

    child_agent = copy.deepcopy(agent)
    
    mutation_power = 0.02 #hyper-parameter, set from https://arxiv.org/pdf/1712.06567.pdf
            
    for param in child_agent.parameters():
    
        if(len(param.shape)==4): #weights of Conv2D

            for i0 in range(param.shape[0]):
                for i1 in range(param.shape[1]):
                    for i2 in range(param.shape[2]):
                        for i3 in range(param.shape[3]):
                            
                            param[i0][i1][i2][i3]+= mutation_power * np.random.randn()
                                
                                    

        elif(len(param.shape)==2): #weights of linear layer
            for i0 in range(param.shape[0]):
                for i1 in range(param.shape[1]):
                    
                    param[i0][i1]+= mutation_power * np.random.randn()
                        

        elif(len(param.shape)==1): #biases of linear layer or conv layer
            for i0 in range(param.shape[0]):
                
                param[i0]+=mutation_power * np.random.randn()

    return child_agent

In [14]:
def return_children(agents, sorted_parent_indexes, elite_index):
    
    children_agents = []
    
    #first take selected parents from sorted_parent_indexes and generate N-1 children
    for i in range(len(agents)-1):
        
        selected_agent_index = sorted_parent_indexes[np.random.randint(len(sorted_parent_indexes))]
        children_agents.append(mutate(agents[selected_agent_index]))

    #now add one elite
    elite_child = add_elite(agents, sorted_parent_indexes, elite_index)
    children_agents.append(elite_child)
    elite_index=len(children_agents)-1 #it is the last one
    
    return children_agents, elite_index

In [15]:
def add_elite(agents, sorted_parent_indexes, elite_index=None, only_consider_top_n=10):
    
    candidate_elite_index = sorted_parent_indexes[:only_consider_top_n]
    
    if(elite_index is not None):
        candidate_elite_index = np.append(candidate_elite_index,[elite_index])
        
    top_score = None
    top_elite_index = None
    
    for i in candidate_elite_index:
        score = return_average_score(agents[i],runs=5)
        print("Score for elite i ", i, " is ", score)
        
        if(top_score is None):
            top_score = score
            top_elite_index = i
        elif(score > top_score):
            top_score = score
            top_elite_index = i
            
    print("Elite selected with index ",top_elite_index, " and score", top_score)
    
    child_agent = copy.deepcopy(agents[top_elite_index])
    return child_agent
    

In [16]:
def softmax(x):
    """Compute softmax values for each sets of scores in x."""
    return np.exp(x) / np.sum(np.exp(x), axis=0)

In [None]:
game_actions = 2 #2 actions possible: left or right

#disable gradients as we will not use them
torch.set_grad_enabled(False)

# initialize N number of agents
num_agents = 30
agents = return_random_agents(num_agents)

# How many top agents to consider as parents
top_limit = 5

# run evolution until X generations
generations = 100

elite_index = None

for generation in range(generations):

    # return rewards of agents
    rewards = run_agents_n_times(agents, 3) #return average of 3 runs

    # sort by rewards
    sorted_parent_indexes = np.argsort(rewards)[::-1][:top_limit] #reverses and gives top values (argsort sorts by ascending by default) https://stackoverflow.com/questions/16486252/is-it-possible-to-use-argsort-in-descending-order
    print("")
    print("")
    
    top_rewards = []
    for best_parent in sorted_parent_indexes:
        top_rewards.append(rewards[best_parent])
    
    print("Generation ", generation, " | Mean reward: ", np.mean(rewards), " | Mean of top 5: ",np.mean(top_rewards[:5]))
    #print(rewards)
    print("Top ",top_limit," scores", sorted_parent_indexes)
    print("Rewards for top: ",top_rewards)
    
    # setup an empty list for containing children agents
    children_agents, elite_index = return_children(agents, sorted_parent_indexes, elite_index)

    # kill all agents, and replace them with their children
    agents = children_agents

Track generation: 1183..1483 -> 300-tiles track
Track generation: 1069..1342 -> 273-tiles track
retry to generate track (normal if there are not many of this messages)
Track generation: 1240..1554 -> 314-tiles track
Track generation: 1197..1501 -> 304-tiles track
Track generation: 1204..1512 -> 308-tiles track
retry to generate track (normal if there are not many of this messages)
Track generation: 1232..1548 -> 316-tiles track
Track generation: 1132..1427 -> 295-tiles track
Track generation: 1105..1395 -> 290-tiles track
Track generation: 1185..1485 -> 300-tiles track
Track generation: 1048..1314 -> 266-tiles track
Track generation: 1124..1409 -> 285-tiles track


In [None]:
def play_agent(agent):
    try: #try and exception block because, render hangs if an erorr occurs, we must do env.close to continue working    
        
        observation = env.reset()
        
        last_observation = observation
        r=0
        for _ in range(1000):
            env.render()
            observation = np.ascontiguousarray(observation)
            observation = preprocess(observation)
            inp = torch.tensor(observation).type('torch.FloatTensor')
            output = agent(inp).detach().numpy()
            action = processAction(output)
            new_observation, reward, done, info = env.step(action)
            r=r+reward
            
            observation = new_observation

            if(done):
                break

        env.close()
        print("Rewards: ",r)

    except Exception as e:
        env.close()
        print(e.__doc__)
        print(e.message)        

In [None]:
play_agent(agents[29])

In [None]:
env.close()