In [42]:
import os
import torch
from torchvision import datasets, transforms, models
from torchvision.utils import save_image
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split
import torch.nn as nn
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report, precision_score, recall_score
from torch.nn.functional import relu

In [3]:
unlabeled_set_size = 0.8
labeled_train_absolute_set_size = 0.1
labeled_test_absolute_set_size = round(1 - (labeled_train_absolute_set_size + unlabeled_set_size), 2)

labeled_train_relative_set_size = round((labeled_train_absolute_set_size / (1 - unlabeled_set_size)), 2)
labeled_test_relative_set_size = 1 - labeled_train_relative_set_size

In [4]:
base_dir = os.path.join('Plant_leave_diseases_dataset', 'original')
os.makedirs('best_models', exist_ok=True)
model_save_path = \
    os.path.join('best_models', f'h1_{int(unlabeled_set_size*100)}-{int(labeled_train_absolute_set_size*100)}-{int(labeled_test_absolute_set_size*100)}_UNet.pth')

## U-Net Module

In [5]:
class Encoder(nn.Module):
  def __init__(self):
    super(Encoder, self).__init__()
    # Encoder
    # In the encoder, convolutional layers with the Conv2d function are used to extract features from the input image. 
    # Each block in the encoder consists of two convolutional layers followed by a max-pooling layer, with the exception of the last block which does not include a max-pooling layer.
    # -------
    # input: 572x572x3
    self.e11 = nn.Conv2d(3, 64, kernel_size=3, padding=1) # output: 570x570x64
    self.e12 = nn.Conv2d(64, 64, kernel_size=3, padding=1) # output: 568x568x64
    self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 284x284x64

    # input: 284x284x64
    self.e21 = nn.Conv2d(64, 128, kernel_size=3, padding=1) # output: 282x282x128
    self.e22 = nn.Conv2d(128, 128, kernel_size=3, padding=1) # output: 280x280x128
    self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 140x140x128

    # input: 140x140x128
    self.e31 = nn.Conv2d(128, 256, kernel_size=3, padding=1) # output: 138x138x256
    self.e32 = nn.Conv2d(256, 256, kernel_size=3, padding=1) # output: 136x136x256
    self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 68x68x256

    # input: 68x68x256
    self.e41 = nn.Conv2d(256, 512, kernel_size=3, padding=1) # output: 66x66x512
    self.e42 = nn.Conv2d(512, 512, kernel_size=3, padding=1) # output: 64x64x512
    self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2) # output: 32x32x512

    # input: 32x32x512
    self.e51 = nn.Conv2d(512, 1024, kernel_size=3, padding=1) # output: 30x30x1024
    self.e52 = nn.Conv2d(1024, 1024, kernel_size=3, padding=1) # output: 28x28x1024


  def forward(self, x):
    # Encoder
    xe11 = relu(self.e11(x))
    xe12 = relu(self.e12(xe11))
    xp1 = self.pool1(xe12)

    xe21 = relu(self.e21(xp1))
    xe22 = relu(self.e22(xe21))
    xp2 = self.pool2(xe22)

    xe31 = relu(self.e31(xp2))
    xe32 = relu(self.e32(xe31))
    xp3 = self.pool3(xe32)

    xe41 = relu(self.e41(xp3))
    xe42 = relu(self.e42(xe41))
    xp4 = self.pool4(xe42)

    xe51 = relu(self.e51(xp4))
    xe52 = relu(self.e52(xe51))

    return xe12, xe22, xe32, xe42, xe52


In [6]:
class Decoder(nn.Module):
  def __init__(self):
    super(Decoder, self).__init__()
    # Decoder
    self.upconv1 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
    self.d11 = nn.Conv2d(1024, 512, kernel_size=3, padding=1)
    self.d12 = nn.Conv2d(512, 512, kernel_size=3, padding=1)

    self.upconv2 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
    self.d21 = nn.Conv2d(512, 256, kernel_size=3, padding=1)
    self.d22 = nn.Conv2d(256, 256, kernel_size=3, padding=1)

    self.upconv3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
    self.d31 = nn.Conv2d(256, 128, kernel_size=3, padding=1)
    self.d32 = nn.Conv2d(128, 128, kernel_size=3, padding=1)

    self.upconv4 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
    self.d41 = nn.Conv2d(128, 64, kernel_size=3, padding=1)
    self.d42 = nn.Conv2d(64, 64, kernel_size=3, padding=1)

    # Output layer
    self.outconv = nn.Conv2d(64, 3, kernel_size=1)  # 3 channels


  def forward(self, xe12, xe22, xe32, xe42, xe52):
    # Decoder
    xu1 = self.upconv1(xe52)
    xu11 = torch.cat([xu1, xe42], dim=1)
    xd11 = relu(self.d11(xu11))
    xd12 = relu(self.d12(xd11))

    xu2 = self.upconv2(xd12)
    xu22 = torch.cat([xu2, xe32], dim=1)
    xd21 = relu(self.d21(xu22))
    xd22 = relu(self.d22(xd21))

    xu3 = self.upconv3(xd22)
    xu33 = torch.cat([xu3, xe22], dim=1)
    xd31 = relu(self.d31(xu33))
    xd32 = relu(self.d32(xd31))

    xu4 = self.upconv4(xd32)
    xu44 = torch.cat([xu4, xe12], dim=1)
    xd41 = relu(self.d41(xu44))
    xd42 = relu(self.d42(xd41))

    # Output layer
    out = self.outconv(xd42)

    return out

