In [12]:
import pandas as pd
import os
from PIL import Image
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision.transforms as transforms
import torch
import torch.nn as nn

transform = transforms.Compose([
    transforms.Resize((224, 224)), 
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  
])

Train samples: 26160, Validation samples: 6541


In [None]:
class DatasetSL(Dataset):
    def __init__(self, csv_file, root_dirs, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.root_dirs = root_dirs  # List of directories (train0, train1, etc.)
        self.transform = transform
        self.data = self.clean()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        image_path, label = self.data[index]
        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, label
    
    def clean(self):
        valid_samples = []
        for _, row in self.annotations.iterrows():
            img_name = row["image_name"] + ".jpg"
            label = 0 if row["benign_malignant"] == "benign" else 1
            for root_dir in self.root_dirs:
                image_path = os.path.join(root_dir, img_name)
                if os.path.exists(image_path):
                    valid_samples.append((image_path, label))
                    break  # Stop searching once found
        return valid_samples
    
DATASET_PATH = "/DL/skin_dataset" 
train_dirs = [os.path.join(DATASET_PATH, f"train{i}") for i in range(15)]
full_dataset = DatasetSL(csv_file=os.path.join(DATASET_PATH, "train.csv"), root_dirs=train_dirs, transform=transform)
train_size = int(0.8 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, test_dataset = random_split(full_dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [13]:
import torch
import torch.nn as nn

# Define the Bottleneck Block (used in ResNet-101)
class Bottleneck(nn.Module):
    expansion = 4  # ResNet Bottleneck expands channels

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(Bottleneck, self).__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)

        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample  # Shortcut connection

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity  # Residual connection
        out = self.relu(out)

        return out

# Define ResNet-101 Model
class ResNet101(nn.Module):
    def __init__(self, num_classes=2):
        super(ResNet101, self).__init__()
        self.in_channels = 64

        # Initial Convolution Layer
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Residual Layers (ResNet-101)
        self.layer1 = self._make_layer(Bottleneck, 64, 3)   # 3 Bottleneck blocks
        self.layer2 = self._make_layer(Bottleneck, 128, 4, stride=2)  # 4 Bottleneck blocks
        self.layer3 = self._make_layer(Bottleneck, 256, 23, stride=2)  # 23 Bottleneck blocks
        self.layer4 = self._make_layer(Bottleneck, 512, 3, stride=2)  # 3 Bottleneck blocks

        # Global Average Pooling + Fully Connected Layer
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * Bottleneck.expansion, num_classes)

        # Initialize weights
        self._initialize_weights()

    def _make_layer(self, block, out_channels, blocks, stride=1):
        """Creates a residual block"""
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion)
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion

        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def _initialize_weights(self):
        """Apply He (Kaiming) Initialization"""
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResNet101(num_classes=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
print(model)

In [6]:
def train_model(model, train_loader, criterion, optimizer, epochs=2):
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        epoch_loss = running_loss / len(train_loader)
        epoch_acc = 100 * correct / total
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%")

train_model(model, train_loader, criterion, optimizer, epochs=2)


Epoch [1/2], Loss: 0.1217, Accuracy: 98.05%
Epoch [2/2], Loss: 0.0899, Accuracy: 98.17%


<h2>Testing</h2>

In [14]:
import torch
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def test_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    running_loss = 0.0

    all_labels = []
    all_preds = []

    with torch.no_grad():  # No need to track gradients
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item()

            _, predicted = torch.max(outputs, 1)  # Get predicted class
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
            
    avg_loss = running_loss / len(test_loader)
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average="weighted")
    recall = recall_score(all_labels, all_preds, average="weighted")
    f1 = f1_score(all_labels, all_preds, average="weighted")

    print(f"Test Loss: {avg_loss:.4f}")
    print(f"Test Accuracy: {accuracy * 100:.2f}%")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1-Score: {f1:.4f}")

model.to(device)
test_model(model, test_loader)


  model.load_state_dict(torch.load("/kaggle/working/resnet152_final.pth", map_location=device))


Test Loss: 0.0784
Test Accuracy: 98.23%
Precision: 0.9648
Recall: 0.9823
F1-Score: 0.9735


  _warn_prf(average, modifier, msg_start, len(result))
