In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import cv2
import numpy as np
from PIL import Image, ImageFilter
import io
from preprocessing import *
import random
from skimage import data
from scipy.ndimage import rotate
from kernels import *
import torchvision
import os
from torchvision.transforms.functional import to_pil_image
from torch.utils.data import Dataset, DataLoader
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
from preprocessing import *
from transformers import Swinv2ForImageClassification, SwinConfig
from torch.optim import AdamW
from torchvision import transforms, datasets



In [8]:
from torchvision.datasets import ImageFolder
from torchvision import transforms
import os
from PIL import Image

def preprocess_image(image):
    # Dummy preprocessing that needs to be defined
    
    rich, poor = smash_n_reconstruct(image)
    rich = apply_high_pass_filter(rich)
    poor = apply_high_pass_filter(poor)
    return rich, poor
from torchvision import transforms, datasets
import torch
from torch.utils.data import Dataset
from PIL import Image
import os

from torchvision import transforms
import torch
from torch.utils.data import Dataset
from PIL import Image
import os

class ProcessedPairDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        root_dir: Directory with all the images and subdirectories as classes.
        transform: Optional transform to be applied on a sample.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.classes = sorted(entry.name for entry in os.scandir(root_dir) if entry.is_dir())
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
        self.samples = []

        for class_name in self.classes:
            class_dir = os.path.join(root_dir, class_name)
            for img_name in os.listdir(class_dir):
                if img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                    img_path = os.path.join(class_dir, img_name)
                    self.samples.append((img_path, self.class_to_idx[class_name]))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        image = Image.open(img_path).convert('RGB')  # Ensure RGB

        # Apply preprocessing
        rich, poor = preprocess_image(image)  # Custom preprocessing

        # Apply transformations if any
        if self.transform:
            rich = self.transform(rich)
            poor = self.transform(poor)

        return rich, poor, label


# Define transformations
transform = transforms.Compose([
    transforms.ToTensor(),  # Converts PIL Image or numpy array to FloatTensor of shape (C x H x W)
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# Usage of the dataset
train_dataset = ProcessedPairDataset('/home/kosta/code/School/SentryAI/sample_images/train')
valid_dataset = ProcessedPairDataset('/home/kosta/code/School/SentryAI/sample_images/valid')
train_loader = DataLoader(train_dataset, batch_size=6, shuffle=True, num_workers=1)



## CNN

In [9]:

class CNNBlockRich(nn.Module):
   def __init__(self, num_input_channels):
       super(CNNBlockRich, self).__init__()
       # Assuming num_input_channels is the number of high-pass filtered images
       self.conv = nn.Conv2d(num_input_channels, 1, kernel_size=3, padding=1)
       self.bn = nn.BatchNorm2d(1)
       self.relu = nn.ReLU()
   def forward(self, x):
       x = self.conv(x)
       x = self.bn(x)
       x = self.relu(x)
       return x

In [10]:

class CNNBlockPoor(nn.Module):
   def __init__(self, num_input_channels):
       super(CNNBlockPoor, self).__init__()
       # Assuming num_input_channels is the number of high-pass filtered images
       self.conv = nn.Conv2d(num_input_channels, 3, kernel_size=3, padding=1)
       self.bn = nn.BatchNorm2d(1)
       self.relu = nn.ReLU()
   def forward(self, x):
       x = self.conv(x)
       x = self.bn(x)
       x = self.relu(x)
       return x

## Model

In [11]:
class ImageClassificationModel(nn.Module):
    def __init__(self):
        super(ImageClassificationModel, self).__init__()
        self.feature_combiner = CNNBlockRich(num_input_channels=3)
        self.feature_combiner2 = CNNBlockPoor(num_input_channels=3)
        self.transformer = Swinv2ForImageClassification.from_pretrained(
            "microsoft/swinv2-tiny-patch4-window8-256",
            config=SwinConfig.from_pretrained('microsoft/swinv2-tiny-patch4-window8-256', num_classes=2)
        )

    def forward(self, rich, poor):
       
        
        x = self.feature_combiner(rich)
        y = self.feature_combiner2(poor)    
        feature_difference = x - y
         # Add debug prints to check dimensions
        print("Feature combiner output shape x:", x.shape)
        print("Feature combiner output shape y:", y.shape)
        print("Feature difference shape:", feature_difference.shape)
    
        outputs = self.transformer(feature_difference)

        return outputs.logits


In [12]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = ImageClassificationModel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = AdamW([
    {'params': model.feature_combiner.parameters(), 'lr': 1e-3},
    {'params': model.feature_combiner2.parameters(), 'lr': 1e-3},
    {'params': model.transformer.parameters(), 'lr': 1e-5}  # Lower lr for fine-tuning
])
# Freeze transformer parameters initially
for param in model.transformer.parameters():
    param.requires_grad = True
def train(model, train_loader, optimizer, device, num_epochs):
    model.train()  # Set model to training mode
    for epoch in range(num_epochs):
        total_loss, total, correct = 0, 0, 0
        for batch in train_loader:
            rich, poor, labels = batch[0], batch[1], batch[2]
            rich = rich.to(device)
            poor = poor.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            
            # Assuming your model takes two inputs, rich and poor
            outputs = model(rich, poor)  
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item() * labels.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        epoch_loss = total_loss / total
        epoch_acc = correct / total
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}')


def validate(model, batches, device):
    model.eval()
    total_loss, total, correct = 0, 0, 0
    with torch.no_grad():
        for rich, poor, labels in batches:
            rich, poor, labels = rich.to(device), poor.to(device), labels.to(device)
            
            outputs = model((rich, poor))
            loss = criterion(outputs, labels)
            
            total_loss += loss.item() * labels.size(0)  # Correct loss calculation
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)  # Total number of labels processed
            correct += (predicted == labels).sum().item()  # Correct predictions
        
    avg_loss = total_loss / total  # Correct average loss
    avg_acc = correct / total  # Correct accuracy
    print(f'Validation Loss: {avg_loss:.4f}, Accuracy: {avg_acc:.4f}')


