In [2]:
!pip install platypus-opt

Collecting platypus-opt
  Downloading Platypus_Opt-1.0.4-py3-none-any.whl (70 kB)
[K     |████████████████████████████████| 70 kB 3.6 MB/s 
Installing collected packages: platypus-opt
Successfully installed platypus-opt-1.0.4


In [3]:
import random, copy
from platypus import Type, Mutation, Variator

class Genome(Type):

  def __init__(self, codon_size, max_codon_value):
    super(Genome, self).__init__()
    self.codon_size = codon_size
    self.max_codon_value = max_codon_value
    
  def rand(self):
    return [random.randint(0, self.max_codon_value) for i in range(self.codon_size)]

print(Genome(250, 100000))

#problem = Problem(1, 2)
#problem.types[0] = Genome(250, 100000)
#problem.directions[0] = Problem.MAXIMIZE
#problem.directions[1] = Problem.MAXIMIZE
#problem.function = evaluate


<__main__.Genome object at 0x7f5ab95b3bd0>


In [None]:
import random, copy
from platypus import Type, Mutation, Variator

class Genome(Type):

  def __init__(self, codon_size, max_codon_value):
    super(Genome, self).__init__()
    self.codon_size = codon_size
    self.max_codon_value = max_codon_value
    
  def rand(self):
    return [random.randint(0, self.max_codon_value) for i in range(self.codon_size)]





class GenomeUniformMutation(Mutation):
  
  def __init__(self, probability = 1.0):
    super(GenomeUniformMutation, self).__init__()
    self.probability = probability

  def mutate(self, parent):
    child = copy.deepcopy(parent)
    problem = child.problem
    probability = self.probability
        
    for i in range(len(child.variables)):
      if isinstance(problem.types[i], Genome):
        variable = child.variables[i]
        for j, val in enumerate(variable):
          if random.random() <= probability:
            variable[j] = random.randint(0, problem.types[i].max_codon_value)
        child.variables[i] = variable
        child.evaluated = False
    
    return child

class GenomeSinglePointCrossover(Variator):

  def __init__(self, probability = 1.0):
    super(GenomeSinglePointCrossover, self).__init__(2)
    self.probability = probability

  def evolve(self, parents):
    child1 = copy.deepcopy(parents[0])
    child2 = copy.deepcopy(parents[1])

    problem = child1.problem
    nvars = problem.nvars
      
    for i in range(nvars):
      if isinstance(problem.types[i], Genome):
        x1 = list(child1.variables[i])
        x2 = list(child2.variables[i])

        if random.random() <= self.probability:
          size = min(len(x1), len(x2))
          cxpoint = random.randint(1, size - 1)
          x1[cxpoint:], x2[cxpoint:] = x2[cxpoint:], x1[cxpoint:]
                
        child1.variables[i] = x1
        child2.variables[i] = x2
        child1.evaluated = False
        child2.evaluated = False

    return [child1, child2]

In [None]:
import nltk
import random, csv
from nltk import CFG
from platypus import NSGAII, SPEA2, Problem, nondominated, unique, GAOperator


# GRAMMAR = CFG.fromstring("""
#  cnn     -> blocks flatten fcs lr
#  blocks  -> block | block block | block block block
#  block   -> convs pooling
#  convs   -> conv | conv conv | conv conv conv
#  conv    -> '(Conv' bnorm '),'
#  pooling -> '(MaxPool' dropout '),' |
#  flatten -> '(Flatten),'
#  fcs     -> fc | fc fc |
#  fc      -> '(Fc ' units dropout '),'
#  bnorm   -> ' BNorm' | 
#  dropout -> ' Dropout' | 
#  lr      -> '(Lr ' rates ')'
#  rates   -> '0.1' | '0.01' | '0.001' | '0.0001'
#  units   -> '64' | '128' | '256' | '512'
# """)

GRAMMAR = CFG.fromstring("""
    cnn     -> '(' block ')' fc '*lr-' lr
    block   -> '(' conv pool ')*' m
    conv    -> '(conv*' z ')' bnorm
    pool    -> 'pool-' dropout |
    fc      -> 'fc*' k '*' units
    bnorm   -> 'bnorm-' |
    dropout -> 'dropout' |
    lr      -> '0.1' | '0.01' | '0.001' | '0.0001'
    units   -> '64' | '128' | '256' | '512'
    k       -> '0' | '1' | '2'
    z       -> '1' | '2' | '3'
    m       -> '1' | '2' | '3'
""")


def genome_to_grammar(array):
  sb = []
  stack = [GRAMMAR.start()]
  index = 0
  wraps = 0

  while stack:
    symbol = stack.pop()
    if isinstance(symbol, str):
      sb.append(symbol)
    else:
      rules = [i for i in GRAMMAR.productions() if i.lhs().symbol() == symbol.symbol()]
      rule_index = 0
      if len(rules) > 1:
        rule_index = array[index] % len(rules)
        index += 1
        if index >= len(array):
          index = 0
          wraps += 1
          if wraps > 10:
            return None
      rule = rules[rule_index]
      for production in reversed(rule.rhs()):
        stack.append(production)

  return ''.join(sb)

# from nltk.parse.generate import generate

In [None]:
import re, csv
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer
from tensorflow.keras import backend as K 
from tensorflow.keras import datasets, layers, models, callbacks, optimizers

filename = '/content/drive/MyDrive/phenotypes_eurosat.csv'

def get_metrics(phenotype):
    accuracy, accuracy_sd, f1_score, f1_score_sd = None, None, None, None
    with open(filename, mode='r') as file:
        reader = csv.reader(file)
        for row in reader:
            if row[0] == phenotype:
                accuracy = float(row[1])
                accuracy_sd = float(row[2])
                f1_score = float(row[3])
                f1_score_sd = float(row[4])
                break
    return accuracy, accuracy_sd, f1_score, f1_score_sd


def save_metrics(phenotype, accuracy, accuracy_sd, f1_score, f1_score_sd):
    with open(filename, mode='a') as file:
        writer = csv.writer(file)
        writer.writerow([phenotype, accuracy, accuracy_sd, f1_score, f1_score_sd])


# def load_dataset():

#     # (train_images, train_labels), (test_images, test_labels) = datasets.cifar10.load_data()
#     (train_images, train_labels), (test_images, test_labels) = datasets.fashion_mnist.load_data()
    
#     train_images = train_images.reshape((train_images.shape[0], 28, 28, 1))
#     test_images = test_images.reshape((test_images.shape[0], 28, 28, 1))
    
#     validation_images, test_images, validation_labels, test_labels = train_test_split(test_images, test_labels, test_size=0.2, random_state=42)
    
#     train_images = train_images.astype("float") / 255.0
#     test_images = test_images.astype("float") / 255.0
#     validation_images = validation_images.astype("float") / 255.0

#     lb = LabelBinarizer()
#     train_labels = lb.fit_transform(train_labels)
#     validation_labels = lb.transform(validation_labels)
#     test_labels = lb.transform(test_labels)
    
#     return train_images, train_labels, test_images, test_labels, validation_images, validation_labels

def load_dataset():
    
    import tensorflow_datasets as tfds

    ds = tfds.load('eurosat', split=tfds.Split.TRAIN, batch_size=-1, shuffle_files=True)
    dsnp = tfds.as_numpy(ds)

    images = dsnp['image']
    labels = dsnp['label']

    images = images.reshape((images.shape[0], 64, 64, 3))
    images = images.astype("float") / 255.0

    train_images, test_images, train_labels, test_labels = train_test_split(images, labels, test_size=0.2, random_state=42)
    validation_images, test_images, validation_labels, test_labels = train_test_split(test_images, test_labels, test_size=0.2, random_state=42)

    lb = LabelBinarizer()

    train_labels = lb.fit_transform(train_labels)
    validation_labels = lb.transform(validation_labels)
    test_labels = lb.transform(test_labels)

    return train_images, train_labels, test_images, test_labels, validation_images, validation_labels


def build_model(phenotype):

    nconv, npool, nfc, nfcneuron = [int(i) for i in re.findall('\d+', phenotype.split('lr-')[0])]
    has_dropout = 'dropout' in phenotype
    has_batch_normalization = 'bnorm' in phenotype
    has_pool = 'pool' in phenotype
    learning_rate = float(phenotype.split('lr-')[1])

    # number of filters
    filter_size = 32

    model = models.Sequential()

    # model.add(layers.InputLayer(input_shape=(32, 32, 3)))
    # model.add(layers.InputLayer(input_shape=(28, 28, 1)))
    model.add(layers.InputLayer(input_shape=(64, 64, 3)))

    # Pooling
    for i in range(npool):

        # Convolutions
        for j in range(nconv):

            model.add(layers.Conv2D(filter_size, (3, 3), activation='relu', padding='same'))

            # Duplicate number of filters for each two convolutions
            if (((i + j) % 2) == 1): filter_size = filter_size * 2

            # Add batch normalization
            if has_batch_normalization:
                model.add(layers.BatchNormalization())

        # Add pooling
        if has_pool:
            model.add(layers.MaxPooling2D(pool_size=(2, 2)))
            # Add dropout
            if has_dropout:
                model.add(layers.Dropout(0.25))

    model.add(layers.Flatten())

    # fully connected
    for i in range(nfc):
        model.add(layers.Dense(nfcneuron))
        model.add(layers.Activation('relu'))

    if has_dropout:
        model.add(layers.Dropout(0.5))

    model.add(layers.Dense(10, activation='softmax'))

    opt = optimizers.Adam(lr=learning_rate)

    # F1 Score metric function
    def f1_score(y_true, y_pred):
        true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
        predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = true_positives / (predicted_positives + K.epsilon())
        recall = true_positives / (possible_positives + K.epsilon())
        f1_val = 2 * (precision * recall) / (precision + recall + K.epsilon())
        return f1_val

    model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy', f1_score])

    return model


# def build_model(phenotype):

#     model = models.Sequential()

#     filter_size = 32
#     nconvs = 0
#     optimizer = None

#     model.add(layers.InputLayer(input_shape=(32, 32, 3)))

#     for block in phenotype.split(','):
        
#         if 'Conv' in block:

#             if nconvs == 2:
#                 filter_size *= 2
#                 nconvs = 0

#             model.add(layers.Conv2D(filter_size, (3, 3), activation='relu', padding='same'))

#             if 'BNorm' in block:
#                 model.add(layers.BatchNormalization())

#             nconvs += 1
        
#         if 'MaxPool' in block:
#             model.add(layers.MaxPooling2D(pool_size=(2, 2)))
            
#             if 'Dropout' in block:
#                 model.add(layers.Dropout(0.25))

#         if 'Flatten' in block:
#             model.add(layers.Flatten())

#         if 'Fc' in block:
#             args = re.findall('\d+', block)
#             model.add(layers.Dense(int(args[0])))
#             model.add(layers.Activation('relu'))

#             if 'Dropout' in block:
#                 model.add(layers.Dropout(0.5))

#         if 'Lr' in block:
#             args = re.findall('\d+\.\d+', block)
#             optimizer = optimizers.Adam(lr=float(args[0]))


#     model.add(layers.Dense(10, activation='softmax'))

#     def f1_score(y_true, y_pred):
#         true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
#         possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
#         predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
#         precision = true_positives / (predicted_positives + K.epsilon())
#         recall = true_positives / (possible_positives + K.epsilon())
#         f1_val = 2 * (precision * recall) / (precision + recall + K.epsilon())
#         return f1_val

#     model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy', f1_score])
#     # model.summary()

#     return model


def train_model(model):

    accuracies, f1_scores = [], []

    train_images, train_labels, test_images, \
        test_labels, validation_images, validation_labels = load_dataset()

    # Train three times
    for i in range(3):

        # To free memory on google colab.
        if K.backend() == 'tensorflow':
            K.clear_session()

        print('Trainning %s of 3' % (i + 1))

        # Early Stop when bad networks are identified        
        es = callbacks.EarlyStopping(monitor='val_accuracy', mode='max', verbose=1, patience=10, baseline=0.5)

        model.fit(train_images, train_labels, 
                  epochs=70, 
                  batch_size=128, 
                  verbose=1,
                  validation_data=(validation_images, validation_labels), 
                  callbacks=[es])
        
        _, accuracy, f1_score = model.evaluate(test_images, test_labels, verbose=1)

        accuracies.append(accuracy)
        f1_scores.append(f1_score)

        if i == 0 and accuracy < 0.5:
            break

    return np.mean(accuracies), np.std(accuracies), np.mean(f1_scores), np.std(f1_scores)

In [None]:
def evaluate(variables):

    genome = variables[0]
    phenotype = genome_to_grammar(genome)

    print('PHENOTYPE:', phenotype)

    accuracy, accuracy_sd, f1_score, f1_score_sd = get_metrics(phenotype)

    if accuracy is None and f1_score is None:
        print('Phenotype not yet trained. Building...')
        model = build_model(phenotype)
        accuracy, accuracy_sd, f1_score, f1_score_sd = train_model(model)
        save_metrics(phenotype, accuracy, accuracy_sd, f1_score, f1_score_sd)

    print(accuracy, accuracy_sd, f1_score, f1_score_sd)

    return accuracy, f1_score


problem = Problem(1, 2)
problem.types[0] = Genome(250, 100000)
problem.directions[0] = Problem.MAXIMIZE
problem.directions[1] = Problem.MAXIMIZE
problem.function = evaluate

operator = GAOperator(GenomeSinglePointCrossover(probability=0.75), GenomeUniformMutation(probability=0.01))

algorithm = NSGAII(problem, population_size=50, variator=operator)
# algorithm = SPEA2(problem, population_size=50, variator=operator)

num_generations = 30
max_accuracy, max_fscore = [0] * num_generations, [0] * num_generations

for i in range(num_generations):
  print('Geração:', i + 1)
  algorithm.step()
  for solution in unique(nondominated(algorithm.result)):
    genome = solution.variables[0]
    phenotype = genome_to_grammar(genome)
    print(phenotype, solution.objectives)
    if solution.objectives[0] > max_accuracy[i]:
      max_accuracy[i] = solution.objectives[0]
    if solution.objectives[1] > max_fscore[i]:
      max_fscore[i] = solution.objectives[1]

Geração: 1
PHENOTYPE: (((conv*1))*2)fc*1*256*lr-0.01
Phenotype not yet trained. Building...
Trainning 1 of 3
Epoch 1/70
Epoch 2/70
Epoch 3/70
Epoch 4/70
Epoch 5/70
Epoch 6/70
Epoch 7/70
Epoch 8/70
Epoch 9/70
Epoch 10/70
Epoch 00010: early stopping
0.0962962955236435 0.0 0.0 0.0
PHENOTYPE: (((conv*3)bnorm-)*1)fc*2*256*lr-0.01
Phenotype not yet trained. Building...
Trainning 1 of 3
Epoch 1/70
Epoch 2/70
Epoch 3/70
Epoch 4/70
Epoch 5/70
Epoch 6/70
Epoch 7/70
Epoch 8/70
Epoch 9/70
Epoch 10/70
Epoch 00010: early stopping
0.1259259283542633 0.0 0.0 0.0
PHENOTYPE: (((conv*2)bnorm-)*2)fc*0*512*lr-0.1
Phenotype not yet trained. Building...
Trainning 1 of 3
Epoch 1/70
Epoch 2/70
Epoch 3/70
Epoch 4/70
Epoch 5/70
Epoch 6/70
Epoch 7/70
Epoch 8/70
Epoch 9/70
Epoch 10/70
Epoch 00010: early stopping
0.32407405972480774 0.0 0.30527350306510925 0.0
PHENOTYPE: (((conv*1)pool-dropout)*2)fc*1*256*lr-0.0001
0.8179012338320414 0.021186429532675723 0.8194811145464579 0.02095631380206305
PHENOTYPE: (((conv*2)b

In [None]:
import matplotlib.pyplot as plt

plt.plot(max_accuracy, label = 'Acurácia')
plt.plot(max_fscore, label = 'F1Score')
plt.xlabel('Geração')
plt.ylabel('Valor')
plt.legend()
plt.show()