In [277]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, random_split
import matplotlib.pyplot as plt
import torchvision.datasets as datasets
from PIL import Image
import numpy as np

In [278]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [279]:
data_dir = "./public/dataset"
test_data_dir = "./public/test_dataset"

In [280]:

data_transform = transforms.Compose([
    transforms.Resize((150, 150)),
    transforms.ToTensor()
])


dataset = ImageFolder(data_dir, transform=data_transform)
test_dataset = ImageFolder(test_data_dir, transform=data_transform)


batch_size = 128
train_size = len(dataset) - 2000
train_data, val_data = random_split(dataset, [train_size, 2000])

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=4)
val_loader = DataLoader(val_data, batch_size=batch_size * 2, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, num_workers=4)


def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

In [281]:
class ImageClassificationBase(nn.Module):
    def training_step(self, batch):
        images, labels = batch
        images = images.to(device)
        labels = labels.to(device)
        out = self(images)
        loss = F.cross_entropy(out, labels)
        return loss

    def validation_step(self, batch):
        images, labels = batch
        images = images.to(device)
        labels = labels.to(device)
        out = self(images)
        loss = F.cross_entropy(out, labels)
        acc = accuracy(out, labels)
        return {'val_loss': loss.detach(), 'val_acc': acc}

    def validation_epoch_end(self, outputs):
        outputs = outputs
        batch_losses = [x['val_loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()
        batch_accs = [x['val_acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()
        return {'val_loss': epoch_loss.item(), 'val_acc': epoch_acc.item()}

    def epoch_end(self, epoch, result):
        result = result
        print("Epoch [{}], train_loss: {:.4f}, val_loss: {:.4f}, val_acc: {:.4f}".format(
            epoch, result['train_loss'], result['val_loss'], result['val_acc']))

In [282]:
class StickyNN(ImageClassificationBase):
    def __init__(self):
        super(StickyNN, self).__init__()
        self.network = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Flatten(),
            nn.Linear(82944, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 2)  #num classes
        )

    def forward(self, x):
        return self.network(x)

In [283]:
def evaluate(model, val_loader, train_loader, device):
    model.eval()
    val_loss = 0
    val_acc = 0
    train_loss = 0
    total = 0
    with torch.no_grad():
        for batch in val_loader:
            images, labels = batch
            images, labels = images.to(device), labels.to(device)
            out = model(images)
            loss = F.cross_entropy(out, labels)
            val_loss += loss.item() * images.size(0)
            _, predicted = torch.max(out, 1)
            val_acc += (predicted == labels).sum().item()
            total += labels.size(0)
        
        for batch in train_loader:
            images, labels = batch
            images, labels = images.to(device), labels.to(device)
            out = model(images)
            loss = F.cross_entropy(out, labels)
            train_loss += loss.item() * images.size(0)
            
    return {
        'train_loss': train_loss / len(train_loader.dataset),
        'val_loss': val_loss / len(val_loader.dataset),
        'val_acc': val_acc / total
    }

def fit(epochs, lr, model, train_loader, val_loader, device, opt_func=torch.optim.Adam):
    optimizer = opt_func(model.parameters(), lr)
    for epoch in range(epochs):
        model.train()
        for batch in train_loader:
            loss = model.training_step(batch)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
        result = evaluate(model,val_loader, train_loader, device)
        model.epoch_end(epoch, result)

In [284]:
model = StickyNN()
model = model.to(device)

In [288]:
num_epochs = 40
lr = 0.001

In [289]:
fit(num_epochs, lr, model, train_loader, val_loader, device)

Epoch [0], train_loss: 0.6443, val_loss: 0.6271, val_acc: 0.7064
Epoch [1], train_loss: 0.4640, val_loss: 0.4420, val_acc: 0.8624
Epoch [2], train_loss: 0.3011, val_loss: 0.3033, val_acc: 0.9266
Epoch [3], train_loss: 0.5513, val_loss: 0.5065, val_acc: 0.6697
Epoch [4], train_loss: 0.3273, val_loss: 0.2817, val_acc: 0.8991
Epoch [5], train_loss: 0.4123, val_loss: 0.3641, val_acc: 0.7156
Epoch [6], train_loss: 0.2147, val_loss: 0.2182, val_acc: 0.9174
Epoch [7], train_loss: 0.2449, val_loss: 0.2533, val_acc: 0.9083
Epoch [8], train_loss: 0.1765, val_loss: 0.2001, val_acc: 0.9266
Epoch [9], train_loss: 0.1668, val_loss: 0.1814, val_acc: 0.9083
Epoch [10], train_loss: 0.1481, val_loss: 0.1768, val_acc: 0.9174
Epoch [11], train_loss: 0.1433, val_loss: 0.1623, val_acc: 0.9266
Epoch [12], train_loss: 0.1272, val_loss: 0.1604, val_acc: 0.9266
Epoch [13], train_loss: 0.1403, val_loss: 0.1755, val_acc: 0.9266
Epoch [14], train_loss: 0.1510, val_loss: 0.1643, val_acc: 0.9358
Epoch [15], train_lo

In [290]:
def evaluate_test(model, test_loader, device):
    model.eval()
    test_loss = 0
    test_acc = 0
    total = 0
    with torch.no_grad():
        for batch in test_loader:
            images, labels = batch
            images, labels = images.to(device), labels.to(device)
            out = model(images)
            loss = F.cross_entropy(out, labels)
            test_loss += loss.item() * images.size(0)
            _, predicted = torch.max(out, 1)
            test_acc += (predicted == labels).sum().item()
            total += labels.size(0)
        
    return {
        'test_loss': test_loss / len(test_loader.dataset),
        'test_acc': test_acc / total
    }

In [291]:
evaluate_test(model, test_loader, device)

{'test_loss': 6.350018360700266, 'test_acc': 0.6414342629482072}

In [292]:
normal_data_dir = "./public/normal_dataset"
normal_dataset = ImageFolder(data_dir, transform=data_transform)

In [293]:
novel_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=data_transform)


Files already downloaded and verified


In [294]:
batch_size = 32

normal_dataloader = DataLoader(normal_dataset, batch_size=batch_size, shuffle=True)
novel_dataloader = DataLoader(novel_dataset, batch_size=batch_size, shuffle=True)


In [304]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1),  # 150x150x3 -> 75x75x16
            nn.ReLU(True),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),  # 75x75x16 -> 38x38x32
            nn.ReLU(True),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),  # 38x38x32 -> 19x19x64
            nn.ReLU(True),
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),  # 19x19x64 -> 38x38x32
            nn.ReLU(True),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),  # 38x38x32 -> 76x76x16
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 3, kernel_size=3, stride=2, padding=1, output_padding=1),   # 76x76x16 -> 152x152x3
            nn.Tanh()
        )
    
    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        # Adjust the output to match the input size
        x = x[:, :, :150, :150]
        return x

