## Configuração

In [None]:
import datetime
import json
import os
import pathlib

import cv2
import keras_tuner as kt
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.cluster import KMeans
from sklearn.manifold import TSNE
from sklearn.metrics import silhouette_score
from tensorflow import keras
from tensorflow.keras import Input, Model, layers, models
from tensorflow.keras.callbacks import CSVLogger, EarlyStopping
from tensorflow.keras.utils import image_dataset_from_directory
from tqdm import tqdm

In [2]:
# Path setup - Adjust the root directory accordingly
cwd = os.getcwd()
project_path = pathlib.Path(os.path.join(cwd, "..", "..")).resolve()
splits = ['train', 'valid', 'test']

date_str = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

In [3]:
class Config:
    max_trials = 10
    experiments_path = os.path.join(cwd, "..", "..", "experiments", "autoencoder", date_str)
    dataset_path = os.path.join(project_path, "data", "ships_v10i", "cropped")
    patience = 5
    epochs = 1000
    batch_size = 8
    encoding_dim = 128
    imgsz = 128
    use_kmeans = True

In [4]:
os.makedirs(Config.experiments_path, exist_ok=True)

## Modelos

In [5]:
def build_autoencoder(input_shape, encoding_dim):
    # --- Build the full autoencoder in one continuous pass --- #
    inputs = tf.keras.Input(shape=input_shape, name="encoder_input")

    # Encoder layers
    x = layers.Conv2D(32, (3, 3), activation="relu",
                      padding="same", name="enc_conv1")(inputs)
    x = layers.MaxPooling2D((2, 2), padding="same", name="enc_pool1")(x)
    x = layers.Conv2D(64, (3, 3), activation="relu",
                      padding="same", name="enc_conv2")(x)
    x = layers.MaxPooling2D((2, 2), padding="same", name="enc_pool2")(x)
    x = layers.Conv2D(128, (3, 3), activation="relu",
                      padding="same", name="enc_conv3")(x)
    x = layers.MaxPooling2D((2, 2), padding="same", name="enc_pool3")(x)

    shape_before_flattening = x.shape[1:]  # (H, W, C)
    flattened_dim = np.prod(shape_before_flattening)
    x = layers.Flatten(name="enc_flatten")(x)
    encoded = layers.Dense(encoding_dim, activation="relu", name="latent")(x)

    # Decoder layers
    x_dec = layers.Dense(flattened_dim, activation="relu",
                         name="dec_dense")(encoded)
    x_dec = layers.Reshape((shape_before_flattening[0],
                            shape_before_flattening[1],
                            shape_before_flattening[2]),
                           name="dec_reshape")(x_dec)
    x_dec = layers.Conv2DTranspose(
        128, (3, 3), strides=2, padding="same", activation="relu", name="dec_convT1")(x_dec)
    x_dec = layers.Conv2DTranspose(
        64, (3, 3), strides=2, padding="same", activation="relu", name="dec_convT2")(x_dec)
    x_dec = layers.Conv2DTranspose(
        32, (3, 3), strides=2, padding="same", activation="relu", name="dec_convT3")(x_dec)
    decoded = layers.Conv2D(3, (3, 3), activation="sigmoid",
                            padding="same", name="decoder_output")(x_dec)

    # Full autoencoder model
    autoencoder = Model(inputs, decoded, name="autoencoder_model")

    # --- Extract encoder model ---
    # The encoder goes from the original input to the latent representation.
    encoder = Model(inputs, encoded, name="encoder_model")

    # --- Extract decoder model ---
    # The decoder takes latent vectors and reconstructs images.
    # We must replicate the decoder path using the same shapes.
    latent_inputs = tf.keras.Input(shape=(encoding_dim,), name="decoder_input")
    # Rebuild decoder layers separately, in the same order as above:
    x_d = autoencoder.get_layer("dec_dense")(latent_inputs)
    x_d = autoencoder.get_layer("dec_reshape")(x_d)
    x_d = autoencoder.get_layer("dec_convT1")(x_d)
    x_d = autoencoder.get_layer("dec_convT2")(x_d)
    x_d = autoencoder.get_layer("dec_convT3")(x_d)
    decoder_outputs = autoencoder.get_layer("decoder_output")(x_d)
    decoder = Model(latent_inputs, decoder_outputs, name="decoder_model")

    return autoencoder, encoder, decoder

In [None]:
autoencoder, encoder, decoder  = build_autoencoder(input_shape=(Config.imgsz, Config.imgsz, 3), encoding_dim=Config.encoding_dim)

In [None]:
model_components = [ (autoencoder, 'autoencoder'), (encoder, 'encoder'), (decoder, 'decoder') ]

