## EA1 - Steady State

Group 112: Daimy van Loo, Alexandra Genis, Martino ?, Mikel Blom

**Algorithm Components**

(see Eiben p. 26)

* Initialisation: Draw from  $\mathcal{U}[-1,1]$
* Evaluation: Standard Fitness Function and Exponential Ranking
* Select Parents: Tournament Selection with K=5
* Recombination: Whole Arithmetic with $\alpha=0.5$
* Mutation: Nonuniform Mutation with $p=0.5, \sigma=0.2$
* Survivor Selection: ***Steady State***: Top 50% of Exponential Ranked Population + 50% new children


### Imports

In [235]:
# imports framework
import os
from evoman.environment import Environment
from demo_controller import player_controller

# Maths
import numpy as np
from numpy.random import uniform, normal, choice

# Stat Tracking
import pandas as pd

# Graphics
import matplotlib.pyplot as plt

# Timer for progress tracking
from timeit import default_timer as timer

# Saving results:
import csv

### Parameters

In [236]:
# np.random.seed(138)                 # Seed for reproducibility

# Experiment setup
experiment_name = "group112_EA1"    # For making directories for stat tracking / solution files

# Environment parameters
n_hidden_neurons = 10               # Hidden neurons in player_controller
enemies = [3, 2, 1]                   # enemies[0] to train against. Others for testing
random_ini = False
#n_params = 265                     # Calculated below with (env.get_num_sensors()+1)*n_hidden_neurons + (n_hidden_neurons+1)*5

# Training parameters:
n_train_runs = 10                   # number of training runs
n_generations = 50                # number of generations per training run - usually no improvement after 30 gens, and pandas starts bugging out

# Testing parameters
n_test_runs = 5

## Algorithm components:
# Initialisation
n_pop = 100                         # Population size
init_low = -1                       # Lower bound for init weights
init_high = 1                       # Upper bound for init weights

# Parent Selection
tour_k = 5                          # number of candidates in (tournament) selection

# Recombination
wa_alpha = 0.5                      # If =/= 0.5, can reconfigure function to create 2 children

# Mutation
mut_p = 0.75                         # Probability of child chromosome being mutated
nonunif_lr = 0.1                    # Learning rate for nonuniform mutation

# Survivor Selection
gen_gap = 0.8                       # Generation Gap - percentage of new children per gen

### Experiment and Environment Setup

In [None]:
# we don't want visuals during training
os.environ["SDL_VIDEODRIVER"] = "dummy"

# initializes simulation in individual evolution mode, for single static enemy.
env = Environment(experiment_name=experiment_name,
                  enemies=[enemies[0]],                                          
                  playermode="ai",
                  player_controller=player_controller(n_hidden_neurons),
                  enemymode="static",
                  level=2,
                  speed="fastest",
                  visuals=False)

# Uncomment to show env info.
# env.state_to_log() 

### EA Component Functions

In [238]:
## Initialise

def initialise_population(n_pop: int=100, n_params: int=265, low: float=-1, high: float=1) -> np.array:
    """ 
    Initialise population by drawing (n_pop) vectors of length (n_params).
    Parameters are drawn from a uniform distribution on (-1,1)
    """
    return np.array([uniform(low=low, high=high, size=n_params) for _ in range(n_pop)])

In [239]:
## Evaluate

# population fitness, ep, ee, time scores and exponential ranking
def evaluate_population(population, n_pop, env) -> pd.DataFrame:
    """
    returns dataframe which contains weights and all performance stats for a population.
    also calculates exponential ranking.
    """
    pop_stats = pd.DataFrame(columns=["weights","fitness","ep","ee","time","gain"])

    # Calculating fitness, ep, ee, time
    for x in population:
        ind_score = env.play(pcont=x)
        gain = ind_score[1]-ind_score[2]
        pop_stats.loc[len(pop_stats)] = [x, *ind_score, gain]

    # Ranking population
    pop_stats["rank"] = (1-np.exp(pop_stats["fitness"]))/n_pop

    # Sorting by rank
    pop_stats = pop_stats.sort_values(by="rank").reset_index(drop=True)

    return pop_stats

In [240]:
## Tournament selection
def tournament_selection(population: pd.DataFrame, k: int=5, replace: bool=False) -> list[float]:
    """
    Using a ranked population, we only need to select the lowest index.
    k:          number of tournament candidates
    replace:    if False, draw from population without replacement 
                False increases selection pressure.
    returns weights of 1 individual
    """ 
    
    can_idx = choice(np.arange(0,len(population)), replace=replace, size=k)
    can_idx.sort()

    return population.iloc[can_idx[0]]["weights"]

In [241]:
## Recombination
def wa_recombination(parent_1, parent_2, alpha=0.5, num_child=1):
    """
    Whole Arithmetic Recombination, weighted mean of parents.
    Can return 2 children - will return twins is alpha=0.5
    """
    child = alpha*parent_1 + (1-alpha)*parent_2

    if num_child==2:
        child_2 = alpha*parent_2 + (1-alpha)*parent_1
        return child,child_2
    else:
        return child

