In [1]:
import torch
from torchvision import models
from torch import nn
from torch.utils.data import DataLoader
from LymphoMNIST.LymphoMNIST import LymphoMNIST
from torchvision import transforms
from student_qt import FilteredLymphoMNIST, get_dataloaders  

  from .autonotebook import tqdm as notebook_tqdm


In [2]:

# Hyperparameters
params = {
    'batch_size': 16,
    'im_size': 64,  # Resize dimension used during training
    'model_checkpoint': "checkpoint/Final_models/KD_13 October 00:13_resnet50_qt-1channel-imsize-120-48.pt"  # Path to the saved model
}

BIGGER = 64

import torchvision.transforms as T
transform_student = T.Compose([
    T.Resize((BIGGER, BIGGER)),
    T.ToTensor(),
    T.Normalize([0.4819], [0.1484]),
])

In [3]:


# Initialize datasets
labels_to_keep = [0, 1]
original_train_ds = LymphoMNIST(root='./dataset', train=True, download=True, transform=transform_student, num_classes=3)
original_test_ds = LymphoMNIST(root='./dataset', train=False, download=True, transform=transform_student, num_classes=3)

# Filter datasets
train_ds = FilteredLymphoMNIST(original_train_ds, labels_to_keep)
test_ds = FilteredLymphoMNIST(original_test_ds, labels_to_keep)

# Get dataloaders
train_dl, val_dl, test_dl = get_dataloaders(train_ds, test_ds, batch_size=params['batch_size'], num_workers=4)

Dataset already exists. Skipping download.
Dataset already exists. Skipping download.


# Resnet18

In [8]:
# Load model
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = models.resnet18()
model.conv1 = nn.Conv2d(1, model.conv1.out_channels, kernel_size=model.conv1.kernel_size, stride=model.conv1.stride, padding=model.conv1.padding, bias=False)
model.fc = nn.Linear(model.fc.in_features, len(labels_to_keep))

# Load saved weights
# model.load_state_dict(torch.load('checkpoint/KD_10 October 16:37_resnet50_resnet18-1channel-timm.pt', map_location=device))
model.load_state_dict(torch.load('checkpoint/Final_models/KD_10 October 16:40_resnet50_resnet18-1channel-worked.pt', map_location=device))
model = model.to(device)

# Function to calculate accuracy
def calculate_accuracy(loader, model, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

In [9]:
# Calculate and print accuracies
train_acc = calculate_accuracy(train_dl, model, device)
print(f'Train Accuracy: {train_acc:.2f}%')



Train Accuracy: 100.00%


In [10]:
val_acc = calculate_accuracy(val_dl, model, device)
print(f'Validation Accuracy: {val_acc:.2f}%')

Validation Accuracy: 94.04%


In [11]:
test_acc = calculate_accuracy(test_dl, model, device)
print(f'Test Accuracy: {test_acc:.2f}%')

Test Accuracy: 94.16%


: 

## Inference time

In [17]:
import time

# Hyperparameters
params = {
    'batch_size': 16,  # Standard batch size
    'im_size': 64,     # Resize dimension used during training
    'num_warmup_batches': 5,  # Number of warmup batches
    'num_batches': 100,  # Number of batches to measure (should cover 1000 images)
}

In [None]:


# Function to calculate inference speed
def calculate_inference_speed(loader, model, device, num_batches, warmup_batches):
    model.eval()
    total_time = 0.0
    images_processed = 0
    
    # Warm-up loop
    with torch.no_grad():
        for i, (images, _) in enumerate(loader):
            if i >= warmup_batches:
                break
            images = images.to(device)
            _ = model(images)

    # Timed inference loop
    with torch.no_grad():
        for i, (images, _) in enumerate(loader):
            if i >= num_batches:
                break
            images = images.to(device)

            start_time = time.time()  # Start timing
            _ = model(images)
            end_time = time.time()  # End timing

            # Update total time and images processed
            total_time += (end_time - start_time)
            images_processed += images.size(0)

    avg_inference_time_per_image = total_time / images_processed
    images_per_second = 1.0 / avg_inference_time_per_image
    return avg_inference_time_per_image, images_per_second

# Calculate inference speed for 100 batches (or until 1000 images)
avg_time, throughput = calculate_inference_speed(test_dl, model, device, params['num_batches'], params['num_warmup_batches'])

print(f'Average Inference Time per Image: {avg_time:.6f} seconds')
print(f'Inference Speed: {throughput:.2f} images/second')


In [None]:

import numpy as np
# Function to calculate inference speed
def calculate_inference_speed(loader, model, device, num_batches, warmup_batches):
    model.eval()
    total_time = 0.0
    images_processed = 0
    
    # Warm-up loop
    with torch.no_grad():
        for i, (images, _) in enumerate(loader):
            if i >= warmup_batches:
                break
            images = images.to(device)
            _ = model(images)

    # Timed inference loop
    with torch.no_grad():
        for i, (images, _) in enumerate(loader):
            if i >= num_batches:
                break
            images = images.to(device)

            start_time = time.time()  # Start timing
            _ = model(images)
            end_time = time.time()  # End timing

            # Update total time and images processed
            total_time += (end_time - start_time)
            images_processed += images.size(0)

    avg_inference_time_per_image = total_time / images_processed
    images_per_second = 1.0 / avg_inference_time_per_image
    return avg_inference_time_per_image, images_per_second

# Run the inference test for 50 iterations and collect the results
inference_times = []
inference_speeds = []

for _ in range(50):
    avg_time, throughput = calculate_inference_speed(test_dl, model, device, params['num_batches'], params['num_warmup_batches'])
    inference_times.append(avg_time)
    inference_speeds.append(throughput)

# Calculate the average and standard deviation of inference times and speeds
mean_time = np.mean(inference_times)
std_time = np.std(inference_times)
mean_speed = np.mean(inference_speeds)
std_speed = np.std(inference_speeds)

print(f'Average Inference Time per Image (over 50 runs): {mean_time:.6f} seconds ± {std_time:.6f}')
print(f'Average Inference Speed (over 50 runs): {mean_speed:.2f} images/second ± {std_speed:.2f}')

# Tiny model

In [5]:
# Load model
device = 'cuda' if torch.cuda.is_available() else 'cpu'

class QuantizedCNN(nn.Module):
    def __init__(self, num_classes=2, input_size=(1, 28, 28)):
        super(QuantizedCNN, self).__init__()
        self.num_classes = num_classes
        self.features = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),
            nn.Conv2d(16, 16, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        )
        with torch.no_grad():
            dummy_input = torch.zeros(1, *input_size)
            dummy_output = self.features(dummy_input)
            num_ftrs = dummy_output.numel() // dummy_output.size(0)
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Dropout(0.5),
            nn.Linear(num_ftrs, num_classes),
            nn.Softmax(dim=1)
        )
    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x
    
    