autoencoder.save(os.path.join(Config.experiments_path, "autoencoder_no_train.keras"))
 
for model, name in reversed(model_components):
    keras.utils.plot_model(model, to_file=os.path.join(Config.experiments_path, f"{name}.png"), show_shapes=True)
    keras.utils.plot_model(model, to_file=os.path.join(Config.experiments_path, f"{name}_no_shapes.png"), show_shapes=False)

    # Print Summary
    print(f"{name} Summary:")
    model.summary()

### Hypertuning

In [8]:
class AutoencoderTSNEHyperModel(kt.HyperModel):
    def __init__(self, input_shape, train_dataset, steps_per_epoch, experiments_path, use_kmeans=True):
        """
        input_shape: Formato da imagem (H, W, C)
        train_dataset: tf.data.Dataset de treino (repetido, sem rótulos)
        experiments_path: Caminho base para salvar os resultados do experimento
        use_kmeans: Se True, usa K-Means e silhouette_score; caso contrário, usa variância dos embeddings
        """
        self.steps_per_epoch = steps_per_epoch
        self.input_shape = input_shape
        self.train_dataset = train_dataset
        self.experiments_path = experiments_path
        self.use_kmeans = use_kmeans
        self.trial = 0
        os.makedirs(self.experiments_path, exist_ok=True)

    def generate_autoencoder(self, hp, input_shape):
        # Define a dimensão do encoding
        encoding_dim = hp.Choice('encoding_dim', values=[
                                 32, 64, 128, 256, 512])
        autoencoder, encoder, decoder = build_autoencoder(
            input_shape, encoding_dim)
        autoencoder.compile(optimizer='adam', loss='mse')
        return autoencoder, encoder, decoder

    def build(self, hp):
        autoencoder, encoder, decoder = self.generate_autoencoder(
            hp, self.input_shape)

        # Parâmetros do t-SNE
        hp_tsne_components = hp.Choice('tsne_n_components', values=[2])
        hp_tsne_perplexity = hp.Int(
            'tsne_perplexity', min_value=10, max_value=45, step=5)

        # Parâmetros do K-Means (usado apenas se `use_kmeans` for True)
        if self.use_kmeans:
            hp_n_clusters = hp.Int(
                'n_clusters', min_value=2, max_value=15, step=1)
            autoencoder.hp_n_clusters = hp_n_clusters

        # Armazena os parâmetros no modelo
        autoencoder.hp_tsne_components = hp_tsne_components
        autoencoder.hp_tsne_perplexity = hp_tsne_perplexity
        autoencoder.encoder = encoder
        autoencoder.decoder = decoder
        return autoencoder

    def fit(self, hp, model, *args, **kwargs):
        # Define caminhos para salvar os resultados
        self.trial += 1
        trial_path = os.path.join(self.experiments_path, f"trial_{self.trial}")
        os.makedirs(trial_path, exist_ok=True)

        # Treina o autoencoder
        history = model.fit(
            self.train_dataset,
            steps_per_epoch=self.steps_per_epoch,
            epochs=Config.epochs,
            verbose=1
        )

        # Extrai embeddings com o encoder
        embeddings = []
        for images, _ in self.train_dataset.take(10):  # Processa 10 batches
            embeddings.append(model.encoder.predict(images))
        embeddings = np.concatenate(embeddings, axis=0)  # Une os embeddings

        # Aplica t-SNE nos embeddings
        tsne = TSNE(
            n_components=model.hp_tsne_components,
            perplexity=model.hp_tsne_perplexity,
            learning_rate=200,
            init='pca',
            random_state=42
        )
        tsne_embeddings = tsne.fit_transform(embeddings)

        # Define a métrica
        if self.use_kmeans:
            # Clustering com K-Means e silhouette_score
            n_clusters = model.hp_n_clusters
            kmeans = KMeans(n_clusters=n_clusters, random_state=42)
            cluster_labels = kmeans.fit_predict(tsne_embeddings)
            metric_score = silhouette_score(tsne_embeddings, cluster_labels)
            metric_loss = -metric_score  # Otimiza para maximizar o silhouette_score
        else:
            # Usa a variância dos embeddings como métrica
            cluster_labels = None  # Sem K-Means, não há labels
            metric_loss = np.std(tsne_embeddings)

        # Salva resultados importantes
        self.save_results(
            trial_path, hp, history, model, tsne_embeddings, cluster_labels, metric_score
        )

        # Retorna o score calculado (silhouette_score ou variância)
        return metric_loss

    def save_results(self, trial_path, hp, history, model, tsne_embeddings, cluster_labels, metric_score):
        # Salva os modelos
        model.encoder.save(os.path.join(trial_path, "encoder.keras"))
        model.decoder.save(os.path.join(trial_path, "decoder.keras"))
        model.save(os.path.join(trial_path, "autoencoder.keras"))

        # Salva os embeddings e rótulos do t-SNE
        np.save(os.path.join(trial_path, "tsne_embeddings.npy"), tsne_embeddings)
        if cluster_labels is not None:
            np.save(os.path.join(trial_path, "cluster_labels.npy"), cluster_labels)

        # Salva o histórico de treinamento
        with open(os.path.join(trial_path, "history.json"), "w") as f:
            json.dump(history.history, f)

        # Salva os hiperparâmetros
        with open(os.path.join(trial_path, "hyperparameters.json"), "w") as f:
            json.dump(hp.values, f)

        # Gera visualizações do t-SNE
        plt.figure(figsize=(10, 6))
        if cluster_labels is not None:
            plt.scatter(
                tsne_embeddings[:, 0], tsne_embeddings[:, 1], c=cluster_labels, cmap='viridis', s=5)
            plt.colorbar(label="Cluster Labels")
        else:
            plt.scatter(tsne_embeddings[:, 0],
                        tsne_embeddings[:, 1], cmap='viridis', s=5)
        plt.title(
            't-SNE Clustering' if cluster_labels is not None else 't-SNE Embeddings')
        plt.xlabel('t-SNE 1')
        plt.ylabel('t-SNE 2')
        plt.savefig(os.path.join(trial_path, "tsne_plot.png"))
        plt.close()

        # Salva o resumo em markdown
        self.save_trial_summary(trial_path, hp, metric_score, cluster_labels)

    def save_trial_summary(self, trial_path, hp, metric_score, cluster_labels):
        summary_path = os.path.join(trial_path, "trial_summary.md")
        with open(summary_path, "w") as f:
            f.write(f"# Trial Summary\n")
            f.write(f"**Hiperparâmetros:**\n")
            for param, value in hp.values.items():
                f.write(f"- {param}: {value}\n")
            f.write(
                f"\n**Métrica:** {'Silhouette Score' if cluster_labels is not None else 't-SNE Variance'}: {metric_score:.4f}\n")
            f.write(f"\n## Resultados Salvos\n")
            f.write(f"- t-SNE Embeddings: `tsne_embeddings.npy`\n")
            if cluster_labels is not None:
                f.write(f"- Cluster Labels: `cluster_labels.npy`\n")
            f.write(f"- Encoder Model: `encoder/`\n")
            f.write(f"- Decoder Model: `decoder/`\n")
            f.write(f"- Autoencoder Model: `autoencoder/`\n")
            f.write(f"- Training History: `history.json`\n")
            f.write(f"- Hyperparameters: `hyperparameters.json`\n")
            f.write(f"- t-SNE Plot: `tsne_plot.png`\n")

