# Cancer Classification Project

The project is about of how deep learning and machine learning alghoritms can be used to solve problems that includes machine recognition in images.

In this case, this project in only about identify patterns to classify benign or malign cancer in images.

This project it is only a college project, the main idea is to share knoledge about the data science and deep learning tecnhiques.

## Install the neccesary packages

In [2]:
!pip install tensorflow matplotlib pandas numpy -q

## DataPreparation class

The objetive of the class is to analize and create a balance dataset from the root data that already has a train, test and validate sub-folders.

In [3]:
def preprocess_data(x, y):
    """
    This function handles the image vector and rescale it and optimize it

    :param x: (image dataset)
    :param y: (labels dataset)
    :return: x
    """
    output = tf.keras.layers.Rescaling(1. / 255)(x)
    return output, y

In [4]:
def prepare_dataset(dataset, training = False):
    """
    Receive the dataset and apply the corresponding preprocess.

    :param dataset
    """
    dataset = dataset.map(lambda x, y: preprocess_data(x, y))
    dataset = dataset.cache()
    if training:
        dataset = dataset.shuffle(3100)
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    return dataset

In [5]:
import tensorflow as tf
import logging
from pathlib import Path
from typing import Optional, Dict

logger = logging.getLogger(__name__)

class DataPreparation:
    """
    This class makes semi-automatic the creation of datasets.
    """
    def __init__(self,
                 root_folder,
                 img_size,
                 color_mode,
                 batch_size,
                 class_mode: str = 'binary',
                 shuffle: bool = True,
                 seed: Optional[int] = None,
                 validation_split: Optional[float] = None,
                 subset: Optional[str] = None,
                 cache: bool = True,
                 prefetch: bool = True,
                 recursive: bool = False):

        self.root_folder = root_folder
        self.img_size = img_size
        self.color_mode = color_mode
        self.batch_size = batch_size
        self.class_mode = class_mode
        self.shuffle = shuffle
        self.seed = seed
        self.validation_split = validation_split
        self.subset = subset
        self.cache = cache
        self.prefetch = prefetch
        self.recursive = recursive

    def create_datasets_from_subdirectories(self) -> Dict[str, tf.data.Dataset]:
        root = Path(self.root_folder)
        if not root.exists():
            logger.error("Root folder does not exist: %s", self.root_folder)
            raise FileNotFoundError(f"Root folder does not exist: {self.root_folder}")

        subdirectories = [p for p in root.iterdir() if p.is_dir()]
        datasets = {}
        if not subdirectories:
            try:
                ds = self.create_dataset(str(root))
                datasets[root.name] = ds
                logger.info("Loaded single dataset from root folder.")
            except Exception as e:
                logger.exception("Failed to load dataset from root folder: %s", e)
            return datasets

        for sub in subdirectories:
            path = str(sub)
            try:
                ds = self.create_dataset(path)
                datasets[sub.name] = ds
            except Exception as e:
                logger.exception("Skipping dataset at %s due to error: %s", path, e)

        logger.info('Data normalization done!')
        return datasets

    def create_dataset(self, path: str) -> tf.data.Dataset:
        p = Path(path)
        class_dirs = [d for d in p.iterdir() if d.is_dir() and any(f for f in d.rglob('*') if f.is_file())]
        classes_count = len(class_dirs)

        label_mode = self.class_mode
        if classes_count == 0:
            label_mode = None
            logger.info("No class subdirectories found in %s — using label_mode=None", path)
        else:
            if self.class_mode == 'binary' and classes_count != 2:
                label_mode = 'categorical'
                logger.info("class_mode='binary' but %d classes found in %s — switching to 'categorical'", classes_count, path)

        try:
            dataset = tf.keras.utils.image_dataset_from_directory(
                directory=path,
                image_size=self.img_size,
                color_mode=self.color_mode,
                batch_size=self.batch_size,
                label_mode=label_mode,
                shuffle=self.shuffle,
                seed=self.seed,
                validation_split=self.validation_split,
                subset=self.subset,
                follow_links=self.recursive
            )
            logger.info('Dataset loaded successfully from %s', path)
            logger.info('Classes found in dataset: %s', getattr(dataset, 'class_names', None))
        except Exception as e:
            logger.exception("Error loading dataset from %s: %s", path, e)
            raise

        try:
            is_training = 'train' in path.lower()
            dataset = prepare_dataset(dataset, training=is_training) if is_training else prepare_dataset(dataset)
        except Exception:
            logger.exception("prepare_dataset failed for %s, returning raw dataset", path)

        try:
            if self.cache:
                dataset = dataset.cache()
            if self.prefetch:
                dataset = dataset.prefetch(tf.data.AUTOTUNE)
        except Exception:
            logger.exception("Error applying cache/prefetch for %s — returning dataset without those optimizations", path)

        return dataset

