In [2]:
%pylab inline
import random
from tqdm import tqdm_notebook
from numba import jit, njit

Populating the interactive namespace from numpy and matplotlib


In [3]:
seterr(all='raise')

{'divide': 'warn', 'over': 'warn', 'under': 'ignore', 'invalid': 'warn'}

In [4]:
def multiply(x, y):
    return x * y

def subtract(x, y):
    return x - y

def addition(x, y):
    return x + y

def safe_divide(x, y):
    try:
        return x / y
    except FloatingPointError:
        return maximum(x, y)
    except ZeroDivisionError:
        return maximum(x, y)
            
def euclidian(x, y):
    if len(x) != len(y):
        raise RuntimeError('Mismatched Shapes')
    s = 0
    for i, j in zip(x, y):
        s += (i-j)**2
    return s**0.5

def less_than(x, y):
    return int(x < y)
    
operators = ['multiply', 'subtract', 'addition', 'safe_divide', 'less_than']

In [5]:
import pandas as pd

In [6]:
import pandas as pd

X = pd.read_csv('../data/reg_inputs.csv')
y = loadtxt('../data/reg_targets.csv', delimiter=',', dtype=float).astype('object')

X = X.values[:, 1:].astype('object')
print(X.shape)

(1911, 29)


In [6]:
# Training set
I = permutation(len(t))
Xtr = X[I[:250]]
ttr = y[I[:250]]

Xte = X[I[-500:]]
tte = y[I[-500:]]

NameError: name 't' is not defined

In [None]:
y.shape, ttr.shape

In [None]:
 def memoize(f):
    f.cache = {}
    def decorated_function(*args):
        if args in f.cache:
            return f.cache[args]
        else:
            f.cache[args] = f(*args)
            return f.cache[args]
    return decorated_function

class Simulation:
    def __init__(self, mutation_rate, input_vars, max_depth, inputs, targets, operators, \
                 pop_size, trunc_ratio=0.5):
        self.mutation_rate = mutation_rate
        self.vars = input_vars
        self.max_depth = max_depth
        self.inputs = inputs
        self.targets = targets
        self.operators = operators
        self.pop_size = pop_size
        self.trunc_ratio = trunc_ratio
    
    def random_expression(self, depth):
        if depth == 1 or random.random() < 1/(2**depth-1):
            return random.choice((*self.vars, *[str(random.random())] * 10))
        else:
            return '(' + random.choice(operators) + '(' + \
                self.random_expression(depth - 1) + ',' + \
                self.random_expression(depth - 1) + '))'
        
    def random_function(self):
        re = self.random_expression(self.max_depth)

        rf = eval('lambda ' + ', '.join(self.vars) + ': ' + re)
        rf = memoize(rf)
        rf.geno = lambda: re
        return rf
    
    def crossover(self, p1, p2):
        k = random.random()
        offspring = lambda *x: k * p1(*x) + (1-k) * p2(*x)
        offspring = memoize(offspring) # add cache
        offspring.geno = lambda: '(('+ str(k) + '*' + p1.geno() + ') + ((1-' + str(k) + ')' +  p2.geno() + '))'
        return offspring
    
    def mutation(self, p):
        rm = self.random_function()
        rn = self.random_function()
        
        offspring = lambda *x: p(*x) + self.mutation\?olllllkkm_rate * (rm(*x) * p(*x) - rn(*x) * p(*x))
        offspring = memoize(offspring) # add cache
        offspring.geno = lambda: '((' + p.geno() + str(self.mutation_rate) + '*(' + rm.geno() + '*' + p.geno() + \
                            '-' + rn.geno() + '*' + p.geno() + ')))'
        return offspring
    
    def _grade_pop(self):
        return [ (self.fitness(ind), ind) for ind in self.pop ]
    
    def _sort_pop(self, graded_pop):
        return [ (ind[0], ind[1]) for ind in sorted(graded_pop, key=self._sorted_key) ]
    
    def _sorted_key(self, x):
        return x[0]
        
    def _get_parent_pop(self, parent_pop):
        return [p[1] for p in parent_pop]

    def _do_crossover_mutation(self, parent_pop):
        for i in range(self.pop_size - len(parent_pop)):
            parent = random.sample(parent_pop, 2)
            self.pop.append(self.mutation(self.crossover(parent[0][1], parent[1][1])))
               
    def _get_initial_pop(self):
        return [ self.random_function() for i in range(self.pop_size) ]
    
        
    def fitness(self, individual, X=None, t=None):
        if (X is None) or (t is None):
            X = self.inputs
            t = self.targets
            
        fit = 0
        for i, elements in enumerate(X):
            fit += abs(int(t[i]) - int(individual(*elements)))
        
        return min(fit, 7*len(X))
        
    def evolve(self, generations=10):
        self.pop = self._get_initial_pop()
    
        desc_template = 'G: {:.0f}. M: {:.0f}, A: {:.0f}, BE: {:.0f}'
        loop = tqdm_notebook(range(generations))
        
        self.best_fitness = inf
        self.best_fitness_func = None
        
        past_avg_fit = -1
        past_avg_fit_convergence_counter = 0
        
        avg_fitnesses = []
        
        for gen in loop:
            graded_pop = self._grade_pop()
            sorted_pop = self._sort_pop(graded_pop)
            
            if (sorted_pop[0][0] < self.best_fitness):
                self.best_fitness = sorted_pop[0][0]
                self.best_fitness_func = sorted_pop[0][1]
            
            avg_fit = mean([i[0] for i in sorted_pop])
            avg_fitnesses.append(avg_fit)
            loop.set_description(desc_template.format(gen, sorted_pop[0][0], avg_fit, self.best_fitness))

            parent_pop = sorted_pop[:int(self.trunc_ratio * self.pop_size)]
            
            if avg_fit == past_avg_fit:
                past_avg_fit_convergence_counter += 1
                if past_avg_fit_convergence_counter >= 10:
                    print('Early convergence detected.')
                    break
            else:
                past_avg_fit = avg_fit
                past_avg_fit_convergence_counter = 0


            parent_pop = self._get_parent_pop(parent_pop)
            for i in range(self.pop_size):
                p1, p2 = random.sample(parent_pop, 2)
                self.pop[i] = self.mutation(self.crossover(p1, p2))
