<a href="https://colab.research.google.com/github/Stdunson/InsectDetectionProject/blob/main/InsectThingsColab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sequential Transfer Learning with Large Scale Insect Datasets

#Inconsistencies
1. Not every data picture has every taxonomic level, so there has to be a way to filter that during gathering. The lowest level that every image consistently has is order, which is really high. We're gonna focus on family-level
2. One dataset is JSON and the other is raw images. Shouldn't matter too much buecause I'm gonna have two different training algorithms
3. Bioscan has 934 families while Insect Foundation has 1189(1500 according to code)



In [1]:
#Installs
!pip3 install gdown
!pip3 install wget
!pip3 install utils
!pip3 install bioscan-dataset

Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9655 sha256=5c5b2087730007d1a919b85c8ee2f960bce9e72f8e3c17764f48e4252aa46659
  Stored in directory: /root/.cache/pip/wheels/01/46/3b/e29ffbe4ebe614ff224bad40fc6a5773a67a163251585a13a9
Successfully built wget
Installing collected packages: wget
Successfully installed wget-3.2
Collecting utils
  Downloading utils-1.0.2.tar.gz (13 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: utils
  Building wheel for utils (setup.py) ... [?25l[?25hdone
  Created wheel for utils: filename=utils-1.0.2-py2.py3-none-any.whl size=13906 sha256=831ff363688f5ff579aee59e0ce54d1962660c9ed47cfdefc5e3e1a63d0d715a
  Stored in directory: /root/.cache/pip/wheels/b6/a1/81/1036477786ae0e17b522f6f5a83

In [2]:
#If using drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
#Basic Imports
import os
import itertools
import numpy as np
import random
import time

#Doanloading things
from PIL import Image
from io import BytesIO
import requests
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import gdown
import json
from collections import defaultdict

#Pytorch
import torch
import torchvision
from torchvision import models
from torchvision import transforms
from torch.utils.data import DataLoader
from torch.utils.data import Dataset, random_split
import torch.nn as nn
import torch.optim as optim

#Resnet
from torchvision.models import resnet50, ResNet50_Weights


# Data Downloading

In [4]:
#Bioscan
from bioscan_dataset import BIOSCAN5M

# Define the transformations for ResNet50
# ResNet50_Weights.IMAGENET1K_V2.transforms() provides the recommended transformations for the pre-trained ResNet50
transform = ResNet50_Weights.IMAGENET1K_V2.transforms()

bioscan_dataset_train = BIOSCAN5M("~/Datasets/bioscan-5m", download=True, split="train", modality="image", target_type="family", target_format="index", transform=transform)
bioscan_dataset_test = BIOSCAN5M("~/Datasets/bioscan-5m", download=True, split="test", modality="image", target_type="family", target_format="index", transform=transform)
bioscan_dataset_val = BIOSCAN5M("~/Datasets/bioscan-5m", download=True, split="val", modality="image", target_type="family", target_format="index", transform=transform)
#To get class from int: idx_to_class

#Information about bioscan_dataset: https://github.com/bioscan-ml/dataset

File missing: /root/Datasets/bioscan-5m/bioscan5m/metadata/csv/BIOSCAN_5M_Insect_Dataset_metadata.csv


100%|██████████| 2.07G/2.07G [23:40<00:00, 1.45MB/s]


Image directory missing: /root/Datasets/bioscan-5m/bioscan5m/images/cropped_256


100%|██████████| 2.22G/2.22G [12:58<00:00, 2.86MB/s]


Metadata CSV file already downloaded and verified
Directory missing: /root/Datasets/bioscan-5m/bioscan5m/images/cropped_256/test


100%|██████████| 1.47G/1.47G [42:24<00:00, 576kB/s]


Metadata CSV file already downloaded and verified
Images already downloaded and verified


In [5]:
# New Insect Foundation Download Script

JSON_PATH = "/content/drive/MyDrive/Insect-1M-v1.json"
OUT_ROOT = "/data/insects/images"
MAX_WORKERS = 64   # GCE can handle this easily

os.makedirs(OUT_ROOT, exist_ok=True)

def extract_real_name(family_string):
    if '(' in family_string and ')' in family_string:
        return family_string[family_string.find('(')+1:family_string.find(')')]
    return family_string

with open(JSON_PATH) as f:
    data = json.load(f)

records = []
for r in data["insect_records"]:
    if r.get("Family") and r.get("image_url"):
        r["Family"] = extract_real_name(r["Family"])
        records.append(r)

print(f"Valid records: {len(records)}")

def download_one(idx, record):
    family = record["Family"]
    url = record["image_url"]

    family_dir = os.path.join(OUT_ROOT, family)
    os.makedirs(family_dir, exist_ok=True)

    out_path = os.path.join(family_dir, f"{idx:07d}.jpg")

    if os.path.exists(out_path):
        return None  # skip if already downloaded

    try:
        r = requests.get(url, timeout=10)
        img = Image.open(BytesIO(r.content)).convert("RGB")
        img.save(out_path, "JPEG", quality=90)
        return out_path
    except Exception:
        return None

with ThreadPoolExecutor(MAX_WORKERS) as ex:
    futures = [ex.submit(download_one, i, r) for i, r in enumerate(records)]
    for _ in tqdm(as_completed(futures), total=len(futures)):
        pass

metadata = []
for i, r in enumerate(records):
    path = f"{r['Family']}/{i:07d}.jpg"
    metadata.append({
        "path": path,
        "family": r["Family"]
    })

with open("/data/insects/metadata.json", "w") as f:
    json.dump(metadata, f)

with open("/data/insects/metadata.json") as f:
    metadata = json.load(f)

families = sorted(set(x["family"] for x in metadata))
family_to_idx = {f: i for i, f in enumerate(families)}

class InsectDiskDataset(Dataset):
    def __init__(self, root_dir, metadata, family_to_idx, transform=None):
        self.root_dir = root_dir
        self.metadata = metadata
        self.family_to_idx = family_to_idx
        self.transform = transform

    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        item = self.metadata[idx]
        img_path = os.path.join(self.root_dir, item["path"])
        label = self.family_to_idx[item["family"]]

        img = Image.open(img_path).convert("RGB")

        if self.transform:
            img = self.transform(img)

        return img, label

transform = ResNet50_Weights.IMAGENET1K_V2.transforms()

dataset = InsectDiskDataset(
    root_dir="/data/insects/images",
    metadata=metadata,
    family_to_idx=family_to_idx,
    transform=transform
)

train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size

IF_train_dataset, IF_val_dataset, IF_test_dataset = random_split(
    dataset,
    [train_size, val_size, test_size],
    generator=torch.Generator().manual_seed(42)
)

Valid records: 979149


  3%|▎         | 26165/979149 [32:47<19:54:14, 13.30it/s]


KeyboardInterrupt: 

In [None]:
# Old Insect Foundation Download Script(basic)


# Load JSON
with open('/content/drive/MyDrive/Insect-1M-v1.json', 'r') as file:
    data = json.load(file)

# Function to extract scientific name
def extract_real_name(family_string):
    if '(' in family_string and ')' in family_string:
        start = family_string.find('(')
        end = family_string.find(')')
        return family_string[start+1:end]
    return family_string

# Filter records without family and image_url
filtered_records = []
for record in data['insect_records']:
    if record.get('Family') and record.get('image_url'):
        record['Family'] = extract_real_name(record['Family'])
        filtered_records.append(record)


print(f"Original records: {len(data['insect_records'])}")
print(f"Filtered records: {len(filtered_records)}")

# Create family to index mapping
families = sorted(list(set(record['Family'] for record in filtered_records)))
family_to_idx = {family: idx for idx, family in enumerate(families)}
print(f"Number of unique families: {len(families)}")

# Custom Dataset class
class InsectDataset(Dataset):
    def __init__(self, records, family_to_idx, transform=None):
        self.records = records
        self.family_to_idx = family_to_idx
        self.transform = transform

    def __len__(self):
        return len(self.records)

    def __getitem__(self, idx):
        record = self.records[idx]

        # Download and load image
        try:
            response = requests.get(record['image_url'], timeout=10)
            img = Image.open(BytesIO(response.content)).convert('RGB')
        except Exception as e:
            # Return a blank image if download fails
            print(f"Failed to load image {record['image_url']}: {e}")
            img = Image.new('RGB', (224, 224), color='white')

        # Apply transform
        if self.transform:
            img = self.transform(img)

        # Get family label
        label = self.family_to_idx[record['Family']]

        return img, label

# Apply ResNet50 transforms
transform = ResNet50_Weights.IMAGENET1K_V2.transforms()

# Create full dataset
full_dataset = InsectDataset(filtered_records, family_to_idx, transform)

# Split into train (70%), validation (15%), test (15%)
total_size = len(full_dataset)
train_size = int(0.7 * total_size)
val_size = int(0.15 * total_size)
test_size = total_size - train_size - val_size

IF_train_dataset, IF_val_dataset, IF_test_dataset = random_split(
    full_dataset,
    [train_size, val_size, test_size],
    generator=torch.Generator().manual_seed(42)
)

print(f"\nDataset splits:")
print(f"Train: {len(IF_train_dataset)}")
print(f"Validation: {len(IF_val_dataset)}")
print(f"Test: {len(IF_test_dataset)}")

# Bioscan Things

In [None]:
#Initialize Resnet
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

bioscan_model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V2).to(device)
print(f"ResNet50 model loaded on device: {device}")

Using device: cuda
Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /root/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth


100%|██████████| 97.8M/97.8M [00:00<00:00, 220MB/s]


ResNet50 model loaded on device: cuda


In [None]:
#Data loaders

BATCH_SIZE = 512
NUM_WORKERS = 16

# 1. Initialize DataLoader for bioscan_dataset_train
bioscan_train_dataloader = DataLoader(
    bioscan_dataset_train,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS
)

# 2. Initialize DataLoader for bioscan_dataset_val
bioscan_val_dataloader = DataLoader(
    bioscan_dataset_val,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS
)

# 3. Initialize DataLoader for bioscan_dataset_test
bioscan_test_dataloader = DataLoader(
    bioscan_dataset_test,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS
)

print(f"Created DataLoader with batch size: {BATCH_SIZE}")

# Verify one batch
for images, labels in bioscan_train_dataloader:
    print(f"Batch image shape: {images.shape}")
    print(f"Batch label shape: {len(labels)}")
    break # Just take one batch to verify

Created DataLoader with batch size: 32
Batch image shape: torch.Size([32, 3, 224, 224])
Batch label shape: 32


In [None]:
#Freeze ResNet50 layers

num_classes = 934

#Get amt of in features of last layes
in_features = bioscan_model.fc.in_features
print(f"Original ResNet50 FC layer in_features: {in_features}")

#Replace Last Layer
bioscan_model.fc = nn.Linear(in_features, num_classes)
print(f"Replaced bioscan_model.fc with new nn.Linear layer (in_features={in_features}, out_features={num_classes})")

#Freeze early layers, unfreeze later layers
for name, param in bioscan_model.named_parameters():
    if "layer4" in name or "fc" in name:
        param.requires_grad = True
    else:
        param.requires_grad = False

#Verify
trainable_params = [name for name, param in bioscan_model.named_parameters() if param.requires_grad]
print(f"Number of trainable parameters: {len(trainable_params)}")
print(f"Trainable layers: {trainable_params}")

Original ResNet50 FC layer in_features: 2048
Replaced bioscan_model.fc with new nn.Linear layer (in_features=2048, out_features=934)
Froze all parameters of the base ResNet50 model.
Unfroze parameters of the new classification head (bioscan_model.fc).
Number of trainable parameters: 2
Trainable layers: ['fc.weight', 'fc.bias']


In [None]:
#Define loss funtction & optimizer

bioscan_criterion = nn.CrossEntropyLoss()
print(f"Loss function (criterion) set to: {bioscan_criterion}")

bioscan_optimizer = optim.Adam(bioscan_model.fc.parameters(), lr=0.001)
print(f"Optimizer set to: {bioscan_optimizer}")

Loss function (criterion) set to: CrossEntropyLoss()
Optimizer set to: Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    decoupled_weight_decay: False
    differentiable: False
    eps: 1e-08
    foreach: None
    fused: None
    lr: 0.001
    maximize: False
    weight_decay: 0
)


In [None]:
#Define training loop

dataloaders = {'train': bioscan_train_dataloader, 'validation': bioscan_val_dataloader, 'test': bioscan_test_dataloader}

def train_bioscan_model(model, criterion, optimizer, num_epochs=5):
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-' * 10)

        for phase in ['train', 'validation']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device).long() # Cast labels to torch.Long

                outputs = model(inputs)
                loss = bioscan_criterion(outputs, labels)

                if phase == 'train':
                    bioscan_optimizer.zero_grad()
                    loss.backward()
                    bioscan_optimizer.step()

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} loss: {:.4f}, acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

    return model

