In [2]:
import gym
import numpy as np
import torch
import matplotlib.pyplot as plt
import time

In [8]:
from gym.wrappers.record_video import RecordVideo

In [9]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [10]:
import math
import copy

In [11]:
class CartPoleAI(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = nn.Sequential(nn.Linear(4, 128, bias=True),
                               nn.ReLU(),
                               nn.Linear(128, 2, bias=True),
                               nn.Softmax(dim=1)
                               )
        
    def forward(self, inputs):
        x = self.fc(inputs)
        return x

In [12]:
def init_weights(m):
    if ((type(m)==nn.Linear) | (type(m)==nn.Conv2d)):
        torch.nn.init.xavier_uniform(m.weight)
        m.bias.data.fill_(0.00)

In [13]:
def return_random_agents(num_agents):
    
    agents = []
    for _ in range(num_agents):
        
        agent = CartPoleAI()
        
        for param in agent.parameters():
            param.requires_grad = False
        
        init_weights(agent)
        agents.append(agent)
            
    return agents

In [28]:

def run_agents(agents):
    
    reward_agents=[]
    env = gym.make("CartPole-v0")
    
    for agent in agents:
        agent.eval()
        
        observation = env.reset()
        
        r=0
        s=0
        
        for _ in range(250):
            
            inp = torch.tensor(observation).type('torch.FloatTensor').view(1,-1)
            output_probabilities = agent(inp).detach().numpy()[0]
            action = np.random.choice(range(game_actions), 1, p=output_probabilities).item()
            new_observation, reward, done, info = env.step(action)
            r=r+reward
            
            s=s+1
            observation = new_observation
            
            if(done):
                break
                
        reward_agents.append(r)
        
    return reward_agents

In [37]:
env1 = gym.make('CartPole-v1')
obs = env1.reset()
obs
#inp1 = torch.tensor(obs).view(-1,-1)

(array([-0.03431816, -0.01800865, -0.02355058, -0.04934127], dtype=float32),
 {})

In [49]:
a = (np.array([1,2,3]), {})
b = torch.tensor(list(a))

RuntimeError: Could not infer dtype of dict

In [15]:
def return_avg_score(agent, runs):
    
    score=0
    for i in range(runs):
        score += run_agents([agent])[0]
        
    return score/runs

In [16]:
def run_agents_n_times(agents, runs):
    avg_score=[]
    
    for agent in agents:
        avg_score.append(return_avg_score(agent, runs))
    return avg_score

In [17]:
def mutate(agent):
    
    child_agent = copy.deepcopy(agent)
    
    mutation_power = 0.02
    
    for param in child_agent.parameters():
        if(len(param.shape)==4):
            for i0 in range(param.shape[0]):
                for i1 in range(param.shape[1]):
                    for i2 in range(param.shape[2]):
                        for i3 in range(param.shape[3]):
                            
                            param[i0][i1][i2][i3] += mutation_power * np.random.randn()
                            
        elif(len(param.shape)==2):
            for i0 in range(param.shape[0]):
                for i1 in range(param.shape[1]):
                    
                    param[i0][i1] += mutation_power * np.random.randn()
                    
        elif(len(param.shape)==1):
            for i0 in range(param.shape[0]):
                param[i0] += mutation_power * np.random.randn()
                
    return child_agent

In [18]:
def return_children(agents, sorted_parent_indexes, elite_index):
    
    children_agents = []
    
    
    for i in range(len(agents)-1):
        
        selected_agent_index = sorted_parent_indexes[np.random.randint(len(sorted_parent_indexes))]
        children_agents.append(mutate(agents[selected_agent_index]))

    
    elite_child = add_elite(agents, sorted_parent_indexes, elite_index)
    children_agents.append(elite_child)
    elite_index=len(children_agents)-1 
    
    return children_agents, elite_index

In [19]:
def add_elite(agents, sorted_parents_indexes, elite_index=None, only_consider_top_n=10):
    
    candidate_elite_index = sorted_parents_indexes[:only_consider_top_n]
    
    if(elite_index is not None):
        candidate_elite_index = np.append(candidate_elite_index, [elite_index])
        
    top_score = None
    top_elite_index = None
    
    for i in candidate_elite_index:
        score = return_avg_score(agents[i], runs=5)
        print("Score for elite ", i, "is ", score)
        
        if(top_score is None):
            top_score = score
            top_elite_index = i
            
        elif(score>top_score):
            top_score = score
            top_elite_index = i
            
            
    print("Elite selected with index ", top_elite_index, "and score ", top_score)
    
    child_agent = copy.deepcopy(agents[top_elite_index])
    
    return child_agent

In [20]:
def softmax(x):
    
    return np.exp(x)/np.sum(np.exp(x), axis=0)

In [21]:
game_actions = 2

torch.set_grad_enabled(False)

num_agents=500
agents = return_random_agents(num_agents)

top_limit = 20

generations = 1000

elite_index = None

for generation in range(generations):
    
    rewards = run_agents_n_times(agents, 3)
    
    sorted_parents_indexes = np.argsort(rewards)[::-1][:top_limit]
    
    print("")
    print("")
    
    top_rewards = []
    
    for best_parent in sorted_parents_indexes:
        top_rewards.append(rewards[best_parent])
        
    print("Generation ",generation, "| Mean rewards: ", np.mean(rewards), "| Mean of top 5: ", np.mean(top_rewards[:5]))
        
    print("Top", top_limit, "scores: ",sorted_parents_indexes)
    print("Rewards for top: ", top_rewards)
        
    children_agents, elite_index= return_children(agents, sorted_parents_indexes, elite_index)
    
    agents = children_agents

  logger.warn(


RuntimeError: Could not infer dtype of dict

In [22]:
type(observation)

NameError: name 'observation' is not defined

In [18]:
def play_agent(agent):
    try:
        
        env = gym.make('CartPole-v1')
        
        env_record = RecordVideo(env, './video', force=True)
        observation = env_record.reset()
        last_observation = observation
        r=0
        for _ in range(250):
            env_record.render()
            inp = torch.tensor(observation).type('torch.FloatTensor').view(1,-1)
            output_probabilities = agent(inp).detach().numpy()[0]
            action = np.random.choice(range(game_actions), 1, p=output_probabilities).item()
            new_observation, reward, done, info = env_record.step(action)
            r=r+reward
            observation = new_observation

            if(done):
                break

        env_record.close()
        print("Rewards: ",r)

    except Exception as e:
        env_record.close()
        print(e.__doc__)
        print(str(e))
        

In [None]:
play_agent(agents[96])