# 2D Convolutional Neural Network
## Import modules

In [None]:
import os
import numpy as np
import pandas as pd
import pydicom
import matplotlib.pyplot as plt
import tensorflow as tf
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization, Activation
from tensorflow.keras.utils import plot_model
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.utils import class_weight
from sklearn.model_selection import KFold
from sklearn.metrics import roc_curve, auc, classification_report, precision_recall_curve, average_precision_score, confusion_matrix
from tensorflow.keras.optimizers import Adam
import seaborn as sns

## Data prepation
#### Data path

In [None]:
base_dir = "/kaggle/input/rsna-intracranial-aneurysm-detection"
data_dir = os.path.join(base_dir, "series")
csv_path = os.path.join(base_dir, "train.csv")
IMG_SIZE = 512

#### Load images labels

In [None]:
labels_df = pd.read_csv(csv_path)
labels_df = labels_df[['SeriesInstanceUID', 'Aneurysm Present']]
labels_df['SeriesInstanceUID'] = labels_df['SeriesInstanceUID'].astype(str).str.strip()

#### Load images

In [None]:
x_images = []
y_labels = []
series_dirs_disk = {d: d for d in os.listdir(data_dir)}

# For loop on the series.
for idx, row in tqdm(labels_df.iterrows(), total=len(labels_df), desc="Reading series..."):
    series_id = row['SeriesInstanceUID']
    label = row['Aneurysm Present']
    # Check if the directory exists.
    if series_id not in series_dirs_disk:
        continue
    series_path = os.path.join(data_dir, series_dirs_disk[series_id])
    # Check if the .dicom files exist.
    dicom_files = sorted(os.listdir(series_path))
    if len(dicom_files) == 0:
        continue
    # Select a representative slice (the middle one).
    mid_idx = len(dicom_files) // 2
    dcm_path = os.path.join(series_path, dicom_files[mid_idx])
    # Read the .dicom file.
    try:
        ds = pydicom.dcmread(dcm_path)
        img = ds.pixel_array.astype(np.float32)
    except Exception as e:
        print(f"Error reading serie {series_id}: {e}")
        continue
    # Normalization.
    img -= img.min()
    if img.max() != 0:
        img /= img.max()
    # Treat images according to their shape.
    if img.ndim == 3:  # 3D image.
        img = img[img.shape[0] // 2]  # Middle slice.
    elif img.ndim != 2:
        print(f"Unexpected image shape, skip : {img.shape}")
        continue
    # Reshape images.
    img_tf = tf.expand_dims(tf.expand_dims(img, axis=0), axis=-1)  # (1,H,W,1).
    img_resized = tf.image.resize(img_tf, (IMG_SIZE, IMG_SIZE)).numpy()[0]  # (H,W,1).
    # Save images.
    x_images.append(img_resized)
    y_labels.append(label)
    
# Convert into arrays.
x_images = np.array(x_images, dtype=np.float32).reshape(-1, IMG_SIZE, IMG_SIZE, 1)
y_labels = np.array(y_labels, dtype=np.int64)

In [None]:
print(x_images.min(), x_images.max())

#### Check the shapes

In [None]:
x_images.shape

In [None]:
x_images[0]

In [None]:
y_labels.shape

In [None]:
y_labels

#### Split dataset

In [None]:
x_train, x_test, y_train, y_test = train_test_split(x_images, 
                                                    y_labels, 
                                                    test_size=0.2, 
                                                    random_state=42,
                                                    shuffle=True
)

In [None]:
x_train.shape

In [None]:
y_train.shape

In [None]:
x_test.shape

In [None]:
y_test.shape

## Building the CNN Model
#### Defining the model's architecture

In [None]:
def cnn_2d():
    model = Sequential()
    model.add(Conv2D(32, (3,3), activation='relu', input_shape=(IMG_SIZE, IMG_SIZE, 1), padding='same'))
    model.add(MaxPooling2D((2,2)))
    model.add(Conv2D(32, (3,3), activation='relu'))
    model.add(MaxPooling2D((2,2)))
    model.add(Conv2D(64, (3,3), activation='relu'))
    model.add(MaxPooling2D((2,2)))
    model.add(Flatten())
    model.add(Dense(64, activation='relu'))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(optimizer=Adam(learning_rate=1e-4),
                  loss='binary_crossentropy',
                  metrics=['accuracy'])
    return model

#### Model summary

In [None]:
cnn_model = cnn_2d()
cnn_model.summary()

#### Model architecture

In [None]:
plot_model(cnn_model,
           show_shapes=True,
           show_layer_names=True,
           expand_nested=True
)

## Training the CNN Model
#### Train

In [None]:
np.unique(y_train, return_counts=True)

In [None]:
# Class weights
class_weights = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train),
    y=y_train
)
class_weights = dict(enumerate(class_weights))
# Add early stopping.
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=3,
    restore_best_weights=True
)
# Train the model.
history = cnn_model.fit(
    x_train,
    y_train,
    validation_split=0.3,
    epochs=10,
    batch_size=32,
    callbacks=[early_stop],
    class_weight=class_weights
)

