### Paper parameters 
- 60% train, 40% split into val and test
- Images randomly cropped to 300x300 pixels
- 4 classes for classification
- cross entropy loss
- 15 epochs
- 0.001 learning rate (start, then annealing cosine scheduler)
- Sliding window of 300x300 with a stride of 50 pixels
    - Softmax per window. Highest class probability is one vote if probability higher than threshold.
    - Final prediction is the condition with highest number of votes across all windows
    - Probability thresholds of 0.7. No probability, then data is dropped 

### Paper results
- field/ no field classifier: Finally, training a CNN to classify
field/not field improved precision to 0.98 and lowered recall
slightly to 0.95 — a worthwhile trade-of
- "Maize had the lowest F1 score under both WebCC
and iNaturalist (62% and 68%, respectively), followed by
sugarcane (62% and 76%). The low performance is likely
due to the more visual similarities between the two crops at
the early-growth stage as well as their similar height. Meanwhile, rice has short and bright green leaves and cassava has
a palmate leaf structure and is grown more separately, both
more visually distinctive from the street".
    - And we discriminate from subtle differences in leaf color and amount for very different tree species.
- "The top performing model trained on the Expert labeled dataset achieved 82% accuracy when directly classifying the whole image, 90% with sliding windows but no
MHP threshold, and 93% with a 0.90 MHP threshold"

### Useful references
+ Resnet paper: https://arxiv.org/pdf/1512.03385.pdf
+ Croptypes paper: https://arxiv.org/pdf/2309.05930.pdf
+ Input needed for resnet50: https://pytorch.org/hub/pytorch_vision_resnet/


In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, random_split, Subset
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
import numpy as np
from collections import Counter
from sklearn.metrics import classification_report
from torch.utils.tensorboard import SummaryWriter

# Additional Setup to use Tensorboard
%load_ext tensorboard

In [9]:
# Helper function
def get_class_distribution(dataset):
    count_dict = dict()
    
    for input, label in dataset:
        count_dict[label] = count_dict.get(label, 0) + 1
    return count_dict


Use ordered_trees_classified.zip to get te correct folder order to work with the code. 

In [29]:
transform = transforms.Compose([
    transforms.CenterCrop((300, 324)),  # Cut out the image to 300x324 | To be further cut up later
    transforms.ToTensor(),           # Convert the image to a PyTorch tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the image
])
data_path = "trees_classified"
label_transform = {0:"Matig", 1:"Redelijk", 2:"Slecht", 3:"Goed", 4:"Dood", 5:"Zeer-Slecht"}  # Dataset sorted on most samples first

In [30]:
dataset = torchvision.datasets.ImageFolder(root=data_path, transform=transform)

In [31]:
# See how unbalanced the classes are before balancing
get_class_distribution(dataset)

{0: 9553, 1: 8104, 2: 947, 3: 289, 4: 116, 5: 87}

In [32]:
# Balance the classes by randomly samling class_limit samples from each class

class_limit = 1000
num_classes = 3  # specify how many classes we use. This determines output dimension of the model.

#idx_class1 = [i for i, label in enumerate(dataset.targets) if label == 1]  # Add one if samples are weighted based on class imbalance
idx_class0 = [i for i, label in enumerate(dataset.targets) if label == 0]
idx_class1 = [i for i, label in enumerate(dataset.targets) if label == 1]
idx_class2 = [i for i, label in enumerate(dataset.targets) if label == 2]


np.random.shuffle(idx_class0)
np.random.shuffle(idx_class1)
idx_class0_limit = idx_class0[:class_limit]
idx_class1_limit = idx_class1[:class_limit]
# print(len(idx_class1_limit))
# print(len(idx_class0))

idx_dataset_limited = np.concatenate((idx_class0_limit, idx_class1_limit, idx_class2))
# print(len(idx_dataset_limited))

balanced_dataset = Subset(dataset, idx_dataset_limited)