In [None]:
#Train Model
bioscan_model.to(device)
bioscan_model = train_bioscan_model(bioscan_model, bioscan_criterion, bioscan_optimizer, num_epochs=5)

Epoch 1/5
----------
train loss: 1.3327, acc: 0.6941
validation loss: 0.9284, acc: 0.7661
Epoch 2/5
----------
train loss: 0.8272, acc: 0.7849
validation loss: 0.8327, acc: 0.7916
Epoch 3/5
----------
train loss: 0.7107, acc: 0.8092
validation loss: 0.8115, acc: 0.7924
Epoch 4/5
----------
train loss: 0.6486, acc: 0.8219
validation loss: 0.8177, acc: 0.7930
Epoch 5/5
----------
train loss: 0.6091, acc: 0.8307
validation loss: 0.8339, acc: 0.7909


In [None]:
!mkdir models

In [None]:
#Save Model
torch.save(bioscan_model.state_dict(), 'models/bioscan_model.pth')

In [None]:
#Define evaluation/testing loop
def evaluate_bioscan_model(model, dataloader):
    model.eval()
    running_loss = 0.0
    running_corrects = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device).long() # Cast labels to torch
            outputs = model(inputs)
            loss = bioscan_criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels)


    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_corrects.double() / len(dataloader.dataset)

    print('Test loss: {:.4f}, acc: {:.4f}'.format(epoch_loss, epoch_acc))




