In [1]:
import joblib
import random
import numpy as np
import pandas as pd
import multiprocessing

from deap import base, creator, tools, algorithms

# 0) User input

In [2]:
Params_File = '00000000_Example1-Pput_Promoter-Activity_ML_SV3-Params.pkl'
Regressor_File = '00000000_Example1-Pput_Promoter-Activity_ML_SV3-Regressor.pkl'
Sequences_file = 'Example1-Pput.csv'

parallelEvaluation = False

# 1) Load models

In [3]:
# Unpickle regressor and scaler
myRegr = joblib.load(Regressor_File)
myParams = joblib.load(Params_File)
mySequences = pd.read_csv(Sequences_file, sep=';')

nNukleotides = myRegr.support_vectors_.shape[1] - 1

# 2) Define solver class

In [7]:
class GeneOptimizer():
    # Static variables
    creator_used = False
    
    def __init__(self, parallelevaluation=False, tournsize=3, mateindpb=0.5, mutindpb=0.05):
                
        self._toolbox = base.Toolbox()

        # Enable parallel fitness evaluation if specified
        if parallelEvaluation:
            pool = multiprocessing.Pool()
            self._toolbox.register("map", pool.map)
            
        ###################### Define individuals ##########################

        # Use static variable to prevent recreating FitnessMax and Individual
        if GeneOptimizer.creator_used == False:
            # Define type of fitness function (weight=-1 => minimization)
            creator.create("FitnessMax", base.Fitness, weights=(-1.0,))

            # Define container that represents individual (individual is a list and has the defined fitness)
            creator.create("Individual", list, fitness=creator.FitnessMax)
            
            GeneOptimizer.creator_used = True

        # Define how individual is created (individual object is filled with nPosition random integers that represent the
        # nukleotides)
        self._toolbox.register("attr_nukleotide", random.randint, 0, 3)
        #self._toolbox.register("attr_nukleotide", random.choice, ['A', 'C', 'G', 'T'])
        
        ###################### Set fucntions for GA steps ##########################

        # Set selection function (selTournament: randomly select tournsize individuals and select the best one as parent)
        # The selection function is later repeated n times in each generation to generate n parents 
        self._toolbox.register("select", tools.selTournament, tournsize=tournsize)

        # Set mating function ( cxUniform: takes two parents and transforms them into two childs by iterating over the
        # positions and swapping the nukleotides between the parents with a probability of indpb at each position)
        self._toolbox.register("mate", tools.cxUniform, indpb=mateindpb)

        # Set mutation function (mutUniformInt: mutate a child by iterating over its positions and assigning a new
        # nukleotide with probability indpb)
        self._toolbox.register("mutate", tools.mutUniformInt, low=0, up=3, indpb=mutindpb)
        
        ###################### Define statistics to be evaluated at each generation ##########################
        self._stats = tools.Statistics(lambda ind: ind.fitness.values)
        self._stats.register("avg", np.mean)
        self._stats.register("std", np.std)
        self._stats.register("min", np.min)
        self._stats.register("max", np.max)

    
    def _decode(self, individual):
        seq = list()
        for i in individual:
            if i == 0:
                seq += [1, 0, 0, 0]
            elif i == 1:
                seq += [0, 1, 0, 0]
            elif i == 2:
                seq += [0, 0, 1, 0]
            elif i == 3:
                seq += [0, 0, 0, 1]

        return seq


    def _evaluation(self, individual):
        seq = self._decode(individual)
        # Calculate the gc share and append it to the input
        gc_share =0
        for i in range(0,self._n_nukleotides,4):
            gc_share += seq[i+1] + seq[i+2]

        gc_share /= self._n_nukleotides

        # Calculate expression for the individual
        regressor_input = seq + [gc_share]
        expression = self._regr.predict([regressor_input])

        return expression[0]

    def _feasible(self, individual):
        # Check if individual belongns to known sequences
        if tuple(individual) in list(self._sequences['Sequence_short_encoded']):
            return False
        
        # Check if individual has desired expression level
        expression = self._evaluation(individual)
        if expression != self._target_expr:
            return False

        return True

    def _distance(self, individual):
        d = np.sum(
                np.not_equal(
                    np.array([individual]*5, dtype=int),
                    np.array(self._reference_sequences, dtype=int)
                )
            )    
        
        return (d,)
    
    def _setReferenceSequences(self, sequences, removed_positions):

        # Ensure that nukleotides are only encoded by upper case letters
        self._sequences['Sequence'] = self._sequences['Sequence'].str.upper()
        
        # Split sequence into its elements and delete the ones with too low variance
        sequences_split = np.array(list(self._sequences['Sequence'].apply(list)))
        sequences_short = np.delete(sequences_split, removed_positions, axis=1)
        
        # Apply encoding to nukleotides
        sequences_short[sequences_short == 'A'] = 0
        sequences_short[sequences_short == 'C'] = 1
        sequences_short[sequences_short == 'G'] = 2
        sequences_short[sequences_short == 'T'] = 3

        # Add encoded and shortened sequences to datframe and convert them to tuples 
        # to make them hashable (required for comparison later)
        self._sequences['Sequence_short_encoded'] = sequences_short.astype(str).tolist()
        self._sequences['Sequence_short_encoded'] = self._sequences['Sequence_short_encoded'].apply(tuple)

        # By shortening the sequences some may not be distinguishable anymore. Only the shortened sequence
        # with the highest expression is keeped (through sorting the instance with the highest expression
        # always comes first)
        self._sequences = self._sequences.sort_values('Promoter Activity', ascending=False)
        self._sequences = self._sequences.drop_duplicates('Sequence_short_encoded')

        # Store the 5 sequences with the highest expression
        self._reference_sequences = self._sequences['Sequence_short_encoded'].iloc[0:5].tolist()        
    
    def optimize(self, regr, sequences, removed_positions, n_nukleotides, target_expr=2, cxpb=0.5, mutpb=0.2, ngen=50, hof_size=1, n_pop=300):
        ###################### Set problem dependent variables and functions ##########################
        self._target_expr = target_expr
        self._regr = regr
        self._n_nukleotides = n_nukleotides
        self._sequences = sequences.copy()
        self._setReferenceSequences(sequences, removed_positions)
        n_postitions = int(self._n_nukleotides/4)

        # Define how an individual is created (a list of nuleotids
        self._toolbox.register("individual", tools.initRepeat, creator.Individual,
                               self._toolbox.attr_nukleotide, n_postitions)
        # Define how population is created (population is a list of individuals)
        self._toolbox.register("population", tools.initRepeat, list, self._toolbox.individual)
        
        # Set fitness function
        self._toolbox.register("evaluate", self._distance)
        # Add constraint handling ()
        self._toolbox.decorate("evaluate", tools.DeltaPenalty(self._feasible, 1000.0))


        ###################### Peform optimization ##########################

        # Create initial population
        pop = self._toolbox.population(n=n_pop)

        # Create hall of fame object that keeps track of the best individual
        hof = tools.HallOfFame(hof_size)

        # Perform GA
        # cxpb: probability that two parents mate (if they do they are discared and their child kept, otherwise they 
        #       are kept)
        # mutpb: probability that a child is mutated
        # ngen: number of generations(=iterations)
        pop, log = algorithms.eaSimple(pop, self._toolbox, cxpb=cxpb, mutpb=mutpb, ngen=ngen, 
                                       stats=self._stats, halloffame=hof, verbose=True)


        return [self._decode(seq) for seq in hof], [self._evaluation(seq) for seq in hof]

