In [1]:
import retro
import torch
from torch import nn
from copy import deepcopy

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Moriarty, D., Schultz, A. & Grefenstette, J. (1999). Evolutionary Algorithms for Reinforcement Learning. Journal of Artificial Intelligence Research, 11, 241-276. https://doi.org/10.1613/jair.613 - A Review on the use of Evolutionary Algorithms in Reinforcement Learning, something that, apparently, was relatively popular in the 1990s.

Bai, H., Cheng, R. & Jin, Y. (2023). Evolutionary Reinforcement Learning: A Survey. Intelligent Computing, 2, Article 0025. https://doi.org/10.34133/icomputing.0025  - Yet another review. There is a vast range of uses for Evolutionary Algorithms in RL, not just the model to be used, but even the epsilon hyperparameter, and the reward function.

https://lilianweng.github.io/posts/2018-02-19-rl-overview/#evolution-strategies - Lil's Log, by Lilian Weng

"ES, as a black-box optimization algorithm, is another approach to RL problems (In my original writing, I used the phrase “a nice alternative”; Seita pointed me to this discussion and thus I updated my wording.)" - Lil's Log

Unfortunately, it seems that nowadays Evolutionary Algorithms for Reinforcement Learning tend to make people a bit...nervous.

Evolutionary Algorithms are, afterall, less efficient than gradient optimization.

Even so, I like the idea of Evolutionary Algorithms, so let's play a bit. Let's stick to the simple idea of selecting the most fitting subject ("policy")

In [3]:
class Subject(nn.Module):

    '''
    Simple Neural Network for testing
    '''

    def __init__(self):

        # No epsilon, no mode.
        # Each subject shall prove its worthness

        super(Subject, self).__init__()

        # Input = 3x200x256

        self.neuron1 = nn.Linear(3*200*256, 64)
        self.neuron2 = nn.Linear(64, 64)
        self.neuron3 = nn.Linear(64, 12)

        self.relu = nn.ReLU()

        # Our environment is MultiBinary, so we'll use Sigmoid.
        # MultiBinary Environment: For each button, determine if pressed or not.

        self.sigmoid = nn.Sigmoid()

    def forward(self, obs):

        x = obs.contiguous().view(obs.size(0), -1)

        x = self.neuron1(x)
        x = self.relu(x)
        x = self.neuron2(x)
        x = self.relu(x)

        actions = self.neuron3(x)

        actions = self.sigmoid(actions)

        del x

        return actions

In [4]:
def substitution_gaussian_mutation(gene, n_mutations):

    mean, std = torch.mean(gene), torch.std(gene)

    try:

        distribution = torch.distributions.Normal(mean, std)

    except: # If standard deviation is invalid (0.0), use Normal Distribution instead

        distribution = torch.distributions.Normal(torch.zeros(1,), torch.ones(1,))

    for mutate in range(n_mutations):

        if gene.ndim == 1: # Bias

            element = torch.randperm(gene.size(0))[0].item()

            mutation = distribution.sample((1,))

            with torch.no_grad():

                gene[element] = mutation

        elif gene.ndim == 2: # Linear Layer

            row, column = torch.randperm(gene.size(0))[0].item(), torch.randperm(gene.size(1))[0].item()

            mutation = distribution.sample((1,))

            with torch.no_grad():

                gene[row, column] = mutation

        else: # Convolution Layer

            A, B, C, D = torch.randperm(gene.size(0))[0].item(), torch.randperm(gene.size(1))[0].item(), torch.randperm(gene.size(2))[0].item(), torch.randperm(gene.size(3))[0].item()

            mutation = distribution.sample((1,))

            with torch.no_grad():

                gene[A, B, C, D] = mutation

    return gene

def substitution_normal_mutation(gene, n_mutations):

    distribution = torch.distributions.Normal(torch.zeros(1,), torch.ones(1,))

    for mutate in range(n_mutations):

        if gene.ndim == 1: # Bias

            element = torch.randperm(gene.size(0))[0].item()

            mutation = distribution.sample((1,))

            with torch.no_grad():

                gene[element] = mutation

        elif gene.ndim == 2: # Linear Layer

            row, column = torch.randperm(gene.size(0))[0].item(), torch.randperm(gene.size(1))[0].item()

            mutation = distribution.sample((1,))

            with torch.no_grad():

                gene[row, column] = mutation

        else: # Convolution Layer

            A, B, C, D = torch.randperm(gene.size(0))[0].item(), torch.randperm(gene.size(1))[0].item(), torch.randperm(gene.size(2))[0].item(), torch.randperm(gene.size(3))[0].item()

            mutation = distribution.sample((1,))

            with torch.no_grad():

                gene[A, B, C, D] = mutation

    return gene

