# Trenowanie modelu

## Użyte biblioteki

In [1]:
import os
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Lambda
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import matplotlib.pyplot as plt
import pickle
import gc

## Zmienne globalne

In [2]:
DATA_PATH = "Data"
DATASETS = "Datasets"

TRAIN_DATASETS_PATH = os.path.join(DATASETS, "trainDatasets")
TRAIN_DATASET_PATH = os.path.join(TRAIN_DATASETS_PATH, "trainDataset")
TRAIN_TRIPLETS_PATH = os.path.join(TRAIN_DATASETS_PATH, "trainTriplets")
TRAIN_ANCHOR_DATASETS_PATH = os.path.join(TRAIN_DATASETS_PATH, "trainAnchorDataset")
TRAIN_POSITIVE_DATASETS_PATH = os.path.join(TRAIN_DATASETS_PATH, "trainPositiveDataset")
TRAIN_NEGATIVE_DATASETS_PATH = os.path.join(TRAIN_DATASETS_PATH, "trainNegativeDataset")

TEST_DATASETS_PATH = os.path.join(DATASETS, "testDatasets")
TEST_DATASET_PATH = os.path.join(TEST_DATASETS_PATH, "testDataset")
TEST_TRIPLETS_PATH = os.path.join(TEST_DATASETS_PATH, "testTriplets")
TEST_ANCHOR_DATASETS_PATH = os.path.join(TEST_DATASETS_PATH, "testAnchorDataset")
TEST_POSITIVE_DATASETS_PATH = os.path.join(TEST_DATASETS_PATH, "testPositiveDataset")
TEST_NEGATIVE_DATASETS_PATH = os.path.join(TEST_DATASETS_PATH, "testNegativeDataset")

MODEL_DIR = os.path.join("model")

## Definicja modelu sieci neuronowej

In [3]:
input_shape = (250, 250, 3)

def create_base_network(input_shape):
  input = Input(shape=input_shape)
  x = Conv2D(32, (5, 5), activation='relu')(input)
  x = MaxPooling2D(pool_size=(2, 2))(x)
  x = Conv2D(64, (3, 3), activation='relu')(x)
  x = MaxPooling2D(pool_size=(2, 2))(x)
  x = Conv2D(128, (3, 3), activation='relu')(x)
  x = MaxPooling2D(pool_size=(2, 2))(x)
  x = Flatten()(x)
  x = Dense(256, activation='relu')(x)
  return Model(inputs=input, outputs=x)

base_network = create_base_network(input_shape)

input_a = Input(shape=input_shape)
input_b = Input(shape=input_shape)

processed_a = base_network(input_a)
processed_b = base_network(input_b)

def euclidean_distance(vects):
  x, y = vects
  return tf.sqrt(tf.reduce_sum(tf.square(x - y), axis=1, keepdims=True))

distance = Lambda(euclidean_distance, output_shape=(1,))([processed_a, processed_b])

model = Model([input_a, input_b], distance)

model.compile(loss='binary_crossentropy', optimizer=Adam(), metrics=['accuracy'])

## Ładowanie danych w postaci batchy

In [4]:
def load_image(image_path):
  image = load_img(image_path, target_size=(250, 250))
  image = img_to_array(image)
  return image / 255.0

def data_generator(anchor_paths, positive_paths, negative_paths, batch_size):
        num_samples = len(anchor_paths)
        while True:
            for start in range(0, num_samples, batch_size):
                end = min(start + batch_size, num_samples)
                anchor_batch = [load_image(anchor_paths[i]) for i in range(start, end)]
                positive_batch = [load_image(positive_paths[i]) for i in range(start, end)]
                negative_batch = [load_image(negative_paths[i]) for i in range(start, end)]

                y_positive = tf.ones((len(anchor_batch), 1))
                y_negative = tf.zeros((len(anchor_batch), 1))

                yield (tf.convert_to_tensor(anchor_batch), tf.convert_to_tensor(positive_batch)), y_positive
                yield (tf.convert_to_tensor(anchor_batch), tf.convert_to_tensor(negative_batch)), y_negative

def create_tf_dataset(anchor_paths, positive_paths, negative_paths, batch_size):
  output_signature = (
    (tf.TensorSpec(shape=(None, 250, 250, 3), dtype=tf.float32),
      tf.TensorSpec(shape=(None, 250, 250, 3), dtype=tf.float32)),
    tf.TensorSpec(shape=(None, 1), dtype=tf.float32)
  )

  return tf.data.Dataset.from_generator(
    lambda: data_generator(anchor_paths, positive_paths, negative_paths, batch_size),
    output_signature=output_signature
  )

## Wykresy przedstawiające wyniki uczenia

In [5]:
def plot_metrics(epochs, train_metrics, val_metrics, metric_name, title, ylabel):
  plt.figure(figsize=(12, 4))

  plt.subplot(1, 2, 1)
  plt.plot(epochs, train_metrics, 'b', label=f'Training {metric_name}')
  plt.plot(epochs, val_metrics, 'r', label=f'Validation {metric_name}')
  plt.title(f'Training and validation {title} over epochs')
  plt.xlabel('Epochs')
  plt.ylabel(ylabel)
  plt.legend()

  plt.show()

