In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.preprocessing import LabelBinarizer
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from PIL import Image
import matplotlib.pyplot as plt
import os

# Define image size and dataset path
image_size = (224, 224)
dataset_path = 'MURA-v1.1'

# Define body parts in the dataset
body_parts = ['XR_ELBOW', 'XR_FINGER', 'XR_FOREARM','XR_HAND']


# Function to extract labels
def get_labels(image_path):
    parts = image_path.split('/')
    body_part = parts[-4]
    patient_id = parts[-3]
    abnormality = 1 if 'positive' in parts[-2] else 0
    return patient_id, body_part, abnormality

# Function to load images and labels from a given dataset path
def load_dataset(dataset_path):
    image_paths = []
    labels = []
    abnormality_labels = []
    all_images = []
    body_part_labels = [] # Initialize body_part_labels here
    patient_ids = []

    # Get all image paths from the dataset directory
    for root, _, files in os.walk(dataset_path):
        for file in files:
            if file.lower().endswith(('.png', '.jpg', '.jpeg')):  # Check for common image extensions
                image_paths.append(os.path.join(root, file))

    for image_path in image_paths:
        for body_part in body_parts:
            if body_part in image_path:

                patient_id, body_part, abnormality = get_labels(image_path)
                body_part_labels.append(body_part)
                abnormality_labels.append(abnormality)

                img = Image.open(image_path).convert('RGB').resize((224, 224)) # Assuming image_size is (224, 224)
                img_array = np.array(img)
                all_images.append(img_array)
    return all_images, abnormality_labels, body_part_labels

all_images, abnormality_labels, body_part_labels = load_dataset(dataset_path)

index_to_display = 0
plt.imshow(all_images[index_to_display])
plt.title(f"Abnormality Label: {abnormality_labels[index_to_display]}, Body Part: {body_part_labels[index_to_display]}")
plt.axis('off')
plt.show()


# Convert to NumPy arrays
all_images = np.array(all_images)
abnormality_labels = np.array(abnormality_labels)
body_part_labels_encoded = np.array(body_part_labels)


# Define training and validation datasets
train_dataset_path = '/MURA-v1.1/train'
valid_dataset_path = '/MURA-v1.1/valid'

# Load training and validation datasets
x_train, y_train_abnormality, y_train_body_part = load_dataset(train_dataset_path)
x_val, y_val_abnormality, y_val_body_part = load_dataset(valid_dataset_path)


print(f"Total number of images loaded: {len(all_images)}")



# One-hot encode body part labels
body_part_labels = [str(label) for label in body_part_labels]
encoder = LabelBinarizer()
body_part_labels_encoded = encoder.fit_transform(body_part_labels)

# Split data into training and validation sets
x_train, x_val, y_train_abnormality, y_val_abnormality, y_train_body_part, y_val_body_part = train_test_split(
    all_images, abnormality_labels, body_part_labels_encoded, test_size=0.2, random_state=42
)

# Normalize pixel values
x_train = x_train / 255.0
x_val = x_val / 255.0

# Data augmentation
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True
)
datagen.fit(x_train)

# Transfer learning with ResNet50
base_model = tf.keras.applications.ResNet50(input_shape=(224, 224, 3), include_top=False, weights='imagenet')
base_model.trainable = False  # Freeze pre-trained layers

# Build the CNN model
image_input = layers.Input(shape=(224, 224, 3))

x = base_model(image_input, training=False)
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dropout(0.2)(x)
x = layers.Dense(256, activation='relu')(x)
x = layers.Dropout(0.5)(x)
abnormality_output = layers.Dense(1, activation='sigmoid', name='abnormality')(x)
body_part_output = layers.Dense(y_train_body_part.shape[1], activation='softmax', name='body_part')(x)

model = models.Model(inputs=image_input, outputs=[abnormality_output, body_part_output])
model.summary()

# Compile the model
model.compile(optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
              loss={'abnormality': tf.keras.losses.BinaryCrossentropy(), 'body_part': tf.keras.losses.CategoricalCrossentropy()},
              metrics={'abnormality': [tf.keras.metrics.BinaryAccuracy(), tf.keras.metrics.Precision(), tf.keras.metrics.Recall()],
                       'body_part': tf.keras.metrics.CategoricalAccuracy()})

# Concatenate the labels into a single array
y_train_abnormality = np.array(y_train_abnormality)
y_val_abnormality = np.array(y_val_abnormality)
y_train_body_part = np.array(y_train_body_part)
y_val_body_part = np.array(y_val_body_part)

y_train = np.concatenate((y_train_abnormality.reshape(-1, 1), y_train_body_part), axis=1)
y_val = np.concatenate((y_val_abnormality.reshape(-1, 1), y_val_body_part), axis=1)

#Concatenate the labels into a single array

y_train = np.concatenate((y_train_abnormality.reshape(-1, 1), y_train_body_part), axis=1)
y_val = np.concatenate((y_val_abnormality.reshape(-1, 1), y_val_body_part), axis=1)


# Train the model
history = model.fit(
    x_train,
    {'abnormality': y_train_abnormality, 'body_part': y_train_body_part},
    epochs=10,
    batch_size=32,
    validation_data=(x_val, {'abnormality': y_val_abnormality, 'body_part': y_val_body_part})
)


# Visualize training and validation accuracy
plt.plot(history.history['abnormality_binary_accuracy'], label='Abnormality Accuracy')
plt.plot(history.history['val_abnormality_binary_accuracy'], label='Validation Abnormality Accuracy')
plt.legend()
plt.show()

plt.plot(history.history['body_part_categorical_accuracy'], label='Body Part Accuracy')
plt.plot(history.history['val_body_part_categorical_accuracy'], label='Validation Body Part Accuracy')
plt.legend()
plt.show()

print("Final Abnormality Accuracy:", history.history['abnormality_binary_accuracy'][-1])
print("Final Validation Abnormality Accuracy:", history.history['val_abnormality_binary_accuracy'][-1])


#test the model with a new xray image

def predict_new_image(image_path):
  img = Image.open(image_path).convert('RGB').resize((224, 224))
  img_array = np.array(img) / 255.0
  img_array = np.expand_dims(img_array, axis=0)

  predictions = model.predict(img_array)
  predicted_abnormality = (predictions[0] > 0.5).astype(int)[0][0]
  predicted_body_part_probs = predictions[1][0]
  predicted_body_part_index = np.argmax(predicted_body_part_probs)
  predicted_body_part = encoder.classes_[predicted_body_part_index]

  return predicted_abnormality, predicted_body_part

# Example usage:
new_image_path = 'finger.png'
predicted_abnormality, predicted_body_part = predict_new_image(new_image_path)

print(f"Predicted Abnormality: {predicted_abnormality}")
print(f"Predicted Body Part: {predicted_body_part}")