## Base Model and Sub-Models Architecture

This project is designed with a specific architectural vision in mind: to leverage the strengths of different deep learning models through an ensemble approach. The core idea is to create various models, each based on a distinct architecture (such as ResNet, Inception, MobileNet, etc.), but all sharing a common foundation. By combining the predictions of these diverse models, we aim to achieve a more robust and accurate overall prediction than any single model could achieve on its own.

Here's a breakdown of the planned classes:

*   **Base Model Class:** This serves as the foundational class for all other model instances. It will contain the basic functionalities and parameters common to all models, such as methods for compiling the model, training, evaluation, and potentially saving/loading weights. The other model-specific classes will inherit from this Base Model class, ensuring a consistent interface and reducing code duplication. This class defines the core structure and shared logic for handling the models.

*   **EfficientNetV2Instance Class:** This class will inherit from the `Base Model` class. It will specifically implement a model based on the EfficientNetV2 architecture. EfficientNetV2 is known for its good balance between accuracy and efficiency, making it a suitable choice for one of the ensemble members.

*   **MobileNetV2Instance Class:** This class will also inherit from the `Base Model` class. It will implement a model based on the MobileNetV2 architecture. MobileNetV2 is optimized for mobile and embedded vision applications, focusing on computational efficiency and low latency. Including it in the ensemble brings diversity in terms of model complexity and performance characteristics.

*   **ResNet50V2Instance Class:** Another class inheriting from the `Base Model`. This one will implement a model using the ResNet50V2 architecture. ResNet (Residual Networks) are famous for their ability to train very deep networks effectively, and ResNet50V2 is a widely used variant known for its strong performance on image classification tasks.

The ultimate goal is to take advantage of the unique features and learning capabilities of each individual architecture (EfficientNetV2, MobileNetV2, ResNet50V2, etc.) through the ensemble method. This approach allows the project to explore how different model perspectives can be combined to improve the overall performance in classifying benign or malignant cancer in images.


### Base model

In [None]:
from tensorflow import keras
from typing import Union, Optional

In [None]:
def image_augmentation(x):

    x = keras.layers.RandomFlip('horizontal')(x)
    x = keras.layers.RandomRotation(0.1)(x)
    x = keras.layers.RandomZoom(0.1)(x)
    x = keras.layers.RandomTranslation(0.2, 0.2)(x)
    x = keras.layers.RandomContrast(0.2)(x)
    return x

In [None]:
from typing import Union, Optional

from tensorflow import keras


