In [None]:
import os
import shutil
import numpy as np
import matplotlib.pyplot as plt

from efficientnet_pytorch import EfficientNet
from sklearn.model_selection import train_test_split

from PIL import Image
from PIL.ExifTags import TAGS, GPSTAGS

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, models, transforms
from torchvision.datasets.folder import default_loader, IMG_EXTENSIONS

from vit_pytorch import ViT

In [None]:
def get_exif_data(image_path):
    image = Image.open(image_path)
    image.verify()
    exif_data = image._getexif()
    if not exif_data:
        return None

    exif = {}
    for tag, value in exif_data.items():
        decoded = TAGS.get(tag, tag)
        exif[decoded] = value

    return exif

In [None]:
def get_geotagging(exif):
    if not exif:
        return None

    geotagging = {}
    for (key, val) in GPSTAGS.items():
        if key in exif:
            geotagging[val] = exif[key]

    return geotagging

In [None]:
def get_coordinates(geotags):
    def convert_to_degrees(value):
        d, m, s = value
        return d + (m / 60.0) + (s / 3600.0)

    lat = convert_to_degrees(geotags['GPSLatitude'])
    if geotags['GPSLatitudeRef'] != 'N':
        lat = -lat

    lon = convert_to_degrees(geotags['GPSLongitude'])
    if geotags['GPSLongitudeRef'] != 'E':
        lon = -lon

    return (lat, lon)

In [None]:
from PIL import Image

def get_exif_data(image_path):
    try:
        image = Image.open(image_path)
        # For TIFF images, use _getexif() for compatibility, but it might not exist.
        if hasattr(image, '_getexif'):  # Check if the _getexif attribute exists
            exif_data = image._getexif()
        else:
            # For TIFF and other formats, attempt to access the info dictionary directly
            exif_data = image.info
    except AttributeError as e:
        print(f"Could not retrieve EXIF data: {e}")
        exif_data = None
    return exif_data

In [None]:
get_exif_data("/Users/izzymohamed/Downloads/Cherry/03_11_2021/Aerial_UAV_photos/green.rgb.tif")

In [None]:
def process_images_in_folder(folder_path):
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.lower().endswith(('jpg', 'jpeg', 'png', 'tiff')):
                image_path = os.path.join(root, file)
                exif_data = get_exif_data(image_path)
                geotags = get_geotagging(exif_data)
                if geotags:
                    coordinates = get_coordinates(geotags)
                    print(f"Image: {file} - Coordinates: {coordinates}")
                else:
                    print(f"Image: {file} - No geotagging data found.")

# Example usage
folder_path = '/Users/izzymohamed/Downloads/Cherry/03_11_2021'
process_images_in_folder(folder_path)

In [None]:
# Define main directories
base_dir = '/Users/izzymohamed/Downloads/Cherry v2'

# Define crop directories
crop_root = base_dir + '/Ground_RGB_Photos'

# Define train and test directories
train_set_dir = crop_root + '/train_set'
test_set_dir = crop_root + '/test_set'

In [None]:
# Remove .DS_Store files
def remove_ds_store(directory):
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file == '.DS_Store' or '.DS_Store' in file:
                file_path = os.path.join(root, file)
                print(f"Removing {file_path}")
                os.remove(file_path)

# Remove .DS_Store files from train, validation, and test directories
remove_ds_store(base_dir)

In [None]:
# Display all directories inside the main root
def list_directories(path):
    for root, dirs, files in os.walk(path):
        level = root.replace(path, '').count(os.sep)
        indent = ' ' * 4 * (level)
        print('{}{}/'.format(indent, os.path.basename(root)))

# Call the function with the path you want to explore
list_directories(crop_root)

In [None]:
# Function to split training data into training and validation sets
def split_train_val(base_train_dir, train_dir, val_dir, val_split=0.2):
    classes = os.listdir(base_train_dir)
    for cls in classes:
        print('     Processing class: {}'.format(cls))
        if cls == '.DS_Store':
            continue
        
        class_train_dir = os.path.join(base_train_dir, cls)
        os.makedirs(os.path.join(train_dir, cls), exist_ok=True)
        os.makedirs(os.path.join(val_dir, cls), exist_ok=True)
        
        images = os.listdir(class_train_dir)
        train, val = train_test_split(images, test_size=val_split)
        
        for img in train:
            shutil.copy(os.path.join(class_train_dir, img), os.path.join(train_dir, cls, img))
        for img in val:
            shutil.copy(os.path.join(class_train_dir, img), os.path.join(val_dir, cls, img))