## Klasa odpowiedzialna za uczenie sieci

In [6]:
class PlotTraining(Callback):
  def __init__(self):
    super(PlotTraining, self).__init__()
    self.train_losses = []
    self.train_accuracies = []
    self.val_losses = []
    self.val_accuracies = []

  def on_epoch_end(self, epoch, logs=None):
    self.train_losses.append(logs.get('loss'))
    self.train_accuracies.append(logs.get('accuracy'))
    self.val_losses.append(logs.get('val_loss'))
    self.val_accuracies.append(logs.get('val_accuracy'))

    epochs = range(1, len(self.train_losses) + 1)
    plot_metrics(epochs, self.train_losses, self.val_losses, 'loss', 'loss', 'Loss')
    plot_metrics(epochs, self.train_accuracies, self.val_accuracies, 'accuracy', 'accuracy', 'Accuracy')

    gc.collect()

## Klasa odpowiedzialna walidacje sieci

In [7]:
class EvaluateOnTestSet(Callback):
    def __init__(self, test_data, batch_size):
        self.anchor_test, self.positive_test, self.negative_test = test_data
        self.batch_size = batch_size
        self.test_losses = []
        self.test_accuracies = []

    def on_epoch_end(self, epoch, logs=None):
        test_generator = data_generator(self.anchor_test, self.positive_test, self.negative_test, self.batch_size)
        steps = len(self.anchor_test) // self.batch_size

        total_loss = 0
        total_accuracy = 0
        for _ in tf.range(steps):
            (anchor_batch, positive_batch), y_positive = next(test_generator)
            (anchor_batch, negative_batch), y_negative = next(test_generator)

            loss_positive, accuracy_positive = self.model.test_on_batch([anchor_batch, positive_batch], y_positive)
            loss_negative, accuracy_negative = self.model.test_on_batch([anchor_batch, negative_batch], y_negative)

            total_loss += (loss_positive + loss_negative) / 2
            total_accuracy += (accuracy_positive + accuracy_negative) / 2
        
        avg_loss = total_loss / tf.cast(steps, dtype=tf.float32)
        avg_accuracy = total_accuracy / tf.cast(steps, dtype=tf.float32)

        self.test_losses.append(avg_loss)
        self.test_accuracies.append(avg_accuracy)
        
        print(f'\nTest Loss: {avg_loss:.4f} - Test Accuracy: {avg_accuracy:.4f}')
        
        epochs = range(1, len(self.test_losses) + 1)
        plot_metrics(epochs, self.test_losses, self.test_losses, 'test loss', 'test loss', 'Loss')
        plot_metrics(epochs, self.test_accuracies, self.test_accuracies, 'test accuracy', 'test accuracy', 'Accuracy')    

## Zapisywanie modelu po każdej epoce

In [8]:
class CustomModelCheckpoint(tf.keras.callbacks.Callback):
    def __init__(self, file_prefix="model_", file_extension=".h5"):
        super(CustomModelCheckpoint, self).__init__()
        self.file_prefix = file_prefix
        self.file_extension = file_extension
        self.epoch_counter = 1

    def on_epoch_end(self, epoch, logs=None):
        file_name = f"{self.file_prefix}{self.epoch_counter}{self.file_extension}"
        self.model.save(os.path.join(MODEL_DIR, file_name))
        print(f"Model saved to {os.path.join(MODEL_DIR, file_name)}")
        self.epoch_counter += 1

## Ładowanie datasetów z pliku

In [9]:
with open(TRAIN_ANCHOR_DATASETS_PATH, 'rb') as input:
  train_anchor_data = pickle.load(input)
with open(TRAIN_POSITIVE_DATASETS_PATH, 'rb') as input:
  train_positive_data = pickle.load(input)
with open(TRAIN_NEGATIVE_DATASETS_PATH, 'rb') as input:
  train_negative_data = pickle.load(input)

with open(TEST_ANCHOR_DATASETS_PATH, 'rb') as input:
  test_anchor_data = pickle.load(input)
with open(TEST_POSITIVE_DATASETS_PATH, 'rb') as input:
  test_positive_data = pickle.load(input)
with open(TEST_NEGATIVE_DATASETS_PATH, 'rb') as input:
  test_negative_data = pickle.load(input)

## Uczenie modelu

In [None]:
batch_size = 64
train_dataset = create_tf_dataset(train_anchor_data, train_positive_data, train_negative_data, batch_size)

steps_per_epoch = len(train_anchor_data) // batch_size

plot_training = PlotTraining()

evaluate_on_test_set = EvaluateOnTestSet(test_data=(test_anchor_data, test_positive_data, test_negative_data), batch_size=batch_size)

custom_checkpoint = CustomModelCheckpoint()

history = model.fit(train_dataset, steps_per_epoch=steps_per_epoch, epochs=10, callbacks=[plot_training, evaluate_on_test_set, custom_checkpoint])

## Wykresy podsumowujące uczenie 

In [None]:
def plot_training_history(history):
    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Loss')
    plt.title('Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'], label='Accuracy')
    plt.title('Training Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.show()

plot_training_history(history)