In [1]:
# Standard Library Imports
import os
import itertools
import warnings
warnings.filterwarnings("ignore")

# Data Handling and Numerical Operations
import numpy as np
import pandas as pd
import kagglehub as kh

# Plotting Libraries
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
%matplotlib inline
import seaborn as sns
import visualkeras as vk

# Model Building and Preprocessing
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, BatchNormalization, Dropout, Flatten
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.optimizers import Adam, Adamax
from tensorflow.keras.applications import DenseNet121
from tensorflow.keras.regularizers import l2
from tensorflow.keras.utils import get_file

# Model Evaluation and Metrics
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support, classification_report, roc_curve, auc, confusion_matrix


ddsm_path = kh.dataset_download("awsaf49/cbis-ddsm-breast-cancer-image-dataset")
print(ddsm_path)

ImportError: Error importing numpy: you should not try to import numpy from
        its source directory; please exit the numpy source tree, and relaunch
        your python interpreter from there.

# Data Preperation

In [None]:
df_meta = pd.read_csv(ddsm_path+'/csv/meta.csv')
df_meta.head()

In [None]:
# load dicom info file
df_dicom = pd.read_csv(ddsm_path+'/csv/dicom_info.csv')
df_dicom.head()

In [None]:
# check image types in dataset
df_dicom.SeriesDescription.unique()

In [None]:
# check image path in dataset
# cropped images
cropped_images = df_dicom[df_dicom.SeriesDescription=='cropped images'].image_path
print(cropped_images.iloc[0])

In [None]:
#full mammogram images
full_images = df_dicom[df_dicom.SeriesDescription=='full mammogram images'].image_path
print(full_images.iloc[0])

In [None]:
# ROI images
mask_images = df_dicom[df_dicom.SeriesDescription=='ROI mask images'].image_path
print(mask_images.iloc[0])

In [None]:
# Change path from above to local image directory
path_to_replace = 'CBIS-DDSM/jpeg'
# set correct image path for image types
replacement_path = ddsm_path+'/jpeg'

cropped_images = cropped_images.replace(path_to_replace, replacement_path, regex=True)
full_images = full_images.replace(path_to_replace, replacement_path, regex=True)
mask_images = mask_images.replace(path_to_replace, replacement_path, regex=True)

# view new paths
print('Cropped Images paths:')
print(cropped_images.iloc[0])
print('Full mammo Images paths:')
print(full_images.iloc[0])
print('ROI Mask Images paths:')
print(mask_images.iloc[0])

In [None]:
# organize image paths
full_images_dict = dict()
cropped_images_dict = dict()
mask_images_dict = dict()

for dicom in full_images:
    key = dicom.split("/")[5]
    full_images_dict[key] = dicom
for dicom in cropped_images:
    key = dicom.split("/")[5]
    cropped_images_dict[key] = dicom
for dicom in mask_images:
    key = dicom.split("/")[5]
    mask_images_dict[key] = dicom

# view keys
next(iter((full_images_dict.items())))

In [None]:
# Load the mass dataset
mass_train = pd.read_csv(ddsm_path+'/csv/mass_case_description_train_set.csv')
mass_test = pd.read_csv(ddsm_path+'/csv/mass_case_description_test_set.csv')
# Merge train and test
mass_df = pd.concat([mass_train, mass_test])

mass_train.head(1)

In [None]:
def fix_image_path(dataset):
    for i, img in enumerate(dataset.values):
        img_name = img[11].split("/")[2]
        if img_name in full_images_dict:
            dataset.iloc[i, 11] = full_images_dict[img_name]

        img_name = img[12].split("/")[2]
        if img_name in cropped_images_dict:
            dataset.iloc[i, 12] = cropped_images_dict[img_name]

        img_name = img[13].split("/")[2]
        if img_name in mask_images_dict:
            dataset.iloc[i, 13] = mask_images_dict[img_name]

# Fix image paths
fix_image_path(mass_df)
print(mass_df.iloc[1]['image file path'])
print(mass_df.iloc[1]['cropped image file path'])
print(mass_df.iloc[1]['ROI mask file path'])

# Fix Data

In [None]:
mass_df.info()

In [None]:
# Rename columns to remove spaces
mass_df = mass_df.rename(columns={'left or right breast': 'left_or_right_breast',
                                           'image view': 'image_view',
                                           'abnormality id': 'abnormality_id',
                                           'abnormality type': 'abnormality_type',
                                           'mass shape': 'mass_shape',
                                           'mass margins': 'mass_margins',
                                           'image file path': 'image_file_path',
                                           'cropped image file path': 'cropped_image_file_path',
                                           'ROI mask file path': 'ROI_mask_file_path'})

mass_df.head(1)

In [None]:
# Check for nulls
mass_df.isnull().sum()

