In [1]:
import multiprocessing
import random
from collections import defaultdict
from typing import Dict, List, Tuple

import matplotlib.pyplot as plt
import numpy as np
from deap import algorithms, base, creator, tools

%matplotlib notebook

## Problem and Solution Definitions

### Class that holds the problem definition.

This class must implements:
* A method to represent the problem graphically (`__str__`)
* A method to generate a problem instance from an input file (`from_file`).

In [2]:
class Problem:
    def __init__(self, rows: int, cols: int, fleet: int, rides: List[List[int]], bonus: int, steps: int):
        self.rows = rows
        self.cols = cols
        self.fleet = fleet
        self.rides = rides
        self.bonus = bonus
        self.steps = steps

    def distance(self, x: Tuple[int, int], y: Tuple[int, int]) -> int:
        return abs(x[0] - y[0]) + abs(x[1] - y[1])
    
    @property
    def max_bound(self):
        if not hasattr(self, '_max_bound'):
            self._max_bound = len(self.rides) * self.bonus \
                    + np.sum(np.apply_along_axis(lambda x: self.distance(x=(x[0], x[1]), y=(x[2], x[3])), 1, self.rides[:, 0:4]))
        
        return self._max_bound
    
    @property
    def rides_by_starting(self) -> List[List[int]]:
        return [k for k, ride in sorted(enumerate(self.rides), key=lambda x: x[1][4])]
    
    @classmethod
    def from_file(cls, file_path, *args, **kwargs):
        with open(file_path) as f:
            rows, cols, fleet, num_rides, bonus, steps = [int(i) for i in f.readline().split()]
            rides = np.array([[int(x) for x in line.strip().split()] for i, line in zip(range(num_rides), f.readlines())])

        return cls(rows, cols, fleet, rides, bonus, steps)

    def __str__(self):
        return '\n'.join([f'Ride(x={i[0], i[1]}, y={i[2], i[3]}, starting={i[4]}, finish={i[5]})' for i in self.rides])

    def __repr__(self):
        s = f'Problem{{rows={self.rows}, cols={self.cols}, fleet={self.fleet}, bonus={self.bonus}, steps={self.steps}}}'
        if len(self.rides) < 20:
            s += f'\n{str(self)}'
        return s

### Problem instance

In [3]:
problem = Problem.from_file('input/e_high_bonus.in')
problem

Problem{rows=1500, cols=2000, fleet=350, bonus=1000, steps=150000}

### Class that contains a Solution

The solution to this problem will be represented by a class that can be written and represented, so that following methods must be implemented:
* Serialize the solution into a string (`__str__`).
* Create a string that represents the solution (`__repr__`).
* Write to file the solution (`to_file`).

In [4]:
class Solution:
    """
    A solution for this problem that consists of a structure [[r1, r2, ..., rn]...]
    """
    def __init__(self, data, problem):
        self.data = data
        self.problem = problem

    def to_file(self, file_path):
        with open(file_path, 'w') as f:
            f.write(self.__str__())

    def __str__(self):
        return '\n'.join(' '.join([len(s)] + [str(x) for x in s]) for s in self.data)

    def __repr__(self):
        return self.__str__()

## Genetic Algorithm's Operators

In order to generate a population for the Genetic Algorithm it's necessary to define following constants:
* `INDIVIDUAL_TYPE`: The base type for an individual.
* `INDIVIDUAL_SIZE`: Initial size of the individual.

Running the algorithm consists of generating and evaluating individuals, so it's required to define functions for:
* Generating a new individual based on problem instance (`generate`).
* Evaluating the current fitness value of the individual (`evaluate`).

If it's necessary, following functions can be defined:
* Mutate individuals (`mutate`).
* Crossover (`crossover`).

### Helpers

In [5]:
def distance(a, b, x, y):
    return abs(x - a) + abs(y - b)

def ride_length(ride: List[int]) -> int:
    return distance(*ride[0:4])

def can_reach(x: int, y: int, step: int, ride: List[int]) -> bool:
    return distance(x, y, ride[0], ride[1]) + step <= ride[5] - ride_length(ride)

def find_itinerary(car: int, problem: Problem, rides: List[int]) -> List[int]:
    itinerary = []
    step = 0
    x, y = (0, 0)
    try:
        while step <= problem.steps:
            valid_ride_index = next(i for i in rides if can_reach(x, y, step, problem.rides[i]))
            rides.remove(valid_ride_index)
            valid_ride = problem.rides[valid_ride_index]
            x, y = valid_ride[2:4]
            itinerary.append(valid_ride_index)
            step += ride_length(valid_ride)
    except StopIteration:
        pass
    
    return itinerary

def is_car_valid(rides) -> bool:
    x, y = 0, 0
    step = 0
    for ride in rides:
        distance_origin = distance(x, y, ride[0], ride[1])
        distance_end = ride_length(ride)
        if not can_reach(x, y, step, ride):
            return False
        step += distance_origin + distance_end
        x, y = ride[2:4]
    
    return True

def is_individual_valid(individual, problem) -> bool:
    rides_by_car = defaultdict(list)
    for car, ride in individual:
        rides_by_car[car].append(problem.rides[ride])
        
    for car, rides in rides_by_car.items():
        if not is_car_valid(rides):
            return False
    
    return True

# HEURISTIC = problem.cols * problem.rows // problem.cells

# def learning_rate(x):
#     return int(max((HEURISTIC - x) // HEURISTIC, 0))

### Constants

