In [1]:
import os
import kagglehub

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
#!pip install kaggle kagglehub

In [None]:

# Download latest version
path = kagglehub.dataset_download("samuelayman/backpack")


In [None]:
import os
import pandas as pd
from pathlib import Path

print(f"Caminho do dataset: {path}")
print("\nConte√∫do do diret√≥rio:")
for item in os.listdir(path):
    item_path = os.path.join(path, item)
    if os.path.isdir(item_path):
        print(f"{item}/")
    else:
        print(f"{item}")


In [None]:
import os

# --- CONFIGURA√á√ÉO ---
# 1. Defina o caminho para a pasta que cont√©m os arquivos .txt de anota√ß√£o
labels_dir = f"{path}/backpack/labels"  # Mude aqui se sua pasta tiver outro nome ou caminho

# 2. Defina o ID da classe que voc√™ quer adicionar. Para "backpack", vamos usar 0.
class_id_to_add = 1 #minha classe 0 vai ser as mochilas
# --- FIM DA CONFIGURA√á√ÉO ---


def add_class_to_annotations():
    """
    Percorre todos os arquivos .txt no diret√≥rio especificado e
    adiciona o ID da classe no in√≠cio de cada linha.
    """
    if not os.path.isdir(labels_dir):
        print(f"Erro: O diret√≥rio '{labels_dir}' n√£o foi encontrado.")
        return

    # Lista todos os arquivos no diret√≥rio
    files = os.listdir(labels_dir)
    txt_files = [f for f in files if f.endswith('.txt')]

    if not txt_files:
        print(f"Nenhum arquivo .txt encontrado em '{labels_dir}'.")
        return

    print(f"Encontrados {len(txt_files)} arquivos de anota√ß√£o. Iniciando o processo...")
    
    processed_count = 0
    for filename in txt_files:
        filepath = os.path.join(labels_dir, filename)
        
        # L√™ todas as linhas do arquivo
        with open(filepath, 'r') as f:
            lines = f.readlines()

        # Verifica se o arquivo j√° foi processado para n√£o adicionar o ID duas vezes
        if lines and lines[0].strip().startswith(str(class_id_to_add)):
            # print(f"Arquivo '{filename}' j√° parece estar no formato correto. Pulando.")
            continue
            
        new_lines = []
        for line in lines:
            # Remove espa√ßos em branco extras e verifica se a linha n√£o est√° vazia
            if line.strip():
                new_line = f"{class_id_to_add} {line.strip()}\n"
                new_lines.append(new_line)

        # Escreve o novo conte√∫do de volta no arquivo
        with open(filepath, 'w') as f:
            f.writelines(new_lines)
        
        processed_count += 1

    print(f"\nProcesso conclu√≠do!")
    print(f"{processed_count} arquivos foram atualizados com o ID de classe '{class_id_to_add}'.")


if __name__ == "__main__":
    add_class_to_annotations()

In [None]:
# Instalar bibliotecas necess√°rias
#! pip install tensorflow opencv-python matplotlib scikit-learn pillow seaborn -q

In [None]:
# Definir caminhos base para o resto do pipeline
BASE_PATH = path
IMAGE_DIR = os.path.join(BASE_PATH, 'backpack')
LABEL_DIR = os.path.join(BASE_PATH, "backpack/labels")

BASE_PATH, IMAGE_DIR, labels_dir

In [None]:
import shutil
from sklearn.model_selection import train_test_split

# Criar a estrutura de diret√≥rios
output_dir = os.path.join(BASE_PATH, "dataset")
for split in ['train', 'val']:
    os.makedirs(os.path.join(output_dir, split, 'images'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, split, 'labels'), exist_ok=True)

# Listar todas as imagens
image_files = [f for f in os.listdir(IMAGE_DIR) if f.endswith('.jpg')]

print(len(image_files))

train_files, val_files = train_test_split(image_files, test_size=0.2, random_state=42)

print(f"Total de imagens: {len(image_files)}")
print(f"Imagens de treino: {len(train_files)}")
print(f"Imagens de valida√ß√£o: {len(val_files)}")

