In [14]:
import torch
import torch.nn as nn
import math
import torch.utils.model_zoo as model_zoo
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, Dataset
import os
import pandas as pd
from PIL import Image
import torch.optim as optim
from torch.optim import AdamW
from tqdm import tqdm
import numpy as np 
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torchvision.transforms as T
from pytorchcv.model_provider import get_model

In [15]:
# pip install pytorchcv

In [16]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
])

In [17]:
class PatchExtraction(nn.Module):
    def __init__(self):
        super(PatchExtraction, self).__init__()
        # First separable convolution (depthwise + pointwise)
        self.depthwise_conv1 = nn.Conv2d(512, 512, kernel_size=4, stride=4, padding=1, groups=512)
        self.pointwise_conv1 = nn.Conv2d(512, 256, kernel_size=1, stride=1, padding=0)
        
        # Second separable convolution (depthwise + pointwise)
        self.depthwise_conv2 = nn.Conv2d(256, 256, kernel_size=2, stride=2, padding=0, groups=256)
        self.pointwise_conv2 = nn.Conv2d(256, 256, kernel_size=1, stride=1, padding=0)

        # Normal Conv (used directly)
        self.conv3 = nn.Conv2d(256, 256, kernel_size=1, stride=1, padding=0)

    def forward(self, x):
        # First separable convolution
        x = F.relu(self.depthwise_conv1(x))
        x = F.relu(self.pointwise_conv1(x))

        # Second separable convolution
        x = F.relu(self.depthwise_conv2(x))
        x = F.relu(self.pointwise_conv2(x))

        # Normal convolution
        x = F.relu(self.conv3(x))
        
        return x

In [18]:
class SelfAttention(nn.Module):
    def __init__(self, embed_size, num_heads=1):
        super(SelfAttention, self).__init__()
        self.attention = nn.MultiheadAttention(embed_dim=embed_size, num_heads=num_heads)

    def forward(self, x):
        # Attention expects input of shape [sequence_length, batch_size, embed_dim]
        x = x.unsqueeze(0)  # Adding sequence length as 1
        attn_output, _ = self.attention(x, x, x)
        return attn_output.squeeze(0)

In [19]:
class PattLite(nn.Module):
    def __init__(self):
        super(PattLite, self).__init__()
        
        # Preprocessing: resizing and augmentation
#         self.transform = transforms.Compose([
#             transforms.Resize((224, 224)),
#             transforms.ToTensor(),
#         ])
        
        # Backbone (MobileNet with last 29 layers removed)
        self.backbone = get_model('mobilenetv1', pretrained=True)
        self.backbone = nn.Sequential(*list(self.backbone.features.children())[:-29])
        for param in self.backbone.parameters():
            param.requires_grad = False

        # Patch extraction, attention, and global average pooling layers
        self.patch_extraction = PatchExtraction()
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)
        self.dropout = nn.Dropout(0.1)

        # Pre-classification layer
        self.pre_classification = nn.Sequential(
            nn.Linear(256, 32),
            nn.ReLU(),
            nn.BatchNorm1d(32)
        )

        # Self-attention
        self.self_attention = SelfAttention(embed_size=32)

        # Final classification layer
        self.classifier = nn.Linear(32, 7)

    def forward(self, x):
        # Apply transformations
#         x = self.transform(x)
        
        # Backbone (MobileNetV2)
        x = self.backbone(x)
        
        # Patch extraction
        x = self.patch_extraction(x)
        
        # Global average pooling
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)  # Flatten (N, 256)
        
        # Dropout before final classification
        x = self.dropout(x)
        
        # Pre-classification layer
        x = self.pre_classification(x)
        
        # Self-attention (expects [seq_len, batch_size, embed_dim])
        x = self.self_attention(x.unsqueeze(0)).squeeze(0)  # Apply self-attention
        
        # Final classification layer
        x = self.classifier(x)
        
        return x

In [20]:
# pip install --upgrade torchvision

In [21]:
model = PattLite()

ValueError: Unsupported model: mobilenetv1

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
scheduler = ReduceLROnPlateau(optimizer, 'max', patience=3, min_lr=1e-6)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
model = model.to(device)

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, image_dir, csv_file, transform=None):
        self.image_dir = image_dir
        self.transform = transform

        # Read the CSV file
        self.data_frame = pd.read_csv(csv_file)

        # Ensure the CSV file has columns 'filename' and 'class'
        assert 'image' in self.data_frame.columns
        assert 'label' in self.data_frame.columns

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        # Get the filename and class label
        img_name = self.data_frame.iloc[idx, 0]  # Get the filename from the CSV
        class_label = self.data_frame.iloc[idx, 1] - 1  # Get the class label from the CSV

        # Construct the path to the image based on its class label
        class_folder = os.path.join(self.image_dir, str(class_label+1))  # Convert class label to string
        img_path = os.path.join(class_folder, img_name)

        # Load the image
        image = Image.open(img_path).convert('RGB')

        # Apply transformations
        if self.transform:
            image = self.transform(image)

        return image, class_label

In [None]:
image_directory = r"/kaggle/input/raf-db-dataset/DATASET/train"  # Directory containing class subfolders
csv_file_path = r"/kaggle/input/raf-db-dataset/train_labels.csv"

In [None]:
train_dataset = CustomImageDataset(image_dir=image_directory, csv_file=csv_file_path, transform=transform)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=8, num_workers=4, pin_memory=True, shuffle=True)

In [None]:
test_image_directory = r"/kaggle/input/raf-db-dataset/DATASET/test"
test_csv_file_path = r"/kaggle/input/raf-db-dataset/test_labels.csv"

In [None]:
test_dataset = CustomImageDataset(test_image_directory, test_csv_file_path, transform)

In [None]:
test_loader = DataLoader(test_dataset, batch_size=8, num_workers=4, pin_memory=True, shuffle=True)

In [None]:
def train_model(model, train_loader, test_loader, num_epochs):
#     best_val_acc = 0
#     early_stopping_counter = 0
    best_acc = 0
    for epoch in range(1, num_epochs+1):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        # Training loop
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
        
        # Calculate training accuracy
        train_acc = 100. * correct / total
        
        # Validation loop
        model.eval()
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for inputs, labels in test_loader:
                outputs = model(inputs)
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()
        
        val_acc = 100. * val_correct / val_total
        
        print(f'Epoch {epoch}/{num_epochs}, Loss: {running_loss:.4f}, Train Acc: {train_acc:.2f}%, Val Acc: {val_acc:.2f}%')
        
        # Scheduler step
        scheduler.step(val_acc)
        
        # Early stopping
        if val_acc > best_acc:
            best_acc = val_acc
#             early_stopping_counter = 0
            torch.save(model.state_dict(), f"best_model_epoch{epoch}_acc{val_acc}.pth")
#         else:
#             early_stopping_counter += 1
#             if early_stopping_counter > patience:
#                 print("Early stopping!")
#                 break


In [None]:
train_model(model, train_loader, test_loader, 20)