<a href="https://colab.research.google.com/github/Tymass/lung-cancer-classifier-/blob/main/WK_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Dataset import
[Dataset URL](https://www.kaggle.com/datasets/dishantrathi20/ct-scan-images-for-lung-cancer)

In [None]:
%%bash
export DATASET_PATH_ZIP=/content/dataset.zip
export DATASET_KAGGLE_PATH=https://www.kaggle.com/api/v1/datasets/download/dishantrathi20/ct-scan-images-for-lung-cancer
curl -L -so $DATASET_PATH_ZIP $DATASET_KAGGLE_PATH
unzip -qo $DATASET_PATH_ZIP
rm -rf $DATASET_PATH_ZIP

Optionally you can delete selected folders

In [None]:
%%bash

# "Begin cases", "Malignant cases", "adenocarcinoma_left.lower.lobe_T2_N0_M0_Ib", "large.cell.carcinoma_left.hilum_T2_N2_M0_IIIa", "squamous.cell.carcinoma_left.hilum_T1_N2_M0_IIIa", "noraml"

DATASET_TRAIN_PATH=/content/LungcancerDataSet/Data/train
DATASET_TEST_PATH=/content/LungcancerDataSet/Data/test
DATASET_VALID_PATH=/content/LungcancerDataSet/Data/valid

TO_REMOVE=(
  "adenocarcinoma_left.lower.lobe_T2_N0_M0_Ib"
  "large.cell.carcinoma_left.hilum_T2_N2_M0_IIIa"
  "squamous.cell.carcinoma_left.hilum_T1_N2_M0_IIIa"
)
TO_REMOVE_TEST=(
  "adenocarcinoma"
  "large.cell.carcinoma"
  "squamous.cell.carcinoma"
)

for file in "${TO_REMOVE[@]}"; do
  rm -rf "$DATASET_TRAIN_PATH/$file"
  rm -rf "$DATASET_VALID_PATH/$file"
done

for file in "${TO_REMOVE_TEST[@]}"; do
  rm -rf "$DATASET_TEST_PATH/$file"
done

## Libraries import

In [None]:
import numpy as np
from torch.utils.data import DataLoader, Dataset
import torch
from sklearn.metrics import accuracy_score
from torch.utils.data import DataLoader, Dataset
from tensorflow.keras.applications import VGG19
from tensorflow.keras.applications.vgg19 import preprocess_input
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import load_img, img_to_array
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
import os
import matplotlib.pyplot as plt

## Model definitions

In [None]:
def get_fine_tuned_vgg_model(num_classes):
    base_model = tf.keras.applications.VGG19(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

    for layer in base_model.layers[:-5]:
      layer.trainable = False

    # base_model.trainable = False

    base_model.summary()
    model = models.Sequential([
        base_model,
        layers.Flatten(),
        layers.Dense(4096, activation='relu'),
        layers.Dropout(0.7),
        layers.Dense(num_classes, activation='softmax')
    ])

    return model

def get_fine_tuned_densenet_model(num_classes):
    base_model = tf.keras.applications.DenseNet201(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

    for layer in base_model.layers[:-10]:
      layer.trainable = False

    # base_model.trainable = False
    base_model.summary()
    model = models.Sequential([
        base_model,
        layers.Flatten(),
        layers.Dense(2048, activation='relu'),
        layers.Dropout(0.7),
        layers.Dense(num_classes, activation='softmax')
    ])

    return model

def get_fine_tuned_resnet_model(num_classes):
    base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

    for layer in base_model.layers[:-10]:
      layer.trainable = False

    # base_model.trainable = False
    base_model.summary()
    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(2048, activation='relu'),
        layers.Dropout(0.8),
        layers.Dense(num_classes, activation='softmax')
    ])

    return model

def get_fine_tuned_efficientnet_model(num_classes):
    base_model = tf.keras.applications.EfficientNetB0(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

    for layer in base_model.layers[:-10]:
       layer.trainable = False

    # base_model.trainable = False
    base_model.summary()
    model = models.Sequential([
        base_model,
        layers.GlobalAveragePooling2D(),
        layers.Dense(512, activation='relu'),
        layers.Dropout(0.7),
        layers.Dense(num_classes, activation='softmax')
    ])

    return model

## Function definitions

In [None]:
def extract_features(model, dataloader, device):
    features = []
    labels = []
    with torch.no_grad():
        for images, batch_labels in dataloader:
            images = images.to(device)
            batch_features = model(images).cpu().numpy()
            features.append(batch_features)
            labels.append(batch_labels.numpy())
    return np.vstack(features), np.hstack(labels)

def calculate_centroids(features, labels):
    unique_labels = np.unique(labels)
    centroids = {}
    for label in unique_labels:
        class_features = features[labels == label]
        centroids[label] = np.mean(class_features, axis=0)
    return centroids

def classify_image(query_features, centroids, metric='cosine'):
    from scipy.spatial.distance import cosine, euclidean, cityblock

    distances = {}
    for label, centroid in centroids.items():
        if metric == 'cosine':
            distances[label] = 1 - cosine(query_features, centroid)
        elif metric == 'euclidean':
            distances[label] = euclidean(query_features, centroid)
        elif metric == 'manhattan':
            distances[label] = cityblock(query_features, centroid)

    return max(distances, key=distances.get) if metric == 'cosine' else min(distances, key=distances.get)

def evaluate_model(test_features, test_labels, centroids, metric='cosine'):
    predictions = []
    for feature in test_features:
        predicted_label = classify_image(feature, centroids, metric)
        predictions.append(predicted_label)
    accuracy = accuracy_score(test_labels, predictions)
    return accuracy


def train_model(model, train_dataset, valid_dataset, epochs=100, lr=0.0001):
    model.compile(optimizer=optimizers.Adam(learning_rate=lr),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    # Early stopping
    # early_stopping = EarlyStopping(monitor='val_loss', patience=2, restore_best_weights=True)
    # lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=2, verbose=1, min_lr=1e-6)

    default_callbacks = [
        EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1, min_lr=1e-6)
    ]

    history = model.fit(
        train_dataset,
        validation_data=valid_dataset,
        epochs=epochs,
        callbacks=default_callbacks
    )

    return model, history

def get_feature_extractor(model):
    base_model = model.layers[0]

    pooling_output = layers.GlobalAveragePooling2D()(base_model.output)
    dense_output = layers.Dense(512, activation='relu')(pooling_output)
    feature_extractor = Model(inputs=base_model.input, outputs=dense_output)
    return feature_extractor


def extract_features_from_finetuned(model, dataloader):
    features = []
    labels = []
    feature_extractor = get_feature_extractor(model)

    # feature_extractor.summary()
    for images, batch_labels in dataloader:
        feature_batch = feature_extractor.predict(images)
        print(f'Extracted features shape: {feature_batch.shape}')
        features.append(feature_batch)
        labels.append(batch_labels)

    return np.vstack(features), np.hstack(labels)

## Fine-tuning

In [None]:
if __name__ == "__main__":

    train_dir = "LungcancerDataSet/Data/train"
    valid_dir = "LungcancerDataSet/Data/valid"
    test_dir = "LungcancerDataSet/Data/test"

    train_dataset = image_dataset_from_directory(
        train_dir,
        image_size=(224, 224),
        batch_size=64,
        label_mode='int'
    )

    valid_dataset = image_dataset_from_directory(
        valid_dir,
        image_size=(224, 224),
        batch_size=64,
        label_mode='int'
    )

    test_dataset = image_dataset_from_directory(
        test_dir,
        image_size=(224, 224),
        batch_size=64,
        label_mode='int'
    )

    model = get_fine_tuned_vgg_model(num_classes=3)
    model.summary()
    model, history = train_model(model, train_dataset, valid_dataset, epochs=100)

## Embedded space extraction

In [None]:
train_features, train_labels = extract_features_from_finetuned(model, train_dataset)
test_features, test_labels = extract_features_from_finetuned(model, test_dataset)

In [None]:
centroids = calculate_centroids(train_features, train_labels)

accuracy = evaluate_model(test_features, test_labels, centroids, metric='euclidean')
print(f"Accuracy after fine-tuning: {accuracy:.2f}")

# Densenet120
cosine: 0.57

euc: 0.29

manh: 0.05

# VGG19
cosine: 0.43

euc: 0.08

manh: 0.05
# EfficientnetB0
cosine: 0.57

euc: 0.46

manh: 0.05
#Resnet50
cosine: 0.48

euc: 0.56

manh: 0.28

In [None]:
centroids = calculate_centroids(train_features, train_labels)
for label, centroid in centroids.items():
    print(f'Centroid shape: {centroid.shape}')
    print(f'Centroid for label {label}: {centroid[:5]}')

## Training summary

In [None]:
def plot_training_history(history):
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title('Accuracy over epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Loss over epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.tight_layout()
    plt.show()

plot_training_history(history)