In [None]:
# @title Load Game and Polyomino { display-mode: "form" }


In [None]:
# @title Imports { display-mode: "form" }
from Game import Game
from Polyomino import constructPolyominos
from multiprocessing import Pool, cpu_count
from random import randint
import numpy as np
import os
from tqdm.notebook import tqdm
import ast



In [2]:
# @title Parameters { display-mode: "form" }
n = 4 # @param {type:"integer"}

allowed = constructPolyominos(n)
prev_res = list()

def generate_initial_params():
    return tuple(np.random.uniform(200, size=5))


def merge_results(results, n=5):
    merged_dict = {}
    epsilon = 1e-5

    for score, points in results:

        key = points
        found_key = None
        for existing_key in merged_dict.keys():
            if all(abs(x - y) < epsilon for x, y in zip(existing_key, key)):
                found_key = existing_key
                break

        if found_key is not None:
            merged_dict[found_key].append(score)
        else:
            merged_dict[key] = [score]

    merged_results = [(sum(scores) / len(scores), key) for key, scores in merged_dict.items()]

    merged_results.sort(reverse=True, key=lambda x: x[0])

    return merged_results[:n]


def generate_new_params(best):
    res = []
    for i in range(5):
        s = randint(0, len(best)-1)
        res.append(round(best[s][1][i]*(randint(97, 103)/100), 3))
    return tuple(res)


In [3]:
# @title Save Loading { display-mode: "form" }
def get_latest_generation_filename(directory_path, starting_word="Generation_"):
    all_files = os.listdir(directory_path)
    generation_files = [file for file in all_files if file.startswith(starting_word) and file.endswith(".txt")]

    if not generation_files:
        return None

    generation_numbers = [int(file.split(starting_word)[1].split(".")[0]) for file in generation_files]
    latest_generation = max(generation_numbers)
    latest_generation_filename = f"{starting_word}{latest_generation}.txt"

    return latest_generation_filename


def load_from_save(path):
    f = open(path, 'r')
    g = 0
    params = list()
    for line in f.readlines():
        if(line[0]=='G'):
            g = int(line.strip().split(' ')[-1])+1
            continue
        params.append(ast.literal_eval(line.strip()))
    return g, params


In [4]:
# @title Main functions { display-mode: "form" }
def runTetris(params):
    h, blank_spaces, blank_line, line_cleared, uniformity = params
    g  = Game(24, 10)
    g.set_allowed(allowed, 'r')
    g.set_ai_params(h, blank_spaces, blank_line, line_cleared, uniformity)
    score = g.play_game()
    return (score, (h, blank_spaces, blank_line, line_cleared, uniformity))


def run_with_thread_pool(params, i=0, best_n=5, new_params=15, new_initial=5, result_path="./results"):
    global prev_res
    num_processes = cpu_count()
    with Pool(num_processes) as executor:
        res = list()
        print(f"Generation {i}\n")
        with tqdm(total=len(params)) as pbar:
            for r in executor.imap_unordered(runTetris, params):

                pbar.update(1)
                res.append(r)

        f = open(f'{result_path}/Generation_{i}.txt', 'w')
        f.write(f"Generation {i}\n")

        best = merge_results(res, best_n)
        print(f"Best {best_n}")
        for child in best:
            print(child)
            f.write(str(child)+"\n")
        f.close()
        new_params = [generate_new_params(best) for _ in range(new_params)]+[generate_initial_params() for _ in range(new_initial)]
        prev_res.append(res)
    return new_params


In [None]:
# @title TRAINING { display-mode: "form" }
NB_TEST = 5 # @param {type:"integer"}
NB_GENERATIONS = 12 # @param {type:"integer"}
OUTPUT_NAME = "test" # @param {type:"string"}
LOAD = False # @param {type:"boolean"}
INITIAL_PARAMS = 50 # @param {type:"integer"}
BEST_N = 5 # @param {type:"integer"}
BEST_N_AFTER_GEN_5 = 3 # @param {type:"integer"}
NEW_PARAMS_FROM_BEST = 30 # @param {type:"integer"}
NEW_INITIAL_PARAMS = 10 # @param {type:"integer"}

main_path = "./results/"


if(not os.path.exists(f"{main_path}{OUTPUT_NAME}") and LOAD):
    LOAD = False
    print("Save file in specified location does not exist. Starting the evolution from random parameters")

j = 0

if(LOAD):
    file_path = get_latest_generation_filename(f"{main_path}{OUTPUT_NAME}/")
    j, best= load_from_save(f"{main_path}{OUTPUT_NAME}/{file_path}")
    best = [(0, p[1]) for p in best]
    params = [generate_new_params(best) for _ in range(NEW_PARAMS_FROM_BEST)]+[generate_initial_params() for _ in range(NEW_INITIAL_PARAMS)]
else:

    params = [generate_initial_params() for _ in range(INITIAL_PARAMS)]
    if not os.path.exists(f"{main_path}{OUTPUT_NAME}"):
        os.makedirs(f"{main_path}{OUTPUT_NAME}")
for i in range(j, j+NB_GENERATIONS):
    params = run_with_thread_pool(params*NB_TEST, i, result_path=f"{main_path}{OUTPUT_NAME}", best_n=BEST_N, new_params=NEW_PARAMS_FROM_BEST, new_initial=NEW_INITIAL_PARAMS, )
    if(i>5):
      BEST_N = BEST_N_AFTER_GEN_5