#### Loss curve

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper right')
plt.show()

#### Accuracy curve

In [None]:
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

## Model performance
#### Basic evaluation

In [None]:
cnn_model.evaluate(x_test, y_test)

#### Class prediction on test dataset

In [None]:
y_pred = cnn_model.predict(x_test)
y_pred_classes = (y_pred >= 0.5).astype(int).flatten()
print(y_pred_classes[:10])
print(y_pred_classes.shape)

#### Confusion matrix

In [None]:
conf_matrix = confusion_matrix(y_test, y_pred_classes)
sns.heatmap(conf_matrix, annot=True, fmt="d")

In [None]:
conf_matrix_norm = confusion_matrix(y_test, y_pred_classes, normalize="true")
sns.heatmap(conf_matrix_norm, annot=True, fmt=".2f")

#### Classification report

In [None]:
print(classification_report(y_test, y_pred_classes, target_names=[str(i) for i in range(2)]))

#### Other metrics

In [None]:
tn, fp, fn, tp = conf_matrix.ravel()
sensitivity = tp / (tp + fn)
specificity = tn / (tn + fp)
print("True Positives :", tp)
print("True Negatives :", tn)
print("False Positives :", fp)
print("False Negatives :", fn)
print("Sensitivity (Recall) :", sensitivity)
print("Specificity :", specificity)

#### ROC Curve and AUC

In [None]:
# Compute ROC curve and ROC area for each class.
fpr, tpr, _ = roc_curve(y_test, y_pred)
roc_auc = auc(fpr, tpr)
# Plot ROC curve.
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='blue', lw=2, label='ROC curve (AUC = {:.2f})'.format(roc_auc))
plt.plot([0, 1], [0, 1], 'k--', lw=2, label='Random guess')
plt.xlabel('False Positive Rate (FPR)')
plt.ylabel('True Positive Rate (TPR)')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()

#### PRC Curve and AP

In [None]:
# Compute PRC Curve.
precision, recall, _ = precision_recall_curve(y_test, y_pred)
average_precision = average_precision_score(y_test, y_pred)
# Plot PRC.
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='green', lw=2, label='PRC curve (AP = {:.2f})'.format(average_precision))
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc="lower left")
plt.show()

#### 4-fold Cross-Validation

In [None]:
kf = KFold(n_splits=4, shuffle=True, random_state=42)
kf_accuracies = []
for train_index, val_index in kf.split(x_train, y_train):
    x_train_fold, x_val_fold = x_train[train_index], x_train[val_index]
    y_train_fold, y_val_fold = y_train[train_index], y_train[val_index]
    kf_model = cnn_2d()
    history_kf = kf_model.fit(x_train_fold,
                              y_train_fold,
                              validation_data=(x_val_fold, y_val_fold),
                              epochs=5,
                              batch_size=32)
    kf_results = kf_model.evaluate(x_test, y_test, verbose=0)
    kf_accuracies.append(kf_results[1])
print(f"Accuracies : {kf_accuracies}")
print(f"Variance : {np.var(kf_accuracies)}")

## Predicting the localization
#### New labels preparation