In [242]:
## Mutation 
def nonunif_mutation(child, lr=0.1, p=0.75):
    """
    Non-Uniform mutation, Eiben p. 57.
    p = probability of given child being mutated
    lr = size of mutation factor; draws are from N(0,lr)
    """
    
    # if no mutation, return child
    if uniform() > p: 
        return child
    
    else:
        # fool me once... 
        mutated_child = child[:]
        
        # draw mutation factors for genes from normal distribution
        mut_factor = normal(loc=0, scale=lr, size=len(mutated_child))

        # mutate selected genes by mutation factor 
        mutated_child += mut_factor

    return mutated_child

### Assembled Algorithm

In [243]:
#  Real-Valued GA (Eiben, p. 26):
def train(env, n_pop=100, n_params=265, n_generations=10, mut_p=.75, mut_lr=.1, gen_gap=.8, init_low=-1, init_high=1, keep_stats=False):
    # Experiment Setup
    ## Begin
    # initialise time
    total_time = 0

    # Initialise population
    population = initialise_population(n_pop, n_params, low=init_low, high=init_high)

    # calculate number of new children and survivors based on hyperparameters
    n_children = int(gen_gap*len(population))
    n_survivors = len(population)-n_children

    # Evaluate initial population:
    population = evaluate_population(population, n_pop, env)

    # Statistics keeping; track population development over generation.
    # Adds 3 columns per generation, can later easily be subsetted with pandas "like" function
    if keep_stats:
        run_df = pd.DataFrame()
        run_df["init_fitness"] = population["fitness"]
        run_df["init_ep"] = population["ep"]
        run_df["init_time"] = population["time"]
        run_df["init_gain"] = population["gain"]


    # Retrieving the relevant values for these individuals
    best_fitness = population.iloc[0]["fitness"]
    best_health = population.iloc[population["ep"].idxmax()]["ep"]
    best_gain = population.iloc[population["ep"].idxmax()]["gain"]
    # If there is no best fast player, time = env.timeexpire
    try: 
        best_time = population.iloc[population[population["ep"] > 0]["time"].idxmin()]["time"]
    except Exception as e:
        best_time = env.timeexpire

    # Copying weights for fittest, healthiest (at end of game) and fastest (and beating enemy)
    w_fittest = population.iloc[0]["weights"]
    w_healthiest = population.iloc[population["ep"].idxmax()]["weights"]
    w_gain = population.iloc[population["gain"].idxmax()]["weights"]

    # If there is no fastest alive player, just save best player:
    try: 
        w_fastest = population.iloc[population[population["ep"] > 0]["time"].idxmin()]["weights"]
    except Exception as e:
        w_fastest = population.iloc[0]["weights"]

    # Starting training loop:
    print(f"Best values at initialisation: ")
    print(f"Fitness: {best_fitness:.4f}, Most Health: {best_health:.4f}, Time Fastest: {best_time:4}")
    print("-----------------------------------------------------------------")
    print(f"Running for {n_generations} generations:\n")

    # Repeat Until (Termination Condition is satisfied):
    for i in range(n_generations):
        
        gen_time = timer()
        new_generation = []

        # Generate (gen_gap)*len(population) new children. 
        # Each set of parents generates 1 child.
        for _ in range(n_children):
            
            # Select Parents
            parent_1 = tournament_selection(population, tour_k)
            parent_2 = tournament_selection(population, tour_k)

            # Recombine Pairs of Parents
            child_1 = wa_recombination(parent_1, parent_2)

            # Mutate offspring
            child_1 = nonunif_mutation(child_1, lr=mut_lr, p=mut_p)
            
            new_generation.append(child_1)


        # The next generation consists of (1-gen_gap)*population survivors
        # the new generation is of size gen_gap*population 
        survivors = population.iloc[:n_survivors][:]
        
        # Calculate fitness, ep, etc for new generation: 
        new_generation = evaluate_population(np.array(new_generation), n_pop, env)[:]

        # reassemble population, then sort by rank:
        population = pd.concat([survivors, new_generation], axis=0)
        population = population.sort_values(by="rank").reset_index(drop=True)
        
        # Update statistics:
        if keep_stats:
            run_df[f"gen_{i+1}_fitness"] = population["fitness"]
            run_df[f"gen_{i+1}_ep"] = population["ep"]
            run_df[f"gen_{i+1}_time"] = population["time"]
            run_df[f"gen_{i+1}_gain"] = population["gain"]

        # Retrieving new best fitness, healthiest, fastest
        # Only printing new fitness though
        can_fitness = population.iloc[0]["fitness"]
        can_health = population.iloc[population["ep"].idxmax()]["ep"]
        can_gain = population.iloc[population["gain"].idxmax()]["gain"]

        # if there is no best time, best time is env.timeexpire
        try: 
            can_time = population.iloc[population[population["ep"] > 0]["time"].idxmin()]["time"]
        except Exception as e:
            can_time = env.timeexpire

        # Doing some time math
        cur_gen_time = timer()-gen_time
        total_time += cur_gen_time
        print(f"Generation {i+1:3}      Generation time: {cur_gen_time:6.2f}    Total time: {total_time:6.2f}")

        # If new best fitness found, show and save:
        if can_fitness > best_fitness:
            best_fitness = can_fitness
            w_fittest = population.iloc[0]["weights"][:]
            print(f"New best solution fitness: {best_fitness:.4f}")

        # if best health found, best fastest found, only save:
        if can_health > best_health:
            best_health = can_health
            w_healthiest = population.iloc[population["ep"].idxmax()]["weights"][:]

        if can_time < best_time:
            best_time = can_time
            try: 
                w_fastest = population.iloc[population[population["ep"] > 0]["time"].idxmin()]["weights"][:]
            except Exception as e:
                w_fastest = population.iloc[0]["weights"][:]

        if can_gain > best_gain:
            best_gain = can_gain
            w_gain = population.iloc[population["gain"].idxmax()]["weights"][:]


    # Assemble results; pandas dataframe consisting of fittest, fastest and healthiest:
    results = w_fittest

    # After n_generations, print fitness score of fittest individual
    # Return parameters for fittest individual
    print("-----------------------------------------------------------------")
    print(f"After {n_generations} generations in {total_time:.2f} seconds:")
    print(f"Best individual fitness: {best_fitness:.4f}")
    print(f"Lowest Time-to-win: {best_time}, Highest Remaining Player Energy: {best_health:.1f}\n\n")
    
    if keep_stats:
        return results, run_df
    else:
        return results 

