<a href="https://colab.research.google.com/github/AzixSall/ML_Notebooks/blob/main/Breast_Cancer_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import keras_cv
import math

In [None]:
def strong_augmentation(image):
    image = tf.image.random_brightness(image, 0.3)
    image = tf.image.random_contrast(image, 0.7, 1.3)
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    return image


In [None]:
BATCH_SIZE = 64
EPOCHS = 100
IMG_SIZE = 200
CHANNELS = 1

def parse_tfrecord(example_proto):
    feature_description = {
        'image': tf.io.FixedLenFeature([], tf.string),
        'label_normal': tf.io.FixedLenFeature([], tf.int64),
        'label': tf.io.FixedLenFeature([], tf.int64)
    }

    parsed_features = tf.io.parse_single_example(example_proto, feature_description)

    image = tf.io.decode_raw(parsed_features['image'], tf.uint8)
    image = tf.reshape(image, [299, 299, 1])
    image = tf.image.resize(image, [IMG_SIZE, IMG_SIZE])
    image = tf.cast(image, tf.float32) / 255.0

    label = parsed_features['label']

#    label = tf.where(
#        tf.equal(parsed_features['label_normal'], tf.constant(0, dtype=tf.int64)),
#        tf.constant(0, dtype=tf.int64),
#        parsed_features['label']
#    )

#    image = tf.cond(
#        tf.not_equal(label, 0),
#        lambda: strong_augmentation(image),
#        lambda: image
#    )

    label = tf.one_hot(label, 5, dtype=tf.float32)

    return image, label

In [None]:
def create_dataset(tfrecord_files, batch_size=BATCH_SIZE):
    """Create dataset from TFRecord files"""
    dataset = tf.data.TFRecordDataset(tfrecord_files)
    dataset = dataset.map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.shuffle(1000)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    return dataset

In [None]:
def create_numpy_dataset(data_path, labels_path, batch_size=BATCH_SIZE, is_training=False):
    if isinstance(data_path, str):
        data = np.load(data_path, mmap_mode='r')
        labels = np.load(labels_path, mmap_mode='r')
    else:
        data = data_path
        labels = labels_path

    def generator():
        for i in range(len(data)):
            image = data[i].reshape(299, 299, 1)
            image = tf.image.resize(image, [IMG_SIZE, IMG_SIZE])
            #image = image.astype('uint8')
            image = tf.cast(image, tf.float32) / 255.0

            label_index = labels[i]
            one_hot_label = np.zeros(5, dtype=np.float32)
            one_hot_label[label_index] = 1.0

            yield image, one_hot_label

    return tf.data.Dataset.from_generator(
        generator,
        output_signature=(
            tf.TensorSpec(shape=(IMG_SIZE, IMG_SIZE, 1), dtype=tf.float32),
            tf.TensorSpec(shape=(5,), dtype=tf.float32)
        )
    ).batch(batch_size).prefetch(tf.data.AUTOTUNE)

In [None]:
print("Loading training dataset...")
train_files = [f'Mammography/training10_{i}/training10_{i}.tfrecords' for i in range(5)]
train_dataset = create_dataset(train_files, BATCH_SIZE)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

def display_batch(dataset, num_images=8):
    """
    Display images from a batch of the dataset
    """
    # Get one batch
    for images, labels in dataset.take(1):
        plt.figure(figsize=(15, 8))

        for i in range(min(num_images, len(images))):
            plt.subplot(2, 4, i + 1)

            img = images[i].numpy()
            label = np.argmax(labels[i].numpy())

            label_names = ['Normal', 'Abnormal 1', 'Abnormal 2',
                         'Abnormal 3', 'Abnormal 4']
            title = f'Label: {label_names[label]}'

            plt.imshow(img, cmap='gray')
            plt.title(title)
            plt.axis('off')

        plt.tight_layout()
        plt.show()

display_batch(train_dataset)

#for batch in train_dataset.take(10):
#    display_batch(train_dataset)

In [None]:
print("Loading and combining validation/test data...")
test_data = np.load('Mammography/test10_data/test10_data.npy', mmap_mode='r')
test_labels = np.load('Mammography/test10_labels.npy', mmap_mode='r')

cv_data = np.load('Mammography/cv10_data/cv10_data.npy', mmap_mode='r')
cv_labels = np.load('Mammography/cv10_labels.npy', mmap_mode='r')

combined_data = np.concatenate([test_data, cv_data])
combined_labels = np.concatenate([test_labels, cv_labels])

#np.random.shuffle(combined_data)
indices = np.random.permutation(len(combined_data))
combined_data = combined_data[indices]
combined_labels = combined_labels[indices]

In [None]:
val_split = int(len(combined_data) * 0.5)
val_dataset = create_numpy_dataset(
    combined_data[:val_split],
    combined_labels[:val_split],
    BATCH_SIZE,
    is_training=False
)

test_dataset = create_numpy_dataset(
    combined_data[val_split:],
    combined_labels[val_split:],
    BATCH_SIZE,
    is_training=False
)

class_names = ['Negative', 'Benign Calcification', 'Benign Mass',
               'Malignant Calcification', 'Malignant Mass']

print(train_dataset.take(1))
print(val_dataset.take(2))

def print_label_distribution(labels):
    unique, counts = np.unique(labels, return_counts=True)
    dist = dict(zip(unique, counts))
    print("\nLabel distribution:")
    for label_idx, count in dist.items():
        print(f"{class_names[label_idx]}: {count} samples ({count/len(labels)*100:.2f}%)")