In [None]:
labels_df = pd.read_csv(csv_path)
labels_df = labels_df[['SeriesInstanceUID',
                       'Left Infraclinoid Internal Carotid Artery',
                       'Right Infraclinoid Internal Carotid Artery',
                      'Left Supraclinoid Internal Carotid Artery',
                      'Right Supraclinoid Internal Carotid Artery',
                      'Left Middle Cerebral Artery',
                      'Right Middle Cerebral Artery',
                      'Anterior Communicating Artery',
                      'Left Anterior Cerebral Artery',
                      'Right Anterior Cerebral Artery',
                      'Left Posterior Communicating Artery',
                      'Right Posterior Communicating Artery',
                      'Basilar Tip',
                      'Other Posterior Circulation']]
labels_df['SeriesInstanceUID'] = labels_df['SeriesInstanceUID'].astype(str).str.strip()
labels_df = labels_df.set_index('SeriesInstanceUID')
series_ids = [row['SeriesInstanceUID'] for _, row in pd.read_csv(csv_path)[['SeriesInstanceUID']].iterrows()]
filtered_labels = labels_df.loc[series_ids].values.astype(np.float32)
y_labels = np.array(filtered_labels, dtype=np.int64)

In [None]:
y_labels

In [None]:
y_labels.shape

#### Split data set

In [None]:
x_train, x_test, y_train, y_test = train_test_split(
    x_images,
    y_labels,
    test_size=0.2,
    random_state=42,
    shuffle=True
)

In [None]:
x_train.shape

In [None]:
x_test.shape

In [None]:
y_train.shape

In [None]:
y_test.shape

## Building the new CNN Model
#### Defining the model's architecture
It is basically the same model as previously with the adapted input shape.