class BaseModel:

    def __init__(self,
                 model: Optional[keras.Model] = None,
                 optimizer: Optional[Union[str, keras.optimizers.Optimizer]] = None,
                 loss: Optional[Union[str, keras.losses.Loss]] = None,
                 metrics: Optional[Union[list[Union[str, keras.metrics.Metric]]]] = None,
                 epochs: Optional[int] = None,
                 learning_rate: Optional[float] = None,
                 train_dataset: Optional[object] = None,
                 test_dataset: Optional[object] = None,
                 validation_dataset: Optional[object] = None,
                 task_type: Optional[str] = None,
                 debug: Optional[bool] = None,
                 img_shape: Optional[tuple] = False,
                 layer_name: Optional[str] = None,
                 ):

        self.model = model
        self.optimizer = optimizer or keras.optimizers.Adam(learning_rate=0.001)
        self.metrics = metrics or ['accuracy']
        self.loss = loss
        self.train_dataset = train_dataset
        self.test_dataset = test_dataset
        self.validation_dataset = validation_dataset
        self.learning_rate = 0.01
        self.task_type = task_type
        self.debug = False
        self.layer_name = layer_name
        self.units = None
        self.epochs = epochs or 20

        match self.task_type:
            case 'binary':
                self.units = 1
            case 'categorical':
                self.units = 3

    def build_model(self, base_model):
        if base_model is None:
            raise ValueError("A base model must be provided.")
        base_model.trainable = False

        # Removed index calculation from here
        print(f'[debug] building model with trainable base set to False')
        self.top_model(base_model=base_model)
        # Return None or remove return as index is not calculated here anymore
        return None

    def get_layer_name_index_to_fine_tuning_at(self, model):
        # This method is now primarily used internally by fine_tuning_model
        layers = [layer.name for layer in model.layers]
        if self.layer_name and self.layer_name in layers:
             return layers.index(self.layer_name)
        elif self.layer_name:
            print(f"[Warning] Layer name '{self.layer_name}' not found in model layers. Returning -1.")
            return -1 # Or handle error appropriately
        else:
             print("[Warning] layer_name is not set. Returning -1 for fine-tuning index.")
             return -1


    def fine_tuning_model(self, start_layer: Union[int, str]):
        """
        Unfreezes layers in the model starting from the specified index or layer name for fine-tuning.

        This method sets the `trainable` attribute of layers to True from the given
        `start_layer` (index or name) onwards. BatchNormalization layers are kept frozen.

        :param start_layer: The index (int) or name (str) of the layer from which
                            to start fine-tuning.
                            If index is negative, it counts from the end of the layers.
                            Layers with an index less than the determined starting index
                            will be frozen.
        """
        if self.model is None:
            raise RuntimeError("Model is not built yet. Call build_model() first.")

        actual_index = -1 # Default to no fine-tuning

        if isinstance(start_layer, str):
            # Find the index if a layer name is provided
            layers = [layer.name for layer in self.model.layers]
            try:
                actual_index = layers.index(start_layer)
                print(f"[debug] Fine-tuning from layer '{start_layer}' at index: {actual_index}")
            except ValueError:
                print(f"[Error] Layer with name '{start_layer}' not found in the model. Skipping fine-tuning.")
                return # Exit if layer name not found
        elif isinstance(start_layer, int):
            # Use the provided index (handle negative indices)
            if start_layer < 0:
                actual_index = len(self.model.layers) + start_layer
                if actual_index < 0:
                    print(f"[Warning] Negative index {start_layer} is out of bounds. Will not unfreeze any layers.")
                    return # Exit if index is out of bounds
                print(f"[debug] fine-tuning from layer index: {actual_index} (calculated from negative index {start_layer})")
            else:
                actual_index = start_layer
                if actual_index >= len(self.model.layers):
                    print(f"[Warning] Index {start_layer} is out of bounds. Will not unfreeze any layers.")
                    return # Exit if index is out of bounds
                print(f"[debug] fine-tuning from layer index: {actual_index}")
        elif start_layer is None:
             print("[debug] start_layer is None. Skipping fine-tuning.")
             return # No fine-tuning if start_layer is None
        else:
            print(f"[Error] Invalid type for start_layer: {type(start_layer)}. Must be int, str, or None.")
            return # Exit for invalid type


        # Apply trainability based on actual_index
        if actual_index != -1: # Only proceed if a valid index was determined
            for i, layer in enumerate(self.model.layers):
                if i >= actual_index:
                    # Keep BatchNormalization layers frozen during fine-tuning
                    if not isinstance(layer, keras.layers.BatchNormalization):
                        layer.trainable = True
                    else:
                         layer.trainable = False # Explicitly set BatchNormalization to False
            else:
                layer.trainable = False # Ensure layers before the index are frozen


    def top_model(self, base_model):
        inputs = keras.Input(
            shape=(224, 224, 3))
        x = image_augmentation(inputs)
        encoder_output = base_model(x)
        x = keras.layers.GlobalAveragePooling2D()(encoder_output)
        x = keras.layers.Dropout(0.5)(x)
        x = keras.layers.Dense(256)(x)
        x = keras.layers.BatchNormalization()(x)
        x = keras.layers.ReLU()(x)
        outputs = keras.layers.Dense(self.units, activation='sigmoid')(x)
        model = keras.Model(inputs=inputs, outputs=outputs)
        self.model = model

    def compile(self, learning_rate=None):

        if learning_rate is None:
            optimizer = self.optimizer
        else:
            optimizer = keras.optimizers.Adam(learning_rate=learning_rate)

        self.model.compile(
            optimizer=optimizer,
            loss=self.loss,
            metrics=self.metrics
        )

    def fit(self):

        if not hasattr(self, "callbacks") or not self.callbacks:
            raise RuntimeError("Callbacks must be set before training. Call set_callbacks() first.")

        if self.validation_dataset is not None:
            return self.model.fit(
                self.train_dataset,
                batch_size=32,
                epochs=self.epochs,
                validation_data=self.validation_dataset,
                callbacks=self.callbacks
            )
        else:
            return self.model.fit(
                self.train_dataset,
                batch_size=32,
                epochs=self.epochs,
                callbacks=self.callbacks
            )

    def set_callbacks(self,
                      filepath_to_save_model: str,
                      tensorboard_log_file: str,
                      monitor_param: Optional[str] = 'val_loss',
                      callbacks: list[keras.callbacks.Callback] = None,
                      early_stopping_patiencie: int = 10,
                      reducelr_patiente: int = 5,
                      factor: float = 0.5,
                      ):
        if early_stopping_patiencie <= reducelr_patiente:
            raise ('Early Stopping patience callback must have to be greater than'
                   'ReduceLROnPlateau patience callback')

        self.callbacks = callbacks or [
            keras.callbacks.EarlyStopping(
                patience=early_stopping_patiencie,
                monitor=monitor_param,
                restore_best_weights=True
            ),
            keras.callbacks.ModelCheckpoint(
                filepath=filepath_to_save_model,
                monitor=monitor_param,
                save_best_only=True
            ),
            keras.callbacks.ReduceLROnPlateau(
                patience=reducelr_patiente,
                monitor=monitor_param,
                factor=factor
            ),
            keras.callbacks.TensorBoard(
                log_dir=tensorboard_log_file,
                histogram_freq=1,
                write_images=True
            )
        ]

    def show_layer_names(self, model=None):

        if model is None:
            print("[Error] model parameter can not be None")
            return
        # Use self.model if no model is passed
        model_to_show = model if model is not None else self.model
        if model_to_show is None:
             print("[Error] Model is not built yet.")
             return

        for idx, layer in enumerate(model_to_show.layers):
            print(f"{idx}: {layer.name}")


    def execute_model_flow(self):
        """
            Execute the complete training workflow for transfer learning.
            The workflow consists of:
                1. Building the base transfer learning model.
                2. Compiling the model with the default optimizer.
                3. Performing initial training (feature extraction stage).
                4. Unfreezing selected layers for fine-tuning.
                5. Recompiling the model with a lower learning rate (1e-5).
                6. Training again (fine-tuning stage).

            :return: None
            :rtype: None
            """
        # Build model sets base_model trainable to False initially
        self.build_model() # Assuming subclasses will provide base_model if needed

        self.compile()
        print("\nStarting initial training (feature extraction)...")
        self.fit()

        # Directly pass self.layer_name to fine_tuning_model
        # fine_tuning_model will handle finding the index if it's a string
        start_layer_for_fine_tuning = self.layer_name if self.layer_name else None # Use layer_name if set

        if start_layer_for_fine_tuning: # Proceed with fine-tuning only if a starting layer is specified
            print(f"\nStarting fine-tuning from: {start_layer_for_fine_tuning}...")
            # Now call the modified fine_tuning_model with the determined starting layer (name or index)
            self.fine_tuning_model(start_layer=start_layer_for_fine_tuning)
            # Only recompile and fit if fine-tuning was successfully initiated
            if any(layer.trainable for layer in self.model.layers): # Check if any layers were unfrozen
                 self.compile(learning_rate=1e-5) # Recompile with lower learning rate for fine-tuning
                 self.fit()
            else:
                 print("\nNo layers were unfrozen for fine-tuning. Skipping fine-tuning training stage.")

        else:
            print("\nSkipping fine-tuning as no starting layer name was specified in the constructor.")

