In [12]:
import random
from deap import base, creator, tools, algorithms
import matplotlib.pyplot as plt
from genetic.common_types import Rule, ConditionType, OperatorType, ActionType
from genetic.agent import GeneticAgent
from genetic.game import Game
import numpy as np
import pickle
from pommerman.agents import PlayerAgent
import multiprocessing
import os

if "FitnessMax" in creator.__dict__:
    del creator.FitnessMax
if "Individual" in creator.__dict__:
    del creator.Individual


In [13]:
POPULATION_SIZE = 100
MUTATION_RATE = 0.3
CROSSOVER_RATE = 0.7
MAX_GENERATIONS = 50
TOURNAMENT_SIZE = 7
NUM_ELITES = 10

In [14]:
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

In [15]:
def random_condition():
    return random.choice(list(ConditionType))

def random_operator():
    return random.choice(list(OperatorType))

def random_action():
    return random.choice(list(ActionType))

def create_random_rule():
    num_conditions = random.choices([1, 2, 3], weights=[1, 2, 7], k=1)[0]
    conditions = [random_condition() for _ in range(num_conditions)]
    
    num_operators = num_conditions - 1
    operators = [random_operator() for _ in range(num_operators)]
    
    action = random_action()
    
    return Rule(conditions, operators, action)

def create_individual(num_rules):
    return [create_random_rule() for _ in range(num_rules)]

In [16]:
toolbox = base.Toolbox()
toolbox.register("rule", create_random_rule)

toolbox.register("individual", tools.initRepeat, creator.Individual, 
                toolbox.rule, n=10)

toolbox.register("population", tools.initRepeat, list, toolbox.individual)

In [17]:
def mutate_rule(rule, indpb=0.7):
    if random.random() < indpb:
        # 50% chance to add a condition if there are less than 3
        if len(rule.conditions) < 3 and random.random() < 0.5:
            rule.conditions.append(random_condition())
            if len(rule.conditions) > 1:
                rule.operators.append(random_operator())
        # 50% chance to remove a condition if there are more than 1
        elif len(rule.conditions) > 1 and random.random() < 0.5:
            idx = random.randint(0, len(rule.conditions) - 1)
            rule.conditions.pop(idx)
            if idx < len(rule.operators):
                rule.operators.pop(idx)
            else:
                rule.operators.pop(-1)
        # otherwise, replace a condition
        else:
            idx = random.randint(0, len(rule.conditions) - 1)
            rule.conditions[idx] = random_condition()

    for i in range(len(rule.operators)):
        if random.random() < indpb:
            rule.operators[i] = random_operator()
            
    if random.random() < indpb:
        rule.action = random_action()
        
    return rule

def mutate_individual(individual):
    num_rules_to_mutate = max(1, int(len(individual) * 0.3))
    rule_indices = random.sample(range(len(individual)), num_rules_to_mutate)

    for i in rule_indices:
        if random.random() < 0.2:
            individual[i] = create_random_rule()
        else:
            individual[i] = mutate_rule(individual[i], 0.7)

    return individual,

def crossover_individuals(ind1, ind2):
    cxpoint1 = random.randint(0, len(ind1))
    cxpoint2 = random.randint(0, len(ind1))
    if cxpoint1 > cxpoint2:
        cxpoint1, cxpoint2 = cxpoint2, cxpoint1
        
    ind1[cxpoint1:cxpoint2], ind2[cxpoint1:cxpoint2] = \
        ind2[cxpoint1:cxpoint2], ind1[cxpoint1:cxpoint2]
            
    return ind1, ind2

In [18]:
def evaluate_tournament(tournament_data):
    population, indices = tournament_data
    agents = [GeneticAgent(rules=population[index], individual_index=index) for index in indices]
    game = Game(agents)
    results = game.play_game(num_episodes=3, render_mode=None)
    return results

def evaluate_population_in_tournament(population):
    fitness_scores = [0] * len(population)
    num_tournaments = len(population) // 4
    
    tournament_data = []
    for _ in range(num_tournaments):
        competitors_indices = np.random.choice(len(population), 4, replace=False)
        tournament_data.append((population, competitors_indices))

    processor_count = min(multiprocessing.cpu_count(), len(tournament_data))
    with multiprocessing.Pool(processes=processor_count) as pool:
        all_results = pool.map(evaluate_tournament, tournament_data)
        
    for result in all_results:
        for episode_result in result:
            agents = episode_result['agents']
            total_steps = episode_result['total_steps']
            for agent in agents:
                agent_index = agent['individual_index']
                
                if agent['winner']:
                    fitness_scores[agent_index] += 25

                visited_tiles = len(agent['visited_tiles'])
                fitness_scores[agent_index] += visited_tiles * 2
                fitness_scores[agent_index] += agent['bombs_placed'] * 3

                kills = agent['kills']
                for kill in kills:
                    if kill == agent['individual_index']:
                        fitness_scores[agent_index] -= 10
                    else:
                        fitness_scores[agent_index] += 10

                if visited_tiles < 10 and agent['bombs_placed'] < 2:
                    fitness_scores[agent_index] -= 5
                
    return [(score,) for score in fitness_scores]
        
        