def deletion_mutation(gene, n_mutations):

    for mutate in range(n_mutations):

        if gene.ndim == 1: # Bias

            element = torch.randperm(gene.size(0))[0].item()

            mutation = torch.zeros((1,))

            with torch.no_grad():

                gene[element] = mutation

        elif gene.ndim == 2: # Linear Layer

            row, column = torch.randperm(gene.size(0))[0].item(), torch.randperm(gene.size(1))[0].item()

            mutation = torch.zeros((1,))

            with torch.no_grad():

                gene[row, column] = mutation

        else: # Convolution Layer

            A, B, C, D = torch.randperm(gene.size(0))[0].item(), torch.randperm(gene.size(1))[0].item(), torch.randperm(gene.size(2))[0].item(), torch.randperm(gene.size(3))[0].item()

            mutation = torch.zeros((1,))

            with torch.no_grad():

                gene[A, B, C, D] = mutation

    return gene

def inversion_mutation(gene, n_mutations):

    for mutate in range(n_mutations):

        if gene.ndim == 1: # Bias

            elements = torch.randperm(gene.size(0))

            mutationA = gene[elements[0]].clone()
            mutationB = gene[elements[1]].clone()

            with torch.no_grad():

                gene[elements[0].item()] = mutationB
                gene[elements[1].item()] = mutationA

        elif gene.ndim == 2: # Linear Layer

            rows, columns = torch.randperm(gene.size(0)), torch.randperm(gene.size(1))

            with torch.no_grad():

                mutationA = gene[rows[0].item(), columns[0].item()].clone()
                mutationB = gene[rows[1].item(), columns[1].item()].clone()

                gene[rows[0].item(), columns[0].item()] = mutationB
                gene[rows[1].item(), columns[1].item()] = mutationA

        else: # Convolution Layer

            A, B, C, D = torch.randperm(gene.size(0)), torch.randperm(gene.size(1)), torch.randperm(gene.size(2)), torch.randperm(gene.size(3))

            with torch.no_grad():

                mutationA = gene[A[0].item(), B[0].item(), C[0].item(), D[0].item()].clone()
                mutationB = gene[A[1].item(), B[1].item(), C[1].item(), D[1].item()].clone()

                gene[A[0].item(), B[0].item(), C[0].item(), D[0].item()] = mutationB
                gene[A[1].item(), B[1].item(), C[1].item(), D[1].item()] = mutationA

    return gene

In [5]:
# Hyperparameters

POPULATION_SIZE = 20
MUTAGEN_FACTOR = 100 # The maximum number of mutated genes
GENERATIONS = 5 # During how many generations Selection will be applied
# Note: 1 Generation = 1 Epoch --> 1 complete iteration through entire dataset
GENERATION_SIZE = 1000 # How many iterations configure a single generation --> Timesteps
GAMMA = 0.99 # Discount factor for future rewards (uncertainty)

In [6]:
population = [Subject().to(device) for i in range(POPULATION_SIZE)]
chromossomes = [x.state_dict() for x in population]
genes = [n for n, _ in population[0].named_parameters()]

print(len(chromossomes))
#print(chromossomes[0]) # 50 chromossomes -> Used to map each gene
print(len(genes)) # 6 genes per subject -> The neuron weights + biases
print(genes[0])

20
6
neuron1.weight


In [7]:
# Do the Evolution

# Gym doesn't allow for multiple instances, but let's see if we can fool it
env = retro.make(game="StreetFighterIISpecialChampionEdition-Genesis", state="ChunLiVsBlanka.1star")
environments = [env]*POPULATION_SIZE
states = [x.reset() for x in environments]
obs = env.reset()
obs = torch.from_numpy(obs)
obs = obs/255
obs = obs.permute(2, 1, 0).unsqueeze(0).float().to(device)