In [None]:
#Evaluate Model

model = bioscan_model
dataloader = bioscan_test_dataloader
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

evaluate_bioscan_model(model, dataloader)

Test loss: 1.3028, acc: 0.6868


# Insect Foundation Things

In [None]:
#Initialize Resnet
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

IF_model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V2).to(device)
print(f"ResNet50 model loaded on device: {device}")

In [None]:
#Data Loaders
IF_train_loader = DataLoader(IF_train_dataset, batch_size=512, shuffle=True, num_workers=16)
IF_val_loader = DataLoader(IF_val_dataset, batch_size=512, shuffle=False, num_workers=16)
IF_test_loader = DataLoader(IF_test_dataset, batch_size=512, shuffle=False, num_workers=16)

print("\nDataLoaders created successfully!")
print(f"Family to index mapping saved with {len(family_to_idx)} classes")

# Verify one batch
for images, labels in IF_train_loader:
    print(f"Batch image shape: {images.shape}")
    print(f"Batch label shape: {len(labels)}")
    break # Just take one batch to verify

In [None]:
#Freeze ResNet50 layers

num_classes = 1500

#Get amt of in features of last layes
in_features = IF_model.fc.in_features
print(f"Original ResNet50 FC layer in_features: {in_features}")

#Replace Last Layer
IF_model.fc = nn.Linear(in_features, num_classes)
print(f"Replaced bioscan_model.fc with new nn.Linear layer (in_features={in_features}, out_features={num_classes})")

