In [1]:
import numpy as np
import scipy as sp
from  scipy import ndimage, ndarray
import skimage.measure
from sklearn import metrics
from deap import algorithms, base, creator, tools, gp
import random, operator, math
from enum import Enum
import matplotlib.pyplot as plt
import cv2
import os
from glob import glob
import pygraphviz as pgv
from autograd import grad
import datetime
from inspect import isclass
import sys

# Parameter settings
Various custom parameter settings for the program

In [2]:
# Reproducability
seed = 1
random.seed(seed)
np.random.seed(seed)

# Which data to use
data_directory = "data/"
dataset_name = "jaffe"
training_split = 0.8 # 80% of the data for training

# For Convolutions
pooling_size = 2 # Use 2x2 pooling
filter_size = 3 # Use 3x3 filters

# For GP
tourn_size = 7
num_best = 3
pop_size = 1024
crs_rate = 0.75
mut_rate = 0.2
# Reproduction rate is automatically set to 1-crs_rate-mut_rate
generations = 50

# Read in the data for training/testing
1. Read the images from disk
2. Save these in a dict from label -> images
3. Split these based off label into training/testing images

Need to do it in this order to ensure we get an equal split of instances from each class in the data, since classification accuracy is used as fitness this is important.

In [3]:
# Reads in all the data as a dict from label -> [images]
def read_data(directory):
    data = {}

    # Assumes the images are in subfolders, where the folder name is the images label
    for subdir in glob(directory+"/*/"):
        label = subdir.split("/")[-2] # Second to last element is thee class/sub folder name
        images = [ndimage.imread(image) for image in glob(subdir+"/*.*")] # Read in all the images from subdirectories
        # Shuffle the images (seed specified at the top of program so this will be reproducable)
        random.shuffle(images)
        data[label] = images
        
    # Set of all class labels
    labels = list(data.keys())

    # Sanity check
    if len(labels) != 2:
        print("Binary classification only! But labels found were:", labels)
    
    return data, labels

# Splits the data into two arrays (training and testing) of (label, image) pairs
def format_and_split_data(data, labels, split):
    training = []
    testing = []
    
    # For all the classes, split into training/testing (need to do it per class to ensure we get a good split of all classes)
    for label in labels:
        pairs = [(label, image) for image in data[label]]
        length = int(len(pairs) * split)
        training.extend(pairs[:length])
        testing.extend(pairs[length:])
        
    random.shuffle(training)
    random.shuffle(testing)
    
    return training, testing
        
# Read and split data into training and testing    
data, labels = read_data(data_directory+dataset_name)
training_data, testing_data = format_and_split_data(data, labels, training_split)

# We now have two lists of (label, image) pairs, one for training and one for testing

# Setup GP

Below the function and terminal set are defined. Strongly typed
GP is used to enforce a tiered structure.

In [4]:
# Use a custom class to represent an image, really just a wrapper around ndarray for more explicit typing
class Image:
    def __init__(self, pixels):
        self.pixels = pixels
        
    def __str__(self):
        return "Image("+repr(self.pixels)+")"
    
    __repr__ = __str__
        
# Program Takes in a single image as input. Outputs a float which is then used for classification by thresholding
pset = gp.PrimitiveSetTyped("MAIN", [Image], float)

In [5]:
# ================
# Terminal set
# Three tiers - Convolution, Aggregation and Classification
# ================

# Image is a terminal for Convolution and aggregation tier, but this is defined above (when defining pset)

# Add a random float between -1 and 1 (can be used in classification tier)
pset.addEphemeralConstant("Random", lambda: random.uniform(-1, 1), float)

# ------------------
# Convolution Tier
# ------------------
# Only specific terminal here is the filter/kernel to apply. 

# Kernel/filters, begin with a list of random values. This will be treated as a filter_size x filter_size array when applying
pset.addEphemeralConstant("Kernel", lambda: [random.randint(-3, 3) for _ in range(filter_size * filter_size)] , list)


# ------------------
# Aggregation Tier
# ------------------
# Four possible aggregation window shapes
shapes = ["Ellipse", "Rectangle", "Row", "Column"]
pset.addEphemeralConstant("Shape", lambda: random.choice(shapes), str)


# For x, y, width and height of window. Used as a percentage of width/height of image 
pset.addEphemeralConstant("Pos", lambda: random.randint(5, 90), int)

