In [None]:
import os
import subprocess
from subprocess import Popen, PIPE

In [None]:
def post_early_stopping_checking(metrics, patience, min_delta = 0):

    if not metrics or patience < 1:
        return -1  # Retorna -1 se a lista for vazia ou a paciência inválida

    consecutive_drops = 0  # Contador de quedas consecutivas

    for i in range(1, len(metrics)):
        # Verifica se houve um decrescimento maior que min_delta
        #print(metrics[i],metrics[i-1])
        if metrics[i] < (metrics[i - 1] + min_delta):
            consecutive_drops += 1
            #print(consecutive_drops)
            if consecutive_drops > patience:  # Verifica se excedeu a paciência
                return i  # Retorna a posição onde a paciência foi excedida
        else:
            consecutive_drops = 0  # Reseta o contador se não houver decrescimento

    return -1


In [None]:
import matplotlib.pyplot as plt

# Gráfico Épocas x Losses x Ap50
def imprime_grafico_epocas_losses_ap50(epocas,losses,valores_ap50,modelo,lr,epochs):
    # Convertendo para tipos numéricos
    epocas = [int(e) for e in epocas]
    loss = [float(l) for l in losses]
    ap50 = [float(ap) for ap in valores_ap50]

    # Encontrando os valores de destaque
    min_loss = min(loss)
    min_loss_epoca = epocas[loss.index(min_loss)]

    max_ap50 = max(ap50)
    max_ap50_epoca = epocas[ap50.index(max_ap50)]

    # Criando a figura e os eixos
    fig, ax1 = plt.subplots(figsize=(10, 6))

    # Plotando loss (eixo esquerdo)
    ax1.plot(epocas, loss, marker='o', linestyle='-', color='b', label='Loss')
    ax1.set_xlabel('Epoch', fontsize=12)
    ax1.set_ylabel('Loss', fontsize=12, color='b')
    ax1.tick_params(axis='y', labelcolor='b')
    ax1.grid(True, linestyle='--', alpha=0.7)
    # Destacando o menor loss
    ax1.scatter(min_loss_epoca, min_loss, color='red', label=f'Min Loss: {min_loss:.4f} (Epoch {min_loss_epoca})', zorder=5)

    # Criando um segundo eixo y para ap50
    ax2 = ax1.twinx()
    ax2.plot(epocas, ap50, marker='s', linestyle='--', color='g', label='AP50')
    ax2.set_ylabel('AP50', fontsize=12, color='g')
    ax2.tick_params(axis='y', labelcolor='g')
    # Destacando o maior ap50
    ax2.scatter(max_ap50_epoca, max_ap50, color='purple', label=f'Max ap50: {max_ap50:.4f} (Epoch {max_ap50_epoca})', zorder=5)
    # Verfica early stopping
    early_stopping_epoch = post_early_stopping_checking(ap50, patience=2, min_delta = 0.0001)
    if early_stopping_epoch >= 0:
        ax2.scatter(early_stopping_epoch, ap50[early_stopping_epoch], color='purple', s=200, label='Early Stopping', edgecolor='black')

#        # Adicionar uma anotação (label)
#        plt.annotate(
#            "Ponto importante",  # Texto da anotação
#            (highlight_x, highlight_y),  # Coordenadas do ponto
#            textcoords="offset points",  # Coordenadas relativas
#            xytext=(10, 10),  # Offset para o texto
#            ha='center',  # Alinhamento horizontal
#            fontsize=10,  # Tamanho da fonte
#            arrowprops=dict(facecolor='black', arrowstyle='->')  # Adiciona uma seta
#)


    # Título e legendas
    plt.title(f'Epoch x Loss e ap50 modelo:{modelo} lr:{lr} epochs:{epochs}', fontsize=16)

    # Adicionando as legendas
    fig.legend(loc='upper center', bbox_to_anchor=(0.5, 1.15), fontsize=12, ncol=2)

    fig.tight_layout()
    plt.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np