for generation in range(GENERATIONS):

    rewards = [[0.0]]*POPULATION_SIZE

    MUTAGEN_FACTOR = MUTAGEN_FACTOR//(2 ** generation)

    if MUTAGEN_FACTOR <= 1:

        MUTAGEN_FACTOR = 2

    for _ in range(GENERATION_SIZE):

        for subject in range(POPULATION_SIZE):

            obs = states[subject]
            obs = torch.from_numpy(obs)
            obs = obs/255
            obs = obs.permute(2, 1, 0).unsqueeze(0).float().to(device)

            with torch.no_grad(): # No grads since we're not using Gradient Descent

                action = population[subject](obs)
                action = action.squeeze(0)

            # MultiBinary Environment --> Only 0.0 or 1.0 accepted
            bin = []
            for x in action:
                if x > 0.5:
                    bin.append(1.)
                else:
                    bin.append(0.)
            
            action = bin # This is the actual action

            obs, reward, end, info = environments[subject].step(action)
            states[subject] = obs

            reward = (info['health']**(1+info['matches_won'])) - (info['enemy_health']**(1+info['enemy_matches_won']))
            reward = torch.tensor(reward, device=device)
            reward = -(10.0/(torch.exp(reward) + 1.0)) + 5.0 # Normalizing to -5 to +5 (sigmoid function)
            
            rewards[subject].append(reward)

    for subject_reward_t in range(len(rewards)):

        subject_reward = rewards[subject_reward_t]

        for r_t in range(len(subject_reward)):

            subject_reward[r_t] = subject_reward[r_t] * (GAMMA ** r_t)

        # Getting the total (discounted) reward

        subject_reward = torch.tensor(subject_reward)
        subject_reward = subject_reward.sum()

        rewards[subject_reward_t] = subject_reward.item()

    #rewards = [rewards[i][0] for i in range(len(rewards))]
    rewards = torch.tensor(rewards)

    rewards, idx = torch.sort(rewards, descending=True)

    # Selecting the 10 most fit subjects to reproduce (through mitosis)
    # And applying mutations

    for s in range(10):

        population[idx[POPULATION_SIZE-1-s].item()] = deepcopy(population[idx[s].item()])

        # Selecting genes for mutation ---> Genes = weights and biases

        selected_gene = torch.randint(0, len(genes), (1,)).item()
        gene = genes[selected_gene]
        gene = [chromossomes[idx[POPULATION_SIZE-1-s].item()][gene]]

        chance = abs(torch.rand((1,)))

        if 0.0 < chance < 0.3:

            n_mutations = torch.randint(1, MUTAGEN_FACTOR, (1,)).item()

            for codon in gene:

                substitution_gaussian_mutation(codon, n_mutations)

        if 0.3 < chance < 0.4:

            n_mutations = torch.randint(1, MUTAGEN_FACTOR, (1,)).item()

            for codon in gene:

                substitution_normal_mutation(codon, n_mutations)

        if 0.4 < chance < 0.5:

            n_mutations = torch.randint(1, MUTAGEN_FACTOR, (1,)).item()

            for codon in gene:

                deletion_mutation(codon, n_mutations)

        if 0.5 < chance < 0.8:

            n_mutations = torch.randint(1, MUTAGEN_FACTOR, (1,)).item()

            for codon in gene:

                inversion_mutation(codon, n_mutations)

    print(f"{generation+1}/{GENERATIONS}")
    print(f"Best model so far: {idx[0].item()}")
    print(f"Best Model Total Reward: {rewards[0]}")

for e in environments:

    e.close()

env.close()

1/5
Best model so far: 1
Best Model Total Reward: 1.5085072845977265e-05
2/5
Best model so far: 0
Best Model Total Reward: 494.9991455078125
3/5
Best model so far: 19
Best Model Total Reward: -22.458486557006836
4/5
Best model so far: 19
Best Model Total Reward: -22.458486557006836
5/5
Best model so far: 0
Best Model Total Reward: 13.07724380493164


In [9]:
# Gameplay Mode

env = retro.make(game="StreetFighterIISpecialChampionEdition-Genesis", state="ChunLiVsBlanka.1star")
obs = env.reset()
obs = torch.from_numpy(obs)
obs = obs/255
obs = obs.permute(2, 1, 0).unsqueeze(0).float().to(device)
steps = 0

best_model = population[idx[0].item()]

# If you'd like to save and train even more

'''states = []
actions = []
rewards = []
deltas = []'''

while steps < 1000:
    env.render()

    with torch.no_grad():

        action = best_model(obs)

    action = action.squeeze(0)

    # MultiBinary Environment --> Only 0.0 or 1.0 accepted
    bin = []
    for x in action:
        if x > 0.5:
            bin.append(1.)
        else:
            bin.append(0.)

    action = bin

    obs, reward, end, info = env.step(action)
    obs = torch.from_numpy(obs)
    obs = obs/255
    obs = obs.permute(2, 1, 0).unsqueeze(0).float().to(device)
    #reward = torch.tensor(reward, device=device)

    '''reward = (info['health']**(1+info['matches_won'])) - (info['enemy_health']**(1+info['enemy_matches_won']))
    reward = torch.tensor(reward, device=device)
    reward = -(10.0/(torch.exp(reward) + 1.0)) + 5.0 # Normalizing to -5 to +5 (sigmoid function)'''

    '''states.append(obs.cpu())
    actions.append(action.cpu())
    rewards.append(reward.cpu())
    deltas.append(delta.cpu())'''

    steps += 1

env.render(close=True)
env.close()