### ResNet50V2

In [None]:
class ResNet50V2Instance(BaseModel):

    def __init__(self, *args, **kwargs):
        kwargs['loss'] = kwargs.get('loss', 'binary_crossentropy')
        super().__init__(*args, **kwargs)

        super().set_callbacks(
            filepath_to_save_model='model/saved_models/ResNet50V2Instance.keras',
            tensorboard_log_file='logs_dir/categorical_model'
        )

    def build_model(self, model=None):
        if model is None:
            base_model = keras.applications.ResNet50V2(
                include_top=False,
                weights='imagenet',
                input_shape=(224, 224, 3)
            )
        else:
            base_model = model
        super().build_model(base_model=base_model)

    def fine_tuning_model(self):
        model = self.model
        fine_tuning_at = -50
        for layer in model.layers[:fine_tuning_at]:
            if isinstance(layer, keras.layers.BatchNormalization):
                layer.trainable = False
            else:
                layer.trainable = True

        self.model = model

### MobileNetV2

In [None]:
from tensorflow import keras
class MobileNetV2Instance(BaseModel):

    def __init__(self, *args, **kwargs):
        kwargs['loss'] = kwargs.get('loss', 'binary_crossentropy')
        # Set a default layer_name for fine-tuning or allow it to be passed in
        # Using 'block_15_expand' as a common starting point for fine-tuning
        kwargs['layer_name'] = kwargs.get('layer_name', 'block_15_expand')
        super().__init__(*args, **kwargs)

        super().set_callbacks(
            filepath_to_save_model='model/saved_models/MobileNetV2Instance.keras',
            tensorboard_log_file='logs_dir/categorical_model'
        )

    def build_model(self, base_model=None): # Changed parameter name from model to base_model
        if base_model is None:
            base_model = keras.applications.MobileNetV2(
                include_top=False,
                weights='imagenet',
                input_shape=(224, 224, 3)
            )
        else:
            base_model = base_model
        # Pass the base_model to the parent's build_model which handles trainable layers
        super().build_model(base_model=base_model)

