In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torch.utils.data import Dataset, random_split, DataLoader
import torch.optim as optim
from torchvision import transforms
import os

In [2]:
class SAM(nn.Module):
    def __init__(self, bias=False):
        super(SAM, self).__init__()
        self.bias = bias
        self.conv = nn.Conv2d(in_channels=2, out_channels=1, kernel_size=7, stride=1, padding=3, dilation=1, bias=self.bias)

    def forward(self, x):
        max = torch.max(x,1)[0].unsqueeze(1)
        avg = torch.mean(x,1).unsqueeze(1)
        concat = torch.cat((max,avg), dim=1)
        output = self.conv(concat)
        output = F.sigmoid(output) * x 
        return output 

class CAM(nn.Module):
    def __init__(self, channels, r):
        super(CAM, self).__init__()
        self.channels = channels
        self.r = r
        self.linear = nn.Sequential(
            nn.Linear(in_features=self.channels, out_features=self.channels//self.r, bias=True),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=self.channels//self.r, out_features=self.channels, bias=True))

    def forward(self, x):
        max = F.adaptive_max_pool2d(x, output_size=1)
        avg = F.adaptive_avg_pool2d(x, output_size=1)
        b, c, _, _ = x.size()
        linear_max = self.linear(max.view(b,c)).view(b, c, 1, 1)
        linear_avg = self.linear(avg.view(b,c)).view(b, c, 1, 1)
        output = linear_max + linear_avg
        output = F.sigmoid(output) * x
        return output
    
class CBAM(nn.Module):
    def __init__(self, channels, r):
        super(CBAM, self).__init__()
        self.channels = channels
        self.r = r
        self.sam = SAM(bias=False)
        self.cam = CAM(channels=self.channels, r=self.r)

    def forward(self, x):
        output = self.cam(x)
        output = self.sam(output)
        return output + x

In [3]:
class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class FPN(nn.Module):
    def __init__(self, block, num_blocks,num_classes=14):
        super(FPN, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)

        # Bottom-up layers
        self.layer1 = self._make_layer(block,  64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)

        # Top layer
        self.toplayer = nn.Conv2d(2048, 256, kernel_size=1, stride=1, padding=0)  # Reduce channels
        self.fc = nn.Linear(256 * 7 * 7, num_classes)

        # Smooth layers
        # self.smooth1 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)
        # self.smooth2 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)
        # self.smooth3 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)

        # # Lateral layers
        # self.latlayer1 = nn.Conv2d(1024, 256, kernel_size=1, stride=1, padding=0)
        # self.latlayer2 = nn.Conv2d( 512, 256, kernel_size=1, stride=1, padding=0)
        # self.latlayer3 = nn.Conv2d( 256, 256, kernel_size=1, stride=1, padding=0)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    # def _upsample_add(self, x, y):
    #     '''Upsample and add two feature maps.

    #     Args:
    #       x: (Variable) top feature map to be upsampled.
    #       y: (Variable) lateral feature map.

    #     Returns:
    #       (Variable) added feature map.

    #     Note in PyTorch, when input size is odd, the upsampled feature map
    #     with `F.upsample(..., scale_factor=2, mode='nearest')`
    #     maybe not equal to the lateral feature map size.

    #     e.g.
    #     original input size: [N,_,15,15] ->
    #     conv2d feature map size: [N,_,8,8] ->
    #     upsampled feature map size: [N,_,16,16]

    #     So we choose bilinear upsample which supports arbitrary output sizes.
    #     '''
    #     _,_,H,W = y.size()
    #     return F.upsample(x, size=(H,W), mode='bilinear') + y

    def forward(self, x):
        # Bottom-up
        c1 = F.relu(self.bn1(self.conv1(x)))
        c1 = F.max_pool2d(c1, kernel_size=3, stride=2, padding=1)
        c2 = self.layer1(c1)
        c2_attn=CBAM(c2, r=4)
        c3 = self.layer2(c2_attn)
        c3_attn=CBAM(c3, r=4)
        c4 = self.layer3(c3_attn)
        c4_attn=CBAM(c4, r=4)
        c5 = self.layer4(c4_attn)
        c5_attn=CBAM(c5, r=4)
        # Top-down
        p5 = self.toplayer(c5_attn)
        p5_flat = p5.view(p5.size(0), -1)  # Flatten the tensor
        # Pass through the fully connected layer
        output = self.fc(p5_flat)
        # p4 = self._upsample_add(p5, self.latlayer1(c4_attn))
        # p3 = self._upsample_add(p4, self.latlayer2(c3_attn))
        # p2 = self._upsample_add(p3, self.latlayer3(c2_attn))
        # # Smooth
        # p4 = self.smooth1(p4)
        # p3 = self.smooth2(p3)
        # p2 = self.smooth3(p2)
        # return p2, p3, p4, p5
        return output

## Dataset

In [4]:
class ImageFolderDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir (string): Directory with all the images organized in subdirectories by category.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.classes = sorted(os.listdir(root_dir))
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.classes)}
        self.image_paths = []
        self.labels = []

        for cls in self.classes:
            class_dir = os.path.join(root_dir, cls)
            for img_name in os.listdir(class_dir):
                img_path = os.path.join(class_dir, img_name)
                self.image_paths.append(img_path)
                self.labels.append(self.class_to_idx[cls])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

