In [None]:
from collections import deque
import numpy as np
import pandas as pd
import seaborn as sns
import random
import pickle
import glob

from matplotlib import pyplot as plt
from tqdm.notebook import tqdm

import wandb

from tetris.Environment import TetrisEnv
from util.decaying import DecayingLinear

In [None]:
POPULATION_SIZE = 50
ELITE_SIZE = 3  # elite-selection (copy first n-th directly without any recombination/mutation)

TOURNAMENT_SIZE = 3
TOURNAMENT_RANGE = np.arange(POPULATION_SIZE)

GENOM_SIZE = 25

MOVE_AMOUNT_CAP = 400

MUTATION_RATE_START = 2
MUTATION_RATE_END = 0.2
MUTATION_RATE_DURATION = 10  # in generation

In [None]:
env = TetrisEnv()

In [None]:
# sadly slower:
# def tournament_selection_1(fitnesses):
#     tournament = np.random.choice(TOURNAMENT_RANGE, size=TOURNAMENT_SIZE, replace=False)
    
#     return tournament[np.argmax(np.take(fitnesses, tournament))]

def tournament_selection(fitnesses):
    """Selects an invidivual by using tournament selection and returns the individual's index."""
    attended = []
    best_fitness = -1000
    best_participant = -1
    
    for _ in range(TOURNAMENT_SIZE):
        participant = random.randrange(POPULATION_SIZE)
        
        while participant in attended:
            participant = random.randrange(POPULATION_SIZE)
            
        attended.append(participant)
        
        if fitnesses[participant] > best_fitness:
            best_fitness = fitnesses[participant]
            best_participant = participant
    
    return best_participant

def tournament_selection_parents(fitnesses):
    """Selects two individuals using tournament selection. It is made sure that the same individual is not chosen twice."""
    p1 = tournament_selection(fitnesses)
    p2 = tournament_selection(fitnesses)
    
    while p2 == p1:
        p2 = tournament_selection(fitnesses)
    
    return p1, p2

In [None]:
def get_best_state(heuristics, genome):
    """
        Given a batch of heuristics, selects the best entry according to the maximum linear combinations of 
        the weighted heuristic values, where the genome is used as weights. 
    """
    weighted = np.multiply(heuristics, genome)
    linear = np.sum(weighted, axis=1)
    
    return np.argmax(linear)

In [None]:
def crossover(parent1, parent2):
    """Apply single-point crossover of the two given chromosomes."""
    cut_point = random.randrange(GENOM_SIZE)  # select a random cutting point
    
    # combine the genome of the first parent up to the cutting point with the genome of the second parent
    # after the cutting point, and vice versa
    child1 =  np.concatenate([parent1[:cut_point], parent2[cut_point:]])
    child2 =  np.concatenate([parent2[:cut_point], parent1[cut_point:]])
    
    return child1, child2

def two_point_crossover(parent1, parent2):
    """Two-point crossover implemented by twice applying single-point crossover."""
    return crossover(*crossover(parent1, parent2))

In [None]:
def mutate(genome):
    """Mutates the given chromosome by multiplying each gene with a normally distributed random value."""
    # use normal distribution to mutate
    return np.multiply(np.random.normal(1, mutation_rate.get(), size=(GENOM_SIZE)), genome) 

In [None]:
def determine_fitness(genome, print_debug=False):
    """Given a genome (an invidivual), determine its fitness by simulating a game and using the end score as fitness value."""
    env.reset()
    
    # perform the best action according to the given genome, until the game is finished or the move cap has been reached
    while True:
        states, scores, clears, heuristics, dones = env.get_next_states()
        
        chosen_index = get_best_state(heuristics, genome)
        
        if dones[chosen_index] or env.moves >= MOVE_AMOUNT_CAP:
            if print_debug:
                print(f'Score: {env.score}')
                print(f'Moves: {env.moves}, Clears: {env.clears}, t-spins: {env.tspins}, all_clears: {env.all_clears}')
            
            wandb.log({'game/score': env.score, 'game/moves': env.moves, 'game/tspins': env.tspins, 'game/all_clears': env.all_clears, 
                   'game/singles': env.clears[0], 'game/doubles': env.clears[1], 'game/triples': env.clears[2], 'game/quads': env.clears[3],
            })
            
            return env.score + env.moves  # also use moves so the learning algorithm has something to go on in the beginning
        else:
            env.step(states[chosen_index], clears[chosen_index], scores[chosen_index])

