# Breast Cancer Detection Project

## 1st Experiment (Using Tensorflow)

In [None]:
import pandas as pd
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, BatchNormalization, Flatten, Dense, Dropout
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.preprocessing.image import ImageDataGenerator
import os


BASE_DIR = "/kaggle/input/breast-cancer-jpg-image-dataset-of-cbisddsm/k_CBIS-DDSM"
CALC_CSV_PATH = f"{BASE_DIR}/calc_case(with_jpg_img).csv"
MASS_CSV_PATH = f"{BASE_DIR}/mass_case(with_jpg_img).csv"
BATCH_SIZE = 64

def preprocess_image(image, label):
    image = tf.io.read_file(image) # Load
    image = tf.image.decode_jpeg(image, channels=1) # Grey Scale
    image = tf.image.resize(image, [224, 224]) # Scale Down
    image = image / 255.0 # Normalize

    return image, label


def create_dataset(image_paths, labels, batch_size):
    dataset = tf.data.Dataset.from_tensor_slices((image_paths, labels))
    dataset = dataset.map(preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    
    return dataset


def create_label(abnormality_type, pathology):
    if pathology in ["BENIGN", "BENIGN_WITHOUT_CALLBACK"]:
        pathology_label = 0  # Benign
    else:
        pathology_label = 1  # Malignant

    if abnormality_type == "MASS":
        return pathology_label  # 0: Benign Mass; 1: Malignant Mass

    else:
        return 2 + pathology_label  # 2: Benign Calcification; 3: Malignant Calcification



calc_case_df = pd.read_csv(CALC_CSV_PATH)
mass_case_df = pd.read_csv(MASS_CSV_PATH)

image_paths = []
labels = []

for _, row in calc_case_df.iterrows():
    image_path = os.path.join(BASE_DIR, row["jpg_fullMammo_img_path"])
    label = create_label(row["abnormality type"], row["pathology"])
    image_paths.append(image_path)
    labels.append(label)

for _, row in mass_case_df.iterrows():
    image_path = os.path.join(BASE_DIR, row["jpg_fullMammo_img_path"])
    label = create_label(row["abnormality type"], row["pathology"])
    image_paths.append(image_path)
    labels.append(label)


train_image_paths, train_labels, test_image_paths, test_labels = [], [], [], []

for image_path, label in zip(image_paths, labels):
    if "Train" in image_path:
        train_image_paths.append(image_path)
        train_labels.append(label)

    elif "Test" in image_path:
        test_image_paths.append(image_path)
        test_labels.append(label)

# Create data generators for training and testing with data augmentation
train_datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    vertical_flip=True,
    fill_mode='nearest'
)


test_datagen = ImageDataGenerator()

# Apply data augmentation to the training dataset
train_augmented_dataset = create_dataset(train_image_paths, train_labels, BATCH_SIZE)
train_augmented_iterator = tf.data.Dataset.as_numpy_iterator(train_augmented_dataset)
train_augmented_images, train_augmented_labels = next(train_augmented_iterator)
train_augmented_dataset = train_datagen.flow(train_augmented_images, train_augmented_labels)

# Apply data augmentation to the test dataset (without random transformations)
test_augmented_dataset = create_dataset(test_image_paths, test_labels, BATCH_SIZE)
test_augmented_iterator = tf.data.Dataset.as_numpy_iterator(test_augmented_dataset)
test_augmented_images, test_augmented_labels = next(test_augmented_iterator)
test_augmented_dataset = test_datagen.flow(test_augmented_images, test_augmented_labels)


# Define the CNN model
model = models.Sequential()

# Convolutional layer with 64 filters, a 3x3 kernel, 'relu' activation, and batch normalization
model.add(layers.Conv2D(64, (3, 3), activation='relu', input_shape=(224, 224, 1)))
model.add(layers.BatchNormalization())
# Max pooling layer with a 2x2 pool size
model.add(layers.MaxPooling2D((2, 2)))

# Another convolutional layer with 128 filters, a 3x3 kernel, 'relu' activation, and batch normalization
model.add(layers.Conv2D(128, (3, 3), activation='relu'))
model.add(layers.BatchNormalization())
# Another max pooling layer
model.add(layers.MaxPooling2D((2, 2)))

# Another convolutional layer with 256 filters, a 3x3 kernel, 'relu' activation, and batch normalization
model.add(layers.Conv2D(256, (3, 3), activation='relu'))
model.add(layers.BatchNormalization())
# Another max pooling layer
model.add(layers.MaxPooling2D((2, 2)))

# Flatten layer to convert the 2D output to a vector
model.add(layers.Flatten())

# Fully connected layer with 512 neurons, 'relu' activation, and dropout
model.add(layers.Dense(512, activation='relu'))
model.add(layers.Dropout(0.2))  # Adjust the dropout rate as needed

# Output layer with four neurons (for the four classes) and 'softmax' activation
model.add(layers.Dense(4, activation='softmax'))


# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

# Display the model summary
model.summary()

# Define callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
model_checkpoint = ModelCheckpoint('best_model.h5', save_best_only=True)

# Train the model
epochs = 20

# Train the model with data augmentation
history = model.fit(
    train_augmented_dataset,
    epochs=epochs,
    validation_data=test_augmented_dataset,
    callbacks=[early_stopping, model_checkpoint])

# Evaluate the model on the original test set
test_loss, test_accuracy = model.evaluate(test_augmented_dataset)
print(f"\nTest Accuracy: {test_accuracy * 100:.2f}%")

## 2nd Experiment (Using Pytorch Lightning)

