In [2]:
# type: ignore
import os
from typing import List
import time
import datetime
import json
import tiktoken
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import psutil
from tensorflow import keras
from tensorflow.keras import Model, callbacks, backend, layers, Sequential
from sklearn.metrics import confusion_matrix, classification_report, ConfusionMatrixDisplay

In [3]:
tokenizer = tiktoken.get_encoding("cl100k_base")

In [4]:
IMAGE_SIZE = 224
BATCH_SIZE = 32
NUM_EPOCHS = 50
LEARNING_RATE = 0.001
TEXT_MAX_LENGTH = 512
NUM_CLASSES = 4
CLASS_NAMES = ['glioma_tumor', 'meningioma_tumor', 'pituitary_tumor', 'no_tumor']

In [None]:
DATA_DIR = '../Data'
MODELS_DIR = '../Models'
LOG_DIR = '../Logs'
BENCHMARK_DIR = '../Metrics/Benchmarks'

In [6]:
strategy = tf.distribute.get_strategy()
tf.random.set_seed(42)
np.random.seed(42)

In [7]:
class ModelBenchmark:
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.metrics = {
            'training_time': 0,
            'inference_time': 0,
            'memory_usage': 0,
            'model_size': 0,
            'accuracy': 0,
            'precision': 0,
            'recall': 0,
            'f1_score': 0,
            'auc': 0,
            'confusion_matrix': None,
            'class_wise_metrics': {},
            'training_history': None
        }
        self.start_time = None
        self.end_time = None

    def start_training_timer(self):
        self.start_time = time.time()

    def end_training_timer(self):
        if self.start_time is not None:
            self.end_time = time.time()
            self.metrics['training_time'] = self.end_time - self.start_time
        else:
            print("Warning: Training timer was not started")
            self.metrics['training_time'] = 0

    def measure_inference_time(self, model: Model, dataset: tf.data.Dataset):
        try:
            inference_times = []
            for x, _ in dataset.take(100):
                start = time.time()
                model.predict(x, verbose=0)
                inference_times.append(time.time() - start)
            self.metrics['inference_time'] = np.mean(inference_times)
        except Exception as e:
            print(f"Error measuring inference time: {e}")
            self.metrics['inference_time'] = 0

    def measure_memory_usage(self):
        try:
            process = psutil.Process()
            self.metrics['memory_usage'] = process.memory_info().rss / 1024 / 1024
        except ImportError:
            print("Warning: psutil not available for memory measurement")
            self.metrics['memory_usage'] = 0
        except Exception as e:
            print(f"Error measuring memory usage: {e}")
            self.metrics['memory_usage'] = 0

    def calculate_model_size(self, model_path: str):
        try:
            if os.path.exists(model_path):
                size_bytes = os.path.getsize(model_path)
                self.metrics['model_size'] = size_bytes / (1024 * 1024)
            else:
                print(f"Warning: Model file not found at {model_path}")
                self.metrics['model_size'] = 0
        except Exception as e:
            print(f"Error calculating model size: {e}")
            self.metrics['model_size'] = 0

    def update_metrics(self, history: callbacks.History,
                      test_results: List[float],
                      cm: np.ndarray,
                      class_report: str):
        try:
            self.metrics['training_history'] = history.history
            if len(test_results) >= 5:
                self.metrics['accuracy'] = test_results[1]
                self.metrics['precision'] = test_results[2]
                self.metrics['recall'] = test_results[3]
                self.metrics['auc'] = test_results[4]
            else:
                print("Warning: Insufficient test results")

            self.metrics['confusion_matrix'] = cm.tolist() if cm is not None else None

            if class_report:
                report_lines = class_report.split('\n')
                for line in report_lines[2:-3]:
                    if line.strip():
                        parts = line.split()
                        if len(parts) >= 5:
                            class_name = parts[0]
                            self.metrics['class_wise_metrics'][class_name] = {
                                'precision': float(parts[1]),
                                'recall': float(parts[2]),
                                'f1_score': float(parts[3]),
                                'support': int(parts[4])
                            }
        except Exception as e:
            print(f"Error updating metrics: {e}")

    def save_metrics(self, output_dir: str):
        try:
            os.makedirs(output_dir, exist_ok=True)
            output_path = os.path.join(output_dir, f'{self.model_name}_metrics.json')
            with open(output_path, 'w') as f:
                json.dump(self.metrics, f, indent=4)
        except Exception as e:
            print(f"Error saving metrics: {e}")

    def plot_comparison(self, other_benchmark: 'ModelBenchmark', output_dir: str):
        try:
            os.makedirs(output_dir, exist_ok=True)

            if not self.metrics['training_history'] or not other_benchmark.metrics['training_history']:
                print("Warning: Missing training history for comparison")
                return

            plt.figure(figsize=(15, 10))
            metrics = ['accuracy', 'precision', 'recall', 'auc']
            for i, metric in enumerate(metrics, 1):
                plt.subplot(2, 2, i)
                if metric in self.metrics['training_history']:
                    plt.plot(self.metrics['training_history'][metric],
                            label=f'{self.model_name} (Training)')
                    plt.plot(self.metrics['training_history'][f'val_{metric}'],
                            label=f'{self.model_name} (Validation)')
                if metric in other_benchmark.metrics['training_history']:
                    plt.plot(other_benchmark.metrics['training_history'][metric],
                            label=f'{other_benchmark.model_name} (Training)')
                    plt.plot(other_benchmark.metrics['training_history'][f'val_{metric}'],
                            label=f'{other_benchmark.model_name} (Validation)')
                plt.title(f'{metric.capitalize()} Comparison')
                plt.xlabel('Epoch')
                plt.ylabel(metric.capitalize())
                plt.legend()
                plt.grid(True)
            plt.tight_layout()
            plt.savefig(os.path.join(output_dir, 'training_metrics_comparison.png'))
            plt.close()
        except Exception as e:
            print(f"Error plotting comparison: {e}")

