In [1]:
!pip install torch
!pip install numpy
!pip install opencv-python



In [1]:
import os
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision
from tqdm import tqdm
#from google.colab import drive
import albumentations as A
import torch.nn.functional as F
import math

#drive.mount('/content/drive')


  check_for_updates()


In [3]:
labels = ['PNEUMONIA', 'NORMAL']
img_size = 224

def get_training_data(data_dir):
    data = []

    for label in labels:
        path = os.path.join(data_dir, label)
        class_num = labels.index(label)

        for img in tqdm(os.listdir(path)):
            try:
                # Load and resize the image
                img_arr = cv2.imread(os.path.join(path, img), cv2.IMREAD_GRAYSCALE)
                resized_arr = cv2.resize(img_arr, (img_size, img_size))  # Resize the image

                # Add the image and label as a pair
                data.append([resized_arr, class_num])
            except Exception as e:
                print(f"Error loading image {img}: {e}")

    # Convert the list to a NumPy array
    data = np.array(data, dtype=object)  # Use dtype=object to allow image-label pairing
    return data

# Load the data
train_data = get_training_data('/Users/dangerdani/Documents/FKAN-Biostatistics/data/chest_xray/train')
test_data = get_training_data('/Users/dangerdani/Documents/FKAN-Biostatistics/data/chest_xray/test')
val_data = get_training_data('/Users/dangerdani/Documents/FKAN-Biostatistics/data/chest_xray/val')

100%|██████████| 3107/3107 [00:15<00:00, 201.42it/s]
100%|██████████| 3107/3107 [00:36<00:00, 85.05it/s] 
100%|██████████| 390/390 [00:01<00:00, 233.32it/s]
100%|██████████| 234/234 [00:02<00:00, 96.95it/s] 
100%|██████████| 776/776 [00:03<00:00, 204.60it/s]
100%|██████████| 270/270 [00:03<00:00, 82.31it/s]


In [4]:
# Function to normalize the images
def normalize_images(data):
    images = []
    labels = []
    normalizer = A.Normalize(mean=0.488, std=0.234, max_pixel_value=1)

    for img, label in tqdm(data):
        # Normalization: each pixel is divided by 255
        normalized_img = img / 255.0
        #normalized_img = normalizer(image=normalized_img)['image']
        images.append(normalized_img)
        labels.append(label)

    # Convert the images and labels into separate arrays
    images = np.array(images)
    labels = np.array(labels)

    return images, labels

# Normalize the images in the training dataset
train_images, train_labels = normalize_images(train_data)
val_images, val_labels = normalize_images(val_data)
test_images, test_labels = normalize_images(test_data)


# Check the shape and an example of the normalized and shuffled data
print(f"Shape of normalized and shuffled train images: {train_images.shape}")
print(f"Shape of normalized and shuffled validation images: {val_images.shape}")

100%|██████████| 6214/6214 [00:02<00:00, 2415.93it/s]
100%|██████████| 1046/1046 [00:00<00:00, 1866.03it/s]
100%|██████████| 624/624 [00:00<00:00, 1803.63it/s]


Shape of normalized and shuffled train images: (6214, 224, 224)
Shape of normalized and shuffled validation images: (1046, 224, 224)


In [24]:
print(train_images[0].max())
print(train_images[0].min())

2.187349
-2.0854788


In [5]:
class ResNet(nn.Module):
    def __init__(self, num_classes=2, softmax=True):
      super(ResNet, self).__init__()
      self.resnet = torchvision.models.resnet50(pretrained=True)
      num_ftrs = self.resnet.fc.out_features  [B, N, H, W] -> [B, N * H * W] 
      self.fc = nn.Linear(num_ftrs, num_classes)
      self.bn = nn.BatchNorm1d(num_ftrs)
      self.relu = nn.ReLU()
      self.softmax = torch.nn.Softmax(dim=1) if softmax else None
      self.change_conv1()

    def forward(self, x):
      x = self.resnet(x)
      x = self.bn(x)
      x = self.relu(x)
      x = self.fc(x)
      if self.softmax:
        x = self.softmax(x)
      return x

    def change_conv1(self):
      original_conv1 = self.resnet.conv1

      #Create a new convolutional layer with 1 input channel instead of 3
      new_conv1 = nn.Conv2d(
        in_channels=1,  # Grayscale has 1 channel
        out_channels=original_conv1.out_channels,
        kernel_size=original_conv1.kernel_size,
        stride=original_conv1.stride,
        padding=original_conv1.padding,
        bias=original_conv1.bias is not None
)

      # Initialize the new conv layer's weights by averaging the RGB weights
      with torch.no_grad():
        new_conv1.weight = nn.Parameter(original_conv1.weight.mean(dim=1, keepdim=True))

        #Replace the original conv1 with the new one
        self.resnet.conv1 = new_conv1

