In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from torchvision.datasets import ImageFolder
import os
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")


class_names = ['Broccoli', 'Mango', 'Nut', 'Pepper']
num_classes = len(class_names)

print(f"Number of classes: {num_classes}")
print(f"Class names: {class_names}")

torch.manual_seed(42)
np.random.seed(42)


data_dir = r"C:\Users\ashsh\Downloads"  
print(f"Data directory: {data_dir}")


def explore_folder_structure(base_path, class_names):
    total_images = 0
    for class_name in class_names:
        folder_path = os.path.join(base_path, class_name)
        if os.path.exists(folder_path):
            print(f"\n=== {class_name} ===")
            class_total = 0
            
            
            direct_images = [f for f in os.listdir(folder_path) 
                           if f.lower().endswith(('.png', '.jpg', '.jpeg')) and 
                           os.path.isfile(os.path.join(folder_path, f))]
            if direct_images:
                print(f"  Direct images: {len(direct_images)}")
                class_total += len(direct_images)
            
       
            subfolders = [d for d in os.listdir(folder_path) 
                         if os.path.isdir(os.path.join(folder_path, d))]
            
            if subfolders:
                print(f"  Subfolders found: {subfolders}")
                for subfolder in subfolders:
                    subfolder_path = os.path.join(folder_path, subfolder)
                    images_in_subfolder = [f for f in os.listdir(subfolder_path) 
                                         if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
                    print(f"    {subfolder}: {len(images_in_subfolder)} images")
                    class_total += len(images_in_subfolder)
            
            print(f"  Total for {class_name}: {class_total} images")
            total_images += class_total
        else:
            print(f"Warning: {class_name} folder not found at {folder_path}")
    
    print(f"\n=== SUMMARY ===")
    print(f"Total images across all classes: {total_images}")

explore_folder_structure(data_dir, class_names)

Using device: cuda
Number of classes: 4
Class names: ['Broccoli', 'Mango', 'Nut', 'Pepper']
Data directory: C:\Users\ashsh\Downloads

=== Broccoli ===
  Subfolders found: ['Florets', 'In-Context(Cooking)', 'Whole Crown']
    Florets: 1000 images
    In-Context(Cooking): 1000 images
    Whole Crown: 1000 images
  Total for Broccoli: 3000 images

=== Mango ===
  Subfolders found: ['Cubed,Hedgehog', 'Sliced,Peeled', 'Whole']
    Cubed,Hedgehog: 1006 images
    Sliced,Peeled: 1509 images
    Whole: 1019 images
  Total for Mango: 3534 images

=== Nut ===
  Subfolders found: ['Chopped,Crushed', 'In-Shell', 'Shelled']
    Chopped,Crushed: 1000 images
    In-Shell: 1000 images
    Shelled: 1000 images
  Total for Nut: 3000 images

=== Pepper ===
  Subfolders found: ['Diced,Sliced', 'Halved,Deseeded', 'Whole']
    Diced,Sliced: 1006 images
    Halved,Deseeded: 1038 images
    Whole: 1018 images
  Total for Pepper: 3062 images

=== SUMMARY ===
Total images across all classes: 12596


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
import os
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
from sklearn.model_selection import train_test_split
import random

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

class_names = ['Broccoli', 'Mango', 'Nut', 'Pepper']
num_classes = len(class_names)
data_dir = r"C:\Users\ashsh\Downloads"


class ProduceDataset(Dataset):
    def __init__(self, data_dir, class_names, transform=None, train=True, train_split=0.8):
        self.data_dir = data_dir
        self.class_names = class_names
        self.transform = transform
        self.class_to_idx = {class_name: idx for idx, class_name in enumerate(class_names)}
        
        # Collect all image paths and labels
        self.image_paths = []
        self.labels = []
        
        for class_name in class_names:
            class_folder = os.path.join(data_dir, class_name)
            if os.path.exists(class_folder):
   
                subfolders = [d for d in os.listdir(class_folder) 
                             if os.path.isdir(os.path.join(class_folder, d))]
                
                class_images = []
                for subfolder in subfolders:
                    subfolder_path = os.path.join(class_folder, subfolder)
                    images_in_subfolder = [f for f in os.listdir(subfolder_path) 
                                         if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
                    
                    for img_name in images_in_subfolder:
                        img_path = os.path.join(subfolder_path, img_name)
                        class_images.append(img_path)
                
            
                train_imgs, test_imgs = train_test_split(class_images, 
                                                       train_size=train_split, 
                                                       random_state=42)
                
                if train:
                    selected_images = train_imgs
                else:
                    selected_images = test_imgs
                
    
                self.image_paths.extend(selected_images)
                self.labels.extend([self.class_to_idx[class_name]] * len(selected_images))
        
        print(f"{'Train' if train else 'Test'} dataset created with {len(self.image_paths)} images")
        

        for i, class_name in enumerate(class_names):
            count = self.labels.count(i)
            print(f"  {class_name}: {count} images")
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        

        try:
            image = Image.open(img_path).convert('RGB')
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")

            image = Image.new('RGB', (224, 224), (0, 0, 0))
        
        if self.transform:
            image = self.transform(image)
        
        return image, label


train_transforms = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomCrop(224),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


print("Creating datasets...")
train_dataset = ProduceDataset(data_dir, class_names, transform=train_transforms, train=True)
test_dataset = ProduceDataset(data_dir, class_names, transform=test_transforms, train=False)

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

print(f"\nDataLoaders created:")
print(f"Train batches: {len(train_loader)}")
print(f"Test batches: {len(test_loader)}")


try:
    sample_batch = next(iter(train_loader))
    print(f"\nSample batch shape: {sample_batch[0].shape}")
    print(f"Sample labels shape: {sample_batch[1].shape}")
    print("Data loading successful!")
except Exception as e:
    print(f"Error loading batch: {e}")

Creating datasets...
Train dataset created with 10076 images
  Broccoli: 2400 images
  Mango: 2827 images
  Nut: 2400 images
  Pepper: 2449 images
Test dataset created with 2520 images
  Broccoli: 600 images
  Mango: 707 images
  Nut: 600 images
  Pepper: 613 images

DataLoaders created:
Train batches: 315
Test batches: 79

Sample batch shape: torch.Size([32, 3, 224, 224])
Sample labels shape: torch.Size([32])
Data loading successful!


In [None]:
# Create ResNet model with transfer learning
class ProduceClassifier(nn.Module):
    def __init__(self, num_classes=4, pretrained=True, freeze_backbone=True):
        super(ProduceClassifier, self).__init__()
        
    
        self.resnet = models.resnet18(pretrained=pretrained)
        
      
        num_features = self.resnet.fc.in_features
        

        self.resnet.fc = nn.Linear(num_features, num_classes)
        
        if freeze_backbone:
            
            for param in self.resnet.parameters():
                param.requires_grad = False
            

            for param in self.resnet.fc.parameters():
                param.requires_grad = True
            
            print("Backbone frozen - only training final classification layer")
        
    def forward(self, x):
        return self.resnet(x)

model = ProduceClassifier(num_classes=num_classes, pretrained=True, freeze_backbone=True)
model = model.to(device)


def count_parameters(model):
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    frozen_params = total_params - trainable_params
    return total_params, trainable_params, frozen_params

total_params, trainable_params, frozen_params = count_parameters(model)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)


scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

print(f"\nTesting forward pass...")
try:
    model.eval()
    with torch.no_grad():
        sample_input = torch.randn(2, 3, 224, 224).to(device)
        output = model(sample_input)
        print(f"Input shape: {sample_input.shape}")
        print(f"Output shape: {output.shape}")
        print(f"Output example: {output[0]}")
        print("Forward pass successful!")
except Exception as e:
    print(f"Error in forward pass: {e}")


def test_real_batch():
    try:
        model.eval()
        with torch.no_grad():
            images, labels = next(iter(train_loader))
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            
            print(f"\nReal batch test:")
            print(f"Input batch shape: {images.shape}")
            print(f"True labels: {labels[:5]}")
            print(f"Predicted labels: {predicted[:5]}")
            print("Real batch test successful!")
            
    except Exception as e:
        print(f"Error in real batch test: {e}")

test_real_batch()

Creating ResNet model with frozen backbone...
Backbone frozen - only training final classification layer
Total parameters: 11,178,564
Trainable parameters: 2,052
Frozen parameters: 11,176,512
Percentage trainable: 0.02%

Model created successfully!
Loss function: CrossEntropyLoss
Optimizer: Adam (lr=0.001, weight_decay=1e-4)
Scheduler: StepLR (step_size=7, gamma=0.1)

Testing forward pass...
Input shape: torch.Size([2, 3, 224, 224])
Output shape: torch.Size([2, 4])
Output example: tensor([-0.2891, -0.7783,  0.2104,  0.3842], device='cuda:0')
Forward pass successful!

Real batch test:
Input batch shape: torch.Size([32, 3, 224, 224])
True labels: tensor([3, 2, 2, 0, 1], device='cuda:0')
Predicted labels: tensor([2, 2, 2, 2, 2], device='cuda:0')
Real batch test successful!


In [None]:
import time
from collections import defaultdict


def train_model(model, train_loader, test_loader, criterion, optimizer, scheduler, num_epochs=10):
    """
    Train the model and return training history
    """
    print(f"Starting training for {num_epochs} epochs...")
    print(f"Training on {len(train_loader.dataset)} samples")
    print(f"Testing on {len(test_loader.dataset)} samples")
    print("-" * 50)
    

    history = {
        'train_loss': [],
        'train_acc': [],
        'test_loss': [],
        'test_acc': [],
        'learning_rates': []
    }
    
    best_test_acc = 0.0
    best_model_state = None
    
    for epoch in range(num_epochs):
        epoch_start_time = time.time()
        

        model.train()
        running_loss = 0.0
        running_corrects = 0
        
        print(f"Epoch {epoch+1}/{num_epochs}")
        
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Zero gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)
            
            # Backward pass
            loss.backward()
            optimizer.step()
            

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            

            if (batch_idx + 1) % 50 == 0:
                print(f"  Batch {batch_idx+1}/{len(train_loader)}, Loss: {loss.item():.4f}")
        

        train_loss = running_loss / len(train_loader.dataset)
        train_acc = running_corrects.double() / len(train_loader.dataset)
        

        model.eval()
        test_running_loss = 0.0
        test_running_corrects = 0
        
  
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                _, preds = torch.max(outputs, 1)
                
                test_running_loss += loss.item() * inputs.size(0)
                test_running_corrects += torch.sum(preds == labels.data)
        

        test_loss = test_running_loss / len(test_loader.dataset)
        test_acc = test_running_corrects.double() / len(test_loader.dataset)
        

        scheduler.step()
        current_lr = optimizer.param_groups[0]['lr']
        

        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc.item())
        history['test_loss'].append(test_loss)
        history['test_acc'].append(test_acc.item())
        history['learning_rates'].append(current_lr)
        

        if test_acc > best_test_acc:
            best_test_acc = test_acc
            best_model_state = model.state_dict().copy()
        

        epoch_time = time.time() - epoch_start_time
        print(f"Results - Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}")
        print(f"LR: {current_lr:.6f}, Time: {epoch_time:.2f}s")
        print("-" * 50)
    
    print(f"Best test accuracy: {best_test_acc:.4f}")
    

    model.load_state_dict(best_model_state)
    
    return model, history