In [8]:
def process_text(text_path):
    try:
        if not os.path.exists(text_path):
            print(f"Warning: Text file not found at {text_path}")
            return np.zeros(TEXT_MAX_LENGTH, dtype=np.int32)

        with open(text_path, 'r', encoding='utf-8') as f:
            text = f.read().strip()
            if not text:
                print(f"Warning: Empty text file at {text_path}")
                return np.zeros(TEXT_MAX_LENGTH, dtype=np.int32)

        encoded = tokenizer.encode(text)[:TEXT_MAX_LENGTH]
        return np.pad(
            encoded,
            (0, max(0, TEXT_MAX_LENGTH - len(encoded))),
            'constant'
        ).astype(np.int32)
    except Exception as e:
        print(f"Error processing text file {text_path}: {e}")
        return np.zeros(TEXT_MAX_LENGTH, dtype=np.int32)

preprocessing_model = keras.Sequential([
    keras.layers.Normalization(
        mean=[0.485, 0.456, 0.406],
        variance=[0.229**2, 0.224**2, 0.225**2]
    )
], name="preprocessing")

In [9]:
def clear_memory():
    try:
        import gc
        gc.collect()
        backend.clear_session()
        print("Memory cleared successfully")
    except Exception as e:
        print(f"Error clearing memory: {e}")

def create_datasets(data_dir, validation_split=0.2, use_text=True):
    try:
        if not os.path.exists(data_dir):
            raise ValueError(f"Data directory not found: {data_dir}")

        clear_memory()

        common_args = {
            'labels': 'inferred',
            'label_mode': 'categorical',
            'class_names': CLASS_NAMES,
            'batch_size': BATCH_SIZE,
            'image_size': (IMAGE_SIZE, IMAGE_SIZE),
            'seed': 42
        }

        # Create datasets with file paths
        train_ds = tf.keras.utils.image_dataset_from_directory(
            os.path.join(data_dir, 'train'),
            shuffle=True,
            **common_args
        )

        test_ds = tf.keras.utils.image_dataset_from_directory(
            os.path.join(data_dir, 'test'),
            shuffle=False,
            **common_args
        )

        def preprocess_images(images, labels):
            images = tf.cast(images, tf.float32) / 255.0
            mean = tf.constant([0.485, 0.456, 0.406])
            std = tf.constant([0.229, 0.224, 0.225])
            images = (images - mean) / std
            return images, labels

        train_ds = train_ds.map(preprocess_images, num_parallel_calls=tf.data.AUTOTUNE)
        test_ds = test_ds.map(preprocess_images, num_parallel_calls=tf.data.AUTOTUNE)

        val_ds = None
        if validation_split > 0:
            val_size = int(validation_split * len(train_ds))
            val_ds = train_ds.take(val_size)
            train_ds = train_ds.skip(val_size)

        if use_text:
            def process_text_features(image_paths):
                text_paths = [p.replace('.jpg', '.txt') for p in image_paths]
                text_features = np.zeros((len(image_paths), TEXT_MAX_LENGTH), dtype=np.int32)
                for i, text_path in enumerate(text_paths):
                    if os.path.exists(text_path):
                        text_features[i] = process_text(text_path)
                return text_features

            def add_text_features_to_batch(images, labels):
                return (images, tf.zeros([tf.shape(images)[0], TEXT_MAX_LENGTH], dtype=tf.int32)), labels

            train_ds = train_ds.map(add_text_features_to_batch, num_parallel_calls=tf.data.AUTOTUNE)
            if val_ds:
                val_ds = val_ds.map(add_text_features_to_batch, num_parallel_calls=tf.data.AUTOTUNE)
            test_ds = test_ds.map(add_text_features_to_batch, num_parallel_calls=tf.data.AUTOTUNE)

        train_ds = train_ds.cache().prefetch(tf.data.AUTOTUNE)
        test_ds = test_ds.cache().prefetch(tf.data.AUTOTUNE)
        if val_ds:
            val_ds = val_ds.cache().prefetch(tf.data.AUTOTUNE)

        return {'train': train_ds, 'validation': val_ds, 'test': test_ds}
    except Exception as e:
        print(f"Error creating datasets: {e}")
        raise