# Load saved weights
model = QuantizedCNN(num_classes=2, input_size=(1, BIGGER, BIGGER)).to(device)

# model.load_state_dict(torch.load('checkpoint/KD_10 October 16:38_resnet50_qt-1channel-worked.pt', map_location=device))
model.load_state_dict(torch.load(params['model_checkpoint'], map_location=device))

model = model.to(device)

# Function to calculate accuracy
def calculate_accuracy(loader, model, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

In [None]:
from torchsummary import summary
summary(model, (1, 48, 48))

In [None]:
# Calculate and print accuracies
train_acc = calculate_accuracy(train_dl, model, device)
print(f'Train Accuracy: {train_acc:.2f}%')


In [None]:

val_acc = calculate_accuracy(val_dl, model, device)
print(f'Validation Accuracy: {val_acc:.2f}%')



In [None]:
test_acc = calculate_accuracy(test_dl, model, device)
print(f'Test Accuracy: {test_acc:.2f}%')

## Inference time

In [23]:
import time

# Hyperparameters
params = {
    'batch_size': 16,  # Standard batch size
    'im_size': 64,     # Resize dimension used during training
    'num_warmup_batches': 5,  # Number of warmup batches
    'num_batches': 100,  # Number of batches to measure (should cover 1000 images)
    'model_checkpoint': "checkpoint/KD_30 September 22:37_resnet50_resnet18-1channel-worked.pt"
}

In [None]:

import numpy as np
# Function to calculate inference speed
def calculate_inference_speed(loader, model, device, num_batches, warmup_batches):
    model.eval()
    total_time = 0.0
    images_processed = 0
    
    # Warm-up loop
    with torch.no_grad():
        for i, (images, _) in enumerate(loader):
            if i >= warmup_batches:
                break
            images = images.to(device)
            _ = model(images)

    # Timed inference loop
    with torch.no_grad():
        for i, (images, _) in enumerate(loader):
            if i >= num_batches:
                break
            images = images.to(device)

            start_time = time.time()  # Start timing
            _ = model(images)
            end_time = time.time()  # End timing

            # Update total time and images processed
            total_time += (end_time - start_time)
            images_processed += images.size(0)

    avg_inference_time_per_image = total_time / images_processed
    images_per_second = 1.0 / avg_inference_time_per_image
    return avg_inference_time_per_image, images_per_second

# Run the inference test for 50 iterations and collect the results
inference_times = []
inference_speeds = []

for _ in range(50):
    avg_time, throughput = calculate_inference_speed(test_dl, model, device, params['num_batches'], params['num_warmup_batches'])
    inference_times.append(avg_time)
    inference_speeds.append(throughput)

# Calculate the average and standard deviation of inference times and speeds
mean_time = np.mean(inference_times)
std_time = np.std(inference_times)
mean_speed = np.mean(inference_speeds)
std_speed = np.std(inference_speeds)

print(f'Average Inference Time per Image (over 50 runs): {mean_time:.6f} seconds ± {std_time:.6f}')
print(f'Average Inference Speed (over 50 runs): {mean_speed:.2f} images/second ± {std_speed:.2f}')