In [None]:
import torch
import numpy as np
from torch import nn, optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
import json
from collections import OrderedDict
import time
from PIL import Image
import matplotlib.pyplot as plt

# Load the data
data_dir = 'flowers'
train_dir = f"{data_dir}/train"
valid_dir = f"{data_dir}/valid"
test_dir = f"{data_dir}/test"

# Define transforms for the training, validation, and testing sets
train_transforms = transforms.Compose([
    transforms.RandomRotation(30),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

valid_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Load datasets using ImageFolder
train_data = datasets.ImageFolder(train_dir, transform=train_transforms)
valid_data = datasets.ImageFolder(valid_dir, transform=valid_transforms)
test_data = datasets.ImageFolder(test_dir, transform=test_transforms)

# Define dataloaders
trainloader = torch.utils.data.DataLoader(train_data, batch_size=64, shuffle=True)
validloader = torch.utils.data.DataLoader(valid_data, batch_size=32)
testloader = torch.utils.data.DataLoader(test_data, batch_size=32)

# Load category mapping
with open('cat_to_name.json', 'r') as f:
    cat_to_name = json.load(f)

no_output_categories = len(cat_to_name)

# Build and train the classifier
hidden_units = 512  # Changed hidden units for a different architecture
model = models.vgg16(pretrained=True)

for param in model.parameters():
    param.requires_grad = False

# Define the classifier
classifier = nn.Sequential(OrderedDict([
    ('fc1', nn.Linear(25088, hidden_units)),
    ('relu', nn.ReLU()),
    ('dropout', nn.Dropout(0.2)),  # Increased dropout for regularization
    ('fc2', nn.Linear(hidden_units, no_output_categories)),
    ('output', nn.LogSoftmax(dim=1))
]))

model.classifier = classifier
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

# Training hyperparameters
epochs = 5  # Reduced epochs for quicker training
optimizer = optim.SGD(model.classifier.parameters(), lr=0.01, momentum=0.9)  # Changed optimizer
criterion = nn.NLLLoss()

# Training process
for e in range(epochs):
    model.train()
    running_loss = 0
    running_accuracy = 0
    for images, labels in trainloader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        log_ps = model(images)
        loss = criterion(log_ps, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        ps = torch.exp(log_ps)
        top_ps, top_class = ps.topk(1, dim=1)
        matches = (top_class == labels.view(*top_class.shape)).type(torch.FloatTensor)
        running_accuracy += matches.mean().item()

    print(f"Epoch {e+1}/{epochs} - Loss: {running_loss/len(trainloader):.3f}, Accuracy: {running_accuracy/len(trainloader)*100:.2f}%")

# Testing the network
test_accuracy = 0
model.eval()
with torch.no_grad():
    for images, labels in testloader:
        images, labels = images.to(device), labels.to(device)
        log_ps = model(images)
        ps = torch.exp(log_ps)
        top_ps, top_class = ps.topk(1, dim=1)
        matches = (top_class == labels.view(*top_class.shape)).type(torch.FloatTensor)
        test_accuracy += matches.mean().item()

print(f'Test Accuracy: {test_accuracy/len(testloader)*100:.2f}%')


# Save the checkpoint
model.class_to_idx = train_data.class_to_idx  # Attach class to index mapping to the model

# Save the checkpoint in the current directory
destination_directory = None
class_to_idx = train_data.class_to_idx  # Improves label to name mapping

# Save the model's state_dict
def save_model(trained_model, hidden_units, output_units, destination_directory, model_arch, class_to_idx):
    model_checkpoint = {
        'model_arch': model_arch,
        'clf_input': 25088,
        'clf_output': output_units,
        'clf_hidden': hidden_units,
        'state_dict': trained_model.state_dict(),
        'model_class_to_index': class_to_idx,
        'epochs': epochs,  # Save the number of epochs
        'optimizer_state': optimizer.state_dict()  # Save optimizer state
    }
    
    # Save model in current directory
    save_path = f"{destination_directory}/{model_arch}_checkpoint.pth" if destination_directory else f"{model_arch}_checkpoint.pth"
    torch.save(model_checkpoint, save_path)
    print(f"{model_arch} successfully saved to {save_path}")

# Call save_model
save_model(model, hidden_units, no_output_categories, destination_directory, 'vgg16_bn', class_to_idx)

# Loading the checkpoint
checkpoint = 'vgg16_bn_checkpoint.pth'

# Function that accepts two arguments: filepath (location of checkpoint) and device (gpu/cpu)
def load_checkpoint(filepath, device):
    map_location = 'cuda' if device == "gpu" else 'cpu'
    checkpoint = torch.load(filepath, map_location=map_location)

    return (checkpoint['model_arch'], checkpoint['clf_input'], checkpoint['clf_output'],
            checkpoint['clf_hidden'], checkpoint['state_dict'], checkpoint['model_class_to_index'])

# Load the checkpoint
model_arch, input_units, output_units, hidden_units, state_dict, class_to_idx = load_checkpoint(checkpoint, device)
model.load_state_dict(state_dict)

# Inference for classification
practice_img = './flowers/test/19/image_06186.jpg'

# Processes a PIL image for use in a PyTorch model
def process_image(image):
    with Image.open(image) as img:
        img = img.convert('RGB')
        img.thumbnail((256, 256))  # Resize while maintaining aspect ratio
        width, height = img.size

        # Center crop
        left = (width - 224) / 2
        top = (height - 224) / 2
        right = (width + 224) / 2
        bottom = (height + 224) / 2
        img = img.crop((left, top, right, bottom))

        # Convert to tensor and normalize
        img_tensor = transforms.ToTensor()(img)
        img_tensor = transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])(img_tensor)
        
        return img_tensor.numpy()

# Display the original and preprocessed image
def imshow(image, ax=None):
    if ax is None:
        fig, ax = plt.subplots()
    
    image = image.transpose((1, 2, 0))  # Change from CHW to HWC
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    image = std * image + mean
    image = np.clip(image, 0, 1)
    
    ax.imshow(image)
    return ax

# Display original image
original_image = Image.open(practice_img)
imshow(process_image(practice_img))

# Class Prediction
def class_to_label(file, classes):
    with open(file, 'r') as f:
        class_mapping = json.load(f)
    return [class_mapping[c] for c in classes]

# Invert the class_to_idx mapping
idx_mapping = {v: k for k, v in class_to_idx.items()}

# Predict the class of an image using the deep learning model
def predict(image_path, model, idx_mapping, topk, device):
    model.eval()
    with torch.no_grad():
        pre_processed_image = torch.from_numpy(process_image(image_path)).unsqueeze(0).to(device)
        log_ps = model(pre_processed_image)
        ps = torch.exp(log_ps)
        top_ps, top_idx = ps.topk(topk, dim=1)
        
        probabilities = top_ps.squeeze().tolist()
        classes = [idx_mapping[idx.item()] for idx in top_idx.squeeze()]
        
        return probabilities, classes

# Print predictions
def print_predictions(probabilities, classes, image_name, category_names=None):
    print(f"Predictions for image: {image_name}")
    
    if category_names:
        labels = class_to_label(category_names, classes)
        for i, (prob, label, cls) in enumerate(zip(probabilities, labels, classes), 1):
            print(f'{i}) {prob * 100:.2f}% {label.title()} | Class No. {cls}')
    else:
        for i, (prob, cls) in enumerate(zip(probabilities, classes), 1):
            print(f'{i}) {prob * 100:.2f}% Class No. {cls}')
    print('')

# Get predictions for the practice image
probabilities, classes = predict(practice_img, model, idx_mapping, 5, device)

# Print out the predictions
print_predictions(probabilities, classes, practice_img.split('/')[-1], 'cat_to_name.json')