#### Train and Test Loader and Dataset

In [5]:
transform = transforms.Compose([
    transforms.Resize((224, 224)), # Resize images to 224x224 pixels
    transforms.RandomHorizontalFlip(), # Apply horizontal flip for data augmentation
    transforms.ToTensor(), # Convert images to tensors
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # Normalize with ImageNet mean and std
])
root_dir = 'E:\BUET Files\Celia MAM Biomedical Signal Processing\RAtCapsNet\data\labelled_images'
full_dataset = ImageFolderDataset(root_dir=root_dir, transform=transform)

# Calculate lengths for training and testing sets
train_size = int(0.8 * len(full_dataset))
test_size = len(full_dataset) - train_size

# Split the dataset
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

# Create DataLoaders for training and testing
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

In [None]:
if torch.cuda.is_available():
    # Get the current CUDA device (GPU)
    gpu_id = torch.cuda.current_device()
    # Get the name of the GPU
    gpu_name = torch.cuda.get_device_name(gpu_id)
    # Get the capability of the GPU
    gpu_capability = torch.cuda.get_device_capability(gpu_id)
    print(f"CUDA is available. GPU: {gpu_name}")
    print(f"GPU Capability: {gpu_capability}")
else:
    print("CUDA is not available.")

#### Model

In [6]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model=FPN(Bottleneck, [2,2,2,2])
model = model.to(device)

#### Train & Test Function

In [7]:
# Define the loss function and the optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

def train(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (images, labels) in enumerate(dataloader):
        images, labels = images.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Calculate statistics
        running_loss += loss.item() * images.size(0)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Print batch statistics
        print(f'Batch {batch_idx+1}/{len(dataloader)}, Batch Loss: {loss.item():.4f}')

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = correct / total

    print(f'Train Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_acc:.4f}')
    return epoch_loss, epoch_acc

def test(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Calculate statistics
            running_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(dataloader.dataset)
    epoch_acc = correct / total

    print(f'Test Loss: {epoch_loss:.4f}, Test Accuracy: {epoch_acc:.4f}')
    return epoch_loss, epoch_acc


#### Run Train function for n epochs

In [8]:
num_epochs = 10
best_acc = 0.0  # Initialize the best accuracy to 0

for epoch in range(num_epochs):
    print(f'Epoch {epoch+1}/{num_epochs}')

    # Train the model for one epoch
    train_loss, train_acc = train(model, train_loader, criterion, optimizer, device)

    # Evaluate the model on the test set
    test_loss, test_acc = test(model, test_loader, criterion, device)

    # Check if the current test accuracy is greater than the best accuracy so far
    if test_acc > best_acc:
        best_acc = test_acc
        # Save the model state dict if the accuracy improves
        torch.save(model.state_dict(), 'best_model.pth')
        print(f'Saved Best Model with Accuracy: {best_acc:.4f}')
    else:
        print(f'No improvement in accuracy: {test_acc:.4f}, Best Accuracy: {best_acc:.4f}')


print('... Training complete ...')

Epoch 1/10