In [None]:
def evaluate_model(model, test_loader, criterion, device):
    model.eval()
    test_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for data in test_loader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            
            # Check if the output is a tuple (for InceptionV3)
            if isinstance(outputs, tuple):
                outputs = outputs[0]  # Use only the main output
                
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    test_loss /= len(test_loader)
    test_accuracy = 100 * correct / total
    return test_loss, test_accuracy

In [None]:
def create_and_train_model(model, train_loader, val_loader, num_classes, device, epochs=10, fine_tune_epochs=5):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Train the model
    model.to(device)
    model.train()
    
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            
            # Check if the output is a tuple (for InceptionV3)
            if isinstance(outputs, tuple):
                outputs = outputs[0]  # Use only the main output
            
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f'Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(train_loader)}')

    # Fine-tune the model
    for param in model.parameters():
        param.requires_grad = True

    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    
    for epoch in range(fine_tune_epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            
            # Check if the output is a tuple (for InceptionV3)
            if isinstance(outputs, tuple):
                outputs = outputs[0]  # Use only the main output
            
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f'Fine-tune Epoch {epoch+1}/{fine_tune_epochs}, Loss: {running_loss/len(train_loader)}')

    return model

In [None]:
# Loop through each crop directory and train models
crops = ['Armillaria_Stage_1',
'Armillaria_Stage_2',
'Armillaria_Stage_3',
'Healthy']
results = {}

In [None]:
# Find all classes in the training directory
def find_classes(dir):
    # Check if the directory exists, if not, create it
    if not os.path.exists(dir):
        os.makedirs(dir, exist_ok=True)
        print(f"Created directory: {dir}")
    classes = [d for d in os.listdir(dir) if os.path.isdir(os.path.join(dir, d)) and not d.startswith('.')]
    classes.sort()
    class_to_idx = {classes[i]: i for i in range(len(classes))}
    return classes, class_to_idx

for crop in crops:
    train_dir = os.path.join(base_dir, crop, 'train_set')
    test_dir = os.path.join(base_dir, crop, 'test_set')
    print(find_classes(train_dir))
    print(find_classes(test_dir))

In [None]:
# Data transformations
data_transforms1 = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((299, 299)),  # Resize to the required input size
        transforms.RandomResizedCrop(299),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((299, 299)),  # Resize to the required input size
        transforms.CenterCrop(299),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize((299, 299)),  # Resize to the required input size
        transforms.CenterCrop(299),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}


In [None]:
# Define device
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

In [None]:
results = {}

def is_valid_file(path):
    return not path.endswith('.DS_Store') or 'DS_Store' not in path