In [None]:
def get_elite(genomes, fitnesses):
    """Returns the fittest individuals of a population."""
    indices = np.argsort(fitnesses)[-ELITE_SIZE:]
    
    return np.take(genomes, indices, axis=0), np.take(fitnesses, indices, axis=0)

In [None]:
def generate_random_population():
    """Generates a population by randomly generating genomes and determining their fitness."""
    genomes = np.abs(np.random.normal(2.5, size=(POPULATION_SIZE, GENOM_SIZE)))
    fitnesses = np.array([determine_fitness(genome) for genome in genomes])
    
    return genomes, fitnesses 

### Training loop

In [None]:
wandb.init(project='tetris-genetic', config={ 'POPULATION_SIZE': POPULATION_SIZE, 'ELITE_SIZE': ELITE_SIZE, 'TOURNAMENT_SIZE': TOURNAMENT_SIZE, 'GENOM_SIZE': GENOM_SIZE, 'MOVE_AMOUNT_CAP': MOVE_AMOUNT_CAP })

In [None]:
# initialize the decaying mutation rate
mutation_rate = DecayingLinear(MUTATION_RATE_START, MUTATION_RATE_END, MUTATION_RATE_DURATION)

In [None]:
# generate the initial populations
population, fitnesses = generate_random_population()

In [None]:
for generation in range(21):
    children_genomes = []
    children_fitnesses = []

    with tqdm(total=POPULATION_SIZE - ELITE_SIZE) as pbar:
        while len(children_genomes) < POPULATION_SIZE - ELITE_SIZE:
            # parent selection
            parent1, parent2 = tournament_selection_parents(fitnesses)
            parent1, parent2 = population[parent1], population[parent2]

            # recombination
            child1, child2 = two_point_crossover(parent1, parent2)

            # mutation
            child1, child2 = mutate(child1), mutate(child2)

            # fitness evaluation
            child1_fitness, child2_fitness = determine_fitness(child1), determine_fitness(child2)

            # store children
            children_fitnesses.append(child1_fitness)
            children_fitnesses.append(child2_fitness)
            
            wandb.log({ 'genetic/generation': generation, 'genetic/fitness': child1_fitness})
            wandb.log({ 'genetic/generation': generation, 'genetic/fitness': child1_fitness})

            children_genomes.append(child1)
            children_genomes.append(child2)
            
            pbar.update(2)
    
    
    # update population    
    elite_genomes, elite_fitnesses = get_elite(population, fitnesses)
    
    wandb.log({ 'elite/generation': generation, 'elite/fitness': elite_fitnesses[2]})
    wandb.log({ 'elite/generation': generation, 'elite/fitness': elite_fitnesses[1]})
    wandb.log({ 'elite/generation': generation, 'elite/fitness': elite_fitnesses[0]})
    
    # a few elite individuals get cloned to the next generation
    population[:ELITE_SIZE] = elite_genomes
    fitnesses[:ELITE_SIZE] = [determine_fitness(elite_genome) for elite_genome in elite_genomes]
    
    # the other spots are filled with the offspring
    population[ELITE_SIZE:] = children_genomes[:POPULATION_SIZE - ELITE_SIZE]
    fitnesses[ELITE_SIZE:] = children_fitnesses[:POPULATION_SIZE - ELITE_SIZE]
    
    print(f'GENERATION {generation} DONE! Max fitness: {np.max(fitnesses)}')
    
    mutation_rate.step()

In [None]:
elite_genomes, elite_fitnesses = get_elite(population, fitnesses)

In [None]:
# save runs
np.savetxt('models/genetic-1000-singles.txt', elite_genomes[1], fmt='%f')
np.savetxt('models/genetic-1000-singles-FULL.txt', population, fmt='%f')