def imprime_grafico_epocas_losses_kfold(conjuntos_dados,modelo,lr,epochs):
    # Inicializando o gráfico
    plt.figure(figsize=(12, 8))
    menores_losses = []

    # Plotando os dados
    for k_fold, valores in conjuntos_dados.items():
        epocas = [int(e) for e in valores['epocas']]
        losses = [float(l) for l in valores['losses']]

        # Menor loss e época correspondente
        min_loss = min(losses)
        min_loss_epoca = epocas[losses.index(min_loss)]
        menores_losses.append(min_loss)

        # Linha do gráfico
        plt.plot(epocas, losses, label=f'K-Fold {k_fold}', linestyle='-', marker='o')
        # Destaque do menor loss
        plt.scatter(min_loss_epoca, min_loss, color='red', label=f'Min Loss K-Fold {k_fold}: {min_loss:.4f}', zorder=5)

    # Média dos menores losses
    media_menores = np.mean(menores_losses)

    # Adicionando a linha de média
    plt.axhline(media_menores, color='blue', linestyle='--', label=f'Average of Min Losses: {media_menores:.4f}', zorder=4)

    # Configurações do gráfico
    plt.title(f'Epoch x Loss x K-Fold modelo:{modelo} lr:{lr} epochs:{epochs}', fontsize=16)
    plt.xlabel('Époch', fontsize=14)
    plt.ylabel('Loss', fontsize=14)
    plt.grid(alpha=0.3)
    plt.legend(fontsize=10, loc='upper right', bbox_to_anchor=(1.0, 1.0))
    plt.tight_layout()

    # Exibindo o gráfico
    plt.show()




In [None]:
import matplotlib.pyplot as plt
import numpy as np

def imprime_grafico_epocas_ap50_kfold(conjuntos_dados,modelo,lr,epochs):
    # Inicializando o gráfico
    plt.figure(figsize=(12, 8))
    maiores_ap50 = []

    # Plotando os dados
    for k_fold, valores in conjuntos_dados.items():
        epocas = [int(e) for e in valores['epocas']]
        ap50 = [float(l) for l in valores['valores_ap50']]

        # Menor loss e época correspondente
        max_ap50 = max(ap50)
        max_ap50_epoca = epocas[ap50.index(max_ap50)]
        maiores_ap50.append(max_ap50)

        # Linha do gráfico
        plt.plot(epocas, ap50, label=f'K-Fold {k_fold}', linestyle='-', marker='o')
        # Destaque do maior ap50
        plt.scatter(max_ap50_epoca, max_ap50, color='red', label=f'Max ap50 K-Fold {k_fold}: {max_ap50:.4f}', zorder=5)
        # Verfica early stopping
        early_stopping_epoch = post_early_stopping_checking(ap50, patience=2, min_delta = 0.0001)
        if early_stopping_epoch >= 0:
            plt.scatter(early_stopping_epoch, ap50[early_stopping_epoch], color='purple', s=200, label='Early Stopping', edgecolor='black')

    # Média dos menores losses
    media_maiores = np.mean(maiores_ap50)

    # Adicionando a linha de média
    plt.axhline(media_maiores, color='blue', linestyle='--', label=f'Average of Max ap50: {media_maiores:.4f}', zorder=4)

    # Configurações do gráfico
    plt.title(f'Epoch x ap50 x K-Fold modelo:{modelo} lr:{lr} epochs:{epochs}', fontsize=16)
    plt.xlabel('Epoch', fontsize=14)
    plt.ylabel('ap50', fontsize=14)
    plt.grid(alpha=0.3)
    plt.legend(fontsize=10, loc='lower right', bbox_to_anchor=(1.0, 0.1))
    plt.tight_layout()

    # Exibindo o gráfico
    plt.show()

In [None]:
import os
import re