In [None]:
# Back fill null values
mass_df['mass_shape'] = mass_df['mass_shape'].bfill()
mass_df['mass_margins'] = mass_df['mass_margins'].bfill()

# Recheck
mass_df.isnull().sum()

In [None]:
# Summarise Features
mass_df.describe()

In [None]:
# Check unique values in pathology column
print(mass_df.pathology.value_counts())

In [None]:
# Remap
mapper = {'MALIGNANT': 'MALIGNANT', 'BENIGN': 'BENIGN', 'BENIGN_WITHOUT_CALLBACK': 'BENIGN'}
mass_df['pathology'] = mass_df['pathology'].map(mapper)
# Check only two classes remain
mass_df.pathology.unique()

In [None]:
# Check dataset shape
print(f'Shape of mass_train: {mass_df.shape}')

# Visualizations

In [None]:
# Pathology distribution
value = mass_df['pathology'].value_counts()
plt.figure(figsize=(6,6))

plt.pie(value, labels=value.index, autopct='%1.1f%%')
plt.title('Breast Cancer Pathology Distribution', fontsize=15)
plt.show()

In [None]:
def display_images_by_type(dataframe, num_images_to_show=5):
    image_info = [
        {'column': 'image_file_path', 'title': 'Full Mammogram'},
        {'column': 'ROI_mask_file_path', 'title': 'Mask'},
        {'column': 'cropped_image_file_path', 'title': 'Cropped Mammogram'},
    ]

    num_image_types = len(image_info)
    fig, axes = plt.subplots(num_image_types, num_images_to_show, figsize=(num_images_to_show * 3, num_image_types * 3))
    top_n_entries = dataframe.head(num_images_to_show)

    for type_idx, info in enumerate(image_info):
        column_name = info['column']
        column_title = info['title']
        if type_idx == 0:
            fig.suptitle(f'Visualizing {num_images_to_show} Entries by Image Type', fontsize=16, y=1.02)

        for img_idx, (index, row_data) in enumerate(top_n_entries.iterrows()):
            image_path = row_data[column_name]
            image = mpimg.imread(image_path)

            ax = axes[type_idx][img_idx]
            ax.imshow(image, cmap='gray')

            if img_idx == 0:
                ax.set_ylabel(column_title, rotation=90, size='large', ha='right')
                ax.yaxis.set_label_coords(-0.25, 0.5)

            ax.set_title(f"Pathology: {row_data['pathology']}", fontsize=8)
            ax.axis('off')

    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    plt.show()


display_images_by_type(mass_df, num_images_to_show=5)

# Preprocessing of Images

In [None]:
df = mass_df[['cropped_image_file_path', 'pathology']].copy()
df = df.dropna()  # Drop rows with missing paths or labels

# Split into train and test/validation
train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['pathology'], random_state=12)

# Split into test and validation sets (10% of data each)
train_df, valid_df = train_test_split(train_df, test_size=0.5, stratify=train_df['pathology'], random_state=12)

In [None]:
# Define image dimensions and batch size
IMAGE_DIMENSIONS = (299, 299)
TRAINING_BATCH_SIZE = 32

# Basic Augmentation Pipeline
augmentation = ImageDataGenerator(
    rescale=1./255,
    rotation_range=5,
    width_shift_range=0.03,
    height_shift_range=0.03,
    shear_range=0.03,
    zoom_range=0.03,
    fill_mode='reflect',
    horizontal_flip=True,
    vertical_flip=True,
)

# Data generator for training data with augmentation
augmented_train = augmentation.flow_from_dataframe(
    dataframe=train_df,
    x_col='cropped_image_file_path',
    y_col='pathology',
    target_size=IMAGE_DIMENSIONS,
    batch_size=TRAINING_BATCH_SIZE,
    class_mode='categorical',
    shuffle=True,
    seed=12
)

# Data generators for validation and test data without augmentation (only rescaling)
validation_test_preprocessing = ImageDataGenerator(rescale=1./255)

augmented_validation = validation_test_preprocessing.flow_from_dataframe(
    dataframe=valid_df,
    x_col='cropped_image_file_path',
    y_col='pathology',
    target_size=IMAGE_DIMENSIONS,
    batch_size=TRAINING_BATCH_SIZE,
    class_mode='categorical',
    shuffle=False,
    seed=12
)

augmented_test = validation_test_preprocessing.flow_from_dataframe(
    dataframe=test_df,
    x_col='cropped_image_file_path',
    y_col='pathology',
    target_size=IMAGE_DIMENSIONS,
    batch_size=TRAINING_BATCH_SIZE,
    class_mode='categorical',
    shuffle=False,
    seed=42
)

In [None]:
# Get images
images, labels = next(augmented_train)