class KANLinear_v1(nn.Module):
    def __init__(self, in_features, out_features, grid_size=5, spline_order=3,
                 scale_noise=0.1, scale_base=1.0, scale_spline=1.0,
                 enable_standalone_scale_spline=True, base_activation=nn.SiLU,
                 grid_eps=0.02, grid_range=[-1, 1]):
        super(KANLinear_v1, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.grid_size = grid_size
        self.spline_order = spline_order

        h = (grid_range[1] - grid_range[0]) / grid_size
        grid = ((torch.arange(-spline_order, grid_size + spline_order + 1) * h
                 + grid_range[0]).expand(in_features, -1).contiguous())
        self.register_buffer("grid", grid)

        self.base_weight = nn.Parameter(torch.Tensor(out_features, in_features))
        self.spline_weight = nn.Parameter(
            torch.Tensor(out_features, in_features, grid_size + spline_order)
        )
        if enable_standalone_scale_spline:
            self.spline_scaler = nn.Parameter(torch.Tensor(out_features, in_features))

        self.scale_noise = scale_noise
        self.scale_base = scale_base
        self.scale_spline = scale_spline
        self.enable_standalone_scale_spline = enable_standalone_scale_spline
        self.base_activation = base_activation()
        self.grid_eps = grid_eps

        self.reset_parameters()

    def reset_parameters(self):
        nn.init.kaiming_uniform_(self.base_weight, a=math.sqrt(5) * self.scale_base)
        with torch.no_grad():
            noise = ((torch.rand(self.grid_size + 1, self.in_features, self.out_features) - 0.5)
                     * self.scale_noise / self.grid_size)
            self.spline_weight.data.copy_(
                self.scale_spline * self.curve2coeff(self.grid.T[self.spline_order : -self.spline_order], noise)
            )
            if self.enable_standalone_scale_spline:
                nn.init.kaiming_uniform_(self.spline_scaler, a=math.sqrt(5) * self.scale_spline)

    def b_splines(self, x):
        grid = self.grid
        x = x.unsqueeze(-1)
        bases = ((x >= grid[:, :-1]) & (x < grid[:, 1:])).to(x.dtype)
        for k in range(1, self.spline_order + 1):
            bases = ((x - grid[:, :-(k+1)]) / (grid[:, k:-1] - grid[:, :-(k+1)]) * bases[:, :, :-1]
                     + (grid[:, k+1:] - x) / (grid[:, k+1:] - grid[:, 1:-k]) * bases[:, :, 1:])
        return bases.contiguous()

    def curve2coeff(self, x, y):
        A = self.b_splines(x).transpose(0, 1)
        B = y.transpose(0, 1)
        solution = torch.linalg.lstsq(A, B).solution
        return solution.permute(2, 0, 1).contiguous()

    def forward(self, x):
        x = x.view(x.size(0), -1)
        base_output = F.linear(self.base_activation(x), self.base_weight)
        spline_output = F.linear(
            self.b_splines(x).view(x.size(0), -1),
            self.spline_weight.view(self.out_features, -1)
        )
        return base_output + spline_output





class FKAN_ResNet(nn.Module):
  def __init__(self, num_classes=2, softmax=True):
    super(FKAN_ResNet, self).__init__()
    self.backbone = torchvision.models.resnet50(pretrained=True)
    self.kan_layer1 = KANLinear_v1(1000, 256)
    self.kan_layer2 = KANLinear_v1(256, 128)
    self.kan_layer3 = KANLinear_v1(128, num_classes)
    self.softmax = torch.nn.Softmax(dim=1) if softmax else None
    self.change_conv1()

  def forward(self, x):
    x = self.backbone(x)
    x = self.kan_layer1(x)
    x = self.kan_layer2(x)
    x = self.kan_layer3(x)
    if self.softmax:
      x = self.softmax(x)
    return x

  def change_conv1(self):
      original_conv1 = self.backbone.conv1

      #Create a new convolutional layer with 1 input channel instead of 3
      new_conv1 = nn.Conv2d(
        in_channels=1,  # Grayscale has 1 channel
        out_channels=original_conv1.out_channels,
        kernel_size=original_conv1.kernel_size,
        stride=original_conv1.stride,
        padding=original_conv1.padding,
        bias=original_conv1.bias is not None
)

      # Initialize the new conv layer's weights by averaging the RGB weights
      with torch.no_grad():
        new_conv1.weight = nn.Parameter(original_conv1.weight.mean(dim=1, keepdim=True))

        #Replace the original conv1 with the new one
        self.backbone.conv1 = new_conv1


model = FKAN_ResNet(num_classes=2, softmax=True)
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model.to(device)
print(device)


Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /Users/dangerdani/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:08<00:00, 12.5MB/s]


cpu


In [6]:
from torch.utils.data import TensorDataset, DataLoader

# Convert the images and labels to PyTorch tensors

# Apply the transformation to training and validation images
train_images_tensor = torch.stack([torch.tensor(img, dtype=torch.float) for img in train_images]).unsqueeze(1)
val_images_tensor = torch.stack([torch.tensor(img, dtype=torch.float) for img in val_images]).unsqueeze(1)