print("\nValidation set:")
print_label_distribution(combined_labels[:val_split])
print("\nTest set:")
print_label_distribution(combined_labels[val_split:])

In [None]:
def calculate_class_weights(train_dataset, strategy='advanced'):
    """
    Calculate class weights with advanced boosting strategies for minority classes

    Parameters:
    train_dataset: TensorFlow dataset
    """
    class_counts = np.zeros(5)
    total_samples = 0


    for _, labels in train_dataset:
        batch_labels = labels.numpy()
        class_counts += np.sum(batch_labels, axis=0)
        total_samples += len(batch_labels)

    # Calculate base frequencies
    epsilon = 1e-7
    class_frequencies = class_counts / total_samples

    if strategy == 'basic':
        # Original inverse frequency weighting
        class_weights = 1 / (class_frequencies + epsilon)

    elif strategy == 'custom':
        class_weights = 1 / (class_frequencies + epsilon)

        boost_factors = np.array([
            1.0,
            2.0,
            1.8,
            2.5,
            2.0
        ])

        class_weights = class_weights * boost_factors

    class_weights[class_counts == 0] = 0.0

    # Normalize weights
    if np.sum(class_weights) > 0:
        class_weights = class_weights * len(class_counts) / np.sum(class_weights)

        # Ensure minimum weight is 1.0
        min_weight = np.min(class_weights[class_weights > 0])
        class_weights = class_weights / min_weight

    class_weights_dict = dict(enumerate(class_weights))

    print("\nDataset Statistics:")
    print(f"Total samples: {total_samples}")
    print("\nClass Distribution:")
    label_names = ['Normal', 'Benign Calc', 'Benign Mass',
                   'Malignant Calc', 'Malignant Mass']

    print(f"{'Class':<15} {'Count':>8} {'Frequency':>12} {'Weight':>10}")
    print("-" * 45)

    for i, (count, freq, weight) in enumerate(zip(class_counts,
                                                class_frequencies,
                                                class_weights)):
        print(f"{label_names[i]:<15} {int(count):8d} {freq:12.4f} {weight:10.4f}")

    print("\nWeight Statistics:")
    print(f"Mean weight: {np.mean(class_weights):.4f}")
    print(f"Max/Min ratio: {np.max(class_weights)/np.min(class_weights[class_weights>0]):.4f}")

    return class_weights_dict

class_weights = calculate_class_weights(train_dataset, strategy='custom')

In [None]:
def create_custom_efficientnet(img_size=IMG_SIZE, num_classes=5):

    # Load the base EfficientNetB0 model without top layers
    base_model = tf.keras.applications.EfficientNetB0(
        include_top=False,
        weights='imagenet',
        input_shape=(img_size, img_size, 3)
    )

    inputs = tf.keras.Input(shape=(img_size, img_size, 1))
    x = tf.keras.layers.Conv2D(3, (1, 1), padding='same')(inputs)

    x = base_model(x)

    # Custom top layers with similar structure to your original model
    x = tf.keras.layers.GlobalAveragePooling2D()(x)

    # First dense block
    x = tf.keras.layers.Dense(512, activation='leaky_relu',
                            kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Dropout(0.5)(x)

    # Second dense block
    x = tf.keras.layers.Dense(256, activation='leaky_relu',
                            kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Dropout(0.3)(x)

    # Output layer
    outputs = tf.keras.layers.Dense(num_classes, activation='softmax')(x)

    # Create model
    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    return model

In [None]:
def compile_and_prepare_model(model):

    initial_learning_rate = 5e-4
    decay_steps = 1000
    decay_rate = 0.9

    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate,
        decay_steps=decay_steps,
        decay_rate=decay_rate,
        staircase=True
    )

    # Optimizer with weight decay
    optimizer = tf.keras.optimizers.AdamW(
        learning_rate=initial_learning_rate,
        weight_decay=0.01
    )

    model.compile(
        optimizer=optimizer,
        loss='categorical_crossentropy',
        metrics=[
            'accuracy',
            tf.keras.metrics.Precision(),
            tf.keras.metrics.Recall(),
            tf.keras.metrics.AUC()
        ]
    )

    return model

In [None]:
model = create_custom_efficientnet()
model = compile_and_prepare_model(model)

In [None]:
model.summary()

In [None]:
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=10,
    min_delta=1e-7,
    restore_best_weights=True,
)

plateau = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.2,
    patience=5,
    min_delta=1e-7,
    cooldown=0,
    verbose=1
)

model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    'best_model.keras',
    monitor='val_loss',
    save_best_only=True,
    verbose=1
)

In [None]:
history = model.fit(train_dataset,
    validation_data=val_dataset,
                    epochs = EPOCHS,
                    batch_size = BATCH_SIZE,
                    class_weight=class_weights,
                    callbacks=[early_stopping, plateau, model_checkpoint])

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns


results = model.evaluate(test_dataset)
print("\nTest Results:")
for name, value in zip(model.metrics_names, results):
    print(f"{name}: {value:.4f}")


y_pred = np.argmax(model.predict(test_dataset), axis=1)
y_true = np.concatenate([np.argmax(labels, axis=1)
                        for _, labels in test_dataset])


class_names = ['Normal', 'Benign Calc', 'Benign Mass',
               'Malignant Calc', 'Malignant Mass']
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_names,
            yticklabels=class_names)
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.tight_layout()
plt.show()

In [None]:

plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.title('Normal/Abnormal Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.show()

In [None]:
import gc
import tensorflow as tf

# Clear GPU memory before training
gc.collect()
tf.keras.backend.clear_session()