In [1]:
# %%pycodestyle


In [2]:

from deap import base
from deap import creator
from deap import tools

from PIL import Image
from PIL import ImageChops
import numpy as np
import math
import operator

from deap import base, creator, tools, algorithms
from PIL import Image, ImageDraw
from functools import partial
from math import sqrt
import numpy
import random
import pickle
import pandas as pd
import os.path
from functools import reduce
from operator import add, itemgetter






In [3]:

PIC = Image.open('template_big.jpg')
SIZE = 1000
VALUES_PER_GENE = 13

NUMBER_OF_TRIANGLES = 100
POPULATION = 5
NUM_ISLANDS = 1
NGEN = 5
MIG_RATE = 50
CXPB = .5
MUTPB = .2

FREQ = 5

NUM_OF_DRAW_OPTIONS = len(["circle", "triangle", "line", "rectangle"])
MAX_CIR_RADIUS = int(SIZE * .1)
LINE_LENGTH = int(SIZE * .15)
LINE_WIDTH = int(SIZE*.01)


In [4]:


def computeSimilarity(allInds):
    numpyInds = numpy.array(allInds)
    totalPop = len(allInds)
    groupedGeneShape = (NUMBER_OF_TRIANGLES*VALUES_PER_GENE, totalPop)
    groupedGenes = np.reshape(numpyInds.ravel(order='F'), groupedGeneShape)
    numpy.ndarray.sort(groupedGenes, axis=1)

    diffs = numpy.apply_along_axis(np.diff, 1, groupedGenes)

    multipliers = [i * (totalPop-i) for i in range(totalPop)]
    multipliers = multipliers[1:]
    
    scaledDiffs = numpy.apply_along_axis(lambda x: x*multipliers, 1, diffs)
    numOfCombos = (totalPop*(totalPop-1)/2)
    averageDiffInGene = numpy.apply_along_axis(lambda x: sum(x)/numOfCombos, 1, scaledDiffs)

    averageDiffAllGenes = np.mean(averageDiffInGene)
    
    return 1-numpy.mean(averageDiffAllGenes)

In [5]:


def gen_one_triangle():
    return [random.random() for i in range(VALUES_PER_GENE)]

In [6]:


def drawImage(triangles):
    a = random.randint(0,100)
    print("starting a: " + str(a))
    im = Image.new('RGB', (SIZE, SIZE), (255, 255, 255))
    for tri in triangles[1:]:
        mask = Image.new('RGBA', (SIZE, SIZE))
        draw = ImageDraw.Draw(mask)

        drawShape = int(tri[0] * NUM_OF_DRAW_OPTIONS)
        x1 = int(tri[1] * SIZE)
        y1 = int(tri[2] * SIZE)
        x2 = int(tri[3] * SIZE)
        y2 = int(tri[4] * SIZE)
        x3 = int(tri[5] * SIZE)
        y3 = int(tri[6] * SIZE)
        r = int(tri[7] * 255)
        g = int(tri[8] * 255)
        b = int(tri[9] * 255)
        alpha = int(tri[10] * 40)
        radius = int(tri[11] * MAX_CIR_RADIUS)
        direction = math.radians(int(tri[12]*360))

        fillColor = (r, g, b, alpha)

        # triangle
        if drawShape == 0:
            triangle = ((x1, y1), (x2, y2), (x3, y3))
            fillColor = (r, g, b, alpha)
            draw.polygon(triangle, fill=fillColor)

        # circle
        if drawShape == 1:
            lowerX, upperX = x1-radius, x1+radius
            lowerY, upperY = y1-radius, y1+radius
            fillColor = (r, g, b, alpha)
            draw.ellipse((lowerX, lowerY, upperX, upperY), fill=fillColor)

        # line
        if drawShape == 2:
            x2 = x1 + math.cos(direction) * LINE_LENGTH
            y2 = y1 + math.sin(direction) * LINE_LENGTH
            draw.line([x1, y1, x2, y2], width=LINE_WIDTH, fill=fillColor)

        # rectangle
        if drawShape == 3:
            draw.rectangle([x1, y1, x2, y2], fill=fillColor)

        im.paste(mask, mask=mask)
        del mask, draw
        
    print("ending a: " + str(a))

    return im


In [7]:


def evaluate(im1, t2):
    im2 = drawImage(t2)

    h = ImageChops.difference(im1, im2).histogram()
    length = int(len(h)/3)

    r = h[length*0:length*1]
    g = h[length*1:length*2]
    b = h[length*2:length*3]

    h = [sum(x) for x in zip(r, g, b)]
    errSqrd = sum(h*(i**2) for i, h in enumerate(h)) / float(SIZE * SIZE * 3)
    err = math.sqrt(errSqrd) / 256
    return 1 - err,

In [8]:


def getAdaptiveMutationStats(mutationStats):
    weightOfTypesOfMutations = mutationStats[0:3]
    sumOfWeightOfTypesOfMutations = sum(weightOfTypesOfMutations)
    scaledWeightOfTypesOfMutations = [x / sumOfWeightOfTypesOfMutations for x in weightOfTypesOfMutations]
    roundsOfMutation = math.ceil(mutationStats[3]/mutationStats[4])
    
    return scaledWeightOfTypesOfMutations + [roundsOfMutation]

In [9]:


def computeMutationRatesStats_Random(allInds):
    firstTriangles = numpy.array(list(allInds))[:][0]
    mutationStats = numpy.apply_along_axis(getAdaptiveMutationStats, 1, firstTriangles)
    return numpy.average(mutationStats[:,0])

def computeMutationRatesStats_Small(allInds):
    firstTriangles = numpy.array(list(allInds))[:][0]
    mutationStats = numpy.apply_along_axis(getAdaptiveMutationStats, 1, firstTriangles)
    return numpy.average(mutationStats[:,1])

def computeMutationRatesStats_Big(allInds):
    firstTriangles = numpy.array(list(allInds))[:][0]
    mutationStats = numpy.apply_along_axis(getAdaptiveMutationStats, 1, firstTriangles)
    return numpy.average(mutationStats[:,2])

def computeMutationRatesStats_Rounds(allInds):
    firstTriangles = numpy.array(list(allInds))[:][0]
    mutationStats = numpy.apply_along_axis(getAdaptiveMutationStats, 1, firstTriangles)
    return numpy.average(mutationStats[:,3])

In [10]:


def mutate(triangles):
    adaptiveMutationStats = triangles[0]
    
    # types of mutation 
    # 1 Random float
    # 2 Update by .5 - 1.5
    # 3 Update by .95 - 1.05
    
    mutationStats = getAdaptiveMutationStats(adaptiveMutationStats[0:5])
    
    scaledWeightOfTypesOfMutations = mutationStats[0:3]
    roundsOfMutation = mutationStats[3]
    
    for mutationRounds in range(roundsOfMutation):
        typeOfMutation = numpy.random.choice(range(3),p=scaledWeightOfTypesOfMutations)
        
        triangleIndexToMutate = random.randint(0, len(triangles)-1)
        triangleToMutate = triangles[triangleIndexToMutate]
        valueIndexToMutate = random.randint(0, len(triangleToMutate)-1)
        newValue = triangleToMutate[valueIndexToMutate]
        
        if typeOfMutation == 0:
            newValue = random.random()
            
        if typeOfMutation == 1:
            mutateMult = random.uniform(.5,1.5)
            newValue = newValue * mutateMult
        
        if typeOfMutation == 2:
            mutateMult = random.uniform(.95,1.05)
            newValue = newValue * mutateMult
            
        # range on possible new value
        newValue = max(0,newValue)
        newValue = min(1,newValue)
            
        triangles[triangleIndexToMutate][valueIndexToMutate] = newValue

    return triangles,

In [11]:


def logbookToDataframe(logbook):
    chapter_keys = logbook.chapters.keys()
    sub_chaper_keys = [c[0].keys() for c in logbook.chapters.values()]

    columns = reduce(add, [["_".join([x, y]) for y in s] 
                           for x, s in zip(chapter_keys, sub_chaper_keys)])

    df = pd.DataFrame(columns=columns)

    for chapter in logbook.chapters:
        for subchapter in logbook.chapters[chapter][0].keys():
            colName = "_".join([chapter,subchapter])
            values = logbook.chapters[chapter].select(subchapter)
            df[colName] = values
        
    return df

In [12]:
def getFitnesses(pop):
    fit = []
    for ind in pop:
        fit.append(ind.fitness.values)
    return fit

In [13]:
def evaluateAllFitnesses(pop):
    # Evaluate the individuals with an invalid fitness
    invalid_ind = [ind for ind in pop if not ind.fitness.valid]
    fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit

In [14]:


creator.create("Fitness", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.Fitness)