#Freeze early layers, unfreeze later layers
for name, param in IF_model.named_parameters():
    if "layer3" in name or "layer4" in name or "fc" in name:
        param.requires_grad = True
    else:
        param.requires_grad = False

#Verify
trainable_params = [name for name, param in IF_model.named_parameters() if param.requires_grad]
print(f"Number of trainable parameters: {len(trainable_params)}")
print(f"Trainable layers: {trainable_params}")

In [None]:
#Define loss funtction & optimizer

IF_criterion = nn.CrossEntropyLoss()
print(f"Loss function (criterion) set to: {IF_criterion}")

IF_optimizer = optim.Adam(IF_model.fc.parameters(), lr=0.001)
print(f"Optimizer set to: {IF_optimizer}")

In [None]:
#Define training loop

dataloaders = {'train': IF_train_loader, 'validation': IF_val_loader, 'test': IF_test_loader}

def train_IF_model(model, criterion, optimizer, num_epochs=5):
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-' * 10)

        for phase in ['train', 'validation']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device).long() # Cast labels to torch.Long

                outputs = model(inputs)
                loss = IF_criterion(outputs, labels)

                if phase == 'train':
                    IF_optimizer.zero_grad()
                    loss.backward()
                    IF_optimizer.step()

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} loss: {:.4f}, acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

    return model