## Loading dataset

In [None]:
# Load dataset using image_dataset_from_directory
train_dataset = image_dataset_from_directory(
    Config.dataset_path,
    labels=None,  # No labels as this is for autoencoder
    image_size=Config.imgsz,
    batch_size=Config.batch_size,
    shuffle=True
)

tuplify = lambda n: (n, n)

# Repeat the dataset for uninterrupted training
train_dataset = train_dataset.map(lambda x: tuplify(x / 255.0))

### Visualização do dataset

In [None]:
# Display some images from the dataset
plt.figure(figsize=(10, 10))
for images, _ in train_dataset.take(1):  # Obtém o batch e ignora os rótulos (caso existam)
    for i in range(min(4, images.shape[0])):  # Exibe no máximo 9 imagens
        plt.subplot(3, 3, i + 1)  # Define a posição no grid (3x3)
        plt.imshow((images[i].numpy() * 255).astype("uint8"))  # Exibe cada imagem
        plt.axis("off")
plt.show()

## Treinamento

In [11]:
steps_per_epoch = len(train_dataset) // Config.batch_size

In [12]:
hypermodel = AutoencoderTSNEHyperModel(
    input_shape=(Config.imgsz, Config.imgsz, 3),
    train_dataset=train_dataset.repeat(),
    steps_per_epoch=steps_per_epoch,
    use_kmeans=Config.use_kmeans,
    experiments_path=Config.experiments_path
)

In [13]:
tuner = kt.RandomSearch(
    hypermodel,
    objective='loss',
    max_trials=Config.max_trials,
    directory=f'{Config.experiments_path}'
)

In [None]:
tuner.search_space_summary()

In [None]:
tuner.search()

## Resultados

