In [None]:
# マリオ関連のimport
from nes_py.wrappers import JoypadSpace
import gym_super_mario_bros

# プロット関連のimport
# import matplotlib.pyplot as plt
# from matplotlib import animation, rc

# 数値関連のimport
import math
import numpy as np
import numpy.random as rnd

# 警告関連のimport
import warnings

# マルチプロセス関連のimport
from concurrent.futures import ThreadPoolExecutor

# 時間関連のimport
import time

# シード値を設定
rnd.seed(1704034800)

# 警告を非表示
warnings.filterwarnings("ignore", category=UserWarning, module="gym.envs.registration")

In [None]:
# アクションパターン
MOVEMENT = [
    ['right', 'B'],
    ['right', 'A', 'B'],
]

# 定数設定
MAX_WORKERS = 10       # 最大プロセス数
MAX_GENERATIONS = 100  # 最大世代数
NUM_MARIOS = 50        # 個体数
LEN_CHROMOSOME = 300   # 染色体の長さ
CROSS_RATE = 0.8       # 交叉率
MUTATION_RATE = 0.1    # 突然変異率
FRAME_INTERVAL = 10    # 行動するフレーム間隔

In [None]:
def create_generation():
    """初期世代を作成する関数"""
    return rnd.randint(len(MOVEMENT), size=(NUM_MARIOS, LEN_CHROMOSOME))

In [None]:
def cross(parent1, parent2):
    """交叉を行う関数"""
    cross_points = rnd.choice(LEN_CHROMOSOME, 1, replace=False)
    child1 = np.concatenate([parent1[:cross_points[0]], parent2[cross_points[0]:]])
    child2 = np.concatenate([parent2[:cross_points[0]], parent1[cross_points[0]:]])
    return child1, child2


In [None]:
def mutation(mario):
    """突然変異を行う関数"""
    if rnd.random() < MUTATION_RATE:
        num_mutations = math.ceil(0.05 * LEN_CHROMOSOME)
        mutated_indexes = rnd.choice(LEN_CHROMOSOME, num_mutations, replace=False)
        for index in mutated_indexes:
            mario[index] = rnd.randint(len(MOVEMENT))

    return mario

In [None]:
def sorts(fitnesses, generation):
    """マリオを並び替える関数"""
    return zip(*sorted(zip(fitnesses, generation), key=lambda x: x[0], reverse=True))

In [None]:
def print_fitness(fitnesses, current_generation):
    """適応度を出力する関数"""
    max = fitnesses[0]
    min = fitnesses[NUM_MARIOS - 1]
    avg = int(sum(fitnesses) / NUM_MARIOS)
    print("{:<3}   max: {:<4}   min: {:<4}   avg: {:<4}".format(current_generation, max, min, avg))

In [None]:
def roulette_selection(fitnesses, generation):
    """ルーレット選択を行う関数"""
    selection_rates = fitnesses / np.sum(fitnesses)
    parent_indexes = rnd.choice(NUM_MARIOS, 2, p=selection_rates, replace=False)
    return generation[parent_indexes[0]], generation[parent_indexes[1]]

In [None]:

def evaluate(mario):
    """評価関数"""
    # 環境設定
    env = gym_super_mario_bros.make('SuperMarioBros-1-1-v0')
    env = JoypadSpace(env, MOVEMENT)
    env.reset()

    # ゲーム本番
    end_flag = False
    for action in mario:
        for _ in range(FRAME_INTERVAL):
            observation, reward, done, info = env.step(action)
            if done:
                end_flag = True
                break

        if end_flag:
            break

    # 評価値計算
    evaluation = info["x_pos"]
    env.close()
    return evaluation

In [None]:
# 初期世代
generation = create_generation()

# マルチプロセス
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    start_time = time.time()

    for current_generation in range(1, MAX_GENERATIONS + 1):
        # 適応度
        fitnesses = []
        evaluations = list(executor.map(evaluate, generation))
        fitnesses.extend(evaluations)
        fitnesses, generation = sorts(fitnesses, generation)
        print_fitness(fitnesses, current_generation)

        # 次世代
        next_generation = []
        num_elite = math.ceil(NUM_MARIOS * (1 - CROSS_RATE))
        next_generation.extend(generation[:num_elite])
        while len(next_generation) < NUM_MARIOS:
            parent1, parent2 = roulette_selection(fitnesses, generation)
            child1, child2 = cross(parent1, parent2)
            next_generation.extend([mutation(child1), mutation(child2)])

        generation = next_generation[:NUM_MARIOS]
    
    # 実行時間
    end_time = time.time()
    print("Total time(s) : {}".format(end_time - start_time))