# 1. Genetic Algorithm with DEAP #

Подгружаем необходимые библиотеки

In [1]:
from deap import tools, base
from numpy import random as rnd
import numpy as np
from deap import creator
from deap import benchmarks
from deap.algorithms import eaMuPlusLambda
from functools import partial
#Для отрисовки функции
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import matplotlib.pyplot as plt
%matplotlib inline

## 1.1 Задача поиска экстремума сложной функции

Creator - метафабрика классов позволяющая создавать классы, которые будут удовлетворять потребности ваших эволюционных алгоритмов. 

Note: weights=(-1.0,) - minimization, weights=(1.0,) - maximization

In [2]:
creator.create("BaseFitness", base.Fitness, weights=(-1.0,)) 
creator.create("Individual", np.ndarray, fitness=creator.BaseFitness)

Инициализируем целевую функцию 

In [3]:
optimization_function = benchmarks.rastrigin

Нарисуем ее

In [4]:
def function_drawing(func, limit):

    def function_arg0(selected_function, sol):
        return selected_function(sol)[0]

    function_for_drawing = partial(function_arg0, func)

    fig = plt.figure()
    ax = Axes3D(fig, azim = -29, elev = 40)
    X = np.arange(limit[0], limit[1], 0.5)
    Y = np.arange(limit[0], limit[1], 0.5)
    X, Y = np.meshgrid(X, Y)
    Z = np.fromiter(map(function_for_drawing, zip(X.flat,Y.flat)), dtype=np.float, count=X.shape[0]*X.shape[1]).reshape(X.shape)

    ax.plot_surface(X, Y, Z, rstride=1, cstride=1, cmap=cm.jet, linewidth=0.2)

    plt.xlabel("x")
    plt.ylabel("y")

    plt.show()

In [5]:
dimension = 3
pop_size = 100
iterations = 100
mut_prob = 0.6
cross_prob = 0.3
limit = [-5, 5]

In [None]:
function_drawing(optimization_function, limit)

Зададим параметры нашего жволюционного алгоритма
* dimension - размерность задачи
* pop_size - размер популяции
* iterations - количество поколений 
* mut_prob - вероятность применения оператора мутации
* cross_prob - вероятность применения оператора скрещивания к двум отобранным индивидам
* limit - область поиска по каждой из переменых функции

Функция для инициализации случайного числа из выбранной области определения:

In [7]:
def factory(limit, dimension):
    return limit[0] + rnd.random(dimension) * (limit[1]-limit[0])

In [8]:
class DrawLog:
    @staticmethod
    def read_log(log):
        avg_list, std_list, min_list, max_list, gen_list = [list() for i in range(5)]
        for g in log:
            avg_list.append(g['avg'])
            std_list.append(g['std'])
            min_list.append(g['min'])
            max_list.append(g['max'])
            gen_list.append(g['gen'])
        return np.array(gen_list), np.array(avg_list), np.array(std_list), np.array(max_list), np.array(min_list)
    
    @staticmethod
    def draw_log(log):
        gen_list, avg_list, std_list, max_list, min_list = DrawLog.read_log(log)
        plt.plot(gen_list, avg_list, label="avg")
        plt.plot(gen_list, min_list, label="min")
        plt.plot(gen_list, max_list, label="max")
        plt.fill_between(gen_list, avg_list-std_list, avg_list+std_list, alpha=0.2)
        plt.ylabel('Fitness')
        plt.xlabel('Generation, #')
        plt.legend()
        plt.tight_layout()
        plt.show()
        
    @staticmethod
    def draw_logs(log1, log2, lab1, lab2):
        gen1, avg1, std1, max1, min1 = DrawLog.read_log(log1)
        gen2, avg2, std2, max2, min2 = DrawLog.read_log(log2)
        plt.plot(gen1, avg1, label=lab1, color="blue")
        plt.plot(gen1, max1, label="{}_max".format(lab1), color="purple", linewidth=2)
        plt.fill_between(gen1, avg1 - std1, avg1 + std1, alpha=0.2, color="blue")
        plt.plot(gen2, avg2, label=lab2, color="orange")
        plt.plot(gen2, max2, label="{}_max".format(lab2), color="red", linewidth=2)
        plt.fill_between(gen2, avg2 - std2, avg2 + std2, alpha=0.2, color="orange")
        plt.ylabel('Fitness')
        plt.xlabel('Generation, #')
        plt.legend()
        plt.tight_layout()
        plt.show()

## Генетический алгоритм реализованный посредствам Deap

Все основные объекты, которые мы будем использовать на нашем пути: индивид, популяция, а также все функции, операторы и аргументы будут храниться в контейнере DEAP с именем Toolbox. 

Он содержит два метода добавления и удаления содержимого register()и unregister().