def imprime_graficos_treinamento(output_file, experimento):

    modelo = experimento['modelo']
    lr = experimento['lr']
    epochs = experimento['epochs']
    if os.path.exists(output_file):
        with open(output_file, 'r') as f:
            lines = f.readlines()

        epoca_ant = ''
        epocas = []
        losses = []
        valores_ap50 = []

        id_conjunto_dados = 0
        conjuntos_dados = {}

        for line in lines:

            pattern_epoca = r"Epoch:\s*\[(\d+)\]"
            epoca = re.search(pattern_epoca, line)
            pattern_ap50 = r"Average Precision\s+\(AP\)\s+@\[\s*IoU=0\.50\s*\|\s*area=\s+all\s+\|\s*maxDets=100\s*\]\s*=\s*([0-9.]+)"
            ap_50 = re.search(pattern_ap50,line)
            if epoca:
                if epoca.group(1) != epoca_ant:
                    if epoca_ant != '' and int(epoca.group(1)) < int(epoca_ant):
                            #str_conjunto_dados = str(id_conjunto_dados)
                            conjunto_dados = {'epocas':epocas, 'losses':losses,'valores_ap50':valores_ap50}
                            conjuntos_dados[id_conjunto_dados] = conjunto_dados
                            id_conjunto_dados += 1
                            epoca_ant = ''
                            epocas = []
                            losses = []
                            valores_ap50 = []
                    #print(epoca.group(1))
                    epocas.append(epoca.group(1))
                    epoca_ant = epoca.group(1)
                else:
                    pattern_loss = r'loss:\s([0-9.]+)'
                    loss = re.search(pattern_loss,epoca.string)
                    if loss:
                        #print(loss.group(1))
                        losses.append(loss.group(1))
            if ap_50:
                valor_ap50 = ap_50.group(1).split(' ')[-1]
                #print(valor_ap50)
                valores_ap50.append(valor_ap50)
        #print('epoch = ',epocas)
        #print('loss = ',losses)
        #print('ap50 = ',valores_ap50)
        if id_conjunto_dados == 0:
            imprime_grafico_epocas_losses_ap50(epocas,losses,valores_ap50,modelo,lr,epochs)
        else:
            epocas = conjuntos_dados[0]['epocas']
            losses = conjuntos_dados[0]['losses']
            valor_ap50 = conjuntos_dados[0]['valores_ap50']
            imprime_grafico_epocas_losses_ap50(epocas,losses,valores_ap50,modelo,lr,epochs)
            conjunto_dados = {'epocas':epocas, 'losses':losses,'valores_ap50':valores_ap50}
            conjuntos_dados[id_conjunto_dados] = conjunto_dados
            imprime_grafico_epocas_losses_kfold(conjuntos_dados,modelo,lr,epochs)
            imprime_grafico_epocas_ap50_kfold(conjuntos_dados,modelo,lr,epochs)


In [None]:
# Load the Drive helper and mount
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
import sys
versao_modelo='rtdetr_pytorch'

experimentos = [ {'id': '1_a' , 'modelo': 'rtdetr_r50vd_6x_coco', 'lr': '0.00125', 'epochs': '24'},
                 {'id': '1_b','modelo': 'rtdetr_r50vd_6x_coco', 'lr': '0.0001', 'epochs': '24'},
                 {'id': '1_c','modelo': 'rtdetr_r50vd_6x_coco', 'lr': '0.0001', 'epochs': '72'},
                 {'id': '1_d','modelo': 'rtdetr_r50vd_6x_coco', 'lr': '0.0001', 'epochs': '100'},
]

#logs_path = os.path.join('.','logs')
logs_path = os.path.join(os.sep,'content','drive','MyDrive','RT_DETR','logs')

for experimento in experimentos:
    print(experimento)
    id = experimento['id']
    output_file = os.path.join(logs_path, id, 'saida_'+id)
    imprime_graficos_treinamento(output_file,experimento)



In [None]:
# verificação da versão do pytorch
!pip list | grep torch

In [None]:
import os
from os.path import exists, join, basename, splitext
git_repo_url = 'https://github.com/lyuwenyu/RT-DETR.git'
project_name = splitext(basename(git_repo_url))[0]