In [11]:
go = GeneOptimizer()
go.optimize(myRegr, mySequences, myParams['Positions_removed'], nNukleotides,)

gen	nevals	avg    	std    	min	max
0  	300   	56.1933	8.13691	29 	74 
1  	177   	49.35  	7.55298	23 	69 
2  	174   	43.4733	7.20805	24 	67 
3  	173   	37.5567	6.52228	20 	65 
4  	205   	42.2267	96.4648	18 	1000
5  	181   	47.0467	136.276	13 	1000
6  	172   	62.51  	191.429	8  	1000
7  	192   	62.76  	199.552	5  	1000
8  	174   	46.06  	167.831	5  	1000
9  	187   	49.55  	185.484	5  	1000
10 	185   	46.6467	186.036	5  	1000
11 	179   	57.73  	216.193	5  	1000
12 	171   	29.8467	149.976	5  	1000
13 	180   	12.4033	80.9426	5  	1000
14 	191   	12.01  	80.9515	5  	1000
15 	183   	12.1667	80.9535	5  	1000
16 	183   	8.69667	57.3507	5  	1000
17 	171   	8.71667	57.3492	5  	1000
18 	186   	15.5833	98.9591	5  	1000
19 	182   	12.2867	80.9505	5  	1000
20 	167   	5.36667	1.64283	5  	20  
21 	191   	22.0767	127.327	5  	1000
22 	188   	12.2833	80.9457	5  	1000
23 	179   	12.0567	80.9509	5  	1000
24 	184   	11.95  	80.9575	5  	1000
25 	176   	8.63333	57.351 	5  	1000
26 	157   	11.9633	80.9577	5  	10

([[0,
   0,
   0,
   1,
   0,
   0,
   0,
   1,
   1,
   0,
   0,
   0,
   0,
   0,
   1,
   0,
   0,
   1,
   0,
   0,
   0,
   1,
   0,
   0,
   0,
   1,
   0,
   0,
   0,
   1,
   0,
   0,
   0,
   0,
   1,
   0,
   0,
   1,
   0,
   0,
   1,
   0,
   0,
   0,
   0,
   0,
   1,
   0,
   1,
   0,
   0,
   0,
   1,
   0,
   0,
   0,
   1,
   0,
   0,
   0]],
 [2])