# Setup for the training and validation
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ImageClassificationModel().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = AdamW([
    {'params': model.feature_combiner.parameters(), 'lr': 1e-3},
    {'params': model.feature_combiner2.parameters(), 'lr': 1e-3},
    {'params': model.transformer.parameters(), 'lr': 1e-5}  # Lower lr for fine-tuning
])

# Assuming the datasets and loaders are set up correctly
num_epochs = 10
train(model, train_loader, optimizer, device, num_epochs)
validate(model, val_loader, device)


You are using a model of type swinv2 to instantiate a model of type swin. This is not supported for all configurations of models and can yield errors.
You are using a model of type swinv2 to instantiate a model of type swin. This is not supported for all configurations of models and can yield errors.


Feature combiner output shape x: torch.Size([6, 1, 256, 256])
Feature combiner output shape y: torch.Size([6, 1, 256, 256])
Feature difference shape: torch.Size([6, 1, 256, 256])


ValueError: Make sure that the channel dimension of the pixel values match with the one set in the configuration.

## Deep Classifier
Orignal Paper CNN based classifier 

The following deep classifier has the following layers:
| **Type**    |**Kernel  num**| **With BN** | **Activation** |
| ------------| ------------  | -------     | ----------     |
| Convo.      | 32            | TRUE        | ReLU           |
| Convo.      | 32            | TRUE        | ReLU           |
| Convo.      | 32            | TRUE        | ReLU           |
| Convo.      | 32            | TRUE        | ReLU           |
| Avg Pooling | None          | None        | None           |
| Convo.      | 32            | TRUE        | ReLU           |
| Convo.      | 32            | TRUE        | ReLU           |
| Avg Pooling | None          | None        | None           |
| Convo.      | 32            | TRUE        | ReLU           |
| Convo.      | 32            | TRUE        | ReLU           |
| Avg Pooling | None          | None        | None           |
| Convo.      | 32            | TRUE        | ReLU           |
| Convo.      | 32            | TRUE        | ReLU           |
| AdpAvgPool  | None          | None        | None           |
| Flatten     | None          | None        | None           |
| FC          | None          | FALSE       | None           |

Source: https://arxiv.org/pdf/2311.12397.pdf (page 12)

In [None]:
class DeepClassifier(nn.Module):
    def __init__(self, num_classes= 1): 
        super(DeepClassifier, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(32)
        self.conv4 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(32)
        self.avg_pool = nn.AvgPool2d(kernel_size=2, stride=2)
        self.conv5 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn5 = nn.BatchNorm2d(32)
        self.conv6 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn6 = nn.BatchNorm2d(32)
        self.conv7 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn7 = nn.BatchNorm2d(32)
        self.conv8 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn8 = nn.BatchNorm2d(32)
        self.conv9 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn9 = nn.BatchNorm2d(32)
        self.conv10 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn10 = nn.BatchNorm2d(32)
        self.adaptive_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(32, num_classes)

    def forward(self, x):
        
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        x = F.relu(self.bn4(self.conv4(x)))
        x = self.avg_pool(x)
        x = F.relu(self.bn5(self.conv5(x)))
        x = F.relu(self.bn6(self.conv6(x)))
        x = self.avg_pool(x)
        x = F.relu(self.bn7(self.conv7(x)))
        x = F.relu(self.bn8(self.conv8(x)))
        x = self.avg_pool(x)
        x = F.relu(self.bn9(self.conv9(x)))
        x = F.relu(self.bn10(self.conv10(x)))
        x = self.adaptive_avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x


model = DeepClassifier()


example_input = torch.rand(4, 3, 64, 64)

output = model(example_input)
print(output.shape)  