In [None]:
%cd /content
git_repo_url = 'https://github.com/lyuwenyu/RT-DETR.git'
project_name = splitext(basename(git_repo_url))[0]
if not exists(project_name):
  !pip install torch==2.0.1
  !pip install torchvision==0.15.2
  !pip install torchaudio==2.0.2
  !pip install onnx==1.14.0
  !pip install onnxruntime==1.15.1
  !pip install pycocotools
  !pip install PyYAML
  !pip install scipy

In [None]:
print(project_name)

In [None]:
!pip list | grep torch

In [None]:
# verifica GPU
!nvidia-smi

In [None]:
import os
import subprocess
from subprocess import Popen, PIPE

In [None]:
# training on single-gpu

def avalia_modelo(config_modelo,image_set,checkpoint_path):

    arquivo_saida_2 = '/content/1_d_'+image_set+'_test.txt'
    #arquivo_saida_3 = '/content/1_d_'+image_set+'_test_onnx.txt'
    #comando_1 = f'touch {arquivo_saida_2} && touch {arquivo_saida_3}'
    comando_1 = f'touch {arquivo_saida_2}'
    comando_2 = f"""
    export CUDA_VISIBLE_DEVICES=0 &&
    python /content/drive/MyDrive/RT_DETR/RT-DETR/rtdetr_pytorch/tools/train.py -c {config_modelo} -r {checkpoint_path} --test-only | tee {arquivo_saida_2}
    """
    print(comando_2)
    #comando_3 = f"""
    #python /content/drive/MyDrive/RT_DETR/RT-DETR/rtdetr_pytorch/tools/export_onnx.py -c {config_modelo} -r {checkpoint_path} --check | tee {arquivo_saida_3}
    #"""
    !$comando_1
    !$comando_2
    #!$comando_3


In [None]:
def obtem_lista_modelos(path_configs_modelos) -> list :
    lista_modelos = []
    files = os.listdir(path_configs_modelos)
    for file in files:
        if file.startswith('rtdetr') and file.endswith('.yml'):
            lista_modelos.append(file)
    return lista_modelos

In [None]:
def copia_configs(configs_path,custom_configs_path,modelo,image_set):
    nome_modelo = modelo.split('.')[0]
    #print(nome_modelo)
    arquivo_origem = os.path.join(custom_configs_path,'optimizer.yml')
    arquivo_destino = os.path.join(configs_path,'rtdetr','include','optimizer.yml')
    comando = f'cp {arquivo_origem} {arquivo_destino}'
    #print(comando)
    subprocess.Popen(comando, shell=True)

    arquivo_origem = os.path.join(custom_configs_path,'coco_detection_'+image_set+'_test.yml')
    arquivo_destino = os.path.join(configs_path,'dataset','coco_detection.yml')
    comando = f'cp {arquivo_origem} {arquivo_destino}'
    #print(comando)
    subprocess.Popen(comando, shell=True)

    arquivo_origem = os.path.join(custom_configs_path,nome_modelo+'_'+image_set+'.yml')
    arquivo_destino = os.path.join(configs_path,'rtdetr',modelo)
    comando = f'cp {arquivo_origem} {arquivo_destino}'
    #print(comando)
    subprocess.Popen(comando, shell=True)

    return

In [None]:
from google.colab import files
import shutil

