In [1]:
import random
import operator
import itertools
import numpy
import time
import sys
import math
from pyspark import SparkContext, SparkConf
from deap import algorithms
from deap import base
from deap import creator
from deap import tools
from deap import gp

# <p style="color:brown"> Spark configuration</p>

In [2]:
conf = SparkConf().setAppName("GP over Spark")
if not('sc' in globals()):
    sc = SparkContext(conf=conf)

In [3]:
print(sc.version)
print(sc.pythonVer)
print(sc.master)
print(str(sc.sparkHome))
print(str(sc.sparkUser()))
print(sc.appName)
print(sc.applicationId)
print(sc.defaultParallelism)
print(sc.defaultMinPartitions)
print(sys.version)

2.4.4
2.7
local[*]
None
hadoop
GP over Spark
local-1573940720471
8
2
2.7.13 (default, Sep 26 2018, 18:42:22) 
[GCC 6.3.0 20170516]


# Preparing RDDs

In [4]:
TrainingRDD = sc.textFile('hdfs://nodemaster:9000/user/hadoop/HIGGS_Training_Scaled_10500.csv').map(lambda line : [float(x) for x in line.split(',')]).cache()
TestRDD = sc.textFile('hdfs://nodemaster:9000/user/hadoop/HIGGS_Test_Scaled_500.csv').map(lambda line : [float(x) for x in line.split(',')]).cache()

#### Testing RDDs

In [5]:
print("Training dataset size:{}".format(TrainingRDD.count()))
print("Test dataset size:{}".format(TestRDD.count()))

Training dataset size:10500
Test dataset size:500


# <p style="color:green;">Defining GP parameters</p>

In [6]:
toolbox = base.Toolbox()
def sigmoid(x):
    return 1/(1+math.exp(-x))

def evalHiggsBase(individual):
    
    # Transform the tree expression in a callable function
    func = toolbox.compile(expr=individual)
    
    # Evaluate the sum of correctly identified as signal
    result = sum(TrainingRDD.map(lambda line: bool(sigmoid(func(*line[1:]))>0.5) is bool(line[0])).collect())
    
    return result,

# Define a new if-then-else function
def if_then_else(input, output1, output2):
    if input:
        return output1
    else:
        return output2

# Define a protected division function
def protectedDiv(left, right):
    try: 
        return left / right
    except ZeroDivisionError:
        return 1

# defined a new primitive set for strongly typed GP (28 float input attributes from Higgs dataset)
pset = gp.PrimitiveSetTyped("MAIN", itertools.repeat(float, 28), bool, "IN")

# Functions set

# boolean operators
pset.addPrimitive(operator.and_, [bool, bool], bool)
pset.addPrimitive(operator.or_, [bool, bool], bool)
pset.addPrimitive(operator.not_, [bool], bool)

# floating point operators
pset.addPrimitive(operator.add, [float, float], float)
pset.addPrimitive(operator.sub, [float, float], float)
pset.addPrimitive(operator.mul, [float, float], float)
pset.addPrimitive(protectedDiv, [float, float], float)

# logic operators
pset.addPrimitive(operator.lt, [float, float], bool)
pset.addPrimitive(operator.eq, [float, float], bool)
pset.addPrimitive(if_then_else, [bool, float, float], float)

# constants float in [0,1] and boolean constants False, True
pset.addEphemeralConstant("rand1", lambda: random.random(), float)
pset.addTerminal(False, bool)
pset.addTerminal(True, bool)

#Set the best fitness as the max fitness
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)

#other GP parameters : selection, mutation, ...
toolbox.register("expr", gp.genHalfAndHalf, pset=pset, min_=1, max_=3)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)
toolbox.register("evaluate", evalHiggsBase)
toolbox.register("select", tools.selTournament, tournsize=4)
toolbox.register("mate", gp.cxOnePoint)
toolbox.register("expr_mut", gp.genHalfAndHalf, min_=1, max_=3)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)
toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17))


# <p style="color:cyan;">Distribution over Spark</p>

In [7]:
# Evaluating a population on a single exemplar
def evalPop(individuals, line):
    return [bool(sigmoid(i(*[float(v) for v in line[1:]]))>0.5) is bool(float(line[0])) for i in individuals]

#redefinig the GP loop from DEAP to use Spark
def eaSimple(population, toolbox, cxpb, mutpb, ngen, stats=None, halloffame=None, verbose=__debug__):
    
    logbook = tools.Logbook()
    logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])

    #higgs_samp = TrainingRDD.sample(False,sampling_rate).cache()

    # Evaluate the individuals with an invalid fitness
    invalid_ind = [ind for ind in population if not ind.fitness.valid]
    #fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
    ii = [toolbox.compile(f) for f in invalid_ind]
    
    #results = higgs_samp.map(lambda line: evalPop(ii, line))
    results = TrainingRDD.map(lambda line: evalPop(ii, line))
    fitnesses = results.reduce(lambda v1,v2:list(map(operator.add,v1,v2)))
    fitnesses = [tuple([vf]) for vf in fitnesses]
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit

    if halloffame is not None:
        halloffame.update(population)

    record = stats.compile(population) if stats else {}
    logbook.record(gen=0, nevals=len(invalid_ind), **record)
    if verbose:
        print logbook.stream
    # Begin the generational process
    for gen in range(1, ngen + 1):
        # Generate new sample
        #higgs_samp = TrainingRDD.sample(False,sampling_rate).cache()
        
        # Select the next generation individuals
        offspring = toolbox.select(population, len(population))

        # Vary the pool of individuals
        offspring = algorithms.varAnd(offspring, toolbox, cxpb, mutpb)

        # Evaluate the current generation individuals
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        ii = [toolbox.compile(f) for f in invalid_ind]
        
        results = TrainingRDD.map(lambda line: evalPop(ii, line))
        fitnesses = results.reduce(lambda v1,v2:list(map(operator.add,v1,v2)))
        fitnesses = [tuple([vf]) for vf in fitnesses]
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        # Update the hall of fame with the generated individuals
        if halloffame is not None:
            halloffame.update(offspring)

        # Replace the current population by the offspring
        population[:] = offspring

        # Append the current generation statistics to the logbook
        record = stats.compile(population) if stats else {}
        logbook.record(gen=gen, nevals=len(invalid_ind), **record)
        if verbose:
            print logbook.stream

    return population, logbook

