<span style="font-size:30px;">Projeto Final Machine Learning - Tutorial de indentificação de imagens</span>

<span style="font-size:25px;">Introdução</span>

O reconhecimento de imagem é uma área da inteligência artificial que permite que computadores "vejam" e interpretem o conteúdo visual de imagens. Uma aplicação prática disso é o reconhecimento de objetos em imagens, identificação de rostos, entre muitos outros cenários. Neste tutorial, vamos explorar como realizar reconhecimento de imagem em Python, utilizando a biblioteca Pytorch.

<span style="font-size:25px;">Introdução às bibliotecas</span>

Como dito anteriormente, nesse tutorial utilizaremos a biblioteca Pytorch, uma biblioteca de aprendizado de máquina de código aberto para Python, desenvolvida principalmente pelo Facebook's AI Research lab (FAIR). Ele é usado para desenvolver aplicativos de aprendizado de máquina e deep learning.  Para utilizar a biblioteca, é primeiramente necessário importá-la, assim como outras bibliotecas que serão utilizadas nesse tutorial

In [488]:
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

from torchvision import models
import torch.nn as nn
import torch.optim as optim

<span style="font-size:25px;">Problema Exemplo</span>

Para facilitar o entendimento, vamos considerar a seguinte situação como um exemplo. Suponha que um fazendeiro quer descobrir se suas plantas estão doentes. Como a identificação das doenças pode ser difícil se realizada a olho nu, ele decidiu implementar um modelo que recebe imagens das plantas e as classificas como saudáveis ou doentes. O conjunto de imagens utilizado para esse exemplo foi obtido no Kaggle, segundo o link https://www.kaggle.com/datasets/csafrit2/plant-leaves-for-image-classification/

<span style="font-size:25px;">Preparação de Dados</span>

O primeiro passo na montagem do modelo é o carregamento dos dados. Normalmente o conjunto de dados deveria ser dividido em conjuntos de teste e treino. Entretanto, como os dados citados acima já vem separados, usaremos os dados já separados.

In [489]:
#Caminho para o diretório do conjunto de treino
diretorio_treino = 'Plants_2/train'
#Caminho para o diretório do conjunto de teste
diretorio_teste = 'Plants_2/test'

Como o pytorch trabalha com tensores, a estrutura de dados básica usada por esses frameworks para representar dados, é necessário converter as imagens para tensores.

In [490]:

largura = 250
altura = 250