**В конструкторе класса GeneticAlgorithm** мы регистрируем две функции инициализации individual() и population(). 

Здесь, в inidividual:
* tools.initIterate - инидивид наследуется от какого-то итеративного(iterable) объекта (например list).
* creator.Individual - тип объекта который создается
* ind_gener_func - функция при помощи которой инициализируется случайный объект

В population:
* tools.initRepeat - способ инициализации контейнера популяции
* list - тип контейнера
* toolbox.individual - какой функцией инициализируется объект-индивид
* pop_size - сколько индивидов необходимо создать в популяции

Ниже инициализируются эволюционные операторы.

**В функции run_evolution** происходит создание популяции и определение обсервера – данных для мониторинга, далее происходит запуск эволюционного процесса.

In [9]:
class GeneticAlgorithm:
    def __init__(self, factory, function,  pop_size=50, limit=limit, dimension=dimension,crossover=tools.cxOnePoint,\
                 mutation=partial(tools.mutGaussian, mu=0, sigma=0.5, indpb=0.2), \
                 selection = partial(tools.selTournament, tournsize=4)):
        self.toolbox = base.Toolbox()
        self.pop_size = pop_size
        # Structure initializers
        ind_gener_func = partial(factory, limit, dimension)
        self.toolbox.register("individual", tools.initIterate, creator.Individual, ind_gener_func)
        self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual, pop_size)
        #genetic operators
        self.toolbox.register("mate", crossover) #crossover
        self.toolbox.register("mutate", mutation)
        self.toolbox.register("select", selection)
        self.toolbox.register("evaluate",function)
    
    def run_evolution(self, cross_prob, mut_prob, iterations, verbose = True):
        pop = self.toolbox.population() #инициализация начальной популяции
        hof = tools.HallOfFame(3, np.array_equal) #хранятся лучшие решения, архив
        stats = tools.Statistics(lambda ind: ind.fitness.values[0])
        stats.register("avg", np.mean)
        stats.register("std", np.std)
        stats.register("min", np.min)
        stats.register("max", np.max)
        pop, log = eaMuPlusLambda(pop, self.toolbox, mu=self.pop_size, lambda_=int(pop_size*0.8), cxpb=cross_prob, mutpb=mut_prob,
                              ngen=iterations, stats=stats, halloffame=hof, verbose=True)
        print("Best individual = {}".format(hof[0]))
        print("Best fitness = {}".format(hof[0].fitness.values[0]))
        if verbose:
            DrawLog.draw_log(log)
        return pop, log

Инициализация генетического алгоритма

In [10]:
ga = GeneticAlgorithm(factory, optimization_function, pop_size, limit, dimension)

Запуск генетического алгоритма

In [None]:
population, log = ga.run_evolution(cross_prob, mut_prob, iterations)

Протестируем наш алгоритм на другой функции.

In [None]:
iterations = 200
pop_size = 200
mut_prob = 0.9
cross_prob = 0.1
optimization_function = benchmarks.griewank
function_drawing(optimization_function, limit=[-50, 50])

In [None]:
ga = GeneticAlgorithm(factory, optimization_function, pop_size)
population, log = ga.run_evolution(cross_prob, mut_prob, iterations)

Увеличим область поиска

In [None]:
limit = [-100, 100]
ga = GeneticAlgorithm(factory, optimization_function, pop_size)
population, log = ga.run_evolution(cross_prob, mut_prob, iterations)

При увеличении размерности задачи так же будет наблюдаться ухудшение качества поиска. Это означает то, что в данном случае требуется задать другие более подходящие гиперпараметры (в первую очередь увеличить размер популяции и количество поколений для поиска), что позволит получить хорошее решение в изменившихся условиях.

In [None]:
limit = [-5, 5]
dimension = 10
iterations = 700
pop_size = 700
ga = GeneticAlgorithm(factory, optimization_function, pop_size, limit=limit, dimension=dimension)
population, log = ga.run_evolution(cross_prob, mut_prob, iterations)

Мы можем воспользоваться другими генетическими операторами из Deap. 
**Попробуем задать другую функцию скрещивания.**

In [None]:
#выберем двухточеченое скрещивание
crossover = tools.cxTwoPoint
ga = GeneticAlgorithm(factory, optimization_function, pop_size, crossover = crossover, dimension=dimension)
population, log = ga.run_evolution(cross_prob, mut_prob, iterations)

## Использование собственных генетических операторов.

Реализация собственного оператора мутации

In [17]:
def Mymutation(individual):
    n = len(individual)
    for i in range(n):
        if rnd.random() < n * 0.15:
            individual[i] += rnd.normal(0.0, 0.2)
            individual[i] = np.clip(individual[i], -5, 5)
    return individual,

Передадим его в алгоритм в качестве параметра

