In [1]:
"""
IslandMethod
Added by Davide Anghileri, Nathan Consuegra, 2017
"""
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
import numpy as np
import pandas as pd
import random
from datetime import datetime
import re


# TODO: Move to a new class ---------------
import time
from timeit import Timer

def print_time(message, time):
    print("\n", message, time, "s", sep = '')
    
    
def timer(message, wrapped):
    time, result = wrapped
    print_time(message, time)
    return result


def wrapper(func, *args, **kwargs):
    output_container = []
    def wrapper():
        output_container.append(func(*args, **kwargs))
    timer = Timer(wrapper)
    delta = timer.timeit(1)
    return delta, output_container.pop()
# -----------------------------------------


class IslandMethod:
    
    def __init__(self, diffEvo, workers = 1, localIt = 2):
        # Island based input parameters check
        if diffEvo == None:
            raise ValueError("The differential evolution object is needed.")
            
        # Set internal parameters
        self._minimum = 10**-6
        self.workers = workers
        self.diffEvo = diffEvo
        self.localIt = localIt
        self.partitions = np.arange(diffEvo.np)
        self.evalAccum = 0
        self.migrationCount = 0
        self.globalTime = 0
        self.bestChromosome = None
        self.stoppingReason = -1
        
        
    def globalTimer(self, message, wrapped):
        time, result = wrapped
        self.globalTime = time
        print_time(message, time)
        return result
        
        
    def execute(self, sc):
        return self.globalTimer("Global execution time: ", wrapper(self.run, sc))
        
        
    def run(self, sc):
        stop = False
        
        # Create the NP x D np.matrix with random numbers between +- bound
        initialPop = self.diffEvo.initialPopulation()
        pop_rdd = sc.parallelize(initialPop)
        
        # Parallelize the keys (partitions)
        keys = sc.parallelize(self.partitions)
        
        # Setting initial individuals with initial score
        individuals = keys.zip(pop_rdd)
        individuals = individuals.map(lambda x: (x[0], (x[1], self.diffEvo.func(x[1]))))
        self.evalAccum += individuals.count()
        
        while (not stop):
            # Partition by the key previously computed at random
            individuals = individuals.partitionBy(self.workers)
            print("Number of partitions:", individuals.getNumPartitions())

            # Execute the algorithm for each island
            wrapped_individuals = wrapper(individuals.mapPartitions, self.island)
            individuals = timer("Island iteration time: ", wrapped_individuals)
            individuals = sc.parallelize(individuals.collect())
            
            # Check best chromosome
            self.bestChromosome = individuals.takeOrdered(1, lambda x: x[1][1])
            print("\nBest chromosome ->:", self.bestChromosome)
            
            # Update evaluations number (add the number of chromosomes evaluated)
            self.evalAccum += individuals.count()*self.localIt
            
            # Recompute and assign the keys for the next iteration
            individuals = self.migration(individuals)
            self.migrationCount = self.migrationCount + 1
            print("\nMigration number:",self.migrationCount)
            print("\nNumber of Evaluations",self.evalAccum)
            
            # Check stopping criteria
            if(self.evalAccum >= self.diffEvo.maxEval):
                stop = True
                self.stoppingReason = 0
                print("\nThe algorithm stopped because maximum number of evaluations (",
                      self.diffEvo.maxEval, ") is reached: ",self.evalAccum)
            elif(self.get_best_score() <= self.diffEvo.goal + self._minimum):
                stop = True
                self.stoppingReason = 1
                print("\nThe algorithm stopped because the minimum was reached")
       
        # Returning the best chromosome
        print("\nBest chromosome:", self.get_best_chromosome(), "Score:", self.get_best_score())
        
        # Total number of evaluations performed
        return self.evalAccum
    
    
    def reset_counts(self):
        self.evalAccum = 0
        self.migrationCount = 0
        
        
    def get_best_chromosome(self):
        return "(" + ";".join(str(e) for e in self.bestChromosome[0][1][0]) + ")"
    
    
    def get_best_score(self):
        return self.bestChromosome[0][1][1]
    

    def migration(self, individuals):
        random.shuffle(self.partitions)
        return individuals.coalesce(1).mapPartitions(self.migrate)
        
        
    def migrate(self, iterator):
        update = []
        elements = list(iterator)
        for i in range(0, len(elements)):
            update.append((self.partitions[i], (elements[i][1][0], elements[i][1][1])))
        
        return iter(update)
        
        
    def island(self, iterator): 
        # Set iteration variables
        oldPop = list(iterator)
        localItaration=0
        stop = False
        
        # The loop
        while(not stop):
            newPop = []
            localItaration = localItaration + 1
            localEvaluations = 0 
            # For each chromosome in the population
            for i in range(0, len(oldPop)):
                targetPartition = oldPop[i][0]
                target = oldPop[i][1][0]
                curr_score = oldPop[i][1][1]
                
                # Execute mutation and crossover
                donor = self.diffEvo.mutation(oldPop, target)
                trial = self.diffEvo.crossover(target, donor)
                
                # Evaluate the new chromosome and if it is better substitute it
                new_score = self.diffEvo.func(trial)
                if(new_score < curr_score):
                    newPop.append((targetPartition,(trial,new_score)))
                else:
                    newPop.append((targetPartition,(target,curr_score)))
                localEvaluations = localEvaluations + 1
                
            #check stopping criteria
            if(localItaration == self.localIt):
                stop = True

            #substitute the population
            oldPop = newPop

        return iter(oldPop)   