import os
model_save_dir = "saved_models"
os.makedirs(model_save_dir, exist_ok=True)


num_epochs = 15  



trained_model, training_history = train_model(
    model, train_loader, test_loader, criterion, optimizer, scheduler, num_epochs
)


model_save_path = os.path.join(model_save_dir, "best_produce_classifier.pth")
torch.save({
    'model_state_dict': trained_model.state_dict(),
    'class_names': class_names,
    'num_classes': num_classes,
    'model_architecture': 'resnet18',
    'best_test_accuracy': max(training_history['test_acc']),
    'training_history': training_history
}, model_save_path)


def load_trained_model(model_path, device):
    """
    Load a saved model for inference
    """
    checkpoint = torch.load(model_path, map_location=device)
    

    model = ProduceClassifier(num_classes=checkpoint['num_classes'])
    model.load_state_dict(checkpoint['model_state_dict'])
    model = model.to(device)
    model.eval()
    
    print(f"Architecture: {checkpoint['model_architecture']}")
    print(f"Classes: {checkpoint['class_names']}")
    print(f"Best accuracy: {checkpoint['best_test_accuracy']:.4f}")
    
    return model, checkpoint['class_names'], checkpoint['training_history']

print(f"\nTo load this model later, use:")
print(f"model, class_names, history = load_trained_model('{model_save_path}', device)")