In [None]:
for crop in crops:
    print(f'Processing crop: {crop}')
    
    crop_train_dir = os.path.join(base_dir, crop, 'train_set')
    crop_test_dir = os.path.join(base_dir, crop, 'test_set')
    
    train_dir = f'temp_{crop}_train'
    validation_dir = f'temp_{crop}_val'
    
    split_train_val(crop_train_dir, train_dir, validation_dir)
    
    # Ensure the train directory exists
    if not os.path.exists(train_dir):
        os.makedirs(train_dir, exist_ok=True)
        print(f"Created directory: {train_dir}")

    # Ensure the validation directory exists
    if not os.path.exists(validation_dir):
        os.makedirs(validation_dir, exist_ok=True)
        print(f"Created directory: {validation_dir}")

    # Ensure the test directory exists
    if not os.path.exists(crop_test_dir):
        os.makedirs(crop_test_dir, exist_ok=True)
        print(f"Created directory: {crop_test_dir}")

    # Now you can safely create datasets and dataloaders
    train_dataset = datasets.ImageFolder(train_dir, transform=data_transforms['train'], is_valid_file=is_valid_file)
    val_dataset = datasets.ImageFolder(validation_dir, transform=data_transforms['val'], is_valid_file=is_valid_file)
    test_dataset = datasets.ImageFolder(crop_test_dir, transform=data_transforms['test'], is_valid_file=is_valid_file)
    
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    
    num_classes = len(train_dataset.classes)
    
    # Ensure test dataset classes match train dataset classes
    test_dataset.class_to_idx = train_dataset.class_to_idx
    
    # Models to compare
    pretrained_models = {
        'EfficientNetB0': EfficientNet.from_pretrained('efficientnet-b0'),
        'InceptionV3': models.inception_v3(pretrained=True),
        'ResNet50': models.resnet50(pretrained=True),
        'ViT': ViT(
            image_size = 224,
            patch_size = 16,
            num_classes = num_classes,
            dim = 1024,
            depth = 6,
            heads = 16,
            mlp_dim = 2048,
            dropout = 0.1,
            emb_dropout = 0.1
        ),
        # 'AttentionAugmentedResNet50': models.resnet50(pretrained=True)  # Placeholder, implement AttentionAugmentedResNet50
    }

    crop_results = {}

    for model_name, base_model in pretrained_models.items():
        if model_name == 'InceptionV3':
            base_model.AuxLogits.fc = nn.Linear(base_model.AuxLogits.fc.in_features, num_classes)
            base_model.fc = nn.Linear(base_model.fc.in_features, num_classes)
        elif model_name == 'EfficientNetB0':
            base_model._fc = nn.Linear(base_model._fc.in_features, num_classes)
        elif model_name == 'ViT':
            base_model.mlp_head = nn.Linear(base_model.mlp_head.in_features, num_classes)
        elif model_name == 'AttentionAugmentedResNet50':
            # Implement AttentionAugmentedResNet50 here
            pass
        else:
            base_model.fc = nn.Linear(base_model.fc.in_features, num_classes)
        
        print(f'--------------- Training model: {model_name} for crop: {crop} ---------------')
        model = create_and_train_model(base_model, train_loader, val_loader, num_classes, device)
        
        # Evaluate the model
        test_loss, test_accuracy = evaluate_model(model, test_loader, nn.CrossEntropyLoss(), device)
        
        crop_results[model_name] = {
            'model': model,
            'test_loss': test_loss,
            'test_accuracy': test_accuracy
        }
        print(f'{crop} - {model_name} Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%')

    results[crop] = crop_results

    # Clean up temporary directories
    shutil.rmtree(train_dir)
    shutil.rmtree(validation_dir)

print('All crops processed.')

In [None]:
# Plot comparison of accuracy for each model for each crop
for crop, crop_results in results.items():
    accuracies = [result['accuracy'] for result in crop_results.values()]
    model_names = list(crop_results.keys())
    
    plt.figure(figsize=(12, 6))
    plt.bar(model_names, accuracies)
    plt.title(f'Model test accuracy comparison for {crop}')
    plt.ylabel('Accuracy (%)')
    plt.xlabel('Model')
    plt.show()

In [None]:
# Display some correctly and incorrectly classified images
def display_classification_results(model, test_loader, num_images=5):
    model.eval()
    class_labels = test_loader.dataset.classes
    images, labels = next(iter(test_loader))
    images, labels = images[:num_images], labels[:num_images]
    
    with torch.no_grad():
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
    
    fig, axes = plt.subplots(1, num_images, figsize=(20, 8))
    fig.suptitle('Classification Results', fontsize=16)
    
    for i in range(num_images):
        ax = axes[i]
        img = images[i].cpu().numpy().transpose((1, 2, 0))
        img = np.clip(img, 0, 1)
        ax.imshow(img)
        ax.set_title(f'True: {class_labels[labels[i]]}, Pred: {class_labels[predicted[i]]}')
        ax.axis('off')

    plt.show()

In [None]:
# Display results for MobileNetV2 for each crop
for crop, crop_results in results.items():
    print(f'Displaying results for {crop} - MobileNetV2')
    display_classification_results(crop_results['MobileNetV2']['model'], test_loader)