# Class


In [None]:
from tensorflow.keras.mixed_precision import set_global_policy
# Verifique se a GPU está ativa
import tensorflow as tf
is_cuda = len(tf.config.list_physical_devices('GPU')) > 0

# Habilitar mixed precision
# Verificar GPU e configurar precisão mista
if len(tf.config.list_physical_devices('GPU')) > 0:
    policy = tf.keras.mixed_precision.Policy('mixed_float16')
else:
    policy = tf.keras.mixed_precision.Policy('float32')
set_global_policy(policy)


if is_cuda:
  print("GPU está ativa")
else:
  print("GPU não está ativa")

GPU está ativa


In [None]:
from zipfile import ZipFile
import os,shutil



def extract_tar_gz_contents(input_dir: str, output_dir: str):
    """Extrai o conteúdo dos arquivos .tar.gz de um diretório para outro."""
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
#os.path.exists("bruto") and shutil.rmtree("./bruto")

#cnn_features
extract_tar_gz_contents('./cnn_features', './cnn_features')



In [None]:

# loading the temp.zip and creating a zip object
with ZipFile("/content/drive/My Drive/Mestrado 2024/Projetos/Datasets/Labeled Images.zip", 'r') as zObject:

    # Extracting all the members of the zip
    # into a specific location.
    zObject.extractall(path="./bruto")

In [None]:
import os
import shutil

#store the path to your root directory
base='./bruto'

# traverse root directory, and list directories as dirs and files as files
for root, dirs, files in os.walk(base):
    path = root.split(os.sep)

    for file in files:
        if not os.path.isdir(file):

            # move file from nested folder into the base folder
            shutil.move(os.path.join(root,file),os.path.join(base,file))

In [None]:
from typing import Union
from pathlib import Path



def clear_directory_folders(directory_path: Union[str, Path]) -> list:
    """Irreversibly removes all folders (and their content) in the specified
    directory. Doesn't remove files of that specified directory. Returns a
    list with folder paths Python lacks permission to delete."""
    erroneous_paths = []
    for path_location in Path(directory_path).iterdir():
        if path_location.is_dir():
            try:
                shutil.rmtree(path_location)
            except PermissionError:
                erroneous_paths.append(path_location)
    return erroneous_paths

clear_directory_folders(r'./bruto')

[]

In [None]:
import os
import sys
import numpy as np
import pandas as pd
import tensorflow as tf
from PIL import Image
from tensorflow.keras import layers, models
from tensorflow.keras.applications import ResNet50, ResNet101, ResNet152,EfficientNetB4
from transformers import TFConvNextModel, ConvNextFeatureExtractor

# Mapeamento de classes
MAP_CATEGORIES = {
    'A1': 0, 'L1': 1, 'P1': 2, 'G1': 3,
    'A2': 4, 'L2': 5, 'P2': 6, 'G2': 7,
    'A3': 8, 'L3': 9, 'P3': 10, 'G3': 11,
    'A4': 12, 'L4': 13, 'P4': 14, 'G4': 15,
    'A5': 16, 'L5': 17, 'P5': 18,
    'A6': 19, 'L6': 20, 'P6': 21,
    'OTHERCLASS': 22
}

def load_and_process_csv(official_split, label_column):
    """Carrega e processa o CSV com os rótulos mapeados."""
    if not os.path.exists(official_split):
        print("Arquivo de split oficial não encontrado.")
        sys.exit(1)

    df = pd.read_csv(official_split, index_col=0)
    df[label_column] = df[label_column].replace(MAP_CATEGORIES).astype('Int64')
    df.dropna(subset=[label_column], inplace=True)
    return df.reset_index(drop=True)

def preprocess_image(image_path, input_size, resample_method):
    """Pré-processamento da imagem com redimensionamento dinâmico."""
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3) # Ensure image is divisible by patch size (16 for DeiT-base-distilled-patch16-224)

    return tf.image.resize(
        image, [input_size, input_size],
        method=resample_method
    )

def build_dataset(df, data_dir, input_size, label_column, batch_size, resample_method):
    """Constrói dataset com parâmetros específicos do modelo."""
    filepaths = df['filename'].apply(lambda x: os.path.join(data_dir, x)).values
    labels = df[label_column].values.astype(np.int32)

    ds = tf.data.Dataset.from_tensor_slices((filepaths, labels))

    def _load_preprocess(path, label):
        image = preprocess_image(path, input_size, resample_method)
        return image, label

    return (ds
           .map(_load_preprocess, num_parallel_calls=tf.data.AUTOTUNE)
           .batch(batch_size)
           .prefetch(tf.data.AUTOTUNE))



