<a href="https://colab.research.google.com/github/Norzuiso/machine-learning/blob/main/FeatureSelection/manual_train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Imports

import tensorflow as tf
from tensorflow import keras, GradientTape
from keras import layers, Sequential
from keras.datasets import mnist
from keras.layers import Dense
from tensorflow import cast, float32
import numpy as np
from tensorflow import losses


In [None]:
# Imports

import tensorflow as tf
from tensorflow import keras, GradientTape
from keras import layers, Sequential
from keras.datasets import mnist
from keras.layers import Dense
from tensorflow import cast, float32
import numpy as np
from tensorflow import losses

class GA_Weights():
    def __init__(self, loss_fn, eval_model,
                 crossover_std = 0.9,
                 mutation_std = 0.2,
                 alpha=0.1,
                 beta=0.05,
                 pop_size=10,
                 generations=5):
        self.loss_fn = loss_fn
        self.eval_model = eval_model
        self.alpha = alpha
        self.beta = beta
        self.crossover_std = crossover_std
        self.mutation_std = mutation_std
        self.pop_size = pop_size
        self.generations = generations

    def fitness_fn(self, individual, x, y):
        self.eval_model.set_weights(individual)
        y_pred = self.eval_model(x, training=False)
        return self.loss_fn(y, y_pred)


    # Mutacion del indivudo seleccionado desde el crossover
    def mutate(self, individual):
        W, b = individual
        W = W + np.random.randn(W.shape) * self.mutation_std
        b = b + np.random.randn(b.shape) * self.mutation_std
        return [W, b]

    def crossover(self, parent_a, parent_b):
        Wa, ba = parent_a
        Wb, bb = parent_b
        return [(Wa + Wb) * 0.5, (ba + bb) * 0.5]

    # Seleccion de un solo individuo por torneo
    def tournament(self, pop, scores, k=3):
        idx = tf.random.shuffle(tf.range(len(pop)))[:k]
        best = idx[tf.argmin(tf.gather(scores, idx))]
        return pop[int(best)]

    def call_best_weights_GA(self, x, y):
        idx = tf.random.shuffle(tf.range(len(x)))[: self.subset_size]
        x = tf.gather(x, idx)
        y = tf.gather(y, idx)
        base_W = self.eval_model.get_weights()
        population = [
            [
                base_W[0] + tf.random.normal(base_W[0].shape) * 0.1,
                base_W[1] + tf.random.normal(base_W[1].shape) * 0.1,
            ]
            for _ in range(self.pop_size)
        ]

        for _ in range(self.generations):
            fitness_scores = tf.stack([self.fitness_fn(ind, x, y) for ind in population])

            new_population = [population[int(tf.argmin(fitness_scores))]]
            while len(new_population) < self.pop_size:
                parent_a = self.tournament_selecction(population, fitness_scores)
                parent_b = self.tournament_selecction(population, fitness_scores)

                child = self.crossover(parent_a, parent_b) if tf.random.uniform(()) < self.crossover_std else a
                child = self.mutate(child)
                new_population.append(child)

            population = new_population

        return population[int(tf.argmin(fitness_scores))]

class myModel():
    def __init__(self, feature_extractor, classifier, batch_size=32):
        super().__init__()
        self.feature_extractor = feature_extractor
        self.classifier = classifier
        self.batch_size = batch_size

        self.optimizer = keras.optimizers.Adam()
        self.loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        self.acc_metric = keras.metrics.SparseCategoricalAccuracy(name="accuracy")

        self.GA_Weights = GA_Weights(
            loss_fn=self.loss_fn,
            eval_model=self.classifier)

    def call(self, x, training=False):
        feats = self.feature_extractor(x, training=training)
        return self.classifier(feats, training=training)

    def create_dataset(self, x, y):
        return tf.data.Dataset.from_tensor_slices((x, y)).batch(self.batch_size)

    def train_epoch(self, x, y):
        dataset = self.create_dataset(x, y)

        for xb, yd in dataset:
            with GradientTape() as tape:
                logits = self.call(xb)
                loss = self.loss_fn(yd, logits)
            gradients = tape.gradient(loss, self.feature_extractor.trainable_variables)
            self.optimizer.apply_gradients(zip(gradients, self.feature_extractor.trainable_variables))

        features = self.feature_extractor(x, training=False)

        best_weights = self.GA_Weights.call_best_weights_GA(features, y)
        self.classifier.set_weights(best_weights)

        preds = self.classifier(features, training=False)
        self.acc_metric.update_state(y, preds)

        acc = float(self.acc_metric.result())
        self.acc_metric.reset_state()

        return float(loss), acc


    def predict(self, x):
        ds = tf.data.Dataset.from_tensor_slices(x).batch(self.batch_size)
        out = []
        for xb in ds:
            logits = self(xb, training=False)
            out.append(tf.argmax(logits, axis=1))
        return tf.concat(out, axis=0)



(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.reshape(-1, 28, 28, 1).astype("float32") / 255.0
x_test = x_test.reshape(-1, 28, 28, 1).astype("float32") / 255.0

cnn = Sequential([
        layers.Input(shape = (28, 28, 1)),
        layers.Conv2D(32, (3,3), activation='relu', padding='same'),
        layers.Conv2D(64, (3,3), activation='relu', padding='same'),
        layers.GlobalAveragePooling2D()
])

mlp = Dense(10, activation='softmax', name='classifier')

model = myModel(cnn, mlp)
for epoch in range(2):
    loss, acc = model.train_epoch(x_train, y_train)
    print(f"Epoch {epoch+1} | loss={loss:.4f} acc={acc:.4f}")



In [None]:
pred = training.predict(x_test)

In [None]:
def train_model(epochs, dataset, loss_fn, optimizer, feature_extractor, classifier):
    for epoch in range(epochs):
        with GradientTape() as tape:
            features = feature_extractor(x, training=True)
            preds = classifier(features, training=False)
            loss = loss_fn(y, preds)
        training_vars = feature_extractor.trainable_variables
        gradients = tape.gradient(loss, training_vars)
    optimizer.apply_gradients(zip(gradients, training_vars))
    w = classifier.get_weights()[0]
    print(w.shape)
    ga_update(features.numpy(), y.numpy(), classifier)
    print(f"Epoch{epoch}")