# Convert labels to names from binary
class_indices = augmented_train.class_indices
classes = list(class_indices.keys())
label_names = [classes[np.argmax(label)] for label in labels]

# Display the images
plt.figure(figsize=(10, 10))
for i in range(min(images.shape[0], 10)): # display 10
    plt.subplot(5, 5, i + 1)
    plt.imshow(images[i])
    plt.title(label_names[i])
    plt.axis('off')
plt.tight_layout()
plt.show()

# CNN Architecture

In [None]:
def create_model(train_set):
    img_shape = (int(299), int(299), int(3)) # has to be 3 channel for pretrained models
    class_count = len(train_set.class_indices)
    print("Class count:", class_count)
    print("Class indices:", train_set.class_indices)

    base_model = DenseNet121(include_top=False, input_shape=img_shape, weights='imagenet')

    # Freeze the base model layers to prevent them from being updated during initial training
    for layer in base_model.layers:
        layer.trainable = False

    x = base_model.output
    x = Flatten()(x)
    x = BatchNormalization()(x)
    x = Dense(256, activation="relu", kernel_regularizer=l2(0.001))(x)
    x = Dropout(0.5)(x)
    x = Dense(64, activation="relu", kernel_regularizer=l2(0.001))(x)
    x = Dropout(0.3)(x)

    output = Dense(class_count, activation="sigmoid", name="output_sigmoid")(x)

    # 4. Create the Keras Model:
    model = Model(inputs=base_model.input, outputs=output)

    # 5. Compile the model
    model.compile(optimizer=Adamax(learning_rate=0.00005), # Keep your specified optimizer and learning rate
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    model.summary()
    return model

model = create_model(augmented_train)

In [None]:
vk.layered_view(
    model
)

In [None]:
def train_and_evaluate_model(model, train_generator, validation_generator, test_generator, num_epochs=25):
    # Model Training
    training_history = model.fit(
        train_generator,
        validation_data=validation_generator,
        epochs=num_epochs
    )

    # Evaluate Model on Test Set
    test_predictions_probabilities = model.predict(test_generator)
    predicted_class_labels = np.argmax(test_predictions_probabilities, axis=1)
    true_class_labels = test_generator.classes

    # Calculate precision, recall, and F1 score.
    precision_score, recall_score, f1_score, _ = precision_recall_fscore_support(
        true_class_labels, predicted_class_labels, average='weighted', zero_division=0
    )

    # Create metrics dict
    metrics = {
        'test_precision': precision_score,
        'test_recall': recall_score,
        'test_f1_score': f1_score
    }


    return training_history, metrics

history, metrics = train_and_evaluate_model(model, augmented_train, augmented_validation, augmented_test)

# Evaluation

In [None]:
print(metrics)

In [None]:
def confusion_matrix_plot(test, y_predict):
  y_pred = np.argmax(y_predict, axis=1)

  # get class indices and labels
  class_indices = test.class_indices
  classes = list(class_indices.keys())

  # compute confusion matrix
  cm = confusion_matrix(test.classes, y_pred)

  plt.figure(figsize=(10, 10))
  plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
  plt.title('Confusion Matrix')
  plt.colorbar()

  tick_marks = np.arange(len(classes))
  plt.xticks(tick_marks, classes, rotation=45)
  plt.yticks(tick_marks, classes)

  threshold = cm.max() / 2.0
  for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
      plt.text(
          j, i, f"{cm[i, j]}",
          horizontalalignment='center',
          color='white' if cm[i, j] > threshold else 'black'
      )

  plt.tight_layout()
  plt.ylabel('True Label')
  plt.xlabel('Predicted Label')
  plt.show()

y_prediction = model.predict(augmented_test)
confusion_matrix_plot(augmented_test, y_prediction)

In [None]:
# Get the true labels and predicted probabilities for the positive class (MALIGNANT)
y_true = augmented_test.classes
y_pred_prob = y_prediction[:, 1]  # Get probabilities for the 'MALIGNANT' class

# Calculate the AUC
auc_score = auc(roc_curve(y_true, y_pred_prob)[0], roc_curve(y_true, y_pred_prob)[1])
print(f'AUC: {auc_score:.4f}')

# Plot the ROC curve
fpr, tpr, thresholds = roc_curve(y_true, y_pred_prob)

plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % auc_score)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()

# Classification Report

In [None]:
# Visualize model history
plt.plot(history.history['accuracy'], label='Training accuracy')
plt.plot(history.history['val_accuracy'], label='Validation accuracy')
plt.title('training / validation accuracies')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(loc="upper left")
plt.show()

In [None]:
plt.plot(history.history['loss'], label='Training loss')
plt.plot(history.history['val_loss'], label='Validation loss')
plt.title('ReLU training / validation loss values')
plt.ylabel('Loss value')
plt.xlabel('Epoch')
plt.show()