# Fun√ß√£o para mover os arquivos
def move_files(file_list, split_name):
    for filename in file_list:
        basename = os.path.splitext(filename)[0]
        
        # Mover imagem
        shutil.copy(
            os.path.join(IMAGE_DIR, f"{basename}.jpg"),
            os.path.join(output_dir, split_name, 'images', f"{basename}.jpg")
        )
        # Mover anota√ß√£o
        shutil.copy(
            os.path.join(LABEL_DIR, f"{basename}.txt"),
            os.path.join(output_dir, split_name, 'labels', f"{basename}.txt")
        )

# Mover os arquivos para suas respectivas pastas
move_files(train_files, 'train')
move_files(val_files, 'val')

print("Dataset dividido e organizado com sucesso!")

In [None]:
class FasterRCNN(tf.keras.Model):
    def __init__(self, num_classes, num_anchors=9, pool_size=7, **kwargs):
        """
        Construtor do modelo. Inicializa todos os componentes.
        """
        super(FasterRCNN, self).__init__(**kwargs)
        self.num_classes = num_classes
        
        # Define o formato de entrada para o backbone
        input_shape = [*IMAGE_SIZE, 3]
        
        # Instanciar todos os componentes
        self.backbone = get_backbone(input_shape=input_shape)
        self.rpn = get_rpn(num_anchors)
        self.roi_align = RoiAlignLayer(pool_size)
        self.detector = get_detector_head(pool_size, num_classes)
        
        # Fun√ß√µes de perda
        self.rpn_cls_loss_fn = tf.keras.losses.BinaryCrossentropy(from_logits=False)
        self.rpn_reg_loss_fn = tf.keras.losses.Huber()
        self.det_cls_loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)
        self.det_reg_loss_fn = tf.keras.losses.Huber()
   
    def call(self, inputs, training=False):
        """
        Este √© o m√©todo que faltava. Define o forward pass.
        """
        # 1. Passar a imagem pelo backbone para extrair features
        feature_maps = self.backbone(inputs, training=training)
        
        # 2. Obter propostas da Region Proposal Network (RPN)
        rpn_cls_output, rpn_reg_output = self.rpn(feature_maps)
        
        # --- L√ìGICA DE PROPOSTAS (MUITO SIMPLIFICADA) ---
        num_proposals = 300
        dummy_proposals = tf.random.uniform(shape=[tf.shape(inputs)[0], num_proposals, 4], dtype=tf.float32)
        
        # 3. Usar RoI Align para extrair features das propostas
        roi_features = self.roi_align([feature_maps, dummy_proposals])
        
        # --- CORRE√á√ÉO AQUI ---
        # A sa√≠da do roi_align √© (batch_size * num_rois, ...).
        # Precisamos dar um reshape para (batch_size, num_rois, ...)
        # para que seja compat√≠vel com as camadas TimeDistributed do detector.
        batch_size = tf.shape(inputs)[0]
        # O √∫ltimo canal '512' vem do backbone VGG16 (block5_conv3)
        reshaped_roi_features = tf.reshape(
            roi_features, 
            [batch_size, num_proposals, self.roi_align.pool_size, self.roi_align.pool_size, 512]
        )
        
        # 4. Passar as features reshaped pelo detector final
        detector_cls_output, detector_reg_output = self.detector(reshaped_roi_features)
        
        return rpn_cls_output, rpn_reg_output, detector_cls_output, detector_reg_output
    def train_step(self, data):
        """
        Define um passo de treinamento customizado.
        """
        # Desempacota os dados. Nosso pipeline de dados retorna (imagem, caixas, classes)
        images, gt_boxes, gt_classes = data
        
        with tf.GradientTape() as tape:
            # Obter todas as sa√≠das do modelo chamando o m√©todo 'call'
            rpn_cls, rpn_reg, det_cls, det_reg = self(images, training=True)
            
            # --- L√ìGICA DE ATRIBUI√á√ÉO DE ALVOS E C√ÅLCULO DE PERDA (AINDA SIMPLIFICADA) ---
            # Como explicado antes, a l√≥gica para criar os alvos (targets) a partir
            # das caixas de verdade (gt_boxes) √© complexa. Usamos placeholders aqui
            # para garantir que o c√≥digo rode sem erros.
            
            # Alvos falsos para a RPN
            rpn_shape = tf.shape(rpn_cls)
            dummy_rpn_cls_target = tf.random.uniform(rpn_shape, maxval=2, dtype=tf.int32)
            dummy_rpn_reg_target = tf.zeros_like(rpn_reg)
            
            # Alvos falsos para o Detector
            det_shape = tf.shape(det_cls)
            dummy_det_cls_target = tf.random.uniform([det_shape[0], det_shape[1]], maxval=self.num_classes, dtype=tf.int32)
            dummy_det_reg_target = tf.zeros_like(det_reg)
            
            # C√°lculo das perdas
            rpn_cls_loss = self.rpn_cls_loss_fn(dummy_rpn_cls_target, rpn_cls)
            rpn_reg_loss = self.rpn_reg_loss_fn(dummy_rpn_reg_target, rpn_reg)
            det_cls_loss = self.det_cls_loss_fn(dummy_det_cls_target, det_cls)
            det_reg_loss = self.det_reg_loss_fn(dummy_det_reg_target, det_reg)
            
            total_loss = rpn_cls_loss + rpn_reg_loss + det_cls_loss + det_reg_loss

        # Calcular e aplicar os gradientes para atualizar os pesos do modelo
        grads = tape.gradient(total_loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
        
        return {
            "total_loss": total_loss,
            "rpn_cls_loss": rpn_cls_loss,
            "rpn_reg_loss": rpn_reg_loss,
            "det_cls_loss": det_cls_loss,
            "det_reg_loss": det_reg_loss,
        }

In [None]:
import tensorflow as tf

IMAGE_SIZE = (600, 600)
BATCH_SIZE = 2 # Comece com um valor baixo devido ao uso de mem√≥ria

def parse_yolo_annotation(label_path):
    """L√™ um arquivo de anota√ß√£o YOLO e a imagem correspondente."""
    label_path = tf.strings.strip(label_path)
    
    # Encontra o caminho da imagem a partir do caminho da anota√ß√£o
    img_path = tf.strings.regex_replace(label_path, "[/\\\\]labels[/\\\\]", "/images/")
    img_path = tf.strings.regex_replace(img_path, ".txt$", ".jpg")
    
    # Carrega e decodifica a imagem
    image = tf.io.read_file(img_path)
    image = tf.image.decode_jpeg(image, channels=3)
    original_shape = tf.cast(tf.shape(image)[:2], dtype=tf.float32)
    
    # Redimensiona a imagem
    image = tf.image.resize(image, IMAGE_SIZE)
    
    # Carrega as anota√ß√µes
    label_content = tf.io.read_file(label_path)
    lines = tf.strings.split(label_content, '\n')
    
    # Usar TensorArray para boxes e classes
    num_lines = tf.shape(lines)[0]
    boxes_ta = tf.TensorArray(tf.float32, size=0, dynamic_size=True)
    classes_ta = tf.TensorArray(tf.float32, size=0, dynamic_size=True)
    
    def cond(i, boxes_ta, classes_ta):
        return i < num_lines

    def body(i, boxes_ta, classes_ta):
        line = lines[i]
        # Ignorar linhas vazias
        def process_line():
            parts = tf.strings.split(line, ' ')
            class_id = tf.strings.to_number(parts[0], out_type=tf.float32)
            x_center = tf.strings.to_number(parts[1], out_type=tf.float32)
            y_center = tf.strings.to_number(parts[2], out_type=tf.float32)
            width = tf.strings.to_number(parts[3], out_type=tf.float32)
            height = tf.strings.to_number(parts[4], out_type=tf.float32)
            y1 = y_center - (height / 2.0)
            x1 = x_center - (width / 2.0)
            y2 = y_center + (height / 2.0)
            x2 = x_center + (width / 2.0)
            boxes_ta_new = boxes_ta.write(boxes_ta.size(), [y1, x1, y2, x2])
            classes_ta_new = classes_ta.write(classes_ta.size(), class_id)
            return boxes_ta_new, classes_ta_new

        def skip_line():
            return boxes_ta, classes_ta

        boxes_ta, classes_ta = tf.cond(
            tf.strings.length(line) > 0,
            process_line,
            skip_line
        )
        return i + 1, boxes_ta, classes_ta

    i = tf.constant(0)
    _, boxes_ta, classes_ta = tf.while_loop(
        cond, body, [i, boxes_ta, classes_ta]
    )

    boxes = boxes_ta.stack()
    classes = classes_ta.stack()
    
    return image, boxes, classes

def create_dataset(data_dir):
    """Cria um tf.data.Dataset a partir de um diret√≥rio de anota√ß√µes."""
    label_files = tf.data.Dataset.list_files(os.path.join(data_dir, "labels", "*.txt"))
    
    dataset = label_files.map(parse_yolo_annotation, num_parallel_calls=tf.data.AUTOTUNE)
    
    # O collate_fn √© necess√°rio porque as imagens t√™m n√∫meros diferentes de caixas.
    # O `padded_batch` garante que todos os tensores em um lote tenham o mesmo tamanho.
    dataset = dataset.padded_batch(
        BATCH_SIZE,
        padding_values=(tf.constant(0.0), tf.constant(0.0), tf.constant(-1.0)),
        padded_shapes=([*IMAGE_SIZE, 3], [None, 4], [None]),
        drop_remainder=True
    )
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    return dataset

# Criar os datasets de treino e valida√ß√£o
train_dir = os.path.join(output_dir, "train")
val_dir = os.path.join(output_dir, "val")

train_dataset = create_dataset(train_dir)
val_dataset = create_dataset(val_dir)

print("‚úÖ Datasets criados com sucesso!")

In [None]:
NUM_CLASSES = 1 # Apenas 'backpack'

# Instanciar e compilar o modelo
model = FasterRCNN(num_classes=NUM_CLASSES)
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
model.compile(optimizer=optimizer)

# Iniciar o treinamento
print("üöÄ Iniciando o treinamento...")
history = model.fit(
    train_dataset,
    epochs=10, # Comece com poucas √©pocas
    validation_data=val_dataset,
    verbose=1
)
print("‚úÖ Treinamento conclu√≠do!")

In [None]:
import cv2
import numpy as np

def predict(model, image_path, confidence_threshold=0.5):
    # Carregar e pr√©-processar a imagem
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image_resized = tf.image.resize(image, IMAGE_SIZE)
    image_tensor = tf.expand_dims(image_resized, axis=0)
    
    # Fazer a predi√ß√£o
    _, _, det_cls, det_reg = model(image_tensor, training=False)

    # --- L√ìGICA DE P√ìS-PROCESSAMENTO (SIMPLIFICADA) ---
    # Em uma implementa√ß√£o real, voc√™ precisa:
    # 1. Decodificar as propostas da RPN e aplicar NMS.
    # 2. Usar as propostas filtradas no detector.
    # 3. Decodificar as caixas finais do detector.
    # 4. Aplicar NMS novamente nas caixas finais.
    # -------------------------------------------------
    
    # Para este exemplo, vamos apenas olhar a sa√≠da do detector
    scores = tf.reduce_max(det_cls[0], axis=1)
    
    # Filtra as detec√ß√µes com base na confian√ßa
    selected_indices = tf.where(scores > confidence_threshold)
    
    # Pega as melhores caixas (ainda n√£o foram refinadas pela regress√£o, isso √© outra etapa)
    # A l√≥gica de decodifica√ß√£o das propostas dummy n√£o foi implementada, ent√£o as caixas n√£o ser√£o precisas
    
    print(f"Encontradas {len(selected_indices)} detec√ß√µes acima do limiar de confian√ßa.")
    
    # Desenhar as caixas (aqui seria necess√°rio decodificar as caixas)
    # Por simplicidade, vamos pular o desenho, pois as coordenadas n√£o s√£o significativas sem a l√≥gica completa.
    
    final_image = np.array(image)
    # ... c√≥digo para desenhar as caixas finais em `final_image` usando OpenCV ...
    
    return final_image


# Exemplo de uso
# Supondo que o modelo foi treinado de verdade
# result_image = predict(model, 'caminho/para/imagem_teste.jpg')
# cv2.imwrite('resultado.jpg', result_image)