In [None]:
def cnn_2d():
    model = Sequential()
    model.add(Conv2D(32, (3,3), activation='relu', input_shape=(IMG_SIZE, IMG_SIZE, 1), padding='same'))
    model.add(MaxPooling2D((2,2)))
    model.add(Conv2D(32, (3,3), activation='relu', input_shape=(IMG_SIZE, IMG_SIZE, 1), padding='same'))
    model.add(MaxPooling2D((2,2)))
    model.add(Conv2D(64, (3,3), activation='relu'))
    model.add(MaxPooling2D((2,2)))
    model.add(Flatten())
    model.add(Dense(64, activation='relu'))
    model.add(Dense(13, activation='softmax'))
    model.compile(optimizer=Adam(learning_rate=1e-4),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    return model

#### Model summary

In [None]:
cnn_model_2 = cnn_2d()
cnn_model_2.summary()

#### Model architecture

In [None]:
plot_model(cnn_model_2,
           show_shapes=True,
           show_layer_names=True,
           expand_nested=True
)

## Training the model
#### Train

In [None]:
# Class weights
#class_weights = class_weight.compute_class_weight(
#    class_weight='balanced',
#    classes=np.unique(y_train),
#    y=y_train
#)
#class_weights = dict(enumerate(class_weights))
# Add early stopping.
early_stop = EarlyStopping(
    monitor='val_loss',
    patience=3,
    restore_best_weights=True
)
# Train the model.
history_2 = cnn_model_2.fit(
    x_train,
    y_train,
    validation_split=0.3,
    epochs=10,
    batch_size=32,
    callbacks=[early_stop]
    #class_weight=class_weights
)

#### Loss

In [None]:
plt.plot(history_2.history['loss'])
plt.plot(history_2.history['val_loss'])
plt.title('')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper right')
plt.show()

#### Accuracy

In [None]:
plt.plot(history_2.history['accuracy'])
plt.plot(history_2.history['val_accuracy'])
plt.title('')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper right')
plt.show()

## Model performance
#### Basic evaluation

In [None]:
cnn_model_2.evaluate(x_test, y_test)

#### Class prediction on test dataset

In [None]:
y_test_classes = np.argmax(y_test, axis=1)
print(y_test_classes)
print(y_test_classes.shape)

In [None]:
y_pred_2 = cnn_model_2.predict(x_test)
y_pred_classes_2 = np.argmax(y_pred_2, axis=1)
print(y_pred_classes_2)
print(y_pred_classes_2.shape)

#### Confusion matric

In [None]:
conf_matrix_2 = confusion_matrix(y_test_classes, y_pred_classes_2)
sns.heatmap(conf_matrix_2, annot=True, fmt="d")

In [None]:
conf_matrix_norm_2 = confusion_matrix(y_test_classes, y_pred_classes_2, normalize="true")
sns.heatmap(conf_matrix_norm_2, annot=True, fmt=".1f")

#### Classification report

In [None]:
print(classification_report(y_test_classes, y_pred_classes_2, target_names=[str(i) for i in range(13)]))

#### Other metrics

In [None]:
n_classes = 13
TP_2 = np.zeros(n_classes)
TN_2 = np.zeros(n_classes)
FP_2 = np.zeros(n_classes)
FN_2 = np.zeros(n_classes)
sensitivity_2 = np.zeros(n_classes)
specificity_2 = np.zeros(n_classes)
for i in range(n_classes):
    TP_2[i] = conf_matrix_2[i, i]
    FN_2[i] = np.sum(conf_matrix_2[i, :]) - TP_2[i]
    FP_2[i] = np.sum(conf_matrix_2[:, i]) - TP_2[i]
    TN_2[i] = np.sum(conf_matrix_2) - TP_2[i] - FN_2[i] - FP_2[i]
    sensitivity_2[i] = TP_2[i] / (TP_2[i] + FN_2[i]) if (TP_2[i] + FN_2[i]) > 0 else 0
    specificity_2[i] = TN_2[i] / (TN_2[i] + FP_2[i]) if (TN_2[i] + FP_2[i]) > 0 else 0
print("True Positives (TP) per class:", TP_2)
print("True Negatives (TN) per class:", TN_2)
print("False Positives (FP) per class:", FP_2)
print("False Negatives (FN) per class:", FN_2)
print("Sensitivity (Recall) per class:", sensitivity_2)
print("Specificity per class:", specificity_2)

#### ROC Curve and AUC

In [None]:
fpr_2 = dict()
tpr_2 = dict()
roc_auc_2 = dict()
for i in range(n_classes):
    fpr_2[i], tpr_2[i], _ = roc_curve(y_test[:, i], y_pred_2[:, i])
    roc_auc_2[i] = auc(fpr_2[i], tpr_2[i])
plt.figure(figsize=(10, 8))
for i in range(n_classes):
    plt.plot(fpr_2[i], tpr_2[i], label='ROC curve of class {0} (area = {1:0.2f})'.format(i, roc_auc_2[i]))
plt.plot([0, 1], [0, 1], 'k--', label='Random guess')
plt.xlabel('False Positive Rate (FPR)')
plt.ylabel('True Positive Rate (TPR)')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.show()

### PRC Curve and APS

In [None]:
precision_2 = dict()
recall_2 = dict()
average_precision_2 = dict()
for i in range(n_classes):
    precision_2[i], recall_2[i], _ = precision_recall_curve(y_test[:, i], y_pred_2[:, i])
    average_precision_2[i] = average_precision_score(y_test[:, i], y_pred_2[:, i])
plt.figure(figsize=(10, 8))
for i in range(n_classes):
    plt.plot(recall_2[i], precision_2[i], label='PRC curve of class {0} (area = {1:0.2f})'.format(i, average_precision_2[i]))
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc="upper right")
plt.show()

#### 4-fold Cross-Validation

In [None]:
kf_2 = KFold(n_splits=4, shuffle=True, random_state=42)
kf_accuracies_2 = []
for train_index, val_index in kf_2.split(x_train, y_train):
    x_train_fold, x_val_fold = x_train[train_index], x_train[val_index]
    y_train_fold, y_val_fold = y_train[train_index], y_train[val_index]
    kf_model_2 = cnn_2d()
    history_kf_2 = kf_model_2.fit(x_train_fold,
                              y_train_fold,
                              validation_data=(x_val_fold, y_val_fold),
                              epochs=5,
                              batch_size=32)
    kf_results_2 = kf_model_2.evaluate(x_test, y_test, verbose=0)
    kf_accuracies_2.append(kf_results_2[1])
print(f"Accuracies : {kf_accuracies_2}")
print(f"Variance : {np.var(kf_accuracies_2)}")