In [1]:
import random
import pprint
import sys
import time
import numpy as np
from optparse import OptionParser
import pickle
import math
import cv2
import copy
from matplotlib import pyplot as plt
import tensorflow as tf
import pandas as pd
import os

from sklearn.metrics import average_precision_score

from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam, SGD, RMSprop
from tensorflow.keras.layers import Flatten, Dense, Input, Conv2D, MaxPooling2D, Dropout
from tensorflow.keras.layers import GlobalAveragePooling2D, GlobalMaxPooling2D, TimeDistributed
from tensorflow.keras.models import Model
from tensorflow.keras.initializers import Initializer
from tensorflow.keras.regularizers import Regularizer
from tensorflow.keras.utils import get_file
from tensorflow.python.keras.utils import layer_utils
from tensorflow.python.keras.utils import generic_utils
from tensorflow.keras.losses import categorical_crossentropy
from tensorflow.keras.layers import Layer, InputSpec


In [2]:
def get_data(annotation_path):
    all_data = []
    class_count = {}
    class_mapping = {}
    image_data = {}

    # Open the annotation file
    with open(annotation_path, 'r') as f:
        for line in f:
            # Split line by commas
            line_split = line.strip().split(',')

            # Extract fields
            img_file = line_split[0]
            xmin = int(line_split[1])
            ymin = int(line_split[2])
            xmax = int(line_split[3])
            ymax = int(line_split[4])
            local_class = line_split[5]

            # Initialize an entry for each image if it doesn’t already exist
            if img_file not in image_data:
                image_data[img_file] = {
                    'filepath': img_file,
                    'bboxes': [],
                    'overall_label': 'injury_present'  # Default label
                }

            # Append bounding box details for each image
            image_data[img_file]['bboxes'].append({
                'x1': xmin,
                'y1': ymin,
                'x2': xmax,
                'y2': ymax,
                'class': local_class
            })

            # Track class occurrences and mapping
            if local_class not in class_count:
                class_count[local_class] = 1
            else:
                class_count[local_class] += 1

            if local_class not in class_mapping:
                class_mapping[local_class] = len(class_mapping)

    # Convert image_data dict to list for all_data
    all_data = list(image_data.values())
    
    return all_data, class_count, class_mapping


In [3]:
all_data, class_count, class_mapping = get_data(r'annotation.txt')
val_data, class_count, class_mapping = get_data(r'valid_annotation.txt')
test_data, class_count, class_mapping = get_data(r'test_annotation.txt')

In [9]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define the augmentation parameters
datagen = ImageDataGenerator(
    rotation_range=20,       # Random rotation within 20 degrees
    width_shift_range=0.2,   # Horizontal shift up to 20% of the image width
    height_shift_range=0.2,  # Vertical shift up to 20% of the image height
    shear_range=0.2,         # Shear transformations
    zoom_range=0.2,          # Random zoom
    horizontal_flip=True,    # Randomly flip images horizontally
    fill_mode='nearest'      # Fill in missing pixels after transformations
)

def data_generator(data, class_mapping, batch_size):
    while True:  # Loop forever to generate batches indefinitely
        for i in range(0, len(data), batch_size): 
            batch_images = []
            batch_labels = []
            for item in data[i:i + batch_size]:
                # Load and preprocess each image
                img = tf.keras.preprocessing.image.load_img(item['filepath'], target_size=(224, 224))
                img = tf.keras.preprocessing.image.img_to_array(img) / 255.0  # Normalize the image

                # Apply augmentation
                img = datagen.random_transform(img)  # Apply random transformations
                
                batch_images.append(img)

                # Get the hemorrhage type from the first bounding box
                local_class = item['bboxes'][0]['class']  # Assuming you take the first bounding box class
                label = class_mapping[local_class]  # Get numerical label from mapping
                batch_labels.append(label)

            # Convert the batch_labels to one-hot encoding for multi-class classification
            batch_labels = tf.keras.utils.to_categorical(batch_labels, num_classes=len(class_mapping))

            # Yield the batch of images and labels as numpy arrays
            yield (np.array(batch_images), np.array(batch_labels))

# Example class mapping
class_mapping = {
    'Intraventricular': 0,
    'Subdural': 1,
    'Intraparenchymal': 2,
    'Subarachnoid': 3,
}


In [5]:
# # Specify the path to your annotation file
# annotation_path = r'annotation.txt'  # Replace with your actual annotation file path

# # Get data from the annotation file
# all_data, class_count, class_mapping = get_data(annotation_path)

# # Check the structure of the loaded data (optional)
# print(all_data)
# print(class_count)
# print(class_mapping)

# # Define the batch size
# batch_size = 16

# # Create a data generator using all_data
# def data_generator(data, batch_size):
#     while True:  # Loop forever to generate batches indefinitely
#         for i in range(0, len(data), batch_size):
#             batch_images = []
#             batch_labels = []
#             for item in data[i:i + batch_size]:
#                 # Load and preprocess each image
#                 img = tf.keras.preprocessing.image.load_img(item['filepath'], target_size=(300, 300))
#                 img = tf.keras.preprocessing.image.img_to_array(img)
#                 batch_images.append(img)

#                 # Here you could create labels for your bounding boxes and classification
#                 # This example assumes binary classification (injury present/absent)
#                 label = [0]  # Replace with actual logic to create labels based on your classes
#                 batch_labels.append(label)

#             yield (np.array(batch_images), np.array(batch_labels))  # Yield as numpy arrays


In [6]:
from tensorflow.keras import layers, models
import tensorflow as tf