#             self.pop = self._get_parent_pop(parent_pop)
#             self._do_crossover_mutation(parent_pop)

        return self.best_fitness_func, avg_fitnesses

In [None]:
class GPTree:
    def __init__(self, mutation_rate, input_vars, max_depth, operators, \
                 pop_size, trunc_ratio=0.5):
        self.mutation_rate = mutation_rate
        self.input_vars = input_vars
        self.max_depth = max_depth

        self.operators = operators
        self.pop_size = pop_size
        self.trunc_ratio = trunc_ratio
        
    def run(self, inputs, targets, generations):
        self.inputs = inputs
        self.targets = targets
        
        self.tree = [Simulation(mutation_rate=self.mutation_rate, input_vars=self.input_vars, \
                                targets=self.targets[:, i], inputs= self.inputs, \
                                max_depth=self.max_depth, operators=self.operators, pop_size=self.pop_size,\
                                trunc_ratio=self.trunc_ratio) for i in range(self.targets.shape[1])]
        
        self._evolve(generations)
            
    def _evolve(self, generations):
        for sim in self.tree:
            sim.evolve(generations)
            
    def predict(self, Xte):
        return array([[sim.best_fitness_func(*X) for sim in self.tree] for X in Xtr]).astype(int)

In [None]:
tree = GPTree(mutation_rate=0.2, input_vars=input_vars, max_depth=3, operators=operators, \
              pop_size=500, trunc_ratio=0.2)
tree.run(Xtr, ttr, 10)

In [None]:
array(tree.predict(Xte))[0], tte[0]

# time without jit: 25:57

# Test the best function

### Larger (250) training set

In [None]:
prob = 1
for i in range(4):
    prob *= 1/6
prob * 100

In [None]:
def get_accuracy(individual, Xte, tte):
    acc = zeros(len(Xte)).astype(bool)
    for i, (x, t) in enumerate(zip(Xte, tte.astype(int))):
#         print(individual(*x).astype(int), t)
        acc[i] = count_nonzero(individual(*x).astype(int) - t)
        
    return F'Accuracy: {(1 - count_nonzero(acc)/len(tte)) * 100}%.'
        
get_accuracy(best_func, Xte, tte)

In [None]:
print(f'Testing accuracy: {(1 - (sim.fitness(best_func, X=X, t=t))/len(t)) * 100:.2f}%')

### 100 sized training set

In [None]:
print(f'Testing accuracy: {(1 - (sim.fitness(best_func, X=Xte[:100], t=tte[:100]))/len(tte[:100])) * 100:.2f}%')

In [None]:
print(f'Testing accuracy: {(1 - (sim.fitness(best_func, X=X, t=t))/len(t)) * 100:.2f}%')

### These didn't use less_than

In [None]:
print(f'Testing accuracy: {(1 - (sim.fitness(best_func, X=Xte[:100], t=tte[:100]))/len(tte[:100])) * 100:.2f}%')

In [None]:
print(f'Testing accuracy: {(1 - (sim.fitness(best_func, X=X, t=t))/len(t)) * 100:.2f}%')