In [305]:
def fit_autoencoder(model, dataloader, criterion, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for batch in dataloader:
            optimizer.zero_grad()
            images, labels = batch
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, images)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * images.size(0)
        epoch_loss = running_loss / len(dataloader.dataset)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")

def eval_autoencoder(model, dataloader, criterion):
    model.eval()
    total_loss = 0.0
    with torch.no_grad():
        for batch in dataloader:
            images, labels = batch
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, images)
            total_loss += loss.item() * images.size(0)
    avg_loss = total_loss / len(dataloader.dataset)
    print(f"Average Loss: {avg_loss:.4f}")
    return avg_loss

In [306]:
def fine_tune_autoencoder(model, dataloader, normal_threshold, novel_threshold, num_epochs=10):
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters())

    for epoch in range(num_epochs):
        running_loss = 0.0
        for batch in dataloader:
            images, labels = batch
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, images)
            if loss.item() < normal_threshold:
                loss.backward()
            elif loss.item() > novel_threshold:
                (-loss).backward()  
            optimizer.step()
            running_loss += loss.item() * images.size(0)
        epoch_loss = running_loss / len(dataloader.dataset)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}") 



In [307]:
autoencoder = Autoencoder()
autoencoder = autoencoder.to(device)

In [308]:
fit_autoencoder(autoencoder, normal_dataloader, nn.MSELoss(), torch.optim.Adam(autoencoder.parameters()), 120)

Epoch [1/120], Loss: 0.2731
Epoch [2/120], Loss: 0.0220
Epoch [3/120], Loss: 0.0088
Epoch [4/120], Loss: 0.0059
Epoch [5/120], Loss: 0.0050
Epoch [6/120], Loss: 0.0046
Epoch [7/120], Loss: 0.0043
Epoch [8/120], Loss: 0.0040
Epoch [9/120], Loss: 0.0038
Epoch [10/120], Loss: 0.0036
Epoch [11/120], Loss: 0.0035
Epoch [12/120], Loss: 0.0033
Epoch [13/120], Loss: 0.0032
Epoch [14/120], Loss: 0.0031
Epoch [15/120], Loss: 0.0028
Epoch [16/120], Loss: 0.0026
Epoch [17/120], Loss: 0.0023
Epoch [18/120], Loss: 0.0022
Epoch [19/120], Loss: 0.0020
Epoch [20/120], Loss: 0.0019
Epoch [21/120], Loss: 0.0018
Epoch [22/120], Loss: 0.0018
Epoch [23/120], Loss: 0.0017
Epoch [24/120], Loss: 0.0018
Epoch [25/120], Loss: 0.0018
Epoch [26/120], Loss: 0.0016
Epoch [27/120], Loss: 0.0015
Epoch [28/120], Loss: 0.0015
Epoch [29/120], Loss: 0.0014
Epoch [30/120], Loss: 0.0014
Epoch [31/120], Loss: 0.0020
Epoch [32/120], Loss: 0.0014
Epoch [33/120], Loss: 0.0013
Epoch [34/120], Loss: 0.0013
Epoch [35/120], Loss: 0