def evaluate_individual(individual):
    return (0, )

In [19]:
toolbox.register("mate", crossover_individuals)
toolbox.register("mutate", mutate_individual)
toolbox.register("select", tools.selTournament, tournsize=TOURNAMENT_SIZE)
toolbox.register("evaluate", evaluate_individual)

In [20]:
def run_evolution(n_gen=10, pop_size=40):
    pop_size = (pop_size // 4) * 4
    if pop_size < 4:
        pop_size = 4
    
    print(f"Starting evolution with population size: {pop_size}")
    
    pop = toolbox.population(n=pop_size)
    
    fitnesses = evaluate_population_in_tournament(pop)
    for ind, fit in zip(pop, fitnesses):
        ind.fitness.values = fit
        
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", np.mean)
    stats.register("min", np.min)
    stats.register("max", np.max)
    
    hof = tools.HallOfFame(2)
    
    for gen in range(n_gen):
        print(f"Generation {gen + 1}/{n_gen}")
        
        elites = tools.selBest(pop, NUM_ELITES)
        elites = [toolbox.clone(ind) for ind in elites]
        
        offspring = toolbox.select(pop, len(pop) - NUM_ELITES)
        offspring = [toolbox.clone(ind) for ind in offspring]
        
        for i in range(0, len(offspring), 2):
            if i + 1 < len(offspring):
                if random.random() < CROSSOVER_RATE:
                    toolbox.mate(offspring[i], offspring[i + 1])
                    del offspring[i].fitness.values
                    del offspring[i + 1].fitness.values

        for i in range(len(offspring)):
            if random.random() < MUTATION_RATE:
                toolbox.mutate(offspring[i])
                del offspring[i].fitness.values

        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = evaluate_population_in_tournament(invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit
            
        pop[:] = offspring + elites
        
        hof.update(pop)
        
        fits = [ind.fitness.values[0] for ind in pop]
        length = len(pop)
        mean = sum(fits) / length
        
        print(f"  Min: {np.min(fits)}")
        print(f"  Max: {np.max(fits)}")
        print(f"  Avg: {mean}")
        print(f"  Best individual fitness: {hof[0].fitness.values[0]:.2f}")

        # Gen starts at 0, so we save every 10 generations
        if (gen + 1) % 10 == 0:
            if not os.path.exists("generations"):
                os.makedirs("generations")

            with open(f"generations/generation_{gen}.pkl", "wb") as f:
                pickle.dump(pop, f)

        with open(f"best_individual.pkl", "wb") as f:
            pickle.dump(hof[0], f)

    return pop, stats, hof

In [21]:
final_pop, stats, hof = run_evolution(n_gen=30, pop_size=200)

print("\nEvolution finished.")
print(f"Best individual has {len(hof[0])} rules with fitness: {hof[0].fitness.values[0]}")
print("Best individual rules:")
for rule in hof[0]:
    print(rule)

Starting evolution with population size: 200
Generation 1/30
  Min: -112.0
  Max: 201.0
  Avg: 13.675
  Best individual fitness: 201.00
Generation 2/30
  Min: -71.0
  Max: 201.0
  Avg: 29.635
  Best individual fitness: 201.00
Generation 3/30
  Min: -92.0
  Max: 201.0
  Avg: 36.435
  Best individual fitness: 201.00
Generation 4/30
  Min: -145.0
  Max: 201.0
  Avg: 36.74
  Best individual fitness: 201.00
Generation 5/30
  Min: -71.0
  Max: 201.0
  Avg: 48.865
  Best individual fitness: 201.00
Generation 6/30
  Min: -106.0
  Max: 201.0
  Avg: 54.18
  Best individual fitness: 201.00
Generation 7/30
  Min: -59.0
  Max: 249.0
  Avg: 65.77
  Best individual fitness: 249.00
Generation 8/30
  Min: -65.0
  Max: 249.0
  Avg: 47.365
  Best individual fitness: 249.00
Generation 9/30
  Min: -97.0
  Max: 249.0
  Avg: 56.0
  Best individual fitness: 249.00
Generation 10/30
  Min: -162.0
  Max: 282.0
  Avg: 68.035
  Best individual fitness: 282.00
Generation 11/30
  Min: -127.0
  Max: 360.0
  Avg: 80.5