In [None]:
import pandas as pd
import os
from PIL import Image
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torchvision import transforms
from torchvision.io import read_image

import pytorch_lightning as pl
from pytorch_lightning.trainer import Trainer
from pytorch_lightning.callbacks import EarlyStopping, GradientAccumulationScheduler, ModelCheckpoint
from pytorch_lightning.accelerators import CUDAAccelerator

BASE_DIR = "/kaggle/input/breast-cancer-jpg-image-dataset-of-cbisddsm/k_CBIS-DDSM"
CALC_CSV_PATH = f"{BASE_DIR}/calc_case(with_jpg_img).csv"
MASS_CSV_PATH = f"{BASE_DIR}/mass_case(with_jpg_img).csv"
BATCH_SIZE = 64

# Instantiate the model
class Net(pl.LightningModule):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 64, kernel_size=3, padding=1)
        self.batchnorm1 = nn.BatchNorm2d(64)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.batchnorm2 = nn.BatchNorm2d(128)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.batchnorm3 = nn.BatchNorm2d(256)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(256 * 28 * 28, 512)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)
        self.fc2 = nn.Linear(512, 4)  # Assuming 4 classes for the output layer

    def forward(self, x):
        x = self.conv1(x)
        x = self.batchnorm1(x)
        x = self.relu(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.batchnorm2(x)
        x = self.relu(x)
        x = self.pool2(x)
        x = self.conv3(x)
        x = self.batchnorm3(x)
        x = self.relu(x)
        x = self.pool3(x)
        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

    def training_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = F.cross_entropy(outputs, labels)
        return loss

    def validation_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = F.cross_entropy(outputs, labels)

        # Calculate accuracy and log it
        _, predicted = torch.max(outputs, 1)
        accuracy = torch.sum(predicted == labels.data).item() / len(labels)
        self.log("val_accuracy", accuracy, prog_bar=True)

        return loss

    def test_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = F.cross_entropy(outputs, labels)
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=0.001)
        return optimizer

# Custom dataset class
class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]

        # Use PIL.Image.open to read the image
        image = Image.open(img_path).convert('L')  # 'L' mode for grayscale
        if self.transform:
            image = self.transform(image)

        # Move image and label to GPU
        return image, label

# Image transformations with data augmentation, dropout, and normalization
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),  # Add vertical flip
    transforms.RandomRotation(15),  # Rotate randomly up to 15 degrees
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),  # Adjust color
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5]),  # Adjust mean and std as needed
    transforms.RandomErasing(p=0.5, scale=(0.02, 0.2), ratio=(0.3, 3.3)),  # Optional: Random Erasing
])

def create_label(abnormality_type, pathology):
    if pathology in ["BENIGN", "BENIGN_WITHOUT_CALLBACK"]:
        pathology_label = 0  # Benign
    else:
        pathology_label = 1  # Malignant

    if abnormality_type == "MASS":
        return pathology_label  # 0: Benign Mass; 1: Malignant Mass

    else:
        return 2 + pathology_label  # 2: Benign Calcification; 3: Malignant Calcification
    
# Load and preprocess the dataset
calc_case_df = pd.read_csv(CALC_CSV_PATH)
mass_case_df = pd.read_csv(MASS_CSV_PATH)

image_paths = []
labels = []

for _, row in calc_case_df.iterrows():
    image_path = os.path.join(BASE_DIR, row["jpg_fullMammo_img_path"])
    label = create_label(row["abnormality type"], row["pathology"])
    image_paths.append(image_path)
    labels.append(label)

for _, row in mass_case_df.iterrows():
    image_path = os.path.join(BASE_DIR, row["jpg_fullMammo_img_path"])
    label = create_label(row["abnormality type"], row["pathology"])
    image_paths.append(image_path)
    labels.append(label)

# Split the data into training and testing sets
train_image_paths, test_image_paths, train_labels, test_labels = train_test_split(
    image_paths, labels, test_size=0.2, random_state=42, stratify=labels
)

# Create data loaders for training and testing with data augmentation
train_dataset = CustomDataset(train_image_paths, train_labels, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

test_dataset = CustomDataset(test_image_paths, test_labels, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Instantiate the model, loss function, and optimizer
model = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# Define PyTorch Lightning Trainer
trainer = pl.Trainer(
    max_epochs=20,
    #accelerator=CUDAAccelerator(),  # Use CUDAAccelerator
    callbacks=[
        #EarlyStopping(monitor='val_accuracy', patience=3, mode='max'),  # Early stopping with accuracy
        GradientAccumulationScheduler(scheduling={4: 2}),  # Gradient accumulation
        ModelCheckpoint(
            dirpath="./saved_models",
            filename="best_model",
            save_top_k=1,
            monitor='val_accuracy', 
            mode='max',
        )
    ],
)

# Train the model using PyTorch Lightning Trainer
trainer.fit(model, train_loader)

# Test the model
trainer.test(dataloaders=test_loader)

/opt/conda/lib/python3.10/site-packages/pytorch_lightning/trainer/configuration_validator.py:74: You defined a `validation_step` but have no `val_dataloader`. Skipping val loop.
/opt/conda/lib/python3.10/site-packages/pytorch_lightning/trainer/connectors/data_connector.py:441: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=3` in the `DataLoader` to improve performance.
/opt/conda/lib/python3.10/site-packages/pytorch_lightning/loops/fit_loop.py:293: The number of training batches (45) is smaller than the logging interval Trainer(log_every_n_steps=50). Set a lower value for log_every_n_steps if you want to see logs for the training epoch.


Training: |          | 0/? [00:00<?, ?it/s]