# Redefinition
algorithms.eaSimple=eaSimple

# Confusion Matrix 

In [8]:
def confusionMatrix(func,dataset):
    confusion_matrix = [[0.0,0.0],[0.0,0.0]]
    predictions=dataset.map(lambda line:[bool(sigmoid(func(*line[1:]))>0.5),bool(line[0])]).collect()
    for line in predictions:
        predicted = line[0]
        real = line[1]
        if(predicted == real and real):
            confusion_matrix[0][0]+=1
        elif(predicted == real and not(real)):
            confusion_matrix[1][1]+=1
        elif(predicted != real and real):
            confusion_matrix[1][0]+=1
        elif(predicted != real and not(real)):
            confusion_matrix[0][1]+=1
    return confusion_matrix   

# <p style="color:red">Main program</p>

In [9]:
def main():
    
    #logging to a file
    log=open('higgs.log', 'a')
    print("logs are record in file higgs.log")
    
    try:
        random.seed()
        hof = tools.HallOfFame(1)

        # Statistics
        stats = tools.Statistics(lambda ind: ind.fitness.values)
        stats.register("avg", numpy.mean)
        stats.register("std", numpy.std)
        stats.register("min", numpy.min)
        stats.register("max", numpy.max)

        #setting GP population size, mutation and crossover probabilities
        popSize=32
        crossover_prob = 0.9
        mutation_prob = 0.4
        nbGen = 5
        pop = toolbox.population(n=popSize)

        log.write('\nRun Started\n')
        startTime = time.time()
        p,lbook = algorithms.eaSimple(pop, toolbox, crossover_prob, mutation_prob, nbGen, stats, halloffame=hof, verbose=True)
        endTime = time.time()
        log.write(lbook.stream)
        log.write("\nLearning time : {} seconds".format(endTime-startTime))

        # Best individual
        expr = hof[0]

        # Best individual against full training set
        func = toolbox.compile(expr)
        confusion_matrix = confusionMatrix(func,TrainingRDD)
        resultTraining = confusion_matrix[0][0]+confusion_matrix[1][1]
        log.write("\n"+str(expr))
        log.write("\nfitness of best individual against total training set :{}/{}={}".format(resultTraining, numpy.sum(confusion_matrix), float(resultTraining) / numpy.sum(confusion_matrix)))
        cm = '\n'.join('\t'.join('%0.3f' %x for x in y) for y in confusion_matrix)
        log.write("\n"+cm)
        TP = confusion_matrix[0][0]
        TN = confusion_matrix[1][1]
        FP = confusion_matrix[0][1]
        FN = confusion_matrix[1][0]
        log.write("\naccuracy ={}, TPR={}, FPR={}".format((TP+TN)/(TP+TN+FP+FN),TP/(TP+FN),FP/(FP+TN)))

        # Best individual against test dataset
        confusion_matrix = confusionMatrix(func,TestRDD)
        resultTest = confusion_matrix[0][0]+confusion_matrix[1][1]

        log.write("\nfitness of best individual against test set :{}/{}={}".format(resultTest, numpy.sum(confusion_matrix), float(resultTest) / numpy.sum(confusion_matrix)))
        cm = '\n'.join('\t'.join('%0.3f' %x for x in y) for y in confusion_matrix)
        log.write("\n"+cm)
        TP = confusion_matrix[0][0]
        TN = confusion_matrix[1][1]
        FP = confusion_matrix[0][1]
        FN = confusion_matrix[1][0]
        log.write("\naccuracy ={}, TPR={}, FPR={}".format((TP+TN)/(TP+TN+FP+FN),TP/(TP+FN),FP/(FP+TN)))
        log.close()
        return pop, stats, hof
    except Exception as e:
        log.write("Exception !: {}".format(e))
        log.close()

In [10]:
if __name__ == "__main__":
    main()   

logs are record in file higgs.log
gen	nevals	avg    	std    	min 	max 
0  	32    	5114.97	240.121	4948	5572
1  	31    	5317.69	252.733	4955	5661
2  	31    	5328.59	244.336	4955	5583
3  	32    	5383.38	254.175	4949	5583
4  	30    	5451.75	216.01 	4955	5583
5  	27    	5437.94	230.904	4954	5641
