In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models, transforms, datasets
from PIL import Image
import cv2
import numpy as np
import time
import os
import shutil
import random

In [None]:
def create_split_directories(base_path, species_list):
    splits = ['train', 'val', 'test']
    for split in splits:
        split_path = os.path.join(base_path, split)
        os.makedirs(split_path, exist_ok=True)
        for species in species_list:
            os.makedirs(os.path.join(split_path, species), exist_ok=True)

def split_dataset(src_dir, dest_dir, train_ratio=0.7, val_ratio=0.15):
    species_list = [d for d in os.listdir(src_dir) if os.path.isdir(os.path.join(src_dir, d))]
    
    create_split_directories(dest_dir, species_list)
    
    for species in species_list:
        species_dir = os.path.join(src_dir, species)
        files = os.listdir(species_dir)
        random.shuffle(files)
        
        total = len(files)
        train_end = int(total * train_ratio)
        val_end = train_end + int(total * val_ratio)
        
        train_files = files[:train_end]
        val_files = files[train_end:val_end]
        test_files = files[val_end:]
        
        for file in train_files:
            shutil.copy(os.path.join(species_dir, file), os.path.join(dest_dir, 'train', species, file))
        for file in val_files:
            shutil.copy(os.path.join(species_dir, file), os.path.join(dest_dir, 'val', species, file))
        for file in test_files:
            shutil.copy(os.path.join(species_dir, file), os.path.join(dest_dir, 'test', species, file))

src_dir = 'dataset'  # Path to original dataset directory
dest_dir = 'SplitDataset'  # Path to save the split dataset

split_dataset(src_dir, dest_dir)

In [6]:
#DATA LOADING AND TRANSFORMATION

data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

data_dir = 'SplitDataset'     #ADD DATASET LOCATION FOR TRAINING, with seperate 'train' 'val' and 'test' folders

image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'val','test']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=64,
                                             shuffle=True, num_workers=4)
              for x in ['train', 'val','test']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val','test']}
class_names = image_datasets['train'].classes

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [7]:
#MODEL INITIALISATION

model_ft = models.resnet18(pretrained=True)
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, len(class_names))
model_ft = model_ft.to(device)
criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)



In [8]:
#MODEL TRAINING

def train_model(model, criterion, optimizer, scheduler, num_epochs=25, save_path='modeltrain.pth', checkpoint_interval=2):
    since = time.time()
    best_model_wts = model.state_dict()
    best_acc = 0.0
    try:
        for epoch in range(num_epochs):
            print(f'Epoch {epoch}/{num_epochs - 1}')
            print('-' * 10)

            for phase in ['train', 'val']:
                if phase == 'train':
                    model.train()
                else:
                    model.eval()

                running_loss = 0.0
                running_corrects = 0

                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    optimizer.zero_grad()
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)

                if phase == 'train':
                    scheduler.step()

                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]

                print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

                if phase == 'val' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = model.state_dict()

            if epoch % checkpoint_interval == 0:
                torch.save(model.state_dict(), save_path)
                print(f'Checkpoint saved at epoch {epoch}')

            print()

    except KeyboardInterrupt:
        print("Training interrupted. Saving the model...")
        torch.save(model.state_dict(), save_path)
        print(f'Model saved to {save_path}')
        return model

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:.4f}')
    model.load_state_dict(best_model_wts)
    torch.save(model.state_dict(), save_path)
    return model

model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=25, save_path='modeltrain.pth', checkpoint_interval=2)

Epoch 0/24
----------
train Loss: 2.3693 Acc: 0.1704
val Loss: 2.0453 Acc: 0.2587
Checkpoint saved at epoch 0

Epoch 1/24
----------
train Loss: 1.9379 Acc: 0.3793
val Loss: 1.4729 Acc: 0.7273

Epoch 2/24
----------
train Loss: 1.5055 Acc: 0.6874
val Loss: 1.0103 Acc: 0.8392
Checkpoint saved at epoch 2

Epoch 3/24
----------
train Loss: 1.1869 Acc: 0.7689
val Loss: 0.6990 Acc: 0.9161

Epoch 4/24
----------
train Loss: 0.9713 Acc: 0.8207
val Loss: 0.5215 Acc: 0.8881
Checkpoint saved at epoch 4

Epoch 5/24
----------
train Loss: 0.7714 Acc: 0.8607
val Loss: 0.4222 Acc: 0.9441

Epoch 6/24
----------
train Loss: 0.6729 Acc: 0.8756
val Loss: 0.3369 Acc: 0.9371
Checkpoint saved at epoch 6

Epoch 7/24
----------
train Loss: 0.6080 Acc: 0.8859
val Loss: 0.3362 Acc: 0.9371

Epoch 8/24
----------
train Loss: 0.6293 Acc: 0.8711
val Loss: 0.3312 Acc: 0.9301
Checkpoint saved at epoch 8

Epoch 9/24
----------
train Loss: 0.6211 Acc: 0.8800
val Loss: 0.3224 Acc: 0.9301

Epoch 10/24
----------
train L

In [9]:
#TEST LOSS AND ACCURACY

save_path='modeltrain.pth'
model_ft.load_state_dict(torch.load(save_path))
model_ft = model_ft.to(device)
def evaluate_model_with_predictions(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    total_loss = running_loss / dataset_sizes['test']
    total_acc = running_corrects.double() / dataset_sizes['test']

    print(f'Test Loss: {total_loss:.4f} Acc: {total_acc:.4f}')
    return total_loss, total_acc, all_preds, all_labels

criterion = torch.nn.CrossEntropyLoss()
test_loss, test_acc, test_preds, test_labels = evaluate_model_with_predictions(model_ft, dataloaders['test'], criterion)

  model_ft.load_state_dict(torch.load(save_path))


Test Loss: 0.3213 Acc: 0.9664


In [10]:
model_ft.load_state_dict(torch.load('modeltrain.pth'))

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model_ft = model_ft.to(device)

model_ft.eval()

preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

cap = cv2.VideoCapture(0)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    img_pil = Image.fromarray(img_rgb)
    img_tensor = preprocess(img_pil)
    img_tensor = img_tensor.unsqueeze(0)
    img_tensor = img_tensor.to(device)

    with torch.no_grad():
        outputs = model_ft(img_tensor)
        _, preds = torch.max(outputs, 1)
        pred_class = class_names[preds[0]]

    cv2.putText(frame, f'Prediction: {pred_class}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
    cv2.imshow('Video', frame)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

  model_ft.load_state_dict(torch.load('modeltrain.pth'))


KeyboardInterrupt: 