In [None]:
iterations = 300
pop_size = 300
ga = GeneticAlgorithm(factory, optimization_function, pop_size, mutation = Mymutation, dimension=5)
population, log = ga.run_evolution(cross_prob, mut_prob, iterations)

Отлично! Перейдем к следующему разделу

## 1.2 Задача о рюкзаке

**Условие**: из заданного множества предметов со свойствами «стоимость» и «вес» требуется отобрать подмножество с максимальной полной стоимостью и минимальным весом (так же установлено ограничение на суммарный вес рюкзака).

In [19]:
import random
from deap import algorithms

**Инициализация основных параметров/ограничений задачи**
* ind_init_size - количество предметов с которым инициализируются индивиды начальной популяции
* max_item - максимальное количество предметов в рюкзаке
* max_weight - максимальный вес рюкзака
* num_of_items - количество различных предметов

In [20]:
ind_init_size = 5
max_item = 50
max_weight = 50
num_of_items = 20

In [21]:
creator.create("KnapsackFitness", base.Fitness, weights=(-1.0, 1.0))
creator.create("KnapsackIndividual", set, fitness=creator.KnapsackFitness)

Теперь у вас есть индивиды (которые по сути являются множествами из предметов). 

У каждого из них можно оценить fitness. 

В данном случае цель - это минимизация первой цели (веса мешка) и максимизация второй цели (ценность мешка). 

Мы имеем дело с *многокритериальной задачей комбинаторной оптимизации*.

Теперь мы создадим словарь из 20 случайных предметов, которые и будем складывать в рюкзак:

In [22]:
# Create the item dictionary: item name is an integer, and value is 
# a (weight, value) 2-uple.
items = {}
# Create random items and store them in the items' dictionary.
for i in range(num_of_items):
    items[i] = (random.randint(1, 10), random.uniform(0, 100))

Определим **функцию оценки индивида**.

Здесь рассчитывается суммарный вес и суммарная стоимость (не забываем, что у нас два objectiv-а).

In [23]:
def evalKnapsack(individual):
    weight = 0.0
    value = 0.0
    for item in individual:
        weight += items[item][0]
        value += items[item][1]
    if len(individual) > max_item or weight > max_weight:
        return 10000, 0             # Ensure overweighted bags are dominated
    return weight, value

В Deap нет операторов кроссовера и мутации, которые можно было бы применять непосредственно к наборам. Поэтому придется ввети свои.

**Функция скрещивания**

Определим простой вариант скрещивания порождающий двух детей от двух родителей. И в нашем случае первый ребенок будет получаться пересечением двух множеств, а второй ребенок - их разницей.

Однако для того чтобы не получались пустые индивиды, необходимо прописать некоторые дополнительные условия.

In [24]:
def cxSet(ind1, ind2):
    """Apply a crossover operation on input sets. The first child is the
    intersection of the two sets, the second child is the difference of the
    two sets.
    """
    temp = set(ind1)                # Used in order to keep type
    temp2 = set(ind2)
    ind1 &= ind2                    # Intersection (inplace)
    ind2 ^= temp                    # Symmetric Difference (inplace)x
    if len(ind1) == 0:
        ind1.add(temp.pop())
    if len(ind2) == 0:
        ind2.add(temp2.pop())
    return ind1, ind2

**Оператор мутации** будет случайным образом добавлять или удалять элемент из набора.

In [25]:
def mutSet(individual):
    """Mutation that pops or add an element."""
    if random.random() < 0.5:
        if len(individual) > 1:     # We cannot pop from an empty set
            individual.remove(random.choice(sorted(tuple(individual))))
    else:
        individual.add(random.randrange(num_of_items))
    return individual,

Затем мы регистрируем эти операторы в панели инструментов. 

Поскольку это **многокритериальная задача**, мы выбрали схему селекции **NSGA-II**: selNSGA2()

In [26]:
class NSGA2():
    def __init__(self, evaluation, crossover, mutation):
        self.toolbox = base.Toolbox()
        # Attribute generator
        self.toolbox.register("attr_item", random.randrange, num_of_items)
        # Structure initializers
        self.toolbox.register("individual", tools.initRepeat, creator.KnapsackIndividual, self.toolbox.attr_item, ind_init_size)
        self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
        self.toolbox.register("evaluate", evaluation)
        self.toolbox.register("mate", cxSet)
        self.toolbox.register("mutate", mutSet)
        self.toolbox.register("select", tools.selNSGA2)
    def run(self, generations = 50, mut_prob=0.3, cross_prob = 0.5, lamb=100, mu = 50):
        random.seed(64)

        pop = self.toolbox.population(n=mu)
        hof = tools.ParetoFront()
        stats = tools.Statistics(lambda ind: ind.fitness.values)
        stats.register("avg", np.mean, axis=0)
        stats.register("std", np.std, axis=0)
        stats.register("min", np.min, axis=0)
        stats.register("max", np.max, axis=0)

        algorithms.eaMuPlusLambda(pop, self.toolbox, mu, lamb, cross_prob, mut_prob, generations, stats,
                                  halloffame=hof)

        return pop, stats, hof