Code below is for performing aggregation over regions of the image for the aggregation functions (i.e. the aggregation tier).

In [6]:
# Extract the pixel values from an ellipse shape cut in the image
def extract_ellipse(pixels, center_x, center_y, agg_width, agg_height):   
    # Create an empty (black = 0) nparray same shape as the image, then make a ellipse cut in this array (=1)
    mask = np.zeros_like(pixels)
    mask = cv2.ellipse(mask, center=(center_x, center_y), axes=(agg_width, agg_height), angle=0, startAngle=0, endAngle=360, color=(1), thickness=-1)

    # Apply the cutout to original image, only caring about the non zero parts of mask (as these are the pixels in the ellipse)
    return pixels[mask > 0]

def extract_window(image, shape, x, y, w, h):
    pixels = image.pixels
    dimensions = pixels.shape
    
    # Image is in row, col order
    img_width = dimensions[1]
    img_height = dimensions[0]
    
    # Convert to integers so we can use for indexing
    x_start = int(x * img_width)
    y_start = int(y * img_height)
    agg_width = int(w  * img_width)
    agg_height = int(h * img_height)
    
    # Ensure we are within the images bounds
    x_end = min(x_start + agg_width, img_width)
    y_end = min(y_start + agg_height, img_height)
    
    values = None
    
    # Need to extract different regions based off the shape
    if shape == "Rectangle":
        values = pixels[y_start:y_end, x_start:x_end]
    elif shape == "Column":
        values = pixels[y_start:y_end, x_start:min(x_start+1, img_width)]
    elif shape == "Row":
        values = pixels[y_start:min(y_start+1, img_height), x_start:x_end]
    elif shape == "Ellipse":
        values = extract_ellipse(pixels, x_start, y_start, agg_width, agg_height)
    else:
        print("Shape not found!!")
    
    # Convert to a 1d array
    if values is not None:
        values.flatten()
        
    return values

In [7]:
# ================
# Function set
# Three tiers - Convolution, Aggregation and Classification
# ================

# ------------------
# Convolution Tier
# ------------------

# Convolve takes an image and a filter to apply, and returns the convolved image. Uses zero padding for the convolution
def convolve(image, kernel_values):
    # Currently kernel is a list but we want a 2d array
    kernel = np.asarray(kernel_values).reshape((filter_size, filter_size))
    # Use the constant mode, and padding value of zero to deal with edges of image
    return Image(ndimage.filters.convolve(image.pixels, kernel, mode='constant', cval=0.0))
    
pset.addPrimitive(convolve, [Image, list], Image, name="Convolution")

# Pooling takes an image, and returns a subsampled version. Uses max pooling of the specified size
def pooling(image):
    return Image(skimage.measure.block_reduce(image.pixels, (pooling_size, pooling_size), np.max))
    
pset.addPrimitive(pooling, [Image], Image, name="Pooling")

# ------------------
# Aggregation Tier
# ------------------

# Take a shape/region of the image and apply the given function to this region
def agg(fn, image, shape, x, y, width, height):
    # They are integers, treat them as floats instead when passing through
    window = extract_window(image, shape, x/100.0, y/100.0, width/100.0, height/100.0)
    return fn(window) if window.size > 0 else 0

# Pass all the arguments and appropriate statistical operator on to the agg function above
# The inputs correspond to: Image, Shape, X, Y, Width, Height
pset.addPrimitive(lambda *args: agg(np.min, *args), [Image, str, int, int, int, int], float, name="aggmin")
pset.addPrimitive(lambda *args: agg(np.mean, *args), [Image, str, int, int, int, int], float, name="aggmean")
pset.addPrimitive(lambda *args: agg(np.max, *args), [Image, str, int, int, int, int], float, name="aggmax")
pset.addPrimitive(lambda *args: agg(np.std, *args), [Image, str, int, int, int, int], float, name="aggstd")

# ------------------
# Classification Tier
# ------------------
def protectedDiv(num, den):
    try:
        return num / den
    except ZeroDivisionError:
        return 0
    
# Use the basic arithmetic operators
pset.addPrimitive(operator.add, [float, float], float)
pset.addPrimitive(operator.sub, [float, float], float)
pset.addPrimitive(operator.mul, [float, float], float)
pset.addPrimitive(protectedDiv, [float, float], float, name="div")