In [None]:
#Train Model
IF_model.to(device)
IF_model = train_IF_model(IF_model, IF_criterion, IF_optimizer, num_epochs=5)

In [None]:
!mkdir models

In [None]:
#Save Model
torch.save(IF_model.state_dict(), 'models/IF_model.pth')

In [None]:
#Define evaluation/testing loop
def evaluate_IF_model(model, dataloader):
    model.eval()
    running_loss = 0.0
    running_corrects = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device).long() # Cast labels to torch
            outputs = model(inputs)
            loss = IF_criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels)


    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_corrects.double() / len(dataloader.dataset)

    print('Test loss: {:.4f}, acc: {:.4f}'.format(epoch_loss, epoch_acc))




In [None]:
#Evaluate Model

model = IF_model
dataloader = IF_test_loader
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

evaluate_IF_model(model, dataloader)

# Sequential Transfer Learning

To set up, download data from both, fully train first model

## Bioscan -> Insect Foundation

In [None]:
#Initialize Bioscan Trained ResNet
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

resnet50_bioscan_IF_model = resnet50(weights=None).to(device)
state_dict = torch.load('models/bioscan_model.pth')
state_dict = {k: v for k, v in state_dict.items() if not k.startswith("fc.")}
resnet50_bioscan_IF_model.load_state_dict(state_dict, strict=False)

print(f"Bioscan model loaded on device: {device}")

In [None]:
#Data Loaders
IF_train_loader = DataLoader(IF_train_dataset, batch_size=512, shuffle=True, num_workers=16)
IF_val_loader = DataLoader(IF_val_dataset, batch_size=512, shuffle=False, num_workers=16)
IF_test_loader = DataLoader(IF_test_dataset, batch_size=512, shuffle=False, num_workers=16)

print("\nDataLoaders created successfully!")
print(f"Family to index mapping saved with {len(family_to_idx)} classes")

# Verify one batch
for images, labels in IF_train_loader:
    print(f"Batch image shape: {images.shape}")
    print(f"Batch label shape: {len(labels)}")
    break # Just take one batch to verify

In [None]:
#Freeze ResNet50 layers

num_classes = 1500