In [16]:
def plot_autoencoder_results(encoder, decoder, dataset, n_images=10):
    """
    Plota resultados do autoencoder: imagens originais e reconstruídas lado a lado.
    """
    plt.figure(figsize=(20, 4))
    for i, (images, _) in enumerate(dataset.take(1)):
        if i >= n_images:
            break
        # Pega uma imagem do dataset
        original_image = images[i].numpy()
        # Reconstrói a imagem com o autoencoder
        latent_space = encoder.predict(original_image[np.newaxis, ...])
        reconstructed_image = decoder.predict(latent_space)[0]

        # Plotar a imagem original
        plt.subplot(2, n_images, i + 1)
        plt.imshow(original_image)
        plt.title("Original")
        plt.axis("off")

        # Plotar a imagem reconstruída
        plt.subplot(2, n_images, i + n_images + 1)
        plt.imshow(reconstructed_image)
        plt.title("Reconstruída")
        plt.axis("off")

    plt.tight_layout()
    plt.show()

In [17]:
def browse_dataset(dataset, n_images=16):
    """
    Exibe imagens do dataset em uma grade.
    """
    plt.figure(figsize=(10, 10))
    for i, (images, _) in enumerate(dataset.take(1)):
        for j in range(min(n_images, len(images))):
            plt.subplot(int(n_images ** 0.5), int(n_images ** 0.5), j + 1)
            plt.imshow(images[j].numpy())
            plt.axis("off")
        break
    plt.tight_layout()
    plt.show()

In [18]:
def plot_cluster_examples(tsne_embeddings, cluster_labels, dataset, n_examples_per_cluster=5):
    """
    Mostra exemplos de imagens para cada cluster gerado pelo K-Means.
    """
    clusters = np.unique(cluster_labels)
    cluster_indices = {cluster: [] for cluster in clusters}
    
    # Mapeia as imagens dos embeddings para os clusters
    for i, label in enumerate(cluster_labels):
        if len(cluster_indices[label]) < n_examples_per_cluster:
            cluster_indices[label].append(i)

    # Carrega todas as imagens em um array para fácil acesso
    all_images = []
    for images, _ in dataset.unbatch().take(len(cluster_labels)):
        all_images.append(images.numpy())
    all_images = np.array(all_images)

    # Plota exemplos de cada cluster
    for cluster, indices in cluster_indices.items():
        plt.figure(figsize=(10, 5))
        for i, index in enumerate(indices):
            plt.subplot(1, n_examples_per_cluster, i + 1)
            plt.imshow(all_images[index])
            plt.title(f"Cluster {cluster}")
            plt.axis("off")
        plt.suptitle(f"Exemplos do Cluster {cluster}", fontsize=16)
        plt.tight_layout()
        plt.show()

In [19]:
# Obtendo os melhores hiperparâmetros
best_hp = tuner.get_best_hyperparameters(num_trials=1)[0]

# Construindo o melhor modelo com os melhores hiperparâmetros
best_model = hypermodel.build(best_hp)

In [None]:
# Exibir imagens originais e reconstruídas
plot_autoencoder_results(best_model.encoder, best_model.decoder, train_dataset, n_images=10)

In [None]:

# Navegar pelo dataset
browse_dataset(train_dataset, n_images=16)

In [22]:
import json
import os


def find_best_trial(experiments_path):
    best_trial = None
    best_loss = float('inf')
    
    for trial_dir in os.listdir(experiments_path):
        print(trial_dir)
        trial_path = os.path.join(experiments_path, trial_dir)
        if os.path.isdir(trial_path) and trial_dir.startswith("trial_"):
            print(f"Checking {trial_path}")
            # Load the metric score for the trial
            metric_file = os.path.join(trial_path, "history.json")  # Or "history.json" if it's stored there
            try:
                with open(metric_file, "r") as f:
                    losses_array = json.load(f)
                    print(losses_array)
                    loss = np.min(losses_array['loss']) 
                    if loss is not None and loss < best_loss:
                        best_loss = loss
                        best_trial = trial_path
            except FileNotFoundError:
                print(f"Metric file not found in {trial_path}")
    return best_trial, best_loss

In [None]:

# Use the function
best_trial_path, best_metric = find_best_trial(os.path.join(Config.experiments_path))

if best_trial_path:
    print(f"Best trial: {best_trial_path}, Best Metric: {best_metric}")
else:
    print("No trials found.")

In [None]:
if best_trial_path and Config.use_kmeans:
    tsne_embeddings = np.load(os.path.join(
        best_trial_path, "tsne_embeddings.npy"))
    cluster_labels = np.load(os.path.join(
        best_trial_path, "cluster_labels.npy"))

    plot_cluster_examples(
        tsne_embeddings=tsne_embeddings,
        cluster_labels=cluster_labels,
        dataset=train_dataset,
        n_examples_per_cluster=5
    )
else:
    print("No best trial found.")