#esse transformador ira alterar o tamanho das imagens e então convertelas para tensores
transformador = transforms.Compose([
    transforms.Resize((largura, altura)), 
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Transformação do dataset e leitura de dados
dataset_treino = ImageFolder(root=diretorio_treino, transform=transformador)
dataset_teste = ImageFolder(root=diretorio_teste, transform=transformador)


Nessa aplicação estamos mudando a largura e a altura das imagens, para economizar memoria (uma vez que as fotos já vêm na mesma dimensão), e normalizando. Foi aplicada uma normalização pois foi utilizado um modelo já treinado para a classificação. 

Para a leitura de dados, foi utilizado o comando ImageFolder, uma função do pytorch que facilita o carregamento de  conjuntos de dados de imagens quando os dados seguem uma estrutura específica de pastas. Como no dataset utilizado cada subpasta contém imagens de uma classe específica, essa função pode ser utilizada na leitura dos dados. Ele também atribui classes automaticamente segundo as pastas. Podemos verificar as classes com o comando abaixo. 

In [491]:
print(dataset_treino.classes)
dataset_treino[0]

['Alstonia Scholaris diseased (P2a)', 'Alstonia Scholaris healthy (P2b)', 'Arjun diseased (P1a)', 'Arjun healthy (P1b)', 'Bael diseased (P4b)', 'Basil healthy (P8)', 'Chinar diseased (P11b)', 'Chinar healthy (P11a)', 'Gauva diseased (P3b)', 'Gauva healthy (P3a)', 'Jamun diseased (P5b)', 'Jamun healthy (P5a)', 'Jatropha diseased (P6b)', 'Jatropha healthy (P6a)', 'Lemon diseased (P10b)', 'Lemon healthy (P10a)', 'Mango diseased (P0b)', 'Mango healthy (P0a)', 'Pomegranate diseased (P9b)', 'Pomegranate healthy (P9a)', 'Pongamia Pinnata diseased (P7b)', 'Pongamia Pinnata healthy (P7a)']


(tensor([[[-0.6392, -0.6314, -0.6235,  ..., -0.7020, -0.7098, -0.7176],
          [-0.6157, -0.6157, -0.6078,  ..., -0.7098, -0.7098, -0.7098],
          [-0.6235, -0.6157, -0.6078,  ..., -0.7020, -0.7020, -0.7020],
          ...,
          [-0.7804, -0.7725, -0.7725,  ..., -0.8118, -0.8118, -0.8118],
          [-0.7804, -0.7725, -0.7647,  ..., -0.8196, -0.8196, -0.8118],
          [-0.7725, -0.7647, -0.7569,  ..., -0.8196, -0.8196, -0.8196]],
 
         [[-0.5608, -0.5529, -0.5451,  ..., -0.6000, -0.6078, -0.6157],
          [-0.5373, -0.5373, -0.5294,  ..., -0.6078, -0.6078, -0.6078],
          [-0.5451, -0.5373, -0.5294,  ..., -0.6000, -0.6078, -0.6078],
          ...,
          [-0.7804, -0.7725, -0.7725,  ..., -0.8118, -0.8118, -0.8196],
          [-0.7804, -0.7725, -0.7647,  ..., -0.8196, -0.8118, -0.8196],
          [-0.7725, -0.7647, -0.7647,  ..., -0.8275, -0.8275, -0.8275]],
 
         [[-0.4824, -0.4745, -0.4667,  ..., -0.4667, -0.4745, -0.4824],
          [-0.4588, -0.4588,

Para simplificar a análise, vamos agrupar as diferentes plantas em grupos de saldáveis ou doentes. Essas são as classes que serão agrupadas.


In [492]:

# Selecionar classes para serem agrupadas
doentes = ['Alstonia Scholaris diseased (P2a)', 'Arjun diseased (P1a)', 'Bael diseased (P4b)',
           'Chinar diseased (P11b)', 'Gauva diseased (P3b)', 'Jamun diseased (P5b)', 'Jatropha diseased (P6b)',
           'Lemon diseased (P10b)', 'Mango diseased (P0b)', 'Pomegranate diseased (P9b)',
           'Pongamia Pinnata diseased (P7b)']

saudaveis = ['Alstonia Scholaris healthy (P2b)', 'Arjun healthy (P1b)', 'Basil healthy (P8)', 'Chinar healthy (P11a)',
              'Gauva healthy (P3a)', 'Jamun healthy (P5a)', 'Jatropha healthy (P6a)', 'Lemon healthy (P10a)', 'Mango healthy (P0a)',
                'Pomegranate healthy (P9a)', 'Pongamia Pinnata healthy (P7a)']

Agrupando as classes

Treino Doentes

In [493]:
#Para evitar acidentes, vou clonar o conjunto de treino e teste
treino = dataset_treino

#nome da nova classe
nova_classe_doentes = 'Doentes'

df = pd.DataFrame(treino.imgs, columns=['caminho', 'rotulo'])

#verifica se classe ja esta no dataset
if nova_classe_doentes not in treino.classes:
    treino.classes.append(nova_classe_doentes)

nova_classe_idx = len(treino.classes)


novas_amostras = []

# Itera pelas classes na lista e adiciona os elementos à lista criada anteriormente
for caminho, rotulo in treino.imgs:
    nome_classe = treino.classes[rotulo]
    novo_rotulo = nova_classe_idx if nome_classe in doentes else rotulo
    novas_amostras.append((caminho, novo_rotulo))

# Atualiza as amostras no conjunto de treino
treino.imgs = novas_amostras

# Atualiza o DataFrame
df['rotulo'] = [rotulo for (_, rotulo) in treino.imgs]


numero_amostras_doentes_treino = df[df['rotulo'] == nova_classe_idx].shape[0]
print(f'O número de amostras na classe "Doentes" é: {numero_amostras_doentes_treino}')


O número de amostras na classe "Doentes" é: 2111


Treino Saudáveis

In [494]:
#nome da nova classe
nova_classe_saudaveis = 'Saudaveis'


#verifica se classe ja esta no dataset
if nova_classe_saudaveis not in treino.classes:
    treino.classes.append(nova_classe_saudaveis)

nova_classe_idx = len(treino.classes)


novas_amostras = []

# Itera pelas classes na lista e adiciona os elementos à lista criada anteriormente
for caminho, rotulo in treino.imgs:
    nome_classe = treino.classes[rotulo]
    novo_rotulo = nova_classe_idx if nome_classe in saudaveis else rotulo
    novas_amostras.append((caminho, novo_rotulo))

# Atualiza as amostras no conjunto de treino
treino.imgs = novas_amostras

# Atualiza o DataFrame
df['rotulo'] = [rotulo for (_, rotulo) in treino.imgs]


numero_amostras_saudaveis_treino = df[df['rotulo'] == nova_classe_idx].shape[0]
print(f'O número de amostras na classe "Saudaveis" é: {numero_amostras_saudaveis_treino}')

O número de amostras na classe "Saudaveis" é: 2163


Para verificar se todos os dados foram unidos, podemos verificar quantos dados haviam em cada classe e quantos hão na nova classe

In [495]:
# Numero de amostras originais
numero_amostras_classes_originais = df.shape[0]

print(f'O número de amostras nas classes originais é: {numero_amostras_classes_originais}' )
print(f'O número de amostras saudaveis é: {numero_amostras_saudaveis_treino}' )
print(f'O número de amostras doentes é: {numero_amostras_doentes_treino}' )
print(f'O número de amostras somadas nas novas classes são: {numero_amostras_doentes_treino+numero_amostras_saudaveis_treino}')

O número de amostras nas classes originais é: 4274
O número de amostras saudaveis é: 2163
O número de amostras doentes é: 2111
O número de amostras somadas nas novas classes são: 4274


Podemos então aplicar as mesmas transformações no conjunto de testes.

Teste Doentes

In [496]:
#Para evitar acidentes, vou clonar o conjunto de treino e teste
teste = dataset_teste

#nome da nova classe
nova_classe_doentes = 'Doentes'

df = pd.DataFrame(teste.imgs, columns=['caminho', 'rotulo'])

#verifica se classe ja esta no dataset
if nova_classe_doentes not in teste.classes:
    teste.classes.append(nova_classe_doentes)

nova_classe_idx = len(teste.classes)


novas_amostras = []

# Itera pelas classes na lista e adiciona os elementos à lista criada anteriormente
for caminho, rotulo in teste.imgs:
    nome_classe = teste.classes[rotulo]
    novo_rotulo = nova_classe_idx if nome_classe in doentes else rotulo
    novas_amostras.append((caminho, novo_rotulo))

# Atualiza as amostras no conjunto de treino
teste.imgs = novas_amostras

# Atualiza o DataFrame
df['rotulo'] = [rotulo for (_, rotulo) in teste.imgs]


numero_amostras_doentes_teste = df[df['rotulo'] == nova_classe_idx].shape[0]
print(f'O número de amostras na classe "Doentes" é: {numero_amostras_doentes_teste}')

O número de amostras na classe "Doentes" é: 55


Teste Saudáveis

In [497]:
#nome da nova classe
nova_classe_saudaveis = 'Saudaveis'


#verifica se classe ja esta no dataset
if nova_classe_saudaveis not in teste.classes:
    teste.classes.append(nova_classe_saudaveis)

nova_classe_idx = len(teste.classes)


novas_amostras = []

# Itera pelas classes na lista e adiciona os elementos à lista criada anteriormente
for caminho, rotulo in teste.imgs:
    nome_classe = teste.classes[rotulo]
    novo_rotulo = nova_classe_idx if nome_classe in saudaveis else rotulo
    novas_amostras.append((caminho, novo_rotulo))

# Atualiza as amostras no conjunto de treino
teste.imgs = novas_amostras

# Atualiza o DataFrame
df['rotulo'] = [rotulo for (_, rotulo) in teste.imgs]


numero_amostras_saudaveis_teste = df[df['rotulo'] == nova_classe_idx].shape[0]
print(f'O número de amostras na classe "Saudaveis" é: {numero_amostras_saudaveis_teste}')

O número de amostras na classe "Saudaveis" é: 55


Novamente verificando se todos os dados foram unidos corretamente

In [498]:
# Numero de amostras originais
numero_amostras_classes_originais = df.shape[0]

print(f'O número de amostras nas classes originais é: {numero_amostras_classes_originais}' )
print(f'O número de amostras saudaveis é: {numero_amostras_saudaveis_teste}' )
print(f'O número de amostras doentes é: {numero_amostras_doentes_teste}' )
print(f'O número de amostras somadas nas novas classes são: {numero_amostras_doentes_teste+numero_amostras_saudaveis_teste}')

O número de amostras nas classes originais é: 110
O número de amostras saudaveis é: 55
O número de amostras doentes é: 55
O número de amostras somadas nas novas classes são: 110


Verificando então como ficou o conjunto de treino:

In [499]:
classes_no_dataframe = df['rotulo'].unique()
print(classes_no_dataframe)
treino.classes

[23 24]


['Alstonia Scholaris diseased (P2a)',
 'Alstonia Scholaris healthy (P2b)',
 'Arjun diseased (P1a)',
 'Arjun healthy (P1b)',
 'Bael diseased (P4b)',
 'Basil healthy (P8)',
 'Chinar diseased (P11b)',
 'Chinar healthy (P11a)',
 'Gauva diseased (P3b)',
 'Gauva healthy (P3a)',
 'Jamun diseased (P5b)',
 'Jamun healthy (P5a)',
 'Jatropha diseased (P6b)',
 'Jatropha healthy (P6a)',
 'Lemon diseased (P10b)',
 'Lemon healthy (P10a)',
 'Mango diseased (P0b)',
 'Mango healthy (P0a)',
 'Pomegranate diseased (P9b)',
 'Pomegranate healthy (P9a)',
 'Pongamia Pinnata diseased (P7b)',
 'Pongamia Pinnata healthy (P7a)',
 'Doentes',
 'Saudaveis']


Ou seja, somente as classes 'Doentes' e 'Saudáveis' possuem dados.

<span style="font-size:25px;">Treinamento do modelo de Classificação</span>

O PyTorch fornece alguns modelos pré-treinados, como ResNet, VGG, entre outros. A vantagem de se utilizar um modelo pré-treinado são diversas, como: a menor capacidade computacional necessária para treinar um modelo; a menor quantidade de dados necessários; a atuação dos modelos pré-treinados como um tipo de regularização; entre outras. Nessa aplicação, usaremos o ResNet.

In [500]:
#modelo escolhido
modelo = models.resnet18(pretrained=True)

Como esse modelo foi pre-treinado, podemos alterar sua última camada para que o modelo tenha o número correto de saídas. Ou seja,  podemos adaptar o modelo para trabalhar com nosso conjunto de dados.

In [501]:
modelo.fc = nn.Linear(modelo.fc.in_features, len(treino.classes)) #camada fully connected

Para definirmos o que queremos que o modelo faça, temos que definir sua função de perda. A função de perda quantifica quão bem o modelo está performando, sendo o objetivo do treinamento minimizar essa função de perda. Dependendo de qual a função de perda escolhida, o objetivo do modelo durante seu treino também muda. Como queremos que o modelo realize uma classificação com somente duas classes, podemos utilizar o HingeLoss.

In [502]:
# Definição da Hinge Loss
criterio = nn.MultiMarginLoss(margin=1.0)

O parâmetro 'margin' do HingeLoss representa a relação entre a pontuação da classe que o modelo acha que é correta com a que ele acha que é incorreta. Caso essa relação seja menor que 1, sua perda não será mais nula. Com a relação já definida, é necessário implementar um otimizador, pois ele é o responsável por ajustar os pesos do modelo com base na função de perda. O otimizador escolhido foi o Adam, mas a escolha depende da realização de testes, e varia para cada modelo.

In [503]:
#definição do otimizador
otimizador = optim.Adam(modelo.parameters(), lr=0.001)

<span style="font-size:25px;">Aplicação e Avaliação do Modelo</span>

Com os parâmetros já definidos, podemos então implementar o modelo. No pytorch, a implementação do modelo se resume a uma iteração do modelo sobre os dados de treino. O modelo classifica cada imagem do conjunto, verifica a perda e com base nisso recalcula os pesos, repetindo o processo pelo número de repetições definidas (epochs). Como meu computador não é muito moderno, decidi colocar uns checkpoints onde o código pode quebrar caso o tempo ultrapasse o limite definido e os valores do modelo estarão salvos. Também é possivel diminuir o número de epochs, o tamanho das imagens, a margin, entre outros. Entretanto, isso irá diminuir a acurácia do modelo ainda mais. Outra solução é o uso da GPU.

In [504]:
import os
import time
import torch
from torch.utils.data import DataLoader

loader_treino = DataLoader(treino, batch_size=64, shuffle=True) #iterador que escolhe 64 fotos aleatoriamente a cada epoch
loader_teste = DataLoader(teste, batch_size=64, shuffle=True) #iterador que escolhe 64 fotos aleatoriamente a cada epoch

repeticoes = 1
save_interval = 1
tempo_max = 7 * 60 * 60  # 7 horas
start_time = time.time()
tempo_limite = 0

for epoch in range(repeticoes):
    for imagens, rotulos in loader_treino:
        if tempo_limite == 0:
            elapsed_time = time.time() - start_time
            otimizador.zero_grad()
            saidas = modelo(imagens)
            perda = criterio(saidas, rotulos)
            perda.backward()
            otimizador.step()

            if epoch % save_interval == 0:
                checkpoint = {
                    'epoch': epoch,
                    'model_state_dict': modelo.state_dict(),
                    'optimizer_state_dict': otimizador.state_dict(),
                }
                torch.save(checkpoint, 'checkpoint.pth')
                print('salvo')
                if os.path.exists('checkpoint.pth'):
                    print(f"Checkpoint salvo com sucesso: {'checkpoint.pth'}")
                else:
                    print(f"Falha ao salvar o checkpoint: {'checkpoint.pth'}")

            if elapsed_time > tempo_max:
                print(f"Treinamento interrompido. Tempo máximo atingido ({tempo_max} segundos).")

                # Salvar último checkpoint antes de sair
                checkpoint = {
                    'epoch': epoch,
                    'model_state_dict': modelo.state_dict(),
                    'optimizer_state_dict': otimizador.state_dict(),
                    # Adicione mais informações relevantes se necessário
                }
                torch.save(checkpoint, 'final_checkpoint.pth')

                break
        # Avaliação no conjunto de validação
        # Adicione o código de avaliação aqui se necessário

        modelo.train()



salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com sucesso: checkpoint.pth
salvo
Checkpoint salvo com suce

Caso o modelo teve seu treinamento interrompido antes dele terminar, temos que pegar seus parâmetros do checkpoint para poder avaliar seu rendimento. 

In [507]:

from sklearn.metrics import accuracy_score

checkpoint_path = 'checkpoint.pth'
checkpoint = torch.load(checkpoint_path)
modelo.load_state_dict(checkpoint['model_state_dict'])
modelo.eval()


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

Podemos então media a acurácia do modelo. Caso ele tenha tido seu treinamento interrompido, sua acurácia provavelmente não será ideal.

In [508]:
true_labels = []
predicted_labels = []

with torch.no_grad():
    for imagens_val, rotulos_val in loader_teste:
        # Passar os dados pelo modelo
        saidas_val = modelo(imagens_val)

        # Calcular as previsões
        _, previsoes = torch.max(saidas_val, 1)

        # Armazenar rótulos verdadeiros e previsões
        true_labels.extend(rotulos_val.numpy())
        predicted_labels.extend(previsoes.numpy())

# Converter listas em arrays numpy para uso posterior
true_labels = np.array(true_labels)
predicted_labels = np.array(predicted_labels)

# Calcular e imprimir a acurácia
acuracia = accuracy_score(true_labels, predicted_labels)
print(f'Acurácia: {acuracia}')

Acurácia: 0.8


Como é possivel observar, a acurácia obtida foi baixa, menor do que se você chutasse sempre que a planta é doente. Para melhorar isso, seria possivel aumentar o número de repetições, aumentar o tamanho das imagens, deixar o modelo terminar de treinar, no meu caso, entre outros.

OBS. Quando o codigo terminou de rodar 1 epoch, a acurácia foi de 0.8. Um valor bom e bem maior do que o coneseguido anteriormente,