In [16]:
# Initial setup cell
!pip3 install -r ../../requirements.txt

Collecting opencv-python (from -r ../../requirements.txt (line 13))
  Downloading opencv_python-4.9.0.80-cp37-abi3-macosx_11_0_arm64.whl.metadata (20 kB)
Downloading opencv_python-4.9.0.80-cp37-abi3-macosx_11_0_arm64.whl (35.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.4/35.4 MB[0m [31m30.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: opencv-python
Successfully installed opencv-python-4.9.0.80


In [56]:
# Import dependencies
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data
import torchvision

### Data Setup
Initializes our train and test set and stores the ground truth of each image. We use Torch libraries to handle this for us.

In [57]:
# Import data setup dependencies
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.data import SubsetRandomSampler
from torchvision.datasets import ImageFolder
from sklearn.model_selection import KFold
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.metrics import accuracy_score

The code below extracts images from our dataset, resizes each into a fourth their original size (768 -> 192), and converts them into Torch tensors. The ImageFolder class allows us to lazyload our images to preserve our computational power.

In [58]:
# Check current device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Path to our lung_image_sets
data_dir = "../../lung_colon_image_set/lung_image_sets"

# Set the resize size of the images
resized_size = 224

# Convert images into Tensors
tensor_data = transforms.Compose([
  transforms.Resize((resized_size, resized_size)),   # Cut image into a fourth of original size
  transforms.ToTensor()
])

# Load the dataset using ImageFolder
data = ImageFolder(root=data_dir, transform=tensor_data)

# Split the dataset into train and test sets
train_size = int(0.8 * len(data))
test_size = len(data) - train_size
train, test = torch.utils.data.random_split(data, [train_size, test_size])

# Create data loaders for training and testing
load_train = DataLoader(train, batch_size=32, shuffle=True)
load_test = DataLoader(test, batch_size=32, shuffle=False)

### EfficientNet

EfficientNet is an image classification family. In our project, we chose to utilize EfficientNetB4, a scaled version of the base EfficientNet model published by NVIDIA. We utilize EfficientNetB4 as a feature extractor, pulling out 128 features and passing those into our SVM, Softmax, and SVM+PCA classifiers. Our implementation of the feature extractor is below:

In [59]:
from torchvision.models import efficientnet_b4

# Define the EfficientNetCNN model
class EfficientNetCNN(nn.Module):
    def __init__(self, device):
        super(EfficientNetCNN, self).__init__()
        
        # Load the pre-trained EfficientNet-B4 model as our feature extractor
        self.efficientnet = efficientnet_b4(pretrained=True)
        
        # Remove the last layer and freeze the model
        self.efficientnet.classifier = nn.Sequential(*list(self.efficientnet.classifier.children())[:-1])
        self.efficientnet.classifier.eval()

        # Convert to device
        self.to(device=device)
        
    def forward(self, x):
        # Pass the input through the EfficientNet model
        x = self.efficientnet(x)
        return x
    

### Hyperparameters and Constants
Here we define our hyperparameters and constants that will stay constant throughout all implementations of our model.

In [61]:
# Define hyperparameters
learning_rate = 5e-4
momentum = 0.9

# Define number of epochs
num_epochs = 1

# Store the results of each fold
num_folds = 5
kfold = KFold(n_splits=5, shuffle=True, random_state=231)

### EfficientNetB4 + SVM
For our first situation, we will use SVM to do classification on our extracted features.

In [62]:
# Define the model being used
model = EfficientNetCNN(device=device)

# Get the number of features extracted by EfficientNet
num_features = model.efficientnet.classifier[-1].in_features

# Construct results dict to track training
results = {}

# K-Fold Cross Validation
for fold, (train_indices, val_indices) in enumerate(kfold.split(data), 1):
    print(f'Fold {fold}')

    # Create data samplers for train and validation sets
    train_sampler = SubsetRandomSampler(train_indices)
    val_sampler = SubsetRandomSampler(val_indices)

    # Create data loaders for train and validation sets
    train_loader = DataLoader(data, batch_size=32, sampler=train_sampler)
    val_loader = DataLoader(data, batch_size=32, sampler=val_sampler)
    
    # Extract features and labels for the training set
    train_features = []
    train_labels = []
    with torch.no_grad():
        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            outputs = outputs.view(outputs.size(0), -1)
            train_features.append(outputs.cpu().numpy())
            train_labels.append(labels.cpu().numpy())
    train_features = np.concatenate(train_features)
    train_labels = np.concatenate(train_labels)
    
    # Extract features and labels for the validation set
    val_features = []
    val_labels = []
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            outputs = outputs.view(outputs.size(0), -1)
            val_features.append(outputs.cpu().numpy())
            val_labels.append(labels.cpu().numpy())
    val_features = np.concatenate(val_features)
    val_labels = np.concatenate(val_labels)

    # Create and train the SVM classifier
    svm_model = make_pipeline(StandardScaler(), SVC(kernel='linear'))
    svm_model.fit(train_features, train_labels)

    # Evaluate the classifier on the validation set and extract metrics
    val_predictions = svm_model.predict(val_features)
    accuracy = accuracy_score(val_labels, val_predictions)

    results[fold] = accuracy
    print(f'Fold {fold} Accuracy: {accuracy:.4f}')

# Print the average metrics across all folds
average_accuracy = np.mean(list(results.values()))

print(f'K-FOLD CROSS VALIDATION RESULTS FOR {num_folds} FOLDS')
print('--------------------------------')
for fold in results:
    print(f'Fold {fold}: {results[fold]:.4f}')
print(f'Average: {average_accuracy:.4f}')



Fold 1


KeyboardInterrupt: 

### EfficientNetB4 + Softmax
For our second situation, we will use Softmax to do classification on our extracted features.

In [None]:
# Define the model being used
model = EfficientNetCNN(device=device)

# Construct results dict to track training
results = {}

# K-Fold Cross Validation
for fold, (train_indices, val_indices) in enumerate(kfold.split(data), 1):
    print(f'Fold {fold}')

    # Create data samplers for train and validation sets
    train_sampler = SubsetRandomSampler(train_indices)
    val_sampler = SubsetRandomSampler(val_indices)

    # Create data loaders for train and validation sets
    train_loader = DataLoader(data, batch_size=32, sampler=train_sampler)
    val_loader = DataLoader(data, batch_size=32, sampler=val_sampler)
    
    # Extract features and labels for the training set
    train_features = []
    train_labels = []
    with torch.no_grad():
        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            outputs = outputs.view(outputs.size(0), -1)
            train_features.append(outputs.cpu().numpy())
            train_labels.append(labels.cpu().numpy())
    train_features = np.concatenate(train_features)
    train_labels = np.concatenate(train_labels)
    
    # Extract features and labels for the validation set
    val_features = []
    val_labels = []
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            outputs = outputs.view(outputs.size(0), -1)
            val_features.append(outputs.cpu().numpy())
            val_labels.append(labels.cpu().numpy())
    val_features = np.concatenate(val_features)
    val_labels = np.concatenate(val_labels)

    # Create and train the Softmax classifier
    softmax_model = make_pipeline(StandardScaler(), SVC(kernel='linear'))
    softmax_model.fit(train_features, train_labels)

    # Evaluate the classifier on the validation set and extract metrics
    val_predictions = softmax_model.predict(val_features)
    accuracy = accuracy_score(val_labels, val_predictions)

    results[fold] = accuracy
    print(f'Fold {fold} Accuracy: {accuracy:.4f}')

# Print the average metrics across all folds
average_accuracy = np.mean(list(results.values()))

print(f'K-FOLD CROSS VALIDATION RESULTS FOR {num_folds} FOLDS')
print('--------------------------------')
for fold in results:
    print(f'Fold {fold}: {results[fold]:.4f}')
print(f'Average: {average_accuracy:.4f}')

### DGW-Net + PCA + SVM
As an extension to our SVM implementation, the paper suggests that applying PCA on the resulting features derives higher accuracy before being loaded into the SVM classifier. We implement this approach below:

As noted in our paper, we will be unable to replicate the original paper's (Ofary and Ilhan) results because they did not provide their PCA implementation in their paper. Due to time and cost limitations, we decided to use PCA to only utilize the top 50 features from the extracted features.

In [4]:
from sklearn.decomposition import PCA

# Define the CNN model with SVM and PCA
class EN_PCA_SVM(nn.Module):
    def __init__(self, num_components, device):
        super(EN_PCA_SVM, self).__init__()
        
        # Base DGWNet feature extractor -> 128 features
        self.extract_features = EfficientNetCNN(device=device)
        
        # PCA for dimensionality reduction
        self.pca = PCA(n_components=num_components)
        
        # SVM layer using nn.Linear
        self.svm = nn.Linear(num_components, 3)

        # Convert to device
        self.to(device=device)
    
    def forward(self, x):
        # Pass the input through the base CNN
        x = self.extract_features(x)
        
        # Convert features to numpy array -> pass to sklearn
        x = x.detach().cpu().numpy()
        
        # Apply PCA for dimensionality reduction
        x = self.pca.fit_transform(x)
        
        # Convert reduced features back to tensor
        x = torch.from_numpy(x).float().to(device)
        
        # Pass the reduced features through the SVM layer
        x = self.svm(x)
        
        return x.to(device)

In [None]:
# Define instance of our model
pca_svm_model = EN_PCA_SVM(num_components=20, device=device)

# Define our optimizer (reuse same loss function from before)
pca_svm_optimizer = optim.SGD(pca_svm_model.parameters(), lr=learning_rate, momentum=momentum)

### Training the models

Here we call all of our models and train each one on our dataset. To do this, we utilize 5-fold cross validation to train our models. We also store the running losses for each model in an array to model change in loss over time. The definitions are below, and the following cell allows us to begin training.

In [None]:
# Load the dataset using ImageFolder
data = ImageFolder(root=data_dir, transform=tensor_data)

# Split the dataset into train and test sets
train_size = int(0.8 * len(data))
test_size = len(data) - train_size
train, test = torch.utils.data.random_split(data, [train_size, test_size])

# Create data loaders for training and testing
load_train = DataLoader(train, batch_size=32, shuffle=True)
load_test = DataLoader(test, batch_size=32, shuffle=False)

# Define number of folds
k_folds = 5

# Create a KFold object with 5 splits
kfold = KFold(n_splits=k_folds, shuffle=True, random_state=231)
folds = kfold.split(data)

# Store the running losses over each epoch
svm_losses = []
softmax_losses = []
pca_losses = []

Now let's train our models:

In [None]:
# Iterate over folds in training
for fold, (train_indices, val_indices) in enumerate(folds, 1):
    # Create data samplers for train and validation sets
    train_sampler = SubsetRandomSampler(train_indices)
    val_sampler = SubsetRandomSampler(val_indices)

    # Create data loaders for train and validation sets
    train_loader = DataLoader(data, batch_size=32, sampler=train_sampler)
    val_loader = DataLoader(data, batch_size=32, sampler=val_sampler)

    for epoch in range(num_epochs):
        svm_running_loss = 0.0
        softmax_running_loss = 0.0
        pca_running_loss = 0.0
        
        for images, labels in load_train:
            # Move the input data to the device (CPU or GPU)
            images = images.to(device)
            labels = labels.to(device)
            
            # Zero the parameter gradients
            pca_svm_optimizer.zero_grad()
            svm_optimizer.zero_grad()
            softmax_optimizer.zero_grad()
            
            # Forward pass through all models
            svm_output = svm_model(images)
            svm_loss = loss_function(svm_output, labels)

            softmax_output = softmax_model(images)
            softmax_loss = loss_function(softmax_output, labels)

            pca_output = pca_svm_model(images)
            pca_loss = loss_function(pca_output, labels)
            
            # Backward pass and step for all models
            svm_loss.backward()
            svm_optimizer.step()

            softmax_loss.backward()
            softmax_optimizer.step()

            pca_loss.backward()
            pca_svm_optimizer.step()

            # Update running loss
            svm_running_loss += svm_loss.item()
            softmax_running_loss += softmax_loss.item()
            pca_running_loss += pca_loss.item()
        
        # Print the average loss for the epoch
        svm_epoch_loss = svm_running_loss / len(load_train)
        softmax_epoch_loss = softmax_running_loss / len(load_train)
        pca_epoch_loss = pca_running_loss / len(load_train)

        # Store current epoch loss in list
        svm_losses.append((epoch, svm_epoch_loss))
        softmax_losses.append((epoch, softmax_epoch_loss))
        pca_losses.append((epoch, pca_epoch_loss))
        
        print(f"Epoch [{epoch+1}/{num_epochs}], \nSVM Loss: {svm_epoch_loss:.4f}, \nSoftmax Loss: {softmax_epoch_loss:.4f}, \nPCA Loss: {pca_epoch_loss:.4f}")

### Testing and Metrics

Now with our trained models, we will now test with our test set and store metrics for each model. The metrics that we will store are the following:
- Accuracy
- Precision
- Recall
- F1

The metrics are defined in our paper more clearly, but to calculate these we will calculate the the following values:
- True Positive (TP)
- False Positive (FP)
- True Negative (TN)
- False Negative (FN)

We calculate these values below:

In [None]:
# Import metrics from sklearn
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score

In [None]:
# List of models to evaluate
models = [svm_model, DGWSoftmax(), DGWPCASVM()]

# Iterate over each model
for model in models:
    model.eval()  # Set the model to evaluation mode
    
    # Initialize variables to store predictions and true labels
    true_labels = []
    predicted_labels = []
    
    # Iterate over the test set
    with torch.no_grad():
        for images, labels in load_test:
            images = images.to(device)  # Move images to the device (CPU or GPU)
            labels = labels.to(device)  # Move labels to the device (CPU or GPU)
            
            # Forward pass through the model
            outputs = model(images)
            
            # Get the predicted labels
            _, preds = torch.max(outputs, 1)
            
            # Append the true labels and predicted labels to the lists
            true_labels.extend(labels.cpu().numpy())
            predicted_labels.extend(preds.cpu().numpy())
    
    # Calculate evaluation metrics
    accuracy = accuracy_score(true_labels, predicted_labels)
    recall = recall_score(true_labels, predicted_labels, average='weighted')
    precision = precision_score(true_labels, predicted_labels, average='weighted')
    f1 = f1_score(true_labels, predicted_labels, average='weighted')
    
    # Print the evaluation metrics for the current model
    print(f"Model: {type(model).__name__}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print()