In [10]:
class CrossAttentionLayer(layers.Layer):
    def __init__(self, num_heads, key_dim):
        super(CrossAttentionLayer, self).__init__()
        self.mha = layers.MultiHeadAttention(num_heads=num_heads, key_dim=key_dim)
        self.layer_norm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layer_norm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout = layers.Dropout(0.1)

    def call(self, x, y, training=False):
        attn_output = self.mha(x, y, y, training=training)
        x = self.layer_norm1(x + self.dropout(attn_output, training=training))

        attn_output = self.mha(y, x, x, training=training)
        y = self.layer_norm2(y + self.dropout(attn_output, training=training))

        return x, y

class BrainTumorModel(Model):
    def __init__(self, num_classes, use_text=True):
        super(BrainTumorModel, self).__init__()
        self.use_text = use_text

        self.image_conv = Sequential([
            layers.Conv2D(32, 3, activation='relu'),
            layers.MaxPooling2D(),
            layers.Conv2D(64, 3, activation='relu'),
            layers.MaxPooling2D(),
            layers.Conv2D(64, 3, activation='relu'),
            layers.MaxPooling2D(),
            layers.Conv2D(64, 3, activation='relu'),
            layers.MaxPooling2D(),
            layers.Flatten(),
            layers.Dense(512, activation='relu'),
            layers.Dropout(0.5)
        ])

        self.text_embedding = layers.Embedding(10000, 128)
        self.text_lstm = layers.LSTM(256, return_sequences=True)
        self.text_conv = layers.Conv1D(256, 3, activation='relu')
        self.text_pool = layers.GlobalMaxPooling1D()

        self.cross_attention = CrossAttentionLayer(num_heads=8, key_dim=64)

        self.fusion = layers.Dense(512, activation='relu')
        self.dropout = layers.Dropout(0.5)
        self.classifier = layers.Dense(num_classes, activation='softmax')

    def call(self, inputs, training=False):
        images, texts = inputs

        image_features = self.image_conv(images, training=training)

        if self.use_text:
            text_embeddings = self.text_embedding(texts)
            text_features = self.text_lstm(text_embeddings, training=training)
            text_features = self.text_conv(text_features, training=training)
            text_features = self.text_pool(text_features)

            image_features = tf.expand_dims(image_features, axis=1)
            text_features = tf.expand_dims(text_features, axis=1)

            image_features, text_features = self.cross_attention(
                image_features, text_features, training=training
            )

            combined = tf.concat([image_features, text_features], axis=-1)
            combined = tf.squeeze(combined, axis=1)
        else:
            combined = image_features

        x = self.fusion(combined)
        x = self.dropout(x, training=training)
        return self.classifier(x)