In [7]:
class UNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()


    def forward(self, x):
        [xe12, xe22, xe32, xe42, xe52] = self.encoder(x)
        decoded = self.decoder(xe12, xe22, xe32, xe42, xe52)
        return decoded

## Datasets and Dataloaders

In [8]:
data_transforms = {
    'all': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5])
    ]),
}

In [9]:
full_dataset = datasets.ImageFolder(base_dir, transform=data_transforms['all'])

In [10]:
indices = list(range(len(full_dataset)))

# Get the directory paths of images
image_paths = [sample[0] for sample in full_dataset.samples]

labels = [os.path.split(os.path.dirname(path))[-1] for path in image_paths]

In [11]:
#Obtenemos el 80% de los datos 
_, train_val_indices = train_test_split(indices, test_size=unlabeled_set_size, stratify=labels, random_state=42)


train_dataset = Subset(full_dataset, train_val_indices)
val_dataset = Subset(full_dataset, train_val_indices)

In [20]:
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False, num_workers=4)

print(f"Número de imágenes en el conjunto de entrenamiento: {len(train_loader.dataset)}")
print(f"Número de imágenes en el conjunto de validación: {len(val_loader.dataset)}")

Número de imágenes en el conjunto de entrenamiento: 49189
Número de imágenes en el conjunto de validación: 49189


## Function definitions

In [16]:
def plot_confusion_matrix(cm, class_names):
    figure = plt.figure(figsize=(8, 8))
    sns.heatmap(cm, annot=True, cmap=plt.cm.Blues, fmt='g', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Predicted label')
    plt.ylabel('True label')
    plt.title('Confusion Matrix')
    return figure

In [23]:
def train_model(model, criterion, optimizer, train_loader, val_loader, device, num_epochs=10, patience=3):
    best_val_loss = float('inf')
    epochs_no_improve = 0
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        
        for inputs, _ in tqdm(train_loader):
            inputs = inputs.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, inputs)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        train_loss = running_loss / len(train_loader)
        
        model.eval()
        val_loss = 0.0
        
        with torch.no_grad():
            for inputs, _ in val_loader:
                inputs = inputs.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, inputs)
                val_loss += loss.item()
        
        val_loss = val_loss / len(val_loader)
        
        print(f'Epoch {epoch+1}/{num_epochs}')
        print(f'Train Loss: {train_loss:.4f}')
        print(f'Val Loss: {val_loss:.4f}')
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_no_improve = 0
            torch.save(model.state_dict(), model_save_path)
        else:
            epochs_no_improve += 1
        if epochs_no_improve >= patience:
            print("Early stopping triggered!")
            break
    
    return model

In [51]:
def evaluate_model(model_path, dataloader, device):
    model = UNet()
    model.load_state_dict(torch.load(model_path, map_location=device))
    model = model.to(device)
    
    model.eval()
    
    with torch.no_grad():
        for inputs, _ in dataloader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            inputs[0] = inputs[0] * 0.5 + 0.5
            save_image(inputs[0], 'input.jpg')
            outputs[0] = outputs[0] * 0.5 + 0.5
            save_image(outputs[0], 'output.jpg')
            break

## U-Net Autoencoder

In [24]:
# Set up of the device
if torch.backends.mps.is_available():
    device = torch.device("mps")#tqm
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

print(f"Using device: {device}")

model = UNet()
model.to(device)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

Using device: cuda


In [None]:
trained_model = train_model(model, criterion, optimizer, train_loader, val_loader, device, num_epochs=30, patience=5)

In [50]:
image_paths = [full_dataset.samples[index] for index in train_val_indices]

for path in image_paths:
    print(path)
    break

('Plant_leave_diseases_dataset\\original\\Tomato___healthy\\image (106).JPG', 38)


In [52]:
evaluate_model(model_save_path, val_loader, device=device)