def avalia_modelos(project_path,lista_modelos,lista_image_set):
    configs_path = os.path.join(project_path,'configs')
    custom_configs_path = os.path.join(configs_path,'rtdetr','projeto_1_d')
    for modelo in lista_modelos:
        for image_set in lista_image_set:
            print('Início da avaliação do modelo ', modelo, ' para o conjunto de imagens ', image_set)
            copia_configs(configs_path=configs_path,custom_configs_path=custom_configs_path,modelo=modelo,image_set=image_set)
            config_modelo = os.path.join(configs_path,'rtdetr',modelo)
            output_path = os.path.join(os.sep,'content','output','1_d','rtdetr_r50vd_6x_coco_'+image_set)
            checkpoint_path = os.path.join('/content/drive/MyDrive/RT_DETR/RT-DETR/train/1_d',image_set,'best.pth')
            avalia_modelo(config_modelo,image_set,checkpoint_path)

            origem_copia = '/content/1_d_'+image_set+'_test.txt'
            destino_copia = os.path.join('/content/drive/MyDrive/RT_DETR/RT-DETR','eval','1_d',image_set)
            if os.path.isfile(origem_copia):
              if not os.path.exists(destino_copia):
                  os.makedirs(destino_copia)
              #files.download(origem_download)
              shutil.copy(origem_copia,destino_copia)

            origem_copia = '/content/1_d_'+image_set+'_test_onnx.txt'
            if os.path.isfile(origem_copia):
              if not os.path.exists(destino_copia):
                  os.makedirs(destino_copia)
              #files.download(origem_download)
              shutil.copy(origem_copia,destino_copia)



In [None]:
import sys
versao_modelo='rtdetr_pytorch'

project_path=os.path.join('/content/drive/MyDrive/RT_DETR/RT-DETR',versao_modelo)
sys.path.append(project_path)
path_config_modelos = os.path.join(project_path,'configs','rtdetr')
#print(path_config_modelos)
#lista_modelos = obtem_lista_modelos(path_config_modelos)
#lista_modelos = ['rtdetr_r50vd_6x_coco.yml','rtdetr_r34vd_6x_coco.yml']
lista_modelos = ['rtdetr_r50vd_6x_coco.yml']
#print(lista_modelos)
dataset_path = os.path.join(os.sep,'content','drive','MyDrive','RT_DETR','RT-DETR','dataset')
img_list_path = os.path.join(dataset_path,'img_list')
#img_list_path = '/content/drive/MyDrive/RT_DETR/dataset/img_list'
#lista_image_set = os.listdir(img_list_path)
#lista_image_set = ['0','1','2','3','4']
lista_image_set = ['0','1','2','3','4']
lista_image_set.sort()
avalia_modelos(project_path,lista_modelos,lista_image_set)

In [None]:
import matplotlib.pyplot as plt

# Dados de AP50
ap50_values = [0.6370, 0.6470, 0.6820, 0.6410, 0.6460]

# Criando o box-plot
plt.figure(figsize=(8, 6))  # Ajusta o tamanho da figura
plt.boxplot(ap50_values, labels=["RT-DETR"])
#plt.title("Distribuição dos Valores de AP50 para RT-DETR")
plt.ylabel("AP50")
plt.show()


In [None]:
def inferencia(config_modelo,image_path,checkpoint_path):

    #arquivo_saida = '/content/1_d_'+image_set+'_infer.txt'
    #arquivo_saida_3 = '/content/1_d_'+image_set+'_test_onnx.txt'
    #comando_1 = f'touch {arquivo_saida_2} && touch {arquivo_saida_3}'
    #comando_1 = f'touch {arquivo_saida}'
    comando_2 = f"""
    export CUDA_VISIBLE_DEVICES=0 &&
    python /content/drive/MyDrive/RT_DETR/RT-DETR/rtdetr_pytorch/tools/infer.py -c {config_modelo} -r {checkpoint_path} -f {image_path}  -nc 25
    """
    print(comando_2)
    #!$comando_1
    !$comando_2


In [None]:
import os
modelo = 'rtdetr_r50vd_6x_coco.yml'
versao_modelo='rtdetr_pytorch'

image_set = '3'
project_path=os.path.join('/content/drive/MyDrive/RT_DETR/RT-DETR',versao_modelo)
checkpoint_path = os.path.join('/content/drive/MyDrive/RT_DETR/RT-DETR/train/1_c',image_set,'best.pth')
configs_path = os.path.join(project_path,'configs')
config_modelo = os.path.join(configs_path,'rtdetr',modelo)
image_path = '/content/0.png'
inferencia(config_modelo,image_path,checkpoint_path)