#Get amt of in features of last layes
in_features = resnet50_bioscan_IF_model.fc.in_features

#Replace Last Layer
resnet50_bioscan_IF_model.fc = nn.Linear(in_features, num_classes)
print(f"Replaced bioscan_model.fc with new nn.Linear layer (in_features={in_features}, out_features={num_classes})")

#Freeze early layers, unfreeze later layers
for name, param in resnet50_bioscan_IF_model.named_parameters():
    if "layer4" in name or "fc" in name:
        param.requires_grad = True
    else:
        param.requires_grad = False

#Verify
trainable_params = [name for name, param in resnet50_bioscan_IF_model.named_parameters() if param.requires_grad]
print(f"Number of trainable parameters: {len(trainable_params)}")
print(f"Trainable layers: {trainable_params}")

In [None]:
#Define loss funtction & optimizer

resnet50_bioscan_IF_criterion = nn.CrossEntropyLoss()
print(f"Loss function (criterion) set to: {resnet50_bioscan_IF_criterion}")

resnet50_bioscan_IF_optimizer = optim.Adam([
    {"params": resnet50_bioscan_IF_model.layer4.parameters(), "lr": 5e-5},
    {"params": resnet50_bioscan_IF_model.fc.parameters(), "lr": 1e-3}
])

print(f"Optimizer set to: {resnet50_bioscan_IF_optimizer}")

In [None]:
#Define training loop

dataloaders = {'train': IF_train_loader, 'validation': IF_val_loader, 'test': IF_test_loader}

def train_resnet50_bioscan_IF_model(model, criterion, optimizer, num_epochs=5):
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-' * 10)

        for phase in ['train', 'validation']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device).long() # Cast labels to torch.Long

                outputs = model(inputs)
                loss = resnet50_bioscan_IF_criterion(outputs, labels)

                if phase == 'train':
                    resnet50_bioscan_IF_optimizer.zero_grad()
                    loss.backward()
                    resnet50_bioscan_IF_optimizer.step()

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} loss: {:.4f}, acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

    return model

In [None]:
#Train Model
resnet50_bioscan_IF_model.to(device)
resnet50_bioscan_IF_model = train_resnet50_bioscan_IF_model(resnet50_bioscan_IF_model, resnet50_bioscan_IF_criterion, resnet50_bioscan_IF_optimizer, num_epochs=5)

In [None]:
!mkdir models

In [None]:
#Save Model
torch.save(resnet50_bioscan_IF_model.state_dict(), 'models/resnet50_bioscan_IF_model.pth')

In [None]:
#Define evaluation/testing loop
def evaluate_resnet50_bioscan_IF_model(model, dataloader):
    model.eval()
    running_loss = 0.0
    running_corrects = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device).long() # Cast labels to torch
            outputs = model(inputs)
            loss = resnet50_bioscan_IF_criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels)


    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_corrects.double() / len(dataloader.dataset)

    print('Test loss: {:.4f}, acc: {:.4f}'.format(epoch_loss, epoch_acc))




In [None]:
#Evaluate Model

model = resnet50_bioscan_IF_model
dataloader = IF_test_loader
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

evaluate_resnet50_bioscan_IF_model(model, dataloader)

## Insect Foundation -> Bioscan

In [None]:
#Initialize Resnet
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

resnet50_IF_bioscan_model = resnet50(weights=None).to(device)
state_dict = torch.load('models/IF_model.pth')
state_dict = {k: v for k, v in state_dict.items() if not k.startswith("fc.")}
resnet50_IF_bioscan_model.load_state_dict(state_dict, strict=False)

print(f"IF model loaded on device: {device}")

In [None]:
#Data loaders

BATCH_SIZE = 512
NUM_WORKERS = 2

# 1. Initialize DataLoader for bioscan_dataset_train
bioscan_train_dataloader = DataLoader(
    bioscan_dataset_train,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS
)