In [311]:
eval_autoencoder(autoencoder, normal_dataloader, nn.MSELoss())

Average Loss: 0.0006


0.0005630059985912813

In [312]:
normal_threshold = 0.1  
novel_threshold = 0.5   

In [314]:
fine_tune_autoencoder(autoencoder, normal_dataloader, normal_threshold, novel_threshold, 120)

Epoch [1/120], Loss: 0.0012
Epoch [2/120], Loss: 0.0006
Epoch [3/120], Loss: 0.0005
Epoch [4/120], Loss: 0.0005
Epoch [5/120], Loss: 0.0005
Epoch [6/120], Loss: 0.0005
Epoch [7/120], Loss: 0.0005
Epoch [8/120], Loss: 0.0015
Epoch [9/120], Loss: 0.0006
Epoch [10/120], Loss: 0.0006
Epoch [11/120], Loss: 0.0006
Epoch [12/120], Loss: 0.0006
Epoch [13/120], Loss: 0.0006
Epoch [14/120], Loss: 0.0005
Epoch [15/120], Loss: 0.0005
Epoch [16/120], Loss: 0.0005
Epoch [17/120], Loss: 0.0005
Epoch [18/120], Loss: 0.0005
Epoch [19/120], Loss: 0.0005
Epoch [20/120], Loss: 0.0005
Epoch [21/120], Loss: 0.0005
Epoch [22/120], Loss: 0.0005
Epoch [23/120], Loss: 0.0005
Epoch [24/120], Loss: 0.0005
Epoch [25/120], Loss: 0.0005
Epoch [26/120], Loss: 0.0005
Epoch [27/120], Loss: 0.0005
Epoch [28/120], Loss: 0.0005
Epoch [29/120], Loss: 0.0006
Epoch [30/120], Loss: 0.0005
Epoch [31/120], Loss: 0.0005
Epoch [32/120], Loss: 0.0005
Epoch [33/120], Loss: 0.0005
Epoch [34/120], Loss: 0.0005
Epoch [35/120], Loss: 0

In [315]:
eval_autoencoder(autoencoder, novel_dataloader, nn.MSELoss())

Average Loss: 0.0002


0.00022977744387928396

In [316]:
def pre_image(image_path):
   img = Image.open(image_path)
   transform = transforms.Compose([
    transforms.Resize((150, 150)),
    transforms.ToTensor()
   ])
   if img.mode == 'RGBA':
        img = img.convert('RGB')
   img_tf = transform(img).float().unsqueeze(0)
   return img_tf

def detect_novelty(image, autoencoder_model, æ_threshold = 0.5):
    image = image.to(device)
    encoder_output = autoencoder_model(image)
    ae_loss = nn.MSELoss()(encoder_output, image)


    if ae_loss > æ_threshold:
        return True
    else:
        return False

def detect(image, cnn_model, autoencoder_model, train_data, æ_threshold = 0.5):
    novel = detect_novelty(image, autoencoder_model, æ_threshold)
    if novel:
        return "Can't recognise it, must be smth different"
    else: 
        with torch.no_grad():
            model.eval() 
            image = image.to(device) 
            output =cnn_model(image)
            index = output.data.cpu().numpy().argmax()
            classes = train_data.classes
            class_name = classes[index]
            return f"Guess this is a {class_name}!"
        
        
        


In [348]:
img = pre_image("./public/images.jpg")
message = detect(img,model,autoencoder,dataset)
print(message)


Guess this is a leg!


In [276]:
PATH_CNN = './models/StickyCNN.pth'
torch.save(model.state_dict(), PATH_CNN)
PATH_AE = './models/Autoencoder.pth'
torch.save(model.state_dict(), PATH_AE)

In [350]:
loadedModel = StickyNN()
loadedModel.load_state_dict(torch.load('./models/StickyCNN.pth'))
loadedModel.to(device)
loadedModel.eval()

StickyNN(
  (network): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU()
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU()
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU()
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU()
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU()
    (14): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (15): Flatten(start_dim=1, end_dim=-1)
    (16): Linear(in_features=82944, out_features=1024, bias=True)
    (17): ReLU()
    (18): Linear(in_features=1

In [361]:
img = pre_image("./public/images.jpg")
message = detect(img,loadedModel,autoencoder,dataset)
print(message)

Guess this is a leg!
