In [None]:
import pandas as pd
import numpy as np
import json
import matplotlib.pyplot as plt
import seaborn as sns
import cv2

train_dir = '/kaggle/input/herbarium-2022-fgvc9/train_images/'
test_dir = '/kaggle/input/herbarium-2022-fgvc9/test_images'

with open("/kaggle/input/herbarium-2022-fgvc9/train_metadata.json") as json_file:
    train_meta = json.load(json_file)
with open("/kaggle/input/herbarium-2022-fgvc9/test_metadata.json") as json_file:
    test_meta = json.load(json_file)

In [None]:
image_ids = [image["image_id"] for image in train_meta["images"]]
image_dirs = [train_dir + image['file_name'] for image in train_meta["images"]]
category_ids = [annotation['category_id'] for annotation in train_meta['annotations']]
genus_ids = [annotation['genus_id'] for annotation in train_meta['annotations']]

test_ids = [image['image_id'] for image in test_meta]
test_dirs = [test_dir + image['file_name'] for image in test_meta]

#Create the initial training dataframe with the above defined columns
train_df = pd.DataFrame({
    "image_id" : image_ids,
    "image_dir" : image_dirs,
    "category" : category_ids,
    "genus" : genus_ids})

#Create a testing dataframe
test_df = pd.DataFrame({
    "test_id" : test_ids,
    "test_dir" : test_dirs
})

train_df

In [None]:
#Add a genus column to the dataframe
genus_map = {genus['genus_id'] : genus['genus'] for genus in train_meta['genera']}
train_df['genus'] = train_df['genus'].map(genus_map)

##Create a family column in the datagframe based on the genus names
    # Step 1: Create dictionary of genus -> family mapping
genus_family_map = {}
for category in train_meta["categories"]:
    genus = category['genus']
    family = category['family']
    genus_family_map[genus] = family

    # Step 2: Create new column with default value of None™
train_df['family'] = None

    # Step 3: Update values in new column based on genus -> family mapping
for i, row in train_df.iterrows():
    genus = row['genus']
    if genus in genus_family_map:
        family = genus_family_map[genus]
        train_df.at[i, 'family'] = family

train_df

In [None]:
#Filter only the images of plants that are in the Poaceae family
train_df = train_df.loc[train_df['family'] == 'Poaceae']
#Reset index
train_df = train_df.reset_index(drop=True)

train_df

In [None]:
train_df["genus"].value_counts()

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.models as models
import torchvision.transforms as transforms
from torchvision.models import resnet50, ResNet50_Weights
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from PIL import Image
import random

!pip install git+https://github.com/facebookresearch/fvcore.git

df = train_df

In [None]:
batch_size = 128
epochs = 10
IM_SIZE = 256
learning_rate = 1e-4

#Define the data augmentation transformations
data_transforms = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Define the transformations without data augmentation
transform_no_aug = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, X, Y, transform=None, transform_no_aug=None, min_samples=20):
        self.X = X
        self.Y = Y
        self.transform = transform
        self.transform_no_aug = transform_no_aug
        self.min_samples = min_samples

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        img_path = self.X[idx]
        label = self.Y[idx]
        
        img = Image.open(img_path)
        
        if self.transform and self.transform_no_aug:
            label_count = len([y for y in self.Y if y == label])
            if label_count < self.min_samples:
                img = self.transform(img)
            else:
                img = self.transform_no_aug(img)
        
        return img, label

num_classes = train_df['genus'].nunique()

device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
model = torch.hub.load('zhanghang1989/ResNeSt', 'resnest101', pretrained=True, )

In [None]:
X = train_df['image_dir'].values
Y = train_df['genus'].values

# Split the dataset into training and validation sets
X_train, X_val, Y_train, Y_val = train_test_split(X, Y, test_size=0.2, random_state=42, stratify=Y)

# Create the training and validation datasets
train_dataset = CustomDataset(X_train, Y_train, transform=data_transforms, transform_no_aug=transform_no_aug, min_samples=20)
val_dataset = CustomDataset(X_val, Y_val, transform=transform_no_aug)  # Apply only the non-augmented transform to validation set

# Create the DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

print("Size of the training dataset:", len(X_val))

In [None]:
print(model.fc.in_features) 
print(model.fc.out_features)
    
n_inputs = model.fc.in_features
last_layer = nn.Linear(n_inputs, num_classes)
model.fc = last_layer

for idx, (name, param) in enumerate(model.named_parameters()):
    if idx < len(list(model.named_parameters())) - 1:
        param.requires_grad = False

print(model.fc.out_features)    

if torch.cuda.is_available():
    model.cuda()
    
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.fc.parameters(), learning_rate) 

In [None]:
from tqdm import tqdm
from sklearn.metrics import f1_score

def train(trainloader, model, criterion, optimizer, scaler, device=torch.device("cpu")):
    train_acc = 0.0
    train_loss = 0.0
    y_true = []
    y_pred = []
    for images, labels in tqdm(trainloader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        with torch.cuda.amp.autocast(enabled=True):
            output = model(images)
            loss = criterion(output, labels)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            acc = ((output.argmax(dim=1) == labels).float().mean())
            train_acc += acc
            train_loss += loss
            y_true += labels.cpu().numpy().tolist()
            y_pred += output.argmax(dim=1).cpu().numpy().tolist()
            
    train_f1 = f1_score(y_true, y_pred, average=None)
    return train_acc/len(trainloader), train_loss/len(trainloader), train_f1               

In [None]:
def evaluate(testloader, model, criterion, device=torch.device("cpu")):
    eval_acc = 0.0
    eval_loss = 0.0
    y_true = []
    y_pred = []
    for images, labels in tqdm(testloader):
        images = images.to(device)
        labels = labels.to(device)
        with torch.no_grad():
            output = model(images)
            loss = criterion(output, labels)
        acc = ((output.argmax(dim=1) == labels).float().mean())
        eval_acc += acc
        eval_loss += loss
        y_true += labels.cpu().numpy().tolist()
        y_pred += output.argmax(dim=1).cpu().numpy().tolist()
  
    eval_f1 = f1_score(y_true, y_pred, average=None)
    return eval_acc/len(testloader), eval_loss/len(testloader), eval_f1    

In [None]:
scaler = torch.cuda.amp.GradScaler(enabled=True)
for epoch in range(epochs):
    train_acc, train_loss, train_f1 = train(trainloader, model, criterion, optimizer, scaler, device=device)
    eval_acc, eval_loss, eval_f1 = evaluate(valloader, model, criterion, device=torch.device("cuda"))

    print(f"Epoch {epoch + 1} | Train Acc: {train_acc*100} | Train Loss: {train_loss} | Train F1: {train_f1}")
    print(f"\t Val Acc: {eval_acc*100} | Val Loss: {eval_loss} | Val F1: {eval_f1}")
    
    print("F1 score per class (Train):")
    for i, f1 in enumerate(train_f1):
        print(f"Class {i}: {f1}")

    print("\nF1 score per class (Validation):")
    for i, f1 in enumerate(eval_f1):
        print(f"Class {i}: {f1}")

    print("===="*8)