In [1]:
import numpy as np
import torch
import torch.nn as nn
from config import args
import random
from einops import rearrange
import gymnasium as gym
import torch.nn.functional as F
from matplotlib import pyplot as plt

In [2]:
class Actor(nn.Module):
    def __init__(self,args):
        super().__init__()
        self.matrices=[]
        self.args=args
        matrix1=torch.empty(args['state_dim'],args['hidden_dim'],device=args['device'])
        nn.init.normal_(matrix1,mean=0.0,std=torch.sqrt(torch.tensor(1/args['state_dim'],device=args['device'])))
        self.matrices.append(matrix1)
        for _ in range(args['hidden_layer']-1):
            matrix=torch.empty(args['hidden_dim'],args['hidden_dim'],device=args['device'])
            nn.init.normal_(matrix,mean=0.0,std=torch.sqrt(torch.tensor(1/args['hidden_dim'],device=args['device'])))
            self.matrices.append(matrix)
        matrix2=torch.empty(args['hidden_dim'],args['action_dim'],device=args['device'])
        nn.init.normal_(matrix2,mean=0.0,std=torch.sqrt(torch.tensor(1/args['hidden_dim'],device=args['device'])))
        self.matrices.append(matrix2)
        self.LayerNorms=nn.ModuleList([nn.LayerNorm(args['hidden_dim'],device=args['device']) for _ in range(args['hidden_layer'])])
    def set_matrices(self,matrices):
        self.matrices=matrices
    def forward(self,x):
        for i in range(self.args['hidden_layer']):
            x=x@self.matrices[i]
            x=self.LayerNorms[i](x)
        x=x@self.matrices[-1]
        probs=F.softmax(x)
        action_dist=torch.distributions.Categorical(probs)
        action=action_dist.sample()
        return action.item()

In [3]:
def selection(generation,args):
    parents=[]
    while len(parents)<args['parents_size']:
        a=random.sample(range(0,args['generation_size']),args['tournament_size'])
        best=-999999
        for i in range(len(a)):
            if generation[a[i]]['fitness']>best:
                parent=generation[a[i]]
                best=generation[a[i]]['fitness']
        parents.append(parent)
    return parents

In [4]:
def crossover(parents,args):
    next_generation=[]
    sorted_parents=sorted(parents, key=lambda x: x['fitness'], reverse=True)
    for i in range(args['champion_save']):
        next_generation.append(sorted_parents[i])
    while len(next_generation)<args['generation_size']:
        pair=random.sample(range(0,args['parents_size']),2)
        matrices1=parents[pair[0]]['model'].matrices
        matrices2=parents[pair[1]]['model'].matrices
        next_matrices1=[]
        next_matrices2=[]
        for i in range(args['hidden_layer']):
            a=matrices1[i]
            b=matrices2[i]
            cross=torch.randint(0,2,(args['hidden_dim'],),device=args['device'])
            aa=a*cross+b*(1-cross)
            bb=a*(1-cross)+b*cross
            next_matrices1.append(aa)
            next_matrices2.append(bb)
        a=matrices1[-1]
        b=matrices2[-1]    
        cross=torch.randint(0,2,(args['action_dim'],),device=args['device'])
        aa=a*cross+b*(1-cross)
        bb=a*(1-cross)+b*cross
        next_matrices1.append(aa)
        next_matrices2.append(bb)
        model1=Actor(args)
        model1.set_matrices(next_matrices1)
        model2=Actor(args)
        model2.set_matrices(next_matrices2)
        next1={'model':model1,'fitness':-99999}
        next2={'model':model2,'fitness':-99999}
        next_generation.append(next1)
        next_generation.append(next2)
    return next_generation

In [5]:
def mutation(generation):
    for individual in generation[args['champion_save']:]:
        matrices=individual['model'].matrices
        mutated_matrices=[]
        for matrix in matrices:
            in_features=matrix.shape[0]
            mask=(torch.rand_like(matrix,device=args['device']) < args['mutation_rate']).float()
            noise=torch.randn_like(matrix,device=args['device'])*1/torch.sqrt(torch.tensor(in_features,device=args['device']))*args['sigma']
            matrix=matrix+noise*mask
            mutated_matrices.append(matrix)
        individual['model'].set_matrices(mutated_matrices)
    return generation

In [6]:
def generation_init(args):
    generation=[]
    for i in range(args['generation_size']):
        model=Actor(args)
        generation.append({'model':model,'fitness':0})
    return generation

In [7]:
def score(generation,env,args):
    sum=0.0
    for i in range(len(generation)):
        s=0
        Actor=generation[i]['model']
        state=env.reset()[0]
        state=torch.tensor(state,device=args['device'])
        done=False
        while not done:
            with torch.no_grad():
                action=Actor(state)
            next_state,reward,terminated,truncated,_=env.step(action)
            done=terminated or truncated
            s+=reward
            generation[i]['fitness']+=reward
            sum+=reward
            state=torch.tensor(next_state,device=args['device'])
        generation[i]['fitness']=s
    avg_fitness=sum/args['generation_size']
    return generation,avg_fitness

In [None]:
env_name='CartPole-v1'
env=gym.make(env_name)
args['state_dim']=env.observation_space.shape[0]
args['action_dim']=env.action_space.n
avg_fitness_list=[]
generation=generation_init(args)
for i in range(args['generations']):
    with torch.no_grad():
        generation,avg_fitness=score(generation,env,args)
        avg_fitness_list.append(avg_fitness)
        parents=selection(generation,args)
        next_generation=crossover(parents,args)
        next_generation=mutation(next_generation)
        generation=next_generation
    print(f"generation: {i+1}/{args['generations']} avg_fitness: {avg_fitness}",end='\r')
env.close()

In [None]:
episodes=list(range(len(avg_fitness_list)))
plt.plot(episodes,avg_fitness_list)
plt.show()
#torch.cuda.empty_cache()