In [5]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import numpy as np
from tqdm import tqdm
from timm import create_model

from sklearn.metrics import confusion_matrix, f1_score
import seaborn as sns
import matplotlib.pyplot as plt

In [6]:
bee = np.load('bee.npy')
nobee = np.load('nobee.npy')
noqueen = np.load('noqueen.npy')

In [7]:
labels_bee = np.ones(len(bee)) * 1  
labels_nobee = np.ones(len(nobee)) * 0  
labels_noqueen = np.ones(len(noqueen)) * 2  

In [8]:
data = np.concatenate((bee, nobee, noqueen), axis=0)
labels = np.concatenate((labels_bee, labels_nobee, labels_noqueen), axis=0)

df = pd.DataFrame({'data': list(data), 'labels': labels})
df = df.sample(frac=1).reset_index(drop=True) 

In [9]:
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['labels'])
train_df, val_df = train_test_split(train_df, test_size=0.15, random_state=42, stratify=train_df['labels'])

x_train = np.array(train_df['data'].tolist())
y_train = np.array(train_df['labels'].tolist())

x_val = np.array(val_df['data'].tolist())
y_val = np.array(val_df['labels'].tolist())

x_test = np.array(test_df['data'].tolist())
y_test = np.array(test_df['labels'].tolist())

In [10]:
x_train = x_train[..., np.newaxis]
x_val = x_val[..., np.newaxis]
x_test = x_test[..., np.newaxis]

In [12]:
class BeeDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

In [13]:
transform = transforms.Compose([
    transforms.ToPILImage(),  
    transforms.Resize((224, 224)), 
    transforms.ToTensor()
])

In [14]:
train_dataset = BeeDataset(x_train, y_train, transform=transform)
val_dataset = BeeDataset(x_val, y_val, transform=transform)
test_dataset = BeeDataset(x_test, y_test, transform=transform)

In [15]:
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

In [17]:
seed = 42
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
# Ensure deterministic behavior
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [19]:
# Load the CaiT model with pretrained weights
model = create_model('cait_xxs36_224', pretrained=True)

use_bias = model.patch_embed.proj.bias is not None

model.patch_embed.proj = nn.Conv2d(
    in_channels=1,  
    out_channels=model.patch_embed.proj.out_channels,
    kernel_size=model.patch_embed.proj.kernel_size,
    stride=model.patch_embed.proj.stride,
    padding=model.patch_embed.proj.padding,
    bias=use_bias  
)

model.head = nn.Linear(model.head.in_features, 3)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

In [None]:
# Training and validation loop
num_epochs = 30
train_accs = []
val_accs = []
train_losses = []
val_losses = []
best_val_accuracy = 0
best_val_loss = float('inf')

for epoch in range(num_epochs):
    # Training
    model.train()
    train_loss = 0.0
    train_corrects = 0

    for train_images, train_labels in tqdm(train_dataloader, desc=f"Epoch {epoch+1}"):
        train_images = train_images.to(device)
        train_labels = train_labels.long().to(device)
        
        optimizer.zero_grad()

        outputs = model(train_images)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, train_labels)
        
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * train_images.size(0)
        train_corrects += torch.sum(preds == train_labels.data)

    train_loss /= len(train_dataloader.dataset)
    train_acc = train_corrects.double() / len(train_dataloader.dataset)
    train_losses.append(train_loss)
    train_accs.append(train_acc)

    # Validation
    model.eval()
    val_loss = 0.0
    val_corrects = 0
    with torch.no_grad():
        for val_images, val_labels in tqdm(val_dataloader, desc=f"Epoch {epoch+1} Validation"):
            val_images = val_images.to(device)
            val_labels = val_labels.long().to(device)
            
            val_outputs = model(val_images)
            _, preds = torch.max(val_outputs, 1)
            loss = criterion(val_outputs, val_labels)
            
            val_loss += loss.item() * val_images.size(0)
            val_corrects += torch.sum(preds == val_labels.data)

    val_loss /= len(val_dataloader.dataset)
    val_acc = val_corrects.double() / len(val_dataloader.dataset)
    val_losses.append(val_loss)
    val_accs.append(val_acc)

    print(f"Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

   
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_model.pth')

In [None]:
# Load the best model
best_model = model
best_model.load_state_dict(torch.load('/kaggle/working/best_model.pth'))

# Predict on the test set
best_model.eval()
test_predictions = []
true_labels = []
with torch.no_grad():
    for test_images, test_labels in tqdm(test_dataloader, desc="Test Set"):
        test_images = test_images.to(device)
        test_labels = test_labels.long().to(device)
        
        outputs = best_model(test_images)
        _, preds = torch.max(outputs, 1)
        
        test_predictions.extend(preds.cpu().numpy())
        true_labels.extend(test_labels.cpu().numpy())

# Evaluate the model on the test set
num_correct = sum([1 for i in range(len(test_predictions)) if test_predictions[i] == true_labels[i]])
test_accuracy = num_correct / len(test_predictions)

print(f'Test Accuracy: {test_accuracy * 100:.2f}%')

# Generate confusion matrix
cm = confusion_matrix(true_labels, test_predictions)
print(f'Confusion Matrix:\n{cm}')

# Calculate F1 score (macro, weighted, or per-class)
f1 = f1_score(true_labels, test_predictions, average='macro')  
print(f'F1 Score (Macro): {f1:.2f}')

# Plot confusion matrix using seaborn
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=[str(i) for i in range(len(cm))], yticklabels=[str(i) for i in range(len(cm))])
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()