### Fitness Function
Use classification accuracy

In [8]:
# How should fitness of an individual be determined? In this case use classification accuracy
def fitness_function(individual, data, label_set):            
    # Transform the tree expression in a callable function
    output = toolbox.compile(expr=individual)
    
    # Any arbitrary threshold can be used, zero is nice as it means negatives = class1, non negatives = class2
    threshold = 0
    
    real_labels = []
    predicted_labels = []
    
    for (real_label, image) in data:
        real_labels.append(real_label)
            
        out = output(Image(image))
        if out > threshold:
            predicted_labels.append(label_set[0])
        else:
            predicted_labels.append(label_set[1])
    
    real_labels = np.asarray(real_labels)
    predicted_labels = np.asarray(predicted_labels)

    # Count of elementwise matches
    classification_accuracy = metrics.accuracy_score(real_labels, predicted_labels)

    # Deap requires multiple values be returned, so the comma is important!
    return classification_accuracy, 

In [9]:
# DEAP has some issues with strongly typed GP and tree generation (see here: https://groups.google.com/forum/#!searchin/deap-users/stgp/deap-users/YOeb65eRNG4/AYUMcNldhdwJ and here: https://groups.google.com/forum/#!msg/deap-users/adq50--lzJ4/hefHPJKpBQAJ )
# The following code replaces some of the code in DEAP, to make this work. 

# The block off code below be ignored, essentially its a workaround to stop the need for "identity nodes" which bloat the trees, to fix
# the issue of strongly typed trees not being able to be generated in particular circumstances (i.e. full method but cant happen with current types)

# genHalfAndHalf, genFull and genGrow copied directly from deap (https://github.com/DEAP/deap/blob/master/deap/gp.py)
def genFull(pset, min_, max_, type_=None):
    def condition(height, depth):
        """Expression generation stops when the depth is equal to height."""
        return depth == height
    return generate(pset, min_, max_, condition, type_)

def genGrow(pset, min_, max_, type_=None):
    def condition(height, depth):
        """Expression generation stops when the depth is equal to height
        or when it is randomly determined that a a node should be a terminal.
        """
        return depth == height or \
            (depth >= min_ and random.random() < pset.terminalRatio)
    return generate(pset, min_, max_, condition, type_)


def genHalfAndHalf(pset, min_, max_, type_=None):
    method = random.choice((genGrow, genFull))
    return method(pset, min_, max_, type_)

# Small change made to method below from DEAP version. If you try and add a primtiive, but none of the appropriate type
# is available, then try add a terminal instead.
def generate(pset, min_, max_, condition, type_=None):
    """Generate a Tree as a list of list. The tree is build
    from the root to the leaves, and it stop growing when the
    condition is fulfilled.
    :param pset: Primitive set from which primitives are selected.
    :param min_: Minimum height of the produced trees.
    :param max_: Maximum Height of the produced trees.
    :param condition: The condition is a function that takes two arguments,
                      the height of the tree to build and the current
                      depth in the tree.
    :param type_: The type that should return the tree when called, when
                  :obj:`None` (default) the type of :pset: (pset.ret)
                  is assumed.
    :returns: A grown tree with leaves at possibly different depths
              dependending on the condition function.
    """
    if type_ is None:
        type_ = pset.ret
    expr = []
    height = random.randint(min_, max_)
    stack = [(0, type_)]
    while len(stack) != 0:
        depth, type_ = stack.pop()
        
        # If we are at the end of a branch, add a terminal
        if condition(height, depth):
            term = add_terminal(pset, type_)
            expr.append(term)
            
        # Otherwise add a terminal    
        else:
            try:
                prim = random.choice(pset.primitives[type_])
                expr.append(prim)
                for arg in reversed(prim.args):
                    stack.append((depth + 1, arg))
            except IndexError: 
                # This is where the change occurs, if no primitive is available, try and add a terminal instead
                term = add_terminal(pset, type_)
                expr.append(term)
            
    return expr

def add_terminal(pset, type_):
    try:
        term = random.choice(pset.terminals[type_])
    except IndexError:
        _, _, traceback = sys.exc_info()
        raise IndexError("The custom generate function tried to add "\
                          "a terminal of type '%s', but there is "\
                          "none available." % (type_,), traceback)
    if isclass(term):
        term = term()
    
    return term