class CNNFeatureExtractor(layers.Layer):
    def __init__(self, model_name='resnet50', include_top=False, pooling='avg', **kwargs):
        """
        Constrói um extrator de features baseado em CNNs pré-treinadas.

        Args:
            model_name (str): Nome do modelo CNN pré-treinado.
            include_top (bool): Se True, inclui as camadas fully connected do modelo.
            pooling (str): Tipo de pooling para a extração de features ('avg' ou 'max').
        """
        super().__init__(**kwargs)
        self.model_name = model_name
        self.include_top = include_top
        self.pooling = pooling

        # Carrega o modelo CNN pré-treinado
        if self.model_name == 'resnet50':
            self.cnn_model = ResNet50(include_top=self.include_top, weights='imagenet', pooling=self.pooling)
        elif self.model_name == 'resnet101':
            self.cnn_model = ResNet101(include_top=self.include_top, weights='imagenet', pooling=self.pooling)
        elif self.model_name == 'resnet152':
            self.cnn_model = ResNet152(include_top=self.include_top, weights='imagenet', pooling=self.pooling)
        elif self.model_name == 'efficientnetb4':
            self.cnn_model = EfficientNetB4(include_top=self.include_top, weights='imagenet', pooling=self.pooling)
        elif self.model_name.startswith('facebook/convnext'):
            # Carrega o modelo ConvNeXt usando transformers
            self.feature_extractor = ConvNextFeatureExtractor.from_pretrained(model_name)
            self.cnn_model = TFConvNextModel.from_pretrained(model_name)
            self.cnn_model.trainable = False
            #self.hidden_size = self.cnn_model.config.hidden_size
            self.hidden_size = self.cnn_model.config.hidden_sizes[-1]  # Usa o último valor de hidden_sizes

        else:
            raise ValueError(f"Modelo {model_name} não suportado.")

        # Congela o modelo para evitar treinamento
        if not self.model_name.startswith('facebook/'):
            self.cnn_model.trainable = False

    def call(self, inputs):
        """
        Extrai features das imagens de entrada usando o modelo CNN.

        Args:
            inputs (tf.Tensor): Tensor de imagens com shape (batch_size, height, width, channels).

        Returns:
            tf.Tensor: Features extraídas com shape (batch_size, feature_dim).
        """
        if self.model_name.startswith('facebook/'):
            # Pré-processamento específico para ConvNeXt
            inputs = tf.cast(inputs, tf.float32) / 255.0
            mean = tf.constant(self.feature_extractor.image_mean, shape=[1, 1, 1, 3], dtype=tf.float32)
            std = tf.constant(self.feature_extractor.image_std, shape=[1, 1, 1, 3], dtype=tf.float32)
            inputs = (inputs - mean) / std
            inputs = tf.transpose(inputs, perm=[0, 3, 1, 2])  # channels-first
            outputs = self.cnn_model(pixel_values=inputs)
            features = tf.reduce_mean(outputs.last_hidden_state, axis=1)  # Pooling global médio
        else:
            # Pré-processamento para modelos Keras (ResNet, EfficientNet)
            if self.model_name == 'resnet50':
                inputs = tf.keras.applications.resnet50.preprocess_input(inputs)
            elif self.model_name == 'resnet101':
                inputs = tf.keras.applications.resnet.preprocess_input(inputs)
            elif self.model_name == 'resnet152':
                inputs = tf.keras.applications.resnet.preprocess_input(inputs)
            elif self.model_name == 'efficientnetb4':
                inputs = tf.keras.applications.efficientnet.preprocess_input(inputs)
            features = self.cnn_model(inputs)

        # Ensure features is a Tensor, not a TensorSpec
        features = tf.convert_to_tensor(features)

        return features


    def compute_output_shape(self, input_shape):
        """
        Define a forma de saída da camada.
        """
        if self.model_name.startswith('facebook/'):
            # Para ConvNeXt, a saída é (batch_size, hidden_size)
            return (input_shape[0], self.hidden_size)
        else:
            # Para modelos Keras, a forma de saída depende do pooling
            if self.pooling == 'avg':
                return (input_shape[0], 2048)  # ResNet/EfficientNet
            else:
                return (input_shape[0], 2048)  # Outros casos

    def get_config(self):
        """Retorna a configuração da camada para serialização."""
        config = super().get_config()
        config.update({
            'model_name': self.model_name,
            'include_top': self.include_top,
            'pooling': self.pooling
        })
        return config

    @classmethod
    def from_config(cls, config):
        """Recria a instância da camada a partir da configuração."""
        return cls(**config)

def extract_features(model, dataset):
    # Separar features e labels
    features_dataset = dataset.map(lambda x, y: x)
    features = model.predict(features_dataset)
    labels = np.concatenate([y.numpy() for _, y in dataset], axis=0)
     # Verifique a forma do array de features
    print(f"Shape of features before processing: {features.shape}")
    # Remova dimensões extras se necessário
    if len(features.shape) > 2:
        # Achatar as dimensões (7, 7) em uma única dimensão de tamanho 49
        features = tf.reshape(features, (features.shape[0], -1))  # Resulta em (3722, 49)

    print(f"Shape of features after processing: {features.shape}")

    return features, labels


def build_cnn_feature_extractor(model_name='resnet50', input_size=(224, 224, 3)):
    """
    Constrói um modelo de extração de features usando uma CNN pré-treinada.

    Args:
        model_name (str): Nome do modelo CNN.
        input_size (tuple): Tamanho da imagem de entrada (height, width, channels).

    Returns:
        tf.keras.Model: Modelo de extração de features.
    """
    inputs = layers.Input(shape=input_size, dtype=tf.float32)
    features = CNNFeatureExtractor(model_name=model_name)(inputs)
    return models.Model(inputs=inputs, outputs=features)