def detection_and_classification_model(num_classes, num_detection_classes):
    # Base model for feature extraction
    base_model = tf.keras.applications.VGG16(input_shape=(224, 224, 3), include_top=False, weights='imagenet', classifier_activation='softmax')
    
    # Detection Head for Faster R-CNN (RPN and ROI Pooling layers)
    x = base_model.output
    rpn_output = layers.Conv2D(512, (3, 3), activation='relu')(x)
    # Add layers specific to detection task here, e.g., ROI Pooling, classification layers for each bounding box
    # Detection Output - bounding boxes and localized classifications
    detection_output = layers.Dense(num_detection_classes, activation='sigmoid', name='detection_output')(rpn_output)

    # Global Classification Head (for whole image classification)
    x_classification = layers.GlobalAveragePooling2D()(x) #Dimensionality reduction 
    x_classification = layers.Dense(1024, activation='relu')(x_classification)
    # x_classification = layers.Dense(512, activation='relu')(x_classification)
    x_classification = layers.Dropout(0.5)(x_classification)
    classification_output = layers.Dense(len(class_mapping), activation='softmax', name='classification_output')(x_classification)

    # Model with dual outputs
    model = models.Model(inputs=base_model.input, outputs=[classification_output, detection_output])
    return model


In [11]:
from tensorflow.keras.callbacks import EarlyStopping

# Assuming 2 classes for overall classification and 3 types of brain injuries for detection
num_classes = 1  # e.g., "no injury" or "injury present"
num_detection_classes = len(class_mapping)  # Number of unique local injury classes

early_stopping = EarlyStopping(
    monitor='val_loss',  # Track validation loss
    patience=3,          # Stop after 5 epochs with no improvement
    restore_best_weights=True
)

# Build and compile the model
model = detection_and_classification_model(num_classes, num_detection_classes)
custom_learning_rate = 0.0005
optimizer = Adam(learning_rate=custom_learning_rate)
model.compile(
    optimizer='adam',
    loss={
        'classification_output': 'categorical_crossentropy',
        'detection_output': 'mse'
    },
    metrics={
        'classification_output': ['accuracy'],
        'detection_output': ['mse']
    }
)




In [None]:
train_gen = data_generator(all_data, class_mapping, batch_size=16)
val_gen = data_generator(val_data, class_mapping, batch_size=16)

# Training loop
# history = model.fit(
#     train_gen,
#     steps_per_epoch=100,    Adjust based on the size of all_data
#     epochs=10,  # Choose the number of epochs
# )

history = model.fit(
    train_gen,
    steps_per_epoch=len(all_data) // 16,
    # steps_per_epoch=100,
    validation_data=val_gen,
    validation_steps=len(val_data) // 16,
    epochs=10,
    callbacks=[early_stopping]
)

Epoch 1/10
[1m 62/156[0m [32m━━━━━━━[0m[37m━━━━━━━━━━━━━[0m [1m3:50[0m 2s/step - classification_output_accuracy: 0.8775 - loss: 4.4862

In [None]:
# Evaluate on test data (assuming `test_gen` is defined similarly to `train_gen`)
test_gen = data_generator(test_data, class_mapping, batch_size=16)

In [None]:
# Evaluate the model on the test data
test_loss, test_classification_accuracy, test_detection_mse = model.evaluate(
    test_gen,
    steps=100,  # Adjust based on the test dataset size
    verbose=1
)

# Print out results
print(f"Test Loss: {test_loss}")
print(f"Classification Accuracy: {test_classification_accuracy}")
print(f"Detection Mean Squared Error: {test_detection_mse}")


In [None]:
# Load a sample image
sample_image = tf.keras.preprocessing.image.load_img(r'test/ID_0027df23b_png_jpg.rf.78a4fefadb07409a5671806d177c2168.jpg', target_size=(300, 300))
sample_image = tf.keras.preprocessing.image.img_to_array(sample_image) / 255.0
sample_image = np.expand_dims(sample_image, axis=0)  # Add batch dimension

# Predict
classification_pred, detection_pred = model.predict(sample_image)

# Process predictions
print("Classification Prediction:", classification_pred)  # Injury presence prediction
print("Detection Prediction:", detection_pred)            # Bounding box and injury type predictions


In [None]:
predicted_class_idx = np.argmax(classification_pred)
predicted_class_idx

In [None]:
predicted_class = list(class_mapping.keys())[predicted_class_idx]  # Maps index to class name
predicted_class

In [None]:
print(f"Predicted Class: {predicted_class}")
print(f"Class Probabilities: {classification_pred}")

In [None]:
img_width, img_height = 300, 300  # or the target size you defined for your images

for i, bbox in enumerate(detection_pred[0][0]):  # Example using first set of predictions
    x_center, y_center, width, height = bbox
    x_min = int((x_center - width / 2) * img_width)
    y_min = int((y_center - height / 2) * img_height)
    x_max = int((x_center + width / 2) * img_width)
    y_max = int((y_center + height / 2) * img_height)
    print(f"Bounding Box {i}: ({x_min}, {y_min}), ({x_max}, {y_max})")


In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# Load and prepare the image for visualization
img = tf.keras.preprocessing.image.array_to_img(sample_image[0])  # Remove batch dimension

fig, ax = plt.subplots(1)
ax.imshow(img)

for bbox in detection_pred[0][0]:  # Adjust if needed
    x_center, y_center, width, height = bbox
    x_min = (x_center - width / 2) * img_width
    y_min = (y_center - height / 2) * img_height
    rect = patches.Rectangle((x_min, y_min), width * img_width, height * img_height, linewidth=1, edgecolor='r', facecolor='none')
    ax.add_patch(rect)

plt.show()
