In [19]:
import os, cv2, io
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import torch, torchvision

from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence
from torchvision.ops import box_convert
from cv2 import imread, imshow

In [20]:
torch.set_default_device('cpu')
torch.default_generator = torch.Generator(device='cpu')

In [21]:
class TomatoSet(Dataset):
    def __init__(self, annotation_dir, img_dir):
        self.annotation_dir = annotation_dir
        self.img_dir = img_dir
        self.all_images, self.all_labels, self.all_bboxes = self.get_data()
    
    def __len__(self):
        return len(self.all_images)
    
    def __getitem__(self, i):
        image, label, bbox = self.all_images[i], self.all_labels[i], self.all_bboxes[i]
        return image, label.long(), bbox

    def get_data(self):
        all_images = []
        all_labels = []
        all_bboxes = []

        # get images
        for img_path in os.listdir(self.img_dir):
            img = imread(self.img_dir + img_path)
            img_tensor = torch.from_numpy(img).permute(2, 0, 1)  # Bring channels to front
            all_images.append(img_tensor)
        
        for path in os.listdir(self.annotation_dir):
            f = open(self.annotation_dir + path)
            temp_labels = []
            temp_bboxes = []
            for txt in f.readlines():
                arr = txt.split(' ')
                label = int(arr[0])
                bbox = 640 * np.array(arr[1:], dtype=np.float32)
                temp_labels.append(label)
                temp_bboxes.append(bbox)
            
            if not temp_bboxes:
                temp_bboxes = torch.Tensor([[-1, -1, -1, -1]])

            temp_bboxes = torch.Tensor(temp_bboxes)
            temp_labels = torch.Tensor(temp_labels)
            temp_bboxes = box_convert(temp_bboxes, 'cxcywh', 'xyxy')

            all_labels.append(temp_labels)
            all_bboxes.append(temp_bboxes)
        
        all_labels_padded = pad_sequence(all_labels, True, -1)
        all_bboxes_padded = pad_sequence(all_bboxes, True, -1)

        return all_images, all_labels_padded, all_bboxes_padded

In [22]:
data = TomatoSet('train/labels/', 'train/images/')

In [6]:
import torchvision.transforms.functional as F
from torchvision.utils import draw_bounding_boxes

plt.rcParams["savefig.bbox"] = 'tight'

def show(imgs):
    if not isinstance(imgs, list):
        imgs = [imgs]
    fig, axs = plt.subplots(ncols=len(imgs), squeeze=False)
    fig.dpi = 150
    for i, img in enumerate(imgs):
        img = img.detach()
        img = F.to_pil_image(img)
        axs[0, i].imshow(np.asarray(img))
        axs[0, i].set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])

label_str = ['Bacterial Spot', 'Early_Blight', 'Healthy', 'Late_blight', 'Leaf Mold', 'Target_Spot', 'black spot', '']

def show_with_labels(idx_list):
    imgs = data[idx_list]
    if not isinstance(idx_list, list):
        imgs = [imgs]
    
    t = []
    for img in imgs:
        tmp_labels = [label_str[int(x)] for x in img[1]]
        t.append(draw_bounding_boxes(image=img[0], boxes=img[2], labels=tmp_labels, font='arial', font_size=12))

    show(t)

In [7]:
res_net = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights.IMAGENET1K_V2)

from torch import nn

# First 4 blocks are stage 1, we then take 4 blocks.
layers = list(res_net.children())[:8]
feature_map_layers = nn.Sequential(*layers)

for name, param in feature_map_layers.named_parameters():
    param.requires_grad = True

imgs = feature_map_layers.forward(torch.Tensor(np.array(([data[0][0] / 255.]))).to('cuda'))

In [8]:
# Custom model for tomato disease detection
class TomatoDiseaseModel(nn.Module):
    def __init__(self, num_classes):
        super(TomatoDiseaseModel, self).__init__()
        # Using the pretrained ResNet50 as a feature extractor
        self.feature_extractor = feature_map_layers
        # Add a global average pooling layer and a fully connected layer
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(2048, num_classes)
    
    def forward(self, x):
        # Extract features using ResNet
        x = self.feature_extractor(x)
        # Apply global average pooling
        x = self.avgpool(x)
        # Flatten the output
        x = torch.flatten(x, 1)
        # Pass through the fully connected layer
        x = self.fc(x)
        return x

# Define the model, number of classes corresponds to your labels (8)
num_classes = len(label_str) - 1  # Ignore the last empty string
model = TomatoDiseaseModel(num_classes).to('cuda')

In [17]:
# Hyperparameters
learning_rate = 0.001
num_epochs = 20

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
import torch

def train_model(model, data_loader, optimizer, criterion, num_epochs=20):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)  # Move model to the appropriate device
    model.train()  # Set the model to training mode

    for epoch in range(num_epochs):
        running_loss = 0.0
        
        for images, labels, _ in data_loader:
            images = images.to(device) / 255.0  # Normalize and move images to the device
            labels = labels[:, 0].to(device)  # Move labels to the device
            
            # Filter out invalid labels (-1 represents padding)
            valid_indices = labels != -1
            if valid_indices.sum().item() == 0:
                continue  # Skip this batch if no valid labels
            
            valid_images = images[valid_indices]
            valid_labels = labels[valid_indices]
            
            # Forward pass
            outputs = model(valid_images)  # Forward pass with valid images
            
            # Compute loss only for valid labels
            loss = criterion(outputs, valid_labels.long())
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(data_loader)}")



In [18]:
# Example of data loading using DataLoader
from torch.utils.data import DataLoader

# Assuming you want a batch size of 4
train_loader = DataLoader(data, batch_size=4, shuffle=True)

# Train the model
train_model(model, train_loader, optimizer, criterion, num_epochs)


RuntimeError: Expected a 'cuda' device type for generator but found 'cpu'

In [None]:
# Inference function
def infer_single_image(img_tensor):
    model.eval()
    with torch.no_grad():
        img_tensor = img_tensor.float().unsqueeze(0) / 255.0  # Normalize and add batch dimension
        output = model(img_tensor)
        _, predicted_class = torch.max(output, 1)
        return predicted_class.item()

# Example usage
sample_img = data[0][0]  # Get a sample image from the dataset
predicted_label = infer_single_image(sample_img)
print(f"Predicted label: {label_str[predicted_label]}")