### Run Training

In [244]:
def save_best_individuals(best_individuals : dict):
    with open(f"{experiment_name}/EA1_best_individuals.csv", 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(best_individuals.keys())
        writer.writerows(zip(*best_individuals.values()))

In [None]:
def run_training(env, enemies, n_train_runs=10, n_pop=100, n_hidden_neurons=10, n_generations=30, mut_p=.75, mut_lr=.1, gen_gap=.8, init_low=-1, init_high=1, random_ini=False):
    """
    This function trains n_train_runs on each enemy in the list enemies. 
    It returns a dictionary with enemies as keys, and the best individual per enemy as values.
    """
    # make paths to save logs:
    if not os.path.exists(experiment_name):
        os.makedirs(experiment_name)

    if random_ini:
        env.update_parameter("randomini", "yes")

    # initialise dict to keep track of best weights per enemy
    best_enemy_individuals = {}

    for enemy in enemies:
        env.update_parameter("enemies", [enemy])
        
        fittest_individuals = []
        n_training_stats = []
        
        # Calculate environment parameters
        n_params = (env.get_num_sensors()+1)*n_hidden_neurons + (n_hidden_neurons+1)*5
        
        for i in range(n_train_runs):
            print(f"Training run {i+1} of {n_train_runs} for enemy {enemy}")
            run_fittest, run_stats = train(env, n_pop, n_params, n_generations, mut_p, mut_lr, gen_gap, init_low, init_high, keep_stats=True)
            fittest_individuals.append(run_fittest)
            n_training_stats.append(run_stats)
            run_stats.to_csv(f"{experiment_name}/EA1_Train_Run_{i+1}_Enemy_{enemy}_stats.csv")

        # weights of fittest individual saved to pickle file
        best_enemy_individuals[enemy] = fittest_individuals

    save_best_individuals(best_enemy_individuals)

    return best_enemy_individuals

best_enemy_individuals = run_training(env, enemies, n_train_runs, n_pop, n_hidden_neurons, n_generations, mut_p, nonunif_lr, gen_gap, init_low, init_high, random_ini)

### Run Tests

In [None]:
def run_tests(env, enemies : list, best_enemy_individuals : dict, n : int = 5, random_ini = False):
	"""
	This function let's the best individual found for each enemy play against 
	that same enemy for n runs. It returns a dictonary with the enemies as values,
	and a list of the final fitness of each run.
	"""
	gain_tests = {}

	# loop over all enemies 
	for en in enemies:
		print(f'Loading Highest Fitness Solution weights for Enemy {en}')

		# get best weights for this enemy
		best_enemy_weights = best_enemy_individuals[en]

		# update environment
		env.update_parameter('enemies',[en])
		if random_ini:
			env.update_parameter("randomini", "yes")

		gains = []
		for individual in best_enemy_weights:
			total_gain = 0

			# do n runs against this enemy 
			for run in range(n):
				f,ep,ee,t = env.play(pcont=individual)
				print(f"Run {run+1} of best Enemy {en} Weights against Enemy {en}")
				print(f"Resulting Fitness score: {f:.4f}")
				print(f"Time: {t}   Remaining player life: {ep:.1f}   Remaining enemy life: {ee:.1f}")
				total_gain += (ep - ee)
			
			gains.append(total_gain / n)
		
		gain_tests[en] = gains

	with open(f"{experiment_name}/EA1_{n}_Tests_stats.csv", 'w', newline='') as file:
		writer = csv.writer(file)
		writer.writerow(gain_tests.keys())
		writer.writerows(zip(*gain_tests.values()))

	return gain_tests
		
tests = run_tests(env, enemies, best_enemy_individuals, n_test_runs, random_ini)