### EfficientNetV2


In [None]:
class EfficientNetV2Instance(BaseModel):
    def __init__(self, *args, **kwargs):
        kwargs['loss'] = kwargs.get('loss', 'binary_crossentropy')
        # Set a default layer_name for fine-tuning or allow it to be passed in
        kwargs['layer_name'] = kwargs.get('layer_name', 'block6a_expand') # Example layer name for EfficientNetV2B0, adjust as needed
        super().__init__(*args, **kwargs)

        super().set_callbacks(
            filepath_to_save_model='model/saved_models/EfficientNetV2Instance.keras',
            tensorboard_log_file='logs_dir/categorical_model'
        )

    def build_model(self, base_model=None): # Changed parameter name from model to base_model
        if base_model is None:
            base_model = keras.applications.EfficientNetV2B0(
                include_top=False,
                weights='imagenet',
                input_shape=(224, 224, 3)
            )
        else:
            base_model = base_model
        super().build_model(base_model=base_model)

## Training the Models

With the updated `BaseModel` and its subclasses, the training process is streamlined, especially when using the `execute_model_flow` method.

1.  **Instantiate your desired model:** Create an instance of the specific model you want to train (e.g., `MobileNetV2Instance`, `ResNet50V2Instance`, `EfficientNetV2Instance`). When creating the instance, you can pass your training, testing, and validation datasets, as well as the `task_type` (`'binary'` or `'categorical'`). You can also specify the `layer_name` in the constructor to define the starting point for fine-tuning during the `execute_model_flow`.

## Create the datasets

In [None]:
dataset_path = '/content/drive/MyDrive/data'

dataset = (DataPreparation(root_folder=dataset_path,
        img_size=(224, 224),
        color_mode='rgb',
        batch_size=32,
        class_mode='binary',
        shuffle=True).
    create_datasets_from_subdirectories())

Found 390 files belonging to 2 classes.


In [None]:
from enum import Enum

class TaskType(Enum):
    BINARY = "binary"
    CATEGORICAL = "categorical"
    REGRESSION = "regression"

In [None]:
# Instantiate the MobileNetV2 model
# Replace train_dataset, validation_dataset, and test_dataset with your actual datasets
# Make sure to have your datasets loaded before this step
mobilenet_model = MobileNetV2Instance(
    train_dataset=dataset['train'],        # Use the training dataset from DataPreparation
    validation_dataset=dataset['validation'],   # Use the validation dataset from DataPreparation
    test_dataset=dataset['test'],         # Use the test dataset from DataPreparation
    task_type=TaskType.BINARY.value,        # Or 'categorical' depending on your problem
    layer_name='block_15_expand',
    epochs=10# Specify the layer name to start fine-tuning from
)

# Now you can execute the training flow
# history = mobilenet_model.execute_model_flow() # This line was in the previous cell and caused the error

In [None]:
# Execute the complete training and fine-tuning process
history = mobilenet_model.execute_model_flow()

[debug] building model with trainable base set to False

Starting initial training (feature extraction)...
Epoch 1/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 126ms/step - accuracy: 0.6596 - loss: 0.6436 - val_accuracy: 0.7026 - val_loss: 0.6007 - learning_rate: 0.0010
Epoch 2/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 111ms/step - accuracy: 0.7141 - loss: 0.5846 - val_accuracy: 0.8000 - val_loss: 0.4713 - learning_rate: 0.0010
Epoch 3/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 105ms/step - accuracy: 0.7097 - loss: 0.5591 - val_accuracy: 0.7821 - val_loss: 0.4520 - learning_rate: 0.0010
Epoch 4/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 98ms/step - accuracy: 0.7436 - loss: 0.5269 - val_accuracy: 0.7769 - val_loss: 0.4568 - learning_rate: 0.0010
Epoch 5/10
[1m98/98[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 104ms/step - accuracy: 0.7345 - loss: 0.5286 - val_accuracy: 0.8077 - 