# Now permute them
train_images_tensor = train_images_tensor.permute(0, 1, 2, 3)  # (N, 1, 244, 244)
val_images_tensor = val_images_tensor.permute(0, 1, 2, 3)      # (N, 1, 244, 244)
print(train_images_tensor.shape, val_images_tensor.shape)

# The tensors are now in the shape (N, 1, 244, 244), where N is the number of images

train_labels_tensor = torch.tensor(train_labels, dtype=torch.long)
val_labels_tensor = torch.tensor(val_labels, dtype=torch.long)

# Create the dataset and DataLoader
train_dataset = TensorDataset(train_images_tensor, train_labels_tensor)
val_dataset = TensorDataset(val_images_tensor, val_labels_tensor)

# Define the batch size
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=True)
print('Done!')



torch.Size([6214, 1, 224, 224]) torch.Size([1046, 1, 224, 224])
Done!


### **Training**

In [7]:

from sklearn.metrics import classification_report
criterion = nn.CrossEntropyLoss()  # For multi-class or binary classification
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-5)  # AdamW with L2 regularization

# Now the data is ready for training and validation

# Function to calculate relevant metrics

# Training function with Early Stopping
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=100, patience=10):
    patience_counter = 0
    best_validation_score = 0
    for epoch in range(num_epochs):
        model.train()
        p_bar = tqdm(train_loader)
        running_loss = 0

        for i, (images, labels) in enumerate(p_bar):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            p_bar.set_description(f"Epoch {epoch+1}/{num_epochs} - Loss: {running_loss / (i + 1)}")


        if (epoch + 1) % 2 == 0:
            model.eval()
            p_bar = tqdm(val_loader)
            all_preds = []
            all_labels = []
            with torch.no_grad():
                for i, (images, labels) in enumerate(p_bar):
                    images, labels = images.to(device), labels.to(device)
                    outputs = model(images)
                    _, preds = torch.max(outputs, 1)
                    all_preds.extend(preds.cpu().numpy())
                    all_labels.extend(labels.cpu().numpy())
                    p_bar.set_description(f'Epoch {epoch+1}/{num_epochs} - Validation Batch: {i}')

            class_report = classification_report(all_labels, all_preds, target_names=['Pneumonia', 'Normal'], output_dict=True)
            validation_accuracy = class_report['accuracy']
            validation_f1_score = class_report['weighted avg']['f1-score']

            print(f"Epoch {epoch+1}/{num_epochs} - Validation Accuracy: {validation_accuracy} - Validation F1 Score: {validation_f1_score:.4f}")
            if validation_f1_score > best_validation_score:
                best_validation_score = validation_f1_score
                patience_counter = 0
                torch.save(model.state_dict(), os.path.join('best_model_resnet.pth'))

            else:
                patience_counter += 1

        if patience_counter >= patience:
            print(f"Early stopping triggered after {epoch+1} epochs.")
            break

# Start training
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=2, patience=10)

Epoch 1/2 - Loss: 0.3706498853478235: 100%|██████████| 388/388 [24:14<00:00,  3.75s/it] 
Epoch 2/2 - Loss: 0.35179195046117623: 100%|██████████| 388/388 [24:47<00:00,  3.83s/it]
Epoch 2/2 - Validation Batch: 130: 100%|██████████| 131/131 [01:13<00:00,  1.77it/s]


Epoch 2/2 - Validation Accuracy: 0.9780114722753346 - Validation F1 Score: 0.9780


RuntimeError: Parent directory /content/drive/MyDrive/model_results does not exist.

### **Testing**

In [28]:
state_dict = torch.load('/content/drive/MyDrive/model_results/best_model_resnet.pth')
model.load_state_dict(state_dict)

  state_dict = torch.load('/content/drive/MyDrive/model_results/best_model_resnet.pth')


<All keys matched successfully>

In [8]:
test_images_tensor = torch.stack([torch.tensor(img, dtype=torch.float) for img in test_images]).unsqueeze(1)  # Applying the same transformation as for train/val
test_images_tensor = test_images_tensor.permute(0, 1, 2, 3)
print(test_images_tensor.shape)

test_labels_tensor = torch.tensor(test_labels, dtype=torch.long)  # or torch.float if binary classification

# Create the dataset and DataLoader for the test set
test_dataset = TensorDataset(test_images_tensor, test_labels_tensor)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=True)

all_predictions = []
all_labels = []
for images, labels in test_loader:
  images, labels = images.to(device), labels
  outputs = model(images)
  _, preds = torch.max(outputs, 1)
  all_predictions.extend(preds.cpu().numpy())
  all_labels.extend(labels.numpy())

class_report = classification_report(all_labels, all_predictions, target_names=['Pneumonia', 'Normal'])
print(class_report)



torch.Size([624, 1, 224, 224])
              precision    recall  f1-score   support

   Pneumonia       0.77      1.00      0.87       390
      Normal       0.99      0.51      0.67       234

    accuracy                           0.81       624
   macro avg       0.88      0.75      0.77       624
weighted avg       0.85      0.81      0.80       624