In [None]:
pop, stats, hof =NSGA2(evaluation = evalKnapsack, crossover = cxSet, mutation = mutSet).run()

In [28]:
weights = []
values = []
for ind in hof:
    w, val = evalKnapsack(ind) 
    weights.append(w)
    values.append(val)

Найденное множество Парето:

In [None]:
fig, ax = plt.subplots()

ax.scatter(weights, values, c = 'red')  

ax.set_title('Парето-фронт', fontsize = 15)  
plt.ylabel('Стоимость', fontsize = 15)
plt.xlabel('Вес', fontsize = 15)

fig.set_figwidth(8)     
fig.set_figheight(8)    

plt.show()

## 2. Метод роя частиц (PSO) реализованный посредствам Deap

In [30]:
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Particle", np.ndarray, fitness=creator.BaseFitness, speed=None, smin=None, smax=None, best=None)

* particle_generation - функция инициализации нового индивида

На начальном этапе сам индивид случайным образом инициализируется со случайной скоростью.

* updateParticle - служит для обновления скорости и позиции индивида

In [31]:
class PSOAlgorithm:
    
    def generate(self):
        return rnd.uniform(self.pmin, self.pmax, self.dimension)

    def particle_generation(self):
        particle = tools.initIterate(creator.Particle, self.generate)
        particle.speed = rnd.uniform(self.smin, self.smax, self.dimension)
        particle.smin = self.smin
        particle.smax = self.smax
        return particle

    def updateParticle(self, part, global_best):
        v1 = (part.best - part) * rnd.uniform(0, self.c1)
        v2 = (global_best - part) * rnd.uniform(0, self.c2)
        part.speed = np.clip(part.speed * self.w + v1 + v2, self.smin, self.smax)
        part[:] = np.clip(part[:] + part.speed, self.pmin, self.pmax)

    def __init__(self, pop_size, iterations, dimension, function):
        self.pop_size = pop_size
        self.iterations = iterations
        self.dimension = dimension
        self.function = function
        self.c1 = 0.8
        self.c2 = 0.7
        self.w = 1.0
        self.pmin = -5.0
        self.pmax = 5.0
        self.smin = -2.0
        self.smax = 2.0

        self.toolbox = base.Toolbox()
        self.toolbox.register("particle", self.particle_generation)
        self.toolbox.register("population", tools.initRepeat, list, self.toolbox.particle)
        self.toolbox.register("update", self.updateParticle)
        self.toolbox.register("evaluate", self.function)

    def run(self, verbose = True):
        pop = self.toolbox.population(n=self.pop_size)
        stats = tools.Statistics(lambda ind: ind.fitness.values[0])
        stats.register("avg", np.mean)
        stats.register("std", np.std)
        stats.register("min", np.min)
        stats.register("max", np.max)

        logbook = tools.Logbook()
        logbook.header = ["gen", "evals"] + stats.fields

        GEN = self.iterations
        best = None

        for g in range(GEN):
            for part in pop:
                part.fitness.values = self.toolbox.evaluate(part)
                if part.best is None or part.best.fitness < part.fitness:
                    part.best = creator.Particle(part)
                    part.best.fitness.values = part.fitness.values
                if best is None or best.fitness < part.fitness:
                    best = creator.Particle(part)
                    best.fitness.values = part.fitness.values
            for part in pop:
                self.toolbox.update(part, best)

            # Gather all the fitnesses in one list and print the stats
            logbook.record(gen=g, evals=len(pop), **stats.compile(pop))
            print(logbook.stream)
        if verbose:
            DrawLog.draw_log(logbook)

        return pop, logbook, best

In [None]:
pso_exp = PSOAlgorithm(pop_size, iterations, dimension, optimization_function)
pop, logbook, best = pso_exp.run()
print("best")
print(best)
print("speed")
print(pop[0].speed)

# 3. Сравнение эффективностей алгоритмов

In [None]:
dimension = 10
pop_size = 100
iterations = 500
ga = GeneticAlgorithm(factory, optimization_function, pop_size, mutation = Mymutation, dimension=dimension)
population, log = ga.run_evolution(cross_prob, mut_prob, iterations, verbose=False)

pso = PSOAlgorithm(pop_size, iterations, dimension, optimization_function)
pop, logbook, best = pso.run(verbose=False)
DrawLog.draw_logs(log, logbook, "ga", "pso")