### Uso da classe

In [None]:
# Exemplo de uso
# https://huggingface.co/facebook/convnext-large-224

def main():
    # Configurações
    MODEL_NAMES = [
    #'resnet152',
    #'resnet50',
    #'resnet101',
    #'facebook/convnext-base-384',
    #'efficientnetb4',
    #'facebook/convnext-large-224-22k-1k'
    'facebook/convnext-large-224'
]
    BATCH_SIZE = 32
    INPUT_SIZE = (224, 224, 3)  # Tamanho da imagem de entrada
    DATA_DIR = "./bruto"
    OFFICIAL_SPLIT = "/content/drive/My Drive/Mestrado 2024/Projetos/Datasets/official_splits/image_classification.csv"
    BASE_OUTPUT_DIR = "./cnn_features"
    os.makedirs(BASE_OUTPUT_DIR, exist_ok=True)

    # Carregar dados
    df = load_and_process_csv(OFFICIAL_SPLIT, "Complete agreement")
    train_df = df[df['set_type'] == 'Train']
    validation_df = df[df['set_type'] == 'Validation']
    test_df = df[df['set_type'] == 'Test']

    for model_name in MODEL_NAMES:
        print(f"\nProcessando modelo: {model_name}")

        # Construir datasets
        train_ds = build_dataset(train_df, DATA_DIR, INPUT_SIZE[0], "Complete agreement", BATCH_SIZE, tf.image.ResizeMethod.BILINEAR)
        validation_ds = build_dataset(validation_df, DATA_DIR, INPUT_SIZE[0], "Complete agreement", BATCH_SIZE, tf.image.ResizeMethod.BILINEAR)
        test_ds = build_dataset(test_df, DATA_DIR, INPUT_SIZE[0], "Complete agreement", BATCH_SIZE, tf.image.ResizeMethod.BILINEAR)

        # Define o tamanho de entrada com base no modelo
        if '384' in model_name:
            input_size = (384, 384, 3)
        else:
            input_size = (224, 224, 3)

        # Construir extrator de features
        model = build_cnn_feature_extractor(model_name=model_name, input_size=input_size)
        model.build(input_shape=(None, *input_size))

        # Extrair features
        train_features, train_labels = extract_features(model, train_ds)
        validation_features, validation_labels = extract_features(model, validation_ds)
        test_features, test_labels = extract_features(model, test_ds)

        # Salvar resultados
        output_dir = os.path.join(BASE_OUTPUT_DIR, model_name)
        os.makedirs(output_dir, exist_ok=True)

        np.save(os.path.join(output_dir, "train_features.npy"), train_features)
        np.save(os.path.join(output_dir, "train_labels.npy"), train_labels)
        np.save(os.path.join(output_dir, "validation_features.npy"), validation_features)
        np.save(os.path.join(output_dir, "validation_labels.npy"), validation_labels)
        np.save(os.path.join(output_dir, "test_features.npy"), test_features)
        np.save(os.path.join(output_dir, "test_labels.npy"), test_labels)

In [None]:
if __name__ == '__main__':
      print("Iniciando...")
      main()

Iniciando...

Processando modelo: facebook/convnext-large-224


  df[label_column] = df[label_column].replace(MAP_CATEGORIES).astype('Int64')
Some layers from the model checkpoint at facebook/convnext-large-224 were not used when initializing TFConvNextModel: ['classifier']
- This IS expected if you are initializing TFConvNextModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFConvNextModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
All the layers of TFConvNextModel were initialized from the model checkpoint at facebook/convnext-large-224.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFConvNextModel for predictions without further training.


[1m117/117[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 215ms/step
Shape of features before processing: (3722, 7, 7)
Shape of features after processing: (3722, 49)
[1m25/25[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 680ms/step
Shape of features before processing: (793, 7, 7)
Shape of features after processing: (793, 49)
[1m26/26[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 564ms/step
Shape of features before processing: (803, 7, 7)
Shape of features after processing: (803, 49)


In [None]:
from google.colab import files

extrator_name = "cnn_features"
!zip -r {extrator_name}.zip ./cnn_features  # Changed the directory to './cnn_features'
files.download(f'./{extrator_name}.zip')

  adding: cnn_features/ (stored 0%)
  adding: cnn_features/facebook/ (stored 0%)
  adding: cnn_features/facebook/convnext-large-224/ (stored 0%)
  adding: cnn_features/facebook/convnext-large-224/test_features.npy (deflated 10%)
  adding: cnn_features/facebook/convnext-large-224/train_features.npy (deflated 10%)
  adding: cnn_features/facebook/convnext-large-224/validation_features.npy (deflated 10%)
  adding: cnn_features/facebook/convnext-large-224/validation_labels.npy (deflated 81%)
  adding: cnn_features/facebook/convnext-large-224/train_labels.npy (deflated 85%)
  adding: cnn_features/facebook/convnext-large-224/test_labels.npy (deflated 81%)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>