In [10]:
# This a maximization problem, as fitness is classification accuracy (higher the better)
creator.create("FitnessMax", base.Fitness, weights=(1.0,))

# Individuals in the population should be represented as tree structures (standard GP)
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)

toolbox = base.Toolbox()

# Ramped Half and half generation (full and grow
toolbox.register("expr", genHalfAndHalf, pset=pset, min_=2, max_=5)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)

# Tournament size
toolbox.register("select", tools.selTournament, tournsize=tourn_size)
toolbox.register("mate", gp.cxOnePoint)
toolbox.register("expr_mut", genFull, min_=0, max_=2)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)

# Max tree heights for crossover and mutation
toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=10))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=10))

# Need to say how evaluation should be performed, in this case its passing the training data to the defined fitness function
toolbox.register("evaluate", fitness_function, data=training_data, label_set=labels)

In [11]:
# Data to track per generation. Track the min, mean, std, max of the fitness and tree sizes 
stats_fit = tools.Statistics(lambda ind: ind.fitness.values)
stats_size = tools.Statistics(len)

mstats = tools.MultiStatistics(fitness=stats_fit, size=stats_size)
mstats.register("avg", np.mean)
mstats.register("std", np.std)
mstats.register("min", np.min)
mstats.register("max", np.max)

# Run the training procedure

In [12]:
#  To Plot/draw the resulting trees
def plot(tree):
    nodes, edges, labels = gp.graph(tree)

    g = pgv.AGraph()
    g.add_nodes_from(nodes)
    g.add_edges_from(edges)
    g.layout(prog="dot")

    for i in nodes:
        n = g.get_node(i)
        n.attr["label"] = labels[i]

    g.draw("trees/"+dataset_name+".png")

In [13]:
pop = toolbox.population(n=pop_size)
hof = tools.HallOfFame(num_best)

pop, log = algorithms.eaSimple(pop, toolbox, crs_rate, mut_rate, generations, stats=mstats, halloffame=hof, verbose=True)



   	      	                 fitness                 	              size             
   	      	-----------------------------------------	-------------------------------
gen	nevals	avg     	max   	min 	std      	avg    	max	min	std    
0  	1024  	0.500854	0.6875	0.25	0.0183944	16.5547	86 	7  	11.2454
1  	818   	0.509135	0.729167	0.3125	0.0320411	17.5664	87 	1  	11.9591
2  	828   	0.534709	0.729167	0.3125	0.0512373	20.1855	94 	1  	14.231 




3  	821   	0.580526	0.791667	0.3125	0.074576 	20.7871	69 	1  	14.4078
4  	835   	0.633931	0.8125  	0.270833	0.0841099	27.3418	66 	1  	11.8299
5  	841   	0.641907	0.833333	0.208333	0.0977791	28.334 	61 	1  	10.3763
6  	823   	0.662292	0.895833	0.3125  	0.114553 	24.1348	61 	1  	9.82003
7  	813   	0.685201	0.895833	0.333333	0.130307 	20.6758	45 	1  	8.30888
8  	812   	0.706034	0.9375  	0.291667	0.141808 	21.6973	53 	1  	8.59018
9  	826   	0.736328	0.958333	0.3125  	0.155996 	19.9209	45 	1  	6.7841 
10 	845   	0.773885	0.958333	0.125   	0.161647 	17.3555	41 	1  	2.85568
11 	809   	0.795797	0.979167	0.208333	0.165332 	16.9277	35 	1  	2.04745
12 	801   	0.812622	1       	0.166667	0.166997 	16.9326	26 	7  	1.74339
13 	826   	0.81547 	1       	0.166667	0.178111 	16.7734	24 	1  	2.19851
14 	788   	0.849955	1       	0.25    	0.180808 	16.7061	26 	1  	2.12374
15 	836   	0.864766	1       	0.3125  	0.194741 	15.7959	22 	1  	2.10523
16 	798   	0.882629	1       	0.291667	0.196586 	15.0371	23 	1  	1.

# Test the evolved solutions

In [14]:
# Extract the fittest individual, and plot to a file
best = hof[0]
plot(best)

In [15]:
testing_accuracy = fitness_function(best, testing_data, labels)[0]
print(testing_accuracy)

0.916666666667