Training Configuration:
Number of epochs: 15
Batch size: 32
Learning rate: 0.001
Device: cuda
Model save directory: saved_models

Starting training for 15 epochs...
Training on 10076 samples
Testing on 2520 samples
--------------------------------------------------
Epoch 1/15
Training...
  Batch 50/315, Loss: 0.9111
  Batch 100/315, Loss: 0.7951
  Batch 150/315, Loss: 0.4582
  Batch 200/315, Loss: 0.5305
  Batch 250/315, Loss: 0.5171
  Batch 300/315, Loss: 0.5292
Testing...
Results - Train Loss: 0.6299, Train Acc: 0.7722
         Test Loss: 0.3086, Test Acc: 0.9036
         LR: 0.001000, Time: 2042.18s
--------------------------------------------------
Epoch 2/15
Training...
  Batch 50/315, Loss: 0.2443
  Batch 100/315, Loss: 0.3699
  Batch 150/315, Loss: 0.3877
  Batch 200/315, Loss: 0.2505
  Batch 250/315, Loss: 0.3311
  Batch 300/315, Loss: 0.3233
Testing...
Results - Train Loss: 0.4041, Train Acc: 0.8580
         Test Loss: 0.3360, Test Acc: 0.8714
         LR: 0.001000, Time: 1983