In [None]:
def create_model(use_text=True, num_classes=NUM_CLASSES):
    try:
        with strategy.scope():
            activation = 'relu'
            norm_layer = keras.layers.BatchNormalization

            try:
                base_model = keras.applications.MobileNetV3Small(
                    include_top=False,
                    weights='imagenet',
                    input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3),
                    minimalistic=True
                )
                print("Using MobileNetV3Small as base model")
            except Exception as e:
                print(f"Error loading MobileNetV3Small: {e}")
                print("Falling back to MobileNetV2")
                base_model = keras.applications.MobileNetV2(
                    include_top=False,
                    weights='imagenet',
                    input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3),
                    alpha=0.75  # Use a smaller version
                )

            for layer in base_model.layers[:-4]:
                layer.trainable = False

            image_input = keras.Input(shape=(IMAGE_SIZE, IMAGE_SIZE, 3), name='image_input')
            x = base_model(image_input, training=False)
            x = keras.layers.GlobalAveragePooling2D()(x)
            x = keras.layers.Dropout(0.4)(x)

            image_features = keras.layers.Dense(256, activation=activation,
                                             kernel_regularizer=keras.regularizers.l2(1e-4))(x)

            if use_text:
                text_input = keras.Input(shape=(TEXT_MAX_LENGTH,), name='text_input')
                
                z = keras.layers.Embedding(tokenizer.n_vocab, 128,
                                        embeddings_regularizer=keras.regularizers.l2(1e-4))(text_input)
                
                z = keras.layers.SeparableConv1D(256, 5, padding='same', activation=activation)(z)
                z = norm_layer()(z)
                z = keras.layers.MaxPooling1D(2)(z)
                z = keras.layers.Dropout(0.3)(z)

                z = keras.layers.SeparableConv1D(128, 3, padding='same', activation=activation)(z)
                z = norm_layer()(z)
                z = keras.layers.GlobalAveragePooling1D()(z)
                text_features = keras.layers.Dropout(0.3)(z)

                combined = keras.layers.Concatenate()([image_features, text_features])
                y = keras.layers.Dense(256, activation=activation,
                                    kernel_regularizer=keras.regularizers.l2(1e-4))(combined)
                y = norm_layer()(y)
                y = keras.layers.Dropout(0.4)(y)

                outputs = keras.layers.Dense(num_classes, activation='softmax',
                                          kernel_regularizer=keras.regularizers.l2(1e-4))(y)
                model = keras.Model(inputs=[image_input, text_input], outputs=outputs)
            else:
                y = keras.layers.Dense(128, activation=activation,
                                     kernel_regularizer=keras.regularizers.l2(1e-4))(image_features)
                y = norm_layer()(y)
                y = keras.layers.Dropout(0.4)(y)

                outputs = keras.layers.Dense(num_classes, activation='softmax',
                                          kernel_regularizer=keras.regularizers.l2(1e-4))(y)
                model = keras.Model(inputs=image_input, outputs=outputs)

            return model
    except Exception as e:
        print(f"Error creating model: {e}")
        raise

In [12]:
class ProgressCallback(callbacks.Callback):
    def __init__(self):
        super(ProgressCallback, self).__init__()
        self.epoch_times = []
        self.start_time = None

    def on_epoch_begin(self, epoch, logs=None):
        self.start_time = time.time()
        print(f"\nEpoch {epoch + 1}/{self.params['epochs']}")

    def on_epoch_end(self, epoch, logs=None):
        if self.start_time is None:
            print("Warning: start_time not initialized")
            return

        epoch_time = time.time() - self.start_time
        self.epoch_times.append(epoch_time)
        avg_time = np.mean(self.epoch_times)
        remaining_epochs = self.params['epochs'] - (epoch + 1)
        estimated_time = remaining_epochs * avg_time

        print(f"Epoch {epoch + 1} completed in {epoch_time:.2f}s")
        print(f"Estimated time remaining: {estimated_time/60:.1f} minutes")

        if logs:
            print("Metrics:")
            for key, value in logs.items():
                print(f"  {key}: {value:.4f}")

In [13]:
def compute_confusion_matrix(model, dataset):
    try:
        y_pred_all, y_true_all = [], []

        for x, y in dataset:
            y_pred = model.predict(x, verbose=0)
            y_pred_all.extend(tf.argmax(y_pred, axis=1).numpy())
            y_true_all.extend(tf.argmax(y, axis=1).numpy())

        cm = confusion_matrix(y_true_all, y_pred_all)

        report = classification_report(y_true_all, y_pred_all,
                                    target_names=CLASS_NAMES, digits=4)
        print("\nClassification Report:\n")
        print(report)

        plt.figure(figsize=(10, 8))
        disp = ConfusionMatrixDisplay(
            confusion_matrix=cm,
            display_labels=CLASS_NAMES
        )
        disp.plot(cmap=plt.cm.Blues)
        plt.title('Confusion Matrix')
        plt.tight_layout()

        plot_path = os.path.join(BENCHMARK_DIR, 'confusion_matrix.png')
        plt.savefig(plot_path)
        plt.close()

        return cm, report
    except Exception as e:
        print(f"Error computing confusion matrix: {e}")
        return None, None