# Print the balanced dataset distribution
get_class_distribution(balanced_dataset)

{0: 1000, 1: 1000, 2: 947}

In [33]:
train_size = int(0.8 * len(balanced_dataset))
test_size = len(balanced_dataset) - train_size
train_dataset, test_dataset = random_split(balanced_dataset, [train_size, test_size])

In [34]:
batch_size = 32
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True)

In [35]:
resnet50 = models.resnet50(weights='DEFAULT')


In [36]:
def train(train_loader, net, optimizer, criterion, device):
    """
    Trains network for one epoch in batches.

    Args:
        train_loader: Data loader for training set.
        net: Neural network model.
        optimizer: Optimizer (e.g. SGD).
        criterion: Loss function (e.g. cross-entropy loss).
    """

    avg_loss = 0.
    correct = 0
    total = 0

    net.train()

    # iterate through batches
    for i, data in enumerate(train_loader):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)

        # zero teh parameters of optimizer
        optimizer.zero_grad()

        # fwd + back + opti
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # keep track of loss and acc
        avg_loss += loss
        _, predicted = torch.max(outputs.data,1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    return avg_loss/len(train_loader), 100 * correct/total

def test(test_loader, net, criterion, device):
    avg_loss = 0.
    correct = 0
    total = 0

    net.eval()

    with torch.no_grad():
        for data in test_loader:

            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)


            outputs = net(inputs)
            loss = criterion(outputs, labels)

            avg_loss += loss
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    return avg_loss/len(train_loader), 100 * correct/total

In [37]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [38]:
writer = SummaryWriter(log_dir="./runs_condition/")

resnet50.fc = nn.Linear(resnet50.fc.in_features, num_classes)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(resnet50.fc.parameters(), lr=0.001)

num_epochs = 15
resnet50.to(device)

# Set the number of epochs to for training
epochs = 15

patience = 20

train_acc_best = 0
patience_cnt = 0

for epoch in tqdm(range(epochs)):  # loop over the dataset multiple times
    # Train on data
    train_loss, train_acc = train(train_loader, resnet50, optimizer, criterion, device)

    # Test on data
    #val_loss, val_acc = test(val_loader, resnet50, criterion, device)

    # Write metrics to Tensorboard
    writer.add_scalars("Loss", {'Train': train_loss, 'Test':val_loss}, epoch)
    writer.add_scalars('Accuracy', {'Train': train_acc,'Test':val_acc} , epoch)


    if train_acc > train_acc_best:
      train_acc_best = train_acc
      patience_cnt = 0
      best_model_wts = resnet50.state_dict()

    else:
      patience_cnt += 1
      if patience_cnt == patience:
        break
    # print(f"Current loss {train_loss} at epoch {epoch}")


print('Finished Training')
writer.flush()
writer.close()

  0%|                                                                                           | 0/15 [02:20<?, ?it/s]


KeyboardInterrupt: 

In [None]:
# Open Tensorboard
%tensorboard --logdir runs_condition/

# For local users only: uncomment the last line, run this cell once and wait for
# it to time out, run this cell a second time and you should see the board.
# %tensorboard --logdir runs/ --host localhost

In [None]:
resnet50.load_state_dict(best_model_wts)

In [None]:
torch.save(resnet50.state_dict(), "resnet50_model.pt")

In [None]:
true_labels = []
predicted_labels = []

resnet50.eval()


# Iterate through the test data
for inputs, labels in test_loader:
    inputs, labels = inputs.to(device), labels.to(device)
    
    # Forward pass
    with torch.no_grad():
        outputs = resnet50(inputs)
    
    _, predicted = torch.max(outputs, 1)
    
    # Append true labels and predicted labels
    true_labels.extend(labels.cpu().numpy())
    predicted_labels.extend(predicted.cpu().numpy())

# Calculate evaluation metrics
report = classification_report(true_labels, predicted_labels)

# Print the evaluation report
print(report)