toolbox = base.Toolbox()
toolbox.register("attr", gen_one_triangle)
toolbox.register("individual", tools.initRepeat,
                 creator.Individual, toolbox.attr, NUMBER_OF_TRIANGLES)
toolbox.register("population", tools.initRepeat,
                 list, toolbox.individual)

toolbox.register("evaluate", partial(evaluate, PIC))
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", mutate)
toolbox.register("select", tools.selTournament, tournsize=3)
#toolbox.register("select", tools.selRoulette)


toolbox.register("algorithm", algorithms.varAnd, toolbox=toolbox, cxpb=CXPB, mutpb=MUTPB)


logbook = None

NameError: name 'multiprocessing' is not defined

In [None]:

def main(checkpoint=None):
    if checkpoint and os.path.isfile(checkpoint):
        # A file name has been given, then load the data from the file
        with open(checkpoint, "rb") as cp_file:
            cp = pickle.load(cp_file)
        islands = cp["islands"]
        start_gen = cp["generation"]
        hof = cp["halloffame"]
        logbook = cp["logbook"]
        random.setstate(cp["rndstate"])
    else:
        # Start a new evolution
        islands = [toolbox.population(n=POPULATION) for i in range(NUM_ISLANDS)]
        start_gen = 0
        hof = tools.HallOfFame(maxsize=1)
        logbook = tools.Logbook()

    stats_fit = tools.Statistics(lambda ind: ind.fitness.values)
    stats_pop_trends = tools.Statistics(lambda ind: ind)
    stats_fit.register("std", numpy.std)
    stats_fit.register("max", numpy.max)
    stats_fit.register("avg", numpy.mean)
    stats_fit.register("min", numpy.min)
    stats_pop_trends.register("similarity", computeSimilarity)
    stats_pop_trends.register("mutation_random", computeMutationRatesStats_Random)
    stats_pop_trends.register("mutation_small", computeMutationRatesStats_Small)
    stats_pop_trends.register("mutation_large", computeMutationRatesStats_Big)
    stats_pop_trends.register("mutation_rounds", computeMutationRatesStats_Rounds)

    mstats = tools.MultiStatistics(fitness=stats_fit, stats_pop_trends=stats_pop_trends)

    log = ""
    try:

        for gen in range(start_gen, NGEN):
            totalPop = []
            

            for islandIndex, island in enumerate(islands):
                evaluateAllFitnesses(island)
                islandPop = toolbox.select(island, k=len(island))
                islandPop = algorithms.varAnd(islandPop, toolbox, cxpb=CXPB, mutpb=MUTPB)
                islands[islandIndex] = islandPop
                
                totalPop = totalPop + islandPop
            
            if gen % MIG_RATE == 0:
                migRingOrder = list(range(NUM_ISLANDS))
            
                random.shuffle(migRingOrder)
                tools.migRing(islands, k=1, selection=tools.selRandom, migarray=migRingOrder)
                
            totalPop = list(map(toolbox.clone, totalPop))
                
            # Evaluate the individuals with an invalid fitness
            invalid_ind = [ind for ind in totalPop if not ind.fitness.valid]
            fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
            for ind, fit in zip(invalid_ind, fitnesses):
                ind.fitness.values = fit
                
            hof.update(totalPop)
            record = mstats.compile(totalPop)
            logbook.record(gen=gen, evals=len(invalid_ind), **record)
            print(logbook.stream)

            if gen % FREQ == 0:
                # Fill the dictionary using the dict(key=value[, ...])
                # constructor
                cp = dict(islands=islands, generation=gen, halloffame=hof,
                          logbook=logbook, rndstate=random.getstate())

                with open("checkpoint.pkl", "wb") as cp_file:
                    pickle.dump(cp, cp_file)
                    
                    df_log = logbookToDataframe(logbook)
                    df_log.to_csv('stats.txt', index=False)
                    open('result.txt', 'w').write(repr(hof[0]))
                    drawImage(hof[0]).save('result.png')

    finally:
        df_log = logbookToDataframe(logbook)
        df_log.to_csv('stats.txt', index=False)
        open('result.txt', 'w').write(repr(hof[0]))
        a = drawImage(hof[0])
        drawImage(hof[0]).save('result.png')
    return logbook

if __name__ == '__main__':
    main("checkpoint.pkl")
    # main()

In [None]:

#main()
#import cProfile
#cProfile.run('main("checkpoint.pkl")')
#cProfile.run('main()')