def plot_metrics(history):
    try:
        metric_groups = {
            'Accuracy Metrics': ['accuracy', 'val_accuracy', 'precision', 'val_precision', 'recall', 'val_recall'],
            'Loss': ['loss', 'val_loss'],
            'AUC': ['auc', 'val_auc']
        }

        for group_name, metrics in metric_groups.items():
            available_metrics = [m for m in metrics if m in history.history]
            if not available_metrics:
                continue

            plt.figure(figsize=(12, 6))

            for metric in available_metrics:
                plt.plot(
                    history.history[metric],
                    label=metric.replace('val_', 'Validation ') if 'val_' in metric
                          else f'Training {metric}'
                )

            plt.title(f'{group_name}')
            plt.xlabel('Epoch')
            plt.ylabel(group_name)
            plt.legend()
            plt.grid(True, linestyle='--', alpha=0.5)
            plt.tight_layout()

            plot_path = os.path.join(BENCHMARK_DIR, f'{group_name.lower().replace(" ", "_")}_plot.png')
            plt.savefig(plot_path)
            plt.close()
    except Exception as e:
        print(f"Error plotting metrics: {e}")

In [None]:
def train_model(data_dir, use_text=True, resume_from=None, epochs=NUM_EPOCHS):
    print(f"Starting Brain Tumor Classification Training (use_text={use_text})")

    try:
        clear_memory()

        model_name = f"model_with_text" if use_text else "model_without_text"
        benchmark = ModelBenchmark(model_name)

        print("\nLoading datasets...")
        datasets = create_datasets(data_dir, validation_split=0.2, use_text=use_text)
        train_ds = datasets['train']
        val_ds = datasets['validation']
        test_ds = datasets['test']

        print(f"Training samples: {len(train_ds)}")
        if val_ds:
            print(f"Validation samples: {len(val_ds)}")
        print(f"Test samples: {len(test_ds)}")

        clear_memory()

        with strategy.scope():
            model, initial_epoch = None, 0
            if resume_from and os.path.exists(resume_from):
                try:
                    print(f"\nLoading model from checkpoint: {resume_from}")
                    model = keras.models.load_model(resume_from)
                    initial_epoch = int(resume_from.split('_')[-1].split('.')[0]) if 'epoch' in resume_from else 0
                    print(f"Resumed from epoch {initial_epoch}")
                except Exception as e:
                    print(f"Error loading model: {e}")

            if model is None:
                print("\nCreating new model...")
                model = create_model(use_text=use_text)
                initial_epoch = 0

            lr_schedule = keras.optimizers.schedules.ExponentialDecay(
                initial_learning_rate=LEARNING_RATE,
                decay_steps=epochs//3 * len(train_ds),
                decay_rate=0.5,
                staircase=True
            )

            optimizer = (
                keras.optimizers.AdamW(learning_rate=lr_schedule, weight_decay=1e-4)
                if hasattr(keras.optimizers, 'AdamW') else
                keras.optimizers.Adam(learning_rate=lr_schedule)
            )

            model.compile(
                optimizer=optimizer,
                loss='categorical_crossentropy',
                metrics=['accuracy', 'Precision', 'Recall', 'AUC']
            )

        clear_memory()

        model.summary()

        callbacks = [
            keras.callbacks.ModelCheckpoint(
                filepath=os.path.join(MODELS_DIR, f'best_{model_name}.keras'),
                save_best_only=True,
                monitor='val_accuracy',
                mode='max'
            ),
            keras.callbacks.TensorBoard(
                log_dir=os.path.join(LOG_DIR, datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
            ),
            keras.callbacks.EarlyStopping(
                monitor='val_loss',
                patience=10,
                restore_best_weights=True
            ),
            ProgressCallback()
        ]

        print("\nStarting training...")
        benchmark.start_training_timer()

        history = model.fit(
            train_ds,
            epochs=epochs,
            validation_data=val_ds,
            callbacks=callbacks,
            initial_epoch=initial_epoch
        )

        benchmark.end_training_timer()

        clear_memory()

        print("\nSaving final model...")
        final_model_path = os.path.join(MODELS_DIR, f'final_{model_name}.keras')
        model.save(final_model_path)
        print(f"Final model saved to {final_model_path}")

        print("\nMeasuring model performance...")
        benchmark.measure_inference_time(model, test_ds)
        benchmark.measure_memory_usage()
        benchmark.calculate_model_size(final_model_path)

        print("\nEvaluating on test set...")
        test_results = model.evaluate(test_ds, verbose=1)

        cm, report = compute_confusion_matrix(model, test_ds)

        benchmark.update_metrics(history, test_results, cm, report)
        benchmark.save_metrics(BENCHMARK_DIR)

        print("\nGenerating plots...")
        plot_metrics(history)

        return model, history, benchmark
    except Exception as e:
        print(f"Error in training process: {e}")
        raise

In [15]:
def main():
    print("\nTraining model with text features...")
    model_with_text, history_with_text, benchmark_with_text = train_model(
        DATA_DIR, use_text=True, epochs=NUM_EPOCHS
    )
    print("\nTraining model without text features...")
    model_without_text, history_without_text, benchmark_without_text = train_model(
        DATA_DIR, use_text=False, epochs=NUM_EPOCHS
    )

    benchmark_with_text.plot_comparison(benchmark_without_text, BENCHMARK_DIR)

    print("\nModel Comparison Summary:")
    print("-" * 80)
    print(f"{'Metric':<30} {'With Text':<25} {'Without Text':<25}")
    print("-" * 80)

    metrics = [
        ('Accuracy', 'accuracy'),
        ('Precision', 'precision'),
        ('Recall', 'recall'),
        ('AUC', 'auc'),
        ('Training Time (s)', 'training_time'),
        ('Inference Time (s)', 'inference_time'),
        ('Model Size (MB)', 'model_size'),
        ('Memory Usage (MB)', 'memory_usage')
    ]

    for metric_name, metric_key in metrics:
        with_text = benchmark_with_text.metrics[metric_key]
        without_text = benchmark_without_text.metrics[metric_key]
        print(f"{metric_name:<30} {with_text:<25.4f} {without_text:<25.4f}")

    print("-" * 80)
    print(f"\nComparison plots saved to: {BENCHMARK_DIR}")

    return model_with_text, model_without_text, benchmark_with_text, benchmark_without_text

In [16]:
# if __name__ == "__main__":
#     main()

---

In [17]:
def predict_with_model(model_path, image_paths, text_paths=None):
    try:
        use_text = text_paths is not None

        print(f"Loading model from {model_path}...")
        if not os.path.exists(model_path):
            raise FileNotFoundError(f"Model file not found: {model_path}")

        model = keras.models.load_model(model_path)
        print(f"Model loaded successfully")

        results = []

        print("\nPrediction Results:")
        print("-" * 80)
        print(f"{'Image':<30} {'Prediction':<20} {'Confidence':<20}")
        print("-" * 80)

        for i, img_path in enumerate(image_paths):
            try:
                if not os.path.exists(img_path):
                    print(f"Warning: Image file not found: {img_path}")
                    continue

                img = keras.utils.load_img(img_path, target_size=(IMAGE_SIZE, IMAGE_SIZE))
                img_array = keras.utils.img_to_array(img) / 255.0
                img_array = preprocessing_model(np.expand_dims(img_array, 0))

                if use_text and i < len(text_paths):
                    if not os.path.exists(text_paths[i]):
                        print(f"Warning: Text file not found: {text_paths[i]}")
                        text_array = np.zeros((1, TEXT_MAX_LENGTH), dtype=np.int32)
                    else:
                        text_array = np.expand_dims(process_text(text_paths[i]), 0)
                    predictions = model.predict([img_array, text_array], verbose=0)[0]
                else:
                    predictions = model.predict(img_array, verbose=0)[0]

                pred_class = np.argmax(predictions)
                confidence = predictions[pred_class] * 100

                conf_bar_length = 20
                filled_length = int(conf_bar_length * confidence / 100)
                conf_bar = '█' * filled_length + '░' * (conf_bar_length - filled_length)

                img_filename = os.path.basename(img_path)
                if len(img_filename) > 25:
                    img_filename = img_filename[:22] + "..."

                pred_class_name = CLASS_NAMES[pred_class]

                print(f"{img_filename:<30} {pred_class_name:<20} {confidence:>6.2f}% {conf_bar}")

                result = {
                    'image_path': img_path,
                    'predicted_class': pred_class_name,
                    'confidence': confidence,
                    'class_probabilities': {CLASS_NAMES[i]: float(predictions[i]) for i in range(len(CLASS_NAMES))}
                }

                if use_text and i < len(text_paths):
                    result['text_path'] = text_paths[i]

                results.append(result)

            except Exception as e:
                print(f"Error processing {img_path}: {str(e)}")
                results.append({
                    'image_path': img_path,
                    'error': str(e)
                })

        print("-" * 80)
        print(f"Processed {len(results)} images")

        if results:
            class_counts = {}
            for result in results:
                if 'predicted_class' in result:
                    class_name = result['predicted_class']
                    class_counts[class_name] = class_counts.get(class_name, 0) + 1

            print("\nPrediction Summary:")
            for class_name, count in class_counts.items():
                percentage = (count / len(results)) * 100
                print(f"{class_name}: {count} images ({percentage:.1f}%)")

        return results
    except Exception as e:
        print(f"Error in prediction process: {e}")
        raise

In [None]:
def evaluate_model_performance(model_path, data_dir, output_dir='../evaluation_results', use_text=True):
    try:
        # Create output directory if it doesn't exist
        os.makedirs(output_dir, exist_ok=True)
        
        # Load the model
        print("Loading model...")
        model = keras.models.load_model(model_path)
        
        # Create datasets
        train_ds = keras.utils.image_dataset_from_directory(
            os.path.join(data_dir, 'train'),
            labels='inferred',
            label_mode='categorical',
            class_names=CLASS_NAMES,
            batch_size=32,
            image_size=(IMAGE_SIZE, IMAGE_SIZE),
            shuffle=False
        )
        
        test_ds = keras.utils.image_dataset_from_directory(
            os.path.join(data_dir, 'test'),
            labels='inferred',
            label_mode='categorical',
            class_names=CLASS_NAMES,
            batch_size=32,
            image_size=(IMAGE_SIZE, IMAGE_SIZE),
            shuffle=False
        )

        def preprocess_data(images, labels):
            images = tf.cast(images, tf.float32) / 255.0
            mean = tf.constant([0.485, 0.456, 0.406])
            std = tf.constant([0.229, 0.224, 0.225])
            images = (images - mean) / std
            if use_text:
                # Add dummy text input (zeros) since we don't have actual text data
                dummy_text = tf.zeros((tf.shape(images)[0], TEXT_MAX_LENGTH), dtype=tf.int32)
                return (images, dummy_text), labels
            return images, labels

        train_ds = train_ds.map(preprocess_data, num_parallel_calls=tf.data.AUTOTUNE)
        test_ds = test_ds.map(preprocess_data, num_parallel_calls=tf.data.AUTOTUNE)

        train_ds = train_ds.prefetch(tf.data.AUTOTUNE)
        test_ds = test_ds.prefetch(tf.data.AUTOTUNE)

        print("\nEvaluating on training set...")
        train_metrics = model.evaluate(train_ds, verbose=1)
        print("\nEvaluating on test set...")
        test_metrics = model.evaluate(test_ds, verbose=1)

        print("\nGenerating predictions for visualization...")
        train_pred, train_true = [], []
        test_pred, test_true = [], []

        for data, labels in train_ds:
            if use_text:
                images, text = data
                pred = model.predict([images, text], verbose=0)
            else:
                pred = model.predict(data, verbose=0)
            train_pred.extend(np.argmax(pred, axis=1))
            train_true.extend(np.argmax(labels, axis=1))

        for data, labels in test_ds:
            if use_text:
                images, text = data
                pred = model.predict([images, text], verbose=0)
            else:
                pred = model.predict(data, verbose=0)
            test_pred.extend(np.argmax(pred, axis=1))
            test_true.extend(np.argmax(labels, axis=1))

        train_cm = confusion_matrix(train_true, train_pred)
        test_cm = confusion_matrix(test_true, test_pred)

        train_report = classification_report(train_true, train_pred, 
                                          target_names=CLASS_NAMES, digits=4)
        test_report = classification_report(test_true, test_pred,
                                         target_names=CLASS_NAMES, digits=4)

        plt.figure(figsize=(20, 10))
        
        plt.subplot(1, 2, 1)
        train_display = ConfusionMatrixDisplay(
            confusion_matrix=train_cm,
            display_labels=CLASS_NAMES
        )
        train_display.plot(
            cmap='Blues',
            values_format='d',
            xticks_rotation=45,
            ax=plt.gca()
        )
        plt.title('Training Set Confusion Matrix', pad=20, size=14)
        plt.grid(False)
        
        plt.subplot(1, 2, 2)
        test_display = ConfusionMatrixDisplay(
            confusion_matrix=test_cm,
            display_labels=CLASS_NAMES
        )
        test_display.plot(
            cmap='Blues',
            values_format='d',
            xticks_rotation=45,
            ax=plt.gca()
        )
        plt.title('Test Set Confusion Matrix', pad=20, size=14)
        plt.grid(False)
        
        plt.tight_layout(pad=3.0)
        plt.savefig(os.path.join(output_dir, 'confusion_matrices.png'), 
                   bbox_inches='tight', 
                   dpi=300)
        plt.close()

        plt.figure(figsize=(20, 10))
        
        plt.subplot(1, 2, 1)
        train_cm_normalized = train_cm.astype('float') / train_cm.sum(axis=1)[:, np.newaxis]
        train_display = ConfusionMatrixDisplay(
            confusion_matrix=train_cm_normalized,
            display_labels=CLASS_NAMES
        )
        train_display.plot(
            cmap='Blues',
            values_format='.2%',
            xticks_rotation=45,
            ax=plt.gca()
        )
        plt.title('Training Set Normalized Confusion Matrix', pad=20, size=14)
        plt.grid(False)
        
        plt.subplot(1, 2, 2)
        test_cm_normalized = test_cm.astype('float') / test_cm.sum(axis=1)[:, np.newaxis]
        test_display = ConfusionMatrixDisplay(
            confusion_matrix=test_cm_normalized,
            display_labels=CLASS_NAMES
        )
        test_display.plot(
            cmap='Blues',
            values_format='.2%',
            xticks_rotation=45,
            ax=plt.gca()
        )
        plt.title('Test Set Normalized Confusion Matrix', pad=20, size=14)
        plt.grid(False)
        
        # Adjust layout and save
        plt.tight_layout(pad=3.0)
        plt.savefig(os.path.join(output_dir, 'confusion_matrices_normalized.png'), 
                   bbox_inches='tight', 
                   dpi=300)
        plt.close()

        metrics_dict = {
            'training': {
                'loss': float(train_metrics[0]),
                'accuracy': float(train_metrics[1]),
                'precision': float(train_metrics[2]),
                'recall': float(train_metrics[3]),
                'auc': float(train_metrics[4]),
                'classification_report': train_report,
                'confusion_matrix': train_cm.tolist()
            },
            'test': {
                'loss': float(test_metrics[0]),
                'accuracy': float(test_metrics[1]),
                'precision': float(test_metrics[2]),
                'recall': float(test_metrics[3]),
                'auc': float(test_metrics[4]),
                'classification_report': test_report,
                'confusion_matrix': test_cm.tolist()
            }
        }

        with open(os.path.join(output_dir, 'evaluation_metrics.json'), 'w') as f:
            json.dump(metrics_dict, f, indent=4)

        with open(os.path.join(output_dir, 'evaluation_report.txt'), 'w') as f:
            f.write("Training Set Classification Report:\n")
            f.write("=" * 80 + "\n")
            f.write(train_report)
            f.write("\n\nTest Set Classification Report:\n")
            f.write("=" * 80 + "\n")
            f.write(test_report)

        print("\nEvaluation Results Summary:")
        print("=" * 80)
        print(f"Training Set Metrics:")
        print(f"  Loss:      {metrics_dict['training']['loss']:.4f}")
        print(f"  Accuracy:  {metrics_dict['training']['accuracy']:.4f}")
        print(f"  Precision: {metrics_dict['training']['precision']:.4f}")
        print(f"  Recall:    {metrics_dict['training']['recall']:.4f}")
        print(f"  AUC:       {metrics_dict['training']['auc']:.4f}")

        print("\nTest Set Metrics:")
        print(f"  Loss:      {metrics_dict['test']['loss']:.4f}")
        print(f"  Accuracy:  {metrics_dict['test']['accuracy']:.4f}")
        print(f"  Precision: {metrics_dict['test']['precision']:.4f}")
        print(f"  Recall:    {metrics_dict['test']['recall']:.4f}")
        print(f"  AUC:       {metrics_dict['test']['auc']:.4f}")

        print(f"\nDetailed results saved to: {output_dir}")
        return metrics_dict

    except Exception as e:
        print(f"Error in model evaluation: {e}")
        raise

In [None]:
# evaluate_model_performance(
#     model_path='../Models/final_model_with_text.keras',
#     data_dir='../Data',
#     output_dir='../Metrics/Benchmarks/evaluation_results/image_with_text',
#     use_text=True
# )

# evaluate_model_performance(
#     model_path='../Models/final_model_without_text.keras',
#     data_dir='../Data',
#     output_dir='../Metrics/Benchmarks/evaluation_results/image_without_text',
#     use_text=False
# )