## Image Focus and Astigmatism Classifier
**Author:** [Aaron Woods](https://aaronwoods.info)  
**Date Created:** September 12, 2023  
**Description:** This script provides an end-to-end machine learning pipeline to classify images as either "In Focus" or "Out of Focus", and additionally identifies astigmatism-related issues.  
**Repository:** [Image Classification on VSCode](https://insiders.vscode.dev/tunnel/midnightsim/c:/Users/User/Desktop/Image-Classification)

### Overview
The script features a comprehensive pipeline that ingests data from Excel spreadsheets and feeds it into various machine learning models. The design is modular, allowing for easy adaptability to address different image classification problems, including focus quality and astigmatism detection.


## Setup

In [None]:
# ------------------------------
# TensorFlow Installation with GPU Support
# ------------------------------
# Note: TensorFlow versions above 2.10 are not supported on GPUs on native Windows installations.
# For more details, visit: https://www.tensorflow.org/install/pip#windows-wsl2_1
# Uncomment the following line to install TensorFlow if needed.
# %pip install "tensorflow<2.11"

# ------------------------------
# System and TensorFlow Info Check
# ------------------------------
# Import necessary libraries and initialize an empty dictionary to store system information.
import platform
system_info = {"Platform": platform.platform(), "Python Version": platform.python_version()}

# Try importing TensorFlow and collecting relevant system information.
try:
    import tensorflow as tf
    system_info.update({
        "TensorFlow Version": tf.__version__,
        "Num GPUs Available": len(tf.config.list_physical_devices('GPU'))
    })
    system_info['Instructions'] = (
        "You're all set to run your model on a GPU." 
        if system_info['Num GPUs Available'] 
        else (
            "No GPUs found. To use a GPU, follow these steps:\n"
            "  1. Install NVIDIA drivers for your GPU.\n"
            "  2. Install a compatible CUDA toolkit.\n"
            "  3. Install the cuDNN library.\n"
            "  4. Make sure to install the GPU version of TensorFlow."
        )
    )
except ModuleNotFoundError:
    system_info['Instructions'] = (
        "TensorFlow is not installed. "
        "Install it using pip by running: !pip install tensorflow"
    )

# Format and display the gathered system information.
formatted_info = "\n".join(f"{key}: {value}" for key, value in system_info.items())
print(formatted_info)

In [None]:
# ------------------------------
# Package Installation (Optional)
# ------------------------------
# Uncomment the following lines to install required packages if running on a new machine.
# To suppress the output, we use '> /dev/null 2>&1'.
# %pip install numpy pandas matplotlib protobuf seaborn scikit-learn tensorflow > /dev/null 2>&1

# ------------------------------
# Import Libraries
# ------------------------------

# Standard Libraries
import os, sys, random, math, glob, logging
from datetime import datetime
from collections import defaultdict

# Third-Party Libraries
import numpy as np
import cv2
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
from sklearn.utils.class_weight import compute_class_weight
from IPython.display import clear_output
from collections import Counter
from sklearn.preprocessing import LabelEncoder, MultiLabelBinarizer

# TensorFlow and Keras
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.metrics import Precision, Recall, AUC
from tensorflow.keras.callbacks import TensorBoard, Callback
from tensorflow.keras.applications import InceptionV3, ResNet50
from keras.models import load_model
from tensorflow.data import Dataset

import pickle

# Type Annotations
from typing import List, Dict, Tuple, Union, Any, Optional


## Configuration

In [None]:
# Configuration dictionary
config = {
    'Experiment': {
        'NAME': "Multi-Label_Thresholds-30-60-1-2",  # Experiment name
        'RANDOM_SEED': 42,  # Seed for reproducibility
        'PROBLEM_TYPE': 'Multi-Class',  # Problem type: Binary, Multi-Class, Multi-Label
    },
    'Model': {
        'IMG_SIZE': 224,  # Image input size
        'BATCH_SIZE': 32,  # Batch size for training
        'EPOCHS': 100,  # Number of epochs
        'LEARNING_RATE': 1e-3,  # Learning rate
        'EARLY_STOPPING_PATIENCE': 5,  # Early stopping patience parameter
        'REDUCE_LR_PATIENCE': 3,  # Learning rate reduction patience parameter
        'MIN_LR': 1e-6,  # Minimum learning rate
        'LOSS': "binary_crossentropy",  # Loss function: "categorical_crossentropy" for multi-class
        'TRAIN_SIZE': 0.8,  # Fraction of data to use for training
        'VAL_SIZE': 0.5,  # Fraction of data to use for validation
    },
    'Labels': {
        'MAPPINGS': {  # Class label mappings
            'Focus_Label': {'SharpFocus': 0, 'SlightlyBlurred': 1, 'HighlyBlurred': 2},
            'StigX_Label': {'OptimalStig_X': 0, 'ModerateStig_X': 1, 'SevereStig_X': 2},
            'StigY_Label': {'OptimalStig_Y': 0, 'ModerateStig_Y': 1, 'SevereStig_Y': 2},
        }
    },
    'Augmentation': {  # Data augmentation parameters
        'rotation_factor': 0.002,
        'height_factor': (-0.18, 0.18),
        'width_factor': (-0.18, 0.18),
        'contrast_factor': 0.5,
    }
}


In [None]:

# Set random seed for reproducibility
np.random.seed(config['Experiment']['RANDOM_SEED'])
tf.random.set_seed(config['Experiment']['RANDOM_SEED'])


### Recommendations for Loss Functions and Other Settings Per Problem Type

#### Multi-Label Problems:
- **Loss Function**: Typically, "binary_crossentropy" is used because each class label is independent and the task is to predict whether it is present or not.
- **Label Encoding**: One-hot encoding is commonly used where each label is considered as a separate class.
- **Activation Function**: The sigmoid activation function is generally used in the output layer to allow for multiple independent classes.
- **Evaluation Metrics**: Precision, Recall, and F1 Score can be effective for evaluating multi-label problems.

#### Binary Classification Problems:
- **Loss Function**: "binary_crossentropy" is the standard loss function because the task is to categorize instances into one of the two classes.
- **Label Encoding**: Labels are often encoded as 0 or 1.
- **Activation Function**: The sigmoid activation function is usually used in the output layer, producing a probability score that can be thresholded to yield a class label.
- **Evaluation Metrics**: Accuracy, Precision, Recall, and AUC-ROC are commonly used metrics.

#### Multi-Class Problems:
- **Loss Function**: "categorical_crossentropy" or "sparse_categorical_crossentropy" is commonly used. The former requires one-hot encoded labels, while the latter requires integer labels.
- **Label Encoding**: One-hot encoding is often used to convert the categorical labels into a format that can be provided to the neural network.
- **Activation Function**: The softmax activation function is used in the output layer to produce a probability distribution over the multiple classes.
- **Evaluation Metrics**: Accuracy is the most straightforward metric. However, Precision, Recall, and F1 Score can also be used for imbalanced datasets.

Remember to refer to these guidelines when setting up your configuration for different types of problems.


## Defining the Models

In [None]:
def determine_activation_and_units(num_classes: int) -> tuple:
    """Determine the activation function and units based on number of classes and problem type from config."""
    problem_type = config.get('Experiment').get('PROBLEM_TYPE')
    if problem_type == 'Multi-Label':
        return "sigmoid", num_classes # Sigmoid converts each score of the final node between 0 to 1 independent of what the other scores are
    elif problem_type == 'Binary' or num_classes == 2:
        return "sigmoid", 1 # Sigmoid converts each score of the final node between 0 to 1 independent of what the other scores are
    elif problem_type == 'Multi-Class':
        return "softmax", num_classes # Softmax converts each score of the final node between 0 to 1, but also makes sure all the scores add up to 1
    else:
        raise ValueError(f"Invalid problem_type: {problem_type}")

In [None]:
# Transfer learning models
def create_transfer_model(base_model, input_shape: tuple, num_classes: int, hidden_units: list, dropout_rate: float, regularizer_rate: float) -> keras.Model:
    """Creates a transfer learning model based on a given base model."""
    base_model.trainable = False

    model = keras.Sequential([
        base_model,
        layers.GlobalAveragePooling2D()
    ])

    for units in hidden_units:
        model.add(layers.Dense(units, kernel_regularizer=keras.regularizers.l2(regularizer_rate), bias_regularizer=keras.regularizers.l2(regularizer_rate)))
        model.add(layers.LeakyReLU())
        model.add(layers.Dropout(dropout_rate))
        
    activation, units = determine_activation_and_units(num_classes)
    model.add(layers.Dense(units, activation=activation))

    return model

def create_mobilenetv2_transfer_model(input_shape: tuple, num_classes: int) -> keras.Model:
    """Creates a MobileNetV2 based transfer learning model."""
    base_model = tf.keras.applications.MobileNetV2(input_shape=input_shape, include_top=False, weights='imagenet')
    return create_transfer_model(base_model, input_shape, num_classes, [128, 64], 0.5, 0.001)

def create_inceptionv3_transfer_model(input_shape: tuple, num_classes: int) -> keras.Model:
    """Creates an InceptionV3 based transfer learning model."""
    base_model = tf.keras.applications.InceptionV3(input_shape=input_shape, include_top=False, weights='imagenet')
    return create_transfer_model(base_model, input_shape, num_classes, [128, 64], 0.5, 0.001)

def create_resnet50_transfer_model(input_shape: tuple, num_classes: int) -> keras.Model:
    """Creates a ResNet50 based transfer learning model."""
    base_model = tf.keras.applications.ResNet50(input_shape=input_shape, include_top=False, weights='imagenet')
    return create_transfer_model(base_model, input_shape, num_classes, [256, 128], 0.5, 0.001)

In [None]:
# Define the function to create a small version of the Xception network
def create_small_xception_model(input_shape, num_classes):
    # Input layer
    inputs = keras.Input(shape=input_shape)

    # Entry block: Initial Convolution and BatchNormalization
    x = layers.Rescaling(1.0 / 255)(inputs)
    x = layers.Conv2D(128, 3, strides=2, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)
    previous_block_activation = x  # Set aside residual for later use

    # Middle flow: Stacking Separable Convolution blocks
    for size in [256, 512, 728]:
        # ReLU activation
        x = layers.Activation("relu")(x)
        # Separable Convolution
        x = layers.SeparableConv2D(size, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        # ReLU activation
        x = layers.Activation("relu")(x)
        # Separable Convolution
        x = layers.SeparableConv2D(size, 3, padding="same")(x)
        x = layers.BatchNormalization()(x)

        # Max Pooling
        x = layers.MaxPooling2D(3, strides=2, padding="same")(x)

        # Project residual from previous block and add it to the current block
        residual = layers.Conv2D(size, 1, strides=2, padding="same")(previous_block_activation)
        x = layers.add([x, residual])  # Add back residual
        previous_block_activation = x  # Set aside next residual

    # Exit flow: Final Separable Convolution, BatchNormalization, and Global Average Pooling
    x = layers.SeparableConv2D(1024, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)
    x = layers.GlobalAveragePooling2D()(x)

    activation, units = determine_activation_and_units(num_classes)

    # Dropout and Dense output layer
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(units, activation=activation)(x)

    return keras.Model(inputs, outputs)

In [None]:
# Define the function to create a basic CNN model
def create_basic_cnn_model(input_shape, num_classes):
    conv2d_filter_size = (3, 3)
    conv2d_activation = 'relu'
    dense_activation = 'relu'
    num_conv_blocks = 3

    model = tf.keras.models.Sequential()

    # Explicitly define the input shape
    model.add(tf.keras.layers.Input(shape=input_shape))

    for _ in range(num_conv_blocks):
        model.add(tf.keras.layers.Conv2D(32 * (2**_), conv2d_filter_size, activation=conv2d_activation, padding='same'))
        model.add(tf.keras.layers.BatchNormalization())
        model.add(tf.keras.layers.MaxPooling2D((2, 2)))

    model.add(tf.keras.layers.GlobalAveragePooling2D())
    model.add(tf.keras.layers.Dense(128, activation=dense_activation))

    activation, units = determine_activation_and_units(num_classes)
    model.add(layers.Dense(units, activation=activation))

    return model


In [None]:
# Model Selection function to select which model to use
def select_model(model_name: str, input_shape: tuple, num_classes: int) -> keras.Model:
    """Selects a model to use based on the given model name."""
    model_map = {
        "mobilenetv2": create_mobilenetv2_transfer_model,
        "inceptionv3": create_inceptionv3_transfer_model,
        "resnet50": create_resnet50_transfer_model,
        "small_xception": create_small_xception_model,
        "basic_cnn": create_basic_cnn_model
    }
    if model_name not in model_map:
        raise ValueError("Invalid model name")

    return model_map[model_name](input_shape, num_classes)

## Load and Preprocess the data

### Functions for Preparation of CSV

In [None]:
# Read the data
def read_csv(config: Dict):
    # Functionality to read the data
    data_file_path = os.path.join(config['Paths']['NEW_BASE_PATH'], config['Paths']['DATA_FILE'])
    if not os.path.exists(data_file_path):
        raise FileNotFoundError(f"Error: File does not exist - {data_file_path}")
    try:
        data = pd.read_csv(data_file_path, usecols=config['CSV']['COLUMNS_TO_READ'])
        print("---> Data read successfully.")
        sample_frac = config.get('SAMPLE_FRAC', 1.0)
        if 0 < sample_frac < 1.0:
            data = data.sample(frac=sample_frac).reset_index(drop=True)
            print(f"---> Data sampled: Using {sample_frac * 100}% of the available data.")
    except Exception as e:
        raise ValueError(f"Error: Could not read data - {e}") from e
    return data

def clean_csv(df: pd.DataFrame) -> pd.DataFrame:
    invalid_rows = []
    
    for index, row in df.iterrows():
        image_path = row['ImageFile']
        
        # Check if image_path is not string
        if not isinstance(image_path, str):
            print(f"Removing row: {row} (Reason: Invalid ImageFile value - not a string)")
            invalid_rows.append(index)
            continue
        
        # Check if the image path exists
        if not os.path.exists(image_path):
            print(f"Removing row: {row} (Reason: File does not exist)")
            invalid_rows.append(index)
            continue
        
        # Check if image can be read
        img = cv2.imread(image_path)
        if img is None:
            print(f"Removing row: {row} (Reason: Image can't be read)")
            invalid_rows.append(index)
    
    # Drop invalid rows
    df.drop(index=invalid_rows, inplace=True)
    df.reset_index(drop=True, inplace=True)
    
    return df


def update_image_paths(df):
    old_base_path = config['Paths']['OLD_BASE_PATH']
    new_base_path = config['Paths']['NEW_BASE_PATH']
    df['ImageFile'] = df['ImageFile'].str.replace(old_base_path, new_base_path, regex=False)
    print("---> Image paths updated.")
    return df

def generate_labels(df: pd.DataFrame):
    """Generate labels based on the configuration."""
    print("---> Generating labels for Focus, StigX, and StigY...")
    # Extract configurations
    labels_config = config.get('Labels', {}).get('MAPPINGS', {})
    thresholds_config = config.get('Thresholds', {})
    # Offset columns mapping
    offset_column_mapping = {
        'Focus_Label': 'Focus_Offset (V)',
        'StigX_Label': 'Stig_Offset_X (V)',
        'StigY_Label': 'Stig_Offset_Y (V)'
    }
    df_copy = df.copy()
    
    label_encoders = {}  # To store label encoders
    mlb_classes = None  # To store classes of MultiLabelBinarizer
    
    for label_key, choices_dict in labels_config.items():
        offset_column = offset_column_mapping.get(label_key)
        if not offset_column:
            print(f"Warning: No offset column mapping found for '{label_key}'. Skipping label generation.")
            continue
        if offset_column not in df.columns:
            print(f"Warning: Column '{offset_column}' not found in DataFrame. Skipping label generation for '{label_key}'.")
            continue
        
        low_threshold = thresholds_config.get(f"{label_key.split('_')[0].upper()}_LOW", 0)
        high_threshold = thresholds_config.get(f"{label_key.split('_')[0].upper()}_HIGH", 0)
        conditions = [
            (df_copy[offset_column].abs() <= low_threshold),
            (df_copy[offset_column].abs() > low_threshold) & (df_copy[offset_column].abs() <= high_threshold),
            (df_copy[offset_column].abs() > high_threshold)
        ]
        choices = list(choices_dict.keys())
        df_copy[label_key] = np.select(conditions, choices, default='Unknown')
        le = LabelEncoder()
        df_copy[label_key] = le.fit_transform(df_copy[label_key])
        label_encoders[label_key] = le
        print("---> Labels generated for", label_key)
        
    # For multi-label problems
    if config.get('Experiment', {}).get('PROBLEM_TYPE') == 'Multi-Label':
        label_keys = list(labels_config.keys())
        df_copy['Multi_Labels'] = df_copy.apply(lambda row: [row[key] for key in label_keys], axis=1)
        print("---> Multi-labels generated.")
        mlb = MultiLabelBinarizer()
        df_copy['Multi_Labels_Binarized'] = list(mlb.fit_transform(df_copy['Multi_Labels']))
        mlb_classes = mlb.classes_  # Store the classes attribute for later use
        
    return df_copy, label_encoders, mlb_classes

def shuffle_and_reset_index(data):
    print("---> Shuffling and resetting index...")
    shuffled_df = data.sample(frac=1, random_state=config['Experiment']['RANDOM_SEED']).reset_index(drop=True)
    print("---> Data shuffled and index reset.")
    return shuffled_df

def prepare_datasets(df: pd.DataFrame):
    """Prepare training, validation, and test datasets."""
    # Check if DataFrame is empty
    if df is None or df.empty:
        print("Warning: DataFrame is empty. Cannot proceed with data preparation.")
        return {'train': None, 'valid': None, 'test': None}
    # Split Data
    try:
        train_df, temp_df = train_test_split(df, test_size=1 - config['Model']['TRAIN_SIZE'], random_state=config['Experiment']['RANDOM_SEED'])
        val_df, test_df = train_test_split(temp_df, test_size=1 - config['Model']['VAL_SIZE'], random_state=config['Experiment']['RANDOM_SEED'])
    except ValueError:
        print("Not enough data to split into training, validation, and test sets.")
        return {'train': None, 'valid': None, 'test': None}
    print("---> Data split into training, validation, and test sets.")
    return {'train': train_df, 'valid': val_df, 'test': test_df}


In [None]:
# Compute class weights

def compute_and_store_class_weights(datasets: Dict[str, pd.DataFrame], 
                                    label_encoders: Dict[str, LabelEncoder], 
                                    mlb_classes: np.ndarray = None) -> pd.DataFrame:
    problem_type = config.get('Experiment', {}).get('PROBLEM_TYPE', 'Binary')
    
    all_records = []  # To store records before converting them to a DataFrame
    
    if problem_type == 'Multi-Label':
        mlb = MultiLabelBinarizer(classes=mlb_classes)  # Initialize with known classes if available
        for split, df in datasets.items():
            if df is None:
                continue
            
            label_column = np.array(df['Multi_Labels'].tolist())
            binarized_labels = mlb.transform(label_column)  # Use transform instead of fit_transform to ensure consistent classes
            
            for label_idx, label_name in enumerate(mlb.classes_):
                label_data = binarized_labels[:, label_idx]
                unique_labels = np.unique(label_data)
                
                class_weights = compute_class_weight('balanced', classes=unique_labels, y=label_data)
                class_weights_dict = dict(zip(unique_labels, class_weights))
                
                for cls, weight in class_weights_dict.items():
                    cnt = Counter(label_data)[cls]
                    all_records.append({'split': split, 'label': label_name, 'class': cls, 'Count': cnt, 'Weight': weight})
    
    else:  # Multi-Class or Binary
        for split, df in datasets.items():
            if df is None:
                continue
            for label in config['Labels']['MAPPINGS']:
                unique_labels = df[label].unique()
                class_weights = compute_class_weight('balanced', classes=unique_labels, y=df[label])
                
                class_weights_dict = dict(zip(unique_labels, class_weights))
                
                for cls, weight in class_weights_dict.items():
                    cnt = Counter(df[label])[cls]
                    
                    # Reverse map to original class using label_encoders
                    original_class = label_encoders[label].inverse_transform([cls])[0]
                    
                    all_records.append({'split': split, 'label': label, 'class': original_class, 'Count': cnt, 'Weight': weight})
                    
    df_class_weights = pd.DataFrame.from_records(all_records)
    df_class_weights.set_index(['split', 'label', 'class'], inplace=True)
    
    return df_class_weights


In [None]:
# Function to create tf datasets

def create_tf_datasets(datasets: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
    # batch_size = config.get('BATCH_SIZE', 32)
    tf_datasets = {}  # Initialize a dictionary to hold the output datasets
    if config['Experiment'].get('PROBLEM_TYPE') == 'Multi-Label':
        print("[INFO] Problem type detected as Multi-Label.")
        for split, df in datasets.items():
            if df is not None:
                # Check and remove rows where 'ImageFile' is nan
                if df['ImageFile'].isna().any():
                    print("[WARNING] Removing rows with nan in 'ImageFile' column for split", split)
                    df.dropna(subset=['ImageFile'], inplace=True)
                ds = tf.data.Dataset.from_tensor_slices((df['ImageFile'].values, df['Multi_Labels_Binarized'].tolist()))
                ds = preprocess_single_dataset(ds, is_training=(split == 'train'))
                # ds = ds.batch(batch_size)
                tf_datasets[split] = ds
    else:
        print("[INFO] Problem type detected as Multi-Class/Binary.")
        for label in ['Focus_Label', 'StigX_Label', 'StigY_Label']:
            label_datasets = {}
            for split, df in datasets.items():
                if df is not None:
                    # Check and remove rows where 'ImageFile' is nan
                    if df['ImageFile'].isna().any():
                        print("[WARNING] Removing rows with nan in 'ImageFile' column for label", label, "and split", split)
                        df.dropna(subset=['ImageFile'], inplace=True)
                    ds = tf.data.Dataset.from_tensor_slices((df['ImageFile'].values, df[label].values))
                    ds = preprocess_single_dataset(ds, is_training=(split == 'train'))
                    # ds = ds.batch(batch_size)
                    label_datasets[split] = ds
            tf_datasets[label] = label_datasets
    return apply_preprocessing(tf_datasets)

def determine_label_shape() -> int:
    problem_type = config['Experiment'].get('PROBLEM_TYPE', None)
    mappings = config['Labels'].get('MAPPINGS', None)
    if problem_type == 'Multi-Label':
        return sum(len(v) for v in mappings.values())
    elif problem_type in ['Multi-Class', 'Binary']:
        return len(mappings.get(next(iter(mappings))))
    else:
        raise ValueError(f"Invalid PROBLEM_TYPE: {problem_type}")

def preprocess_wrapper(file_path, label, augment: bool) -> Tuple[tf.Tensor, tf.Tensor]:
    image, label = tf.py_function(
        func=lambda file_path, label, augment: preprocess_image(file_path, label, augment),
        inp=[file_path, label, augment], 
        Tout=[tf.float32, tf.int32]
    )
    # Set shapes
    image.set_shape([config['Model']['IMG_SIZE'], config['Model']['IMG_SIZE'], 3])
    label_shape = determine_label_shape()
    label.set_shape([label_shape])
    return image, label

def preprocess_single_dataset(ds, is_training: bool = False):
    ds = ds.map(lambda file_path, label: preprocess_wrapper(file_path, label, is_training))
    return ds

def apply_preprocessing(datasets: Dict) -> Dict:
    if config['Experiment'].get('PROBLEM_TYPE') in ['Multi-Class', 'Binary']:
        for label, splits in datasets.items():
            for split in ['train', 'valid', 'test']:
                if split in splits:
                    is_training = split == 'train'
                    splits[split] = preprocess_single_dataset(splits[split], is_training)

                    # Configure for performance
                    AUTOTUNE = tf.data.AUTOTUNE
                    splits[split] = splits[split].cache().prefetch(buffer_size=AUTOTUNE)
    else:
        for split in ['train', 'valid', 'test']:
            if split in datasets:
                is_training = split == 'train'
                datasets[split] = preprocess_single_dataset(datasets[split], is_training)

                # Configure for performance
                AUTOTUNE = tf.data.AUTOTUNE
                datasets[split] = datasets[split].cache().prefetch(buffer_size=AUTOTUNE)
    print("---> TF Datasets created.")
    return datasets


In [None]:

# Image Preprocessing functions

def create_preprocessing_layers(img_width: int, img_height: int, rescale_factor: float) -> keras.Sequential:
    """Create preprocessing layers for resizing and rescaling images."""
    return keras.Sequential([
        layers.Resizing(img_width, img_height),
        layers.Rescaling(rescale_factor)
    ])


def create_augmentation_layers(augmentation_config: dict) -> keras.Sequential:
    """Create data augmentation layers."""
    try:
        augmentation_layers = tf.keras.Sequential([
            layers.RandomFlip("horizontal"),
            layers.RandomFlip("vertical"),
            layers.RandomRotation(augmentation_config['rotation_factor']),
            layers.RandomTranslation(
                height_factor=augmentation_config['height_factor'],
                width_factor=augmentation_config['width_factor'],
                fill_mode="reflect"
            ),
            layers.RandomContrast(augmentation_config['contrast_factor']),
        ])
    except Exception as e:
        print(f"An error occurred while creating augmentation layers: {e}")
    return augmentation_layers


def read_and_convert_image(file_path: str) -> tf.Tensor:
    """Read an image from a file and convert it to a 3-channel tensor."""
    image = cv2.imread(file_path, cv2.IMREAD_GRAYSCALE)
    if image is None:
        print("Failed to read the image.")
        return None
    image = tf.convert_to_tensor(image, dtype=tf.float32)
    image = tf.expand_dims(image, axis=-1)
    return tf.image.grayscale_to_rgb(image)


def preprocess_image(file_path_or_tensor, label, augment) -> Tuple[tf.Tensor, tf.Tensor]:
    debug_info = {}
    try:
        debug_info['Initial type'] = str(type(file_path_or_tensor))
        debug_info['Initial content'] = str(file_path_or_tensor)
        if isinstance(file_path_or_tensor, tf.Tensor):
            if len(file_path_or_tensor.shape) == 4:  # Assuming shape is (batch, height, width, channels)
                image = file_path_or_tensor
            else:
                file_path_or_tensor = file_path_or_tensor.numpy()
                if isinstance(file_path_or_tensor, (bytes, np.bytes_)):
                    file_path_or_tensor = file_path_or_tensor.decode('utf-8')
                elif isinstance(file_path_or_tensor, np.ndarray):
                    debug_info['Unexpected NumPy array'] = f"Shape: {file_path_or_tensor.shape}, Content: {file_path_or_tensor}"
                    raise ValueError("Unexpected NumPy array for file_path")
                image = read_and_convert_image(file_path_or_tensor)
        else:
            image = read_and_convert_image(file_path_or_tensor)
        debug_info['Final type'] = str(type(file_path_or_tensor))
        debug_info['Final content'] = str(file_path_or_tensor)
        preprocess_seq = create_preprocessing_layers(
            img_width=config['Model']['IMG_SIZE'],
            img_height=config['Model']['IMG_SIZE'],
            rescale_factor=1./255
        )
        augment_seq = create_augmentation_layers(config['Augmentation'])
        image = preprocess_seq(image)
        if augment:
            image = augment_seq(image)
            image = tf.clip_by_value(image, 0, 1)
        
        return image, label
    except Exception as e:
        debug_info['Error'] = str(e)
        print("An error occurred during preprocessing:")
        for key, value in debug_info.items():
            print(f"{key}: {value}")
        raise



In [None]:
# Main function to integrate all steps


# Configure for dataset creation
csv_config = {
    'CSV': {
        'COLUMNS_TO_READ': ['ImageFile', 'Focus_Offset (V)', 'Stig_Offset_X (V)', 'Stig_Offset_Y (V)']
    },
    'Thresholds': {
        'FOCUS_LOW': 30,  # Lower focus threshold
        'FOCUS_HIGH': 60,  # Upper focus threshold
        'STIGX_LOW': 1,  # Lower astigmatism threshold
        'STIGX_HIGH': 2,  # Upper astigmatism threshold
        'STIGY_LOW': 1,  # Lower astigmatism threshold
        'STIGY_HIGH': 2,  # Upper astigmatism threshold
    },
    'Paths': {  # Data and model paths
        'DATA_FILE': "combined_output.csv",
        'OLD_BASE_PATH': "D:\\DOE\\",
        # 'NEW_BASE_PATH': "Y:\\User\\Aaron-HX38\\DOE\\",
        # 'NEW_BASE_PATH': "C:\\Users\\aaron.woods\\OneDrive - Thermo Fisher Scientific\\Documents\\GitHub\\Image-Classification\\",
        'NEW_BASE_PATH': "C:\\Users\\aaron.woods\\OneDrive - Thermo Fisher Scientific\\Desktop\\Dec 24\\",
    },
    'SAMPLE_FRAC': 1.0,  # Fraction of the data to use for quicker prototyping. 1.0 means use all data.
}
config.update(csv_config)
config['Experiment']['PROBLEM_TYPE'] = 'Multi-Class'
# config['Experiment']['PROBLEM_TYPE'] = 'Multi-Label'



# Main function to integrate all steps
def main_pipeline(config: Dict):
    print("===== Preprocessing CSV Data =====")
    # data = read_csv(config, clean=True)
    data = read_csv(config)
    data = update_image_paths(data)
    data = clean_csv(data)
    data, label_encoders, mlb_classess  = generate_labels(data)
    data = shuffle_and_reset_index(data)
    print("===== Preparing TensorFlow Datasets =====")
    datasets = prepare_datasets(data)
    info = compute_and_store_class_weights(datasets, label_encoders)
    datasets = create_tf_datasets(datasets)
    print("===== Preprocessing Complete =====")
    return datasets, info

datasets, info = main_pipeline(config)


In [None]:
# Investigate the class weights or datasets

import pprint 
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(datasets)
# pp.pprint(info)

## Visualize the data

In [None]:
### Class Distribution
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

def plot_single_split(ax, df, split, problem_type):
    filtered_df = df.loc[split]
    
    if problem_type == 'Multi-Class':
        labels = filtered_df.index.get_level_values('label').unique()
        all_classes = []
        for label in labels:
            sub_df = filtered_df.loc[label]
            bars = ax.bar(sub_df.index, sub_df['Count'], label=f"{label}")

            for bar, (_, row) in zip(bars, sub_df.iterrows()):
                x = bar.get_x() + bar.get_width() / 2.0
                y = bar.get_height()
                ax.annotate(f"C: {int(row['Count'])}\nW: {row['Weight']:.2f}",
                            (x, y), 
                            ha='center', 
                            va='bottom', 
                            fontsize=8)
                
            all_classes.extend(sub_df.index)
        
        ax.legend()
        ax.set_xticklabels(all_classes, rotation=90, fontsize=8)
        ax.set_title(f"{split.capitalize()} Data")
        
    elif problem_type == 'Multi-Label':
        x_ticks = [str(cls) for cls in filtered_df.index]
        bars = ax.bar(x_ticks, filtered_df['Count'])
        
        for bar, (_, row) in zip(bars, filtered_df.iterrows()):
            x = bar.get_x() + bar.get_width() / 2.0
            y = bar.get_height()
            ax.annotate(f"C: {int(row['Count'])}\nW: {row['Weight']:.2f}", 
                        (x, y), 
                        ha='center', 
                        va='bottom', 
                        fontsize=8)
        
        ax.set_xticklabels(x_ticks, rotation=90, fontsize=8)
        ax.set_title(f"{split.capitalize()} Data")


def plot_dataset_info(df):
    global config
    
    problem_type = config['Experiment']['PROBLEM_TYPE']
    splits = ['train', 'valid', 'test']
    
    fig, axs = plt.subplots(1, len(splits), figsize=(20, 8))
    
    for i, split in enumerate(splits):
        plot_single_split(axs[i], df, split, problem_type)
        
    plt.tight_layout()
    plt.show()


# Assume 'info' is your DataFrame with 'split', 'label', 'class', 'Count', and 'Weight'
# Call the function using your DataFrame
plot_dataset_info(info)


In [None]:
# See Batches of Images
import matplotlib.pyplot as plt
import tensorflow as tf

def show_batch(dataset, num_images=9):
    """Display a batch of images and their corresponding labels."""
    
    # Extract a batch of `num_images` samples from the training dataset
    for images, labels in dataset.take(1):  # Only take a single batch
        images = images.numpy()
        labels = labels.numpy()
        
        # Create subplots
        fig, axes = plt.subplots(1, num_images, figsize=(15, 15),
                                 subplot_kw={'xticks':[], 'yticks':[]},
                                 gridspec_kw=dict(hspace=0.1, wspace=0.1))
        
        for i, ax in enumerate(axes.flat):
            ax.imshow(images[i].astype("uint8"))
            ax.set_title(f"Focus: {labels[i][0]}, StigX: {labels[i][1]}, StigY: {labels[i][2]}")

# Assuming 'train' is your training dataset for one of the labels
train_dataset = datasets['Focus_Label']['train']

# Show a batch of 9 training images
show_batch(train_dataset, num_images=9)