# 2. Initialize DataLoader for bioscan_dataset_val
bioscan_val_dataloader = DataLoader(
    bioscan_dataset_val,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS
)

# 3. Initialize DataLoader for bioscan_dataset_test
bioscan_test_dataloader = DataLoader(
    bioscan_dataset_test,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS
)

print(f"Created DataLoader with batch size: {BATCH_SIZE}")

# Verify one batch
for images, labels in bioscan_train_dataloader:
    print(f"Batch image shape: {images.shape}")
    print(f"Batch label shape: {len(labels)}")
    break # Just take one batch to verify

In [None]:
#Freeze ResNet50 layers

num_classes = 934

#Get amt of in features of last layes
in_features = resnet50_IF_bioscan_model.fc.in_features
print(f"Original ResNet50 FC layer in_features: {in_features}")

#Replace Last Layer
resnet50_IF_bioscan_model.fc = nn.Linear(in_features, num_classes)
print(f"Replaced bioscan_model.fc with new nn.Linear layer (in_features={in_features}, out_features={num_classes})")

#Freeze early layers, unfreeze later layers
for name, param in resnet50_IF_bioscan_model.named_parameters():
    if "layer4" in name or "fc" in name:
        param.requires_grad = True
    else:
        param.requires_grad = False

#Verify
trainable_params = [name for name, param in resnet50_IF_bioscan_model.named_parameters() if param.requires_grad]
print(f"Number of trainable parameters: {len(trainable_params)}")
print(f"Trainable layers: {trainable_params}")

In [None]:
#Define loss funtction & optimizer

resnet50_IF_bioscan_criterion = nn.CrossEntropyLoss()
print(f"Loss function (criterion) set to: {resnet50_IF_bioscan_criterion}")

resnet50_IF_bioscan_optimizer = optim.Adam([
    {"params": resnet50_IF_bioscan_model.layer4.parameters(), "lr": 1e-4},
    {"params": resnet50_IF_bioscan_model.fc.parameters(), "lr": 1e-3}
])
print(f"Optimizer set to: {resnet50_IF_bioscan_optimizer}")

In [None]:
#Define training loop

dataloaders = {'train': bioscan_train_dataloader, 'validation': bioscan_val_dataloader, 'test': bioscan_test_dataloader}

def train_resnet50_IF_bioscan_model(model, criterion, optimizer, num_epochs=5):
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-' * 10)

        for phase in ['train', 'validation']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device).long() # Cast labels to torch.Long

                outputs = model(inputs)
                loss = resnet50_IF_bioscan_criterion(outputs, labels)

                if phase == 'train':
                    resnet50_IF_bioscan_optimizer.zero_grad()
                    loss.backward()
                    resnet50_IF_bioscan_optimizer.step()

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} loss: {:.4f}, acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

    return model

In [None]:
#Train Model
resnet50_IF_bioscan_model.to(device)
resnet50_IF_bioscan_model = train_resnet50_IF_bioscan_model(resnet50_IF_bioscan_model, resnet50_IF_bioscan_criterion, resnet50_IF_bioscan_optimizer, num_epochs=5)

In [None]:
!mkdir models

In [None]:
#Save Model
torch.save(resnet50_IF_bioscan_model.state_dict(), 'models/resnet50_IF_bioscan_model.pth')

In [None]:
#Define evaluation/testing loop
def evaluate_resnet50_IF_bioscan_model(model, dataloader):
    model.eval()
    running_loss = 0.0
    running_corrects = 0

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device).long() # Cast labels to torch
            outputs = model(inputs)
            loss = resnet50_IF_bioscan_criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels)


    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = running_corrects.double() / len(dataloader.dataset)

    print('Test loss: {:.4f}, acc: {:.4f}'.format(epoch_loss, epoch_acc))




In [None]:
#Evaluate Model

model = resnet50_IF_bioscan_model
dataloader = bioscan_test_dataloader
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

evaluate_resnet50_IF_bioscan_model(model, dataloader)