In [6]:
INDIVIDUAL_SIZE = 500
INDIVIDUAL_TYPE = np.ndarray

### Generation

In [7]:
def generate(problem: Problem) -> Dict[int, List[int]]:
    solution = {}
    rides_completed = []
    for c in range(problem.fleet):
        rides_available = [i for i in problem.rides_by_starting if i not in rides_completed]
        rides = find_itinerary(c, problem, rides_available)
        solution[c] = rides
        rides_completed += rides

    return creator.Individual([[car, ride] for car, rides in solution.items() for ride in rides])

### Evaluation

In [8]:
def evaluate(individual, problem: Problem) -> Tuple[float]:
    if individual.any() and is_individual_valid(individual, problem):
        rides_idx = [i[1] for i in individual]
        rides = problem.rides[rides_idx]
        distances = np.sum(np.apply_along_axis(lambda x: distance(x[0], x[1], x[2], x[3]), 1, rides[:, 0:4]))
        bonus = 0
        return float(distances + bonus),
    else:
        return 0.0,

### Mutation

In [9]:
def mutate(individual, problem) -> Tuple:
    length = len(individual)
    mutation = np.delete(individual, random.sample(range(length), random.randint(0, int(length*0.1))), axis=0)
    # new_slices = max(1, int(learning_rate(length) * length)) * 5
    # mutation = np.append(individual, [generate_slice(problem) for _ in range(new_slices)], axis=0)
    
    return creator.Individual(mutation),

### Crossover

In [10]:
def crossover_two_points(ind1, ind2) -> Tuple:
    size = min(len(ind1), len(ind2))
    cxpoint1 = random.randint(1, size)
    cxpoint2 = random.randint(1, size - 1)
    if cxpoint2 >= cxpoint1:
        cxpoint2 += 1
    else: # Swap the two cx points
        cxpoint1, cxpoint2 = cxpoint2, cxpoint1

    ind1[cxpoint1:cxpoint2], ind2[cxpoint1:cxpoint2] = ind2[cxpoint1:cxpoint2].copy(), ind1[cxpoint1:cxpoint2].copy()
        
    return ind1, ind2

def crossover(ind1, ind2) -> Tuple:
    new_slices = max(1, int(learning_rate(len(ind1)) * len(ind1)))
    cx_point = random.randint(0, len(ind1) - new_slices)
    cx1 = np.append(ind1, ind2[cx_point:cx_point+new_slices], axis=0)
    cx2 = np.append(ind2, ind1[cx_point:cx_point+new_slices], axis=0)
    return creator.Individual(cx1), creator.Individual(cx2)

## DEAP toolbox

### Types

In [11]:
creator.create("Fitness", base.Fitness, weights=(1.0,))
creator.create("Individual", INDIVIDUAL_TYPE, fitness=creator.Fitness)

### Population

In [12]:
toolbox = base.Toolbox()
toolbox.register("individual", generate, problem=problem)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)

### Genetic functions

In [13]:
toolbox.register("evaluate", evaluate, problem=problem)
toolbox.register("mate", crossover_two_points)
toolbox.register("mutate", mutate, problem=problem)
toolbox.register("select", tools.selTournament, tournsize=3)

### Multiprocessing

In [14]:
pool = multiprocessing.Pool()
toolbox.register("map", pool.map)

Process ForkPoolWorker-4:
Process ForkPoolWorker-1:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/local/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/local/lib/python3.6/multiprocessing/queues.py", line 334, in get
    with self._rlock:
  File "/usr/local/lib/python3.6/multiprocessing/queues.py", line 335, in get
    res = self._reader.recv_bytes()
  File "/usr/local/lib

## Solution

### Hyperparameters

In [15]:
GENERATIONS = 2000
MU = 10
LAMBDA = 20
CROSSOVER_PROBABILITY = 0.6
MUTATION_PROBABILITY = 0.3

### Run the algorithm

In [None]:
population = toolbox.population(n=MU)

hof = tools.HallOfFame(1, similar=np.array_equal)

stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean, axis=0)
stats.register("std", np.std, axis=0)
stats.register("min", np.min, axis=0)
stats.register("max", np.max, axis=0)

# population, log = algorithms.eaSimple(population, toolbox, CROSSOVER_PROBABILITY, MUTATION_PROBABILITY, GENERATIONS, stats, halloffame=hof)
population, log = algorithms.eaMuPlusLambda(population, toolbox, MU, LAMBDA, CROSSOVER_PROBABILITY, MUTATION_PROBABILITY, GENERATIONS, stats, halloffame=hof)
# population, log = algorithms.eaMuCommaLambda(population, toolbox, MU, LAMBDA, CROSSOVER_PROBABILITY, MUTATION_PROBABILITY, GENERATIONS, stats, halloffame=hof)

KeyboardInterrupt: 

### Best individual

In [None]:
solution = Solution(data=np.array(hof[0]), problem=problem)
print(f'Best individual fitness: {int(hof[0].fitness.wvalues[0])}/{problem.rows*problem.cols} ({int(hof[0].fitness.wvalues[0]) / (problem.rows*problem.cols) * 100:.2f}%)')
# print(str(solution))

### Plot evolution

In [None]:
gen, avg, min_, max_ = log.select("gen", "avg", "min", "max")
plt.plot(gen, avg, label="avg")
plt.plot(gen, min_, label="min")
plt.plot(gen, max_, label="max")
plt.xlabel("Generation")
plt.ylabel("Fitness")
plt.legend(loc="lower right")
plt.show()

In [None]:
solution.to_file('output/bonus.txt')