In [2]:
# Chest X-Ray Classification using U-Net with and without Attention Mechanism

import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import seaborn as sns
import random
from PIL import Image
from sklearn.preprocessing import LabelEncoder
from imblearn.over_sampling import RandomOverSampler
import cv2
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Activation, Dropout, BatchNormalization
from tensorflow.keras import regularizers
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import warnings

warnings.filterwarnings("ignore")


In [None]:
# Load dataset
dataset_path = 'dataset\train'
image_paths = []
categories = []
categories_list = ['Lung Tumour', 'Normal', 'Pneumonia', 'Tuberculosis', 'COPD', 'Other diseases']

for category in categories_list:
    category_path = os.path.join(dataset_path, category)
    if os.path.exists(category_path):
        for image_name in os.listdir(category_path):
            image_path = os.path.join(category_path, image_name)
            image_paths.append(image_path)
            categories.append(category)

df = pd.DataFrame({'image_path': image_paths, 'category': categories})


In [None]:
# Data exploration
df.shape

In [None]:
df.columns

In [None]:
df.duplicated().sum()

In [None]:
df.isnull().sum()

In [None]:
df.info()

In [None]:
df['category'].unique()

In [None]:
df['category'].value_counts()

In [None]:
# Data visualization
category_counts = df['category'].value_counts()
plt.figure(figsize=(8, 6))
sns.barplot(x=category_counts.index, y=category_counts.values, palette="viridis")
plt.title("Count of Images per Category")
plt.xlabel("Category")
plt.ylabel("Number of Images")
plt.show()

plt.figure(figsize=(8, 6))
plt.pie(category_counts.values, labels=category_counts.index, autopct='%1.1f%%', startangle=140, colors=sns.color_palette("viridis", len(category_counts)))
plt.title("Proportion of Images per Category")
plt.show()

num_images_per_category = 5
plt.figure(figsize=(15, 10))
for i, category in enumerate(df['category'].unique()):
    category_images = df[df['category'] == category]['image_path']
    selected_images = random.sample(list(category_images), num_images_per_category)
    for j, image_path in enumerate(selected_images):
        img = Image.open(image_path)
        plt.subplot(len(df['category'].unique()), num_images_per_category, i * num_images_per_category + j + 1)
        plt.imshow(img, cmap='gray')
        plt.axis('off')
        plt.title(category if j == 0 else "")
plt.tight_layout()
plt.show()

In [None]:

# Encode labels
label_encoder = LabelEncoder()
df['category_encoded'] = label_encoder.fit_transform(df['category'])
df = df[['image_path', 'category_encoded']]

# Oversample data
ros = RandomOverSampler(random_state=42)
X_resampled, y_resampled = ros.fit_resample(df[['image_path']], df['category_encoded'])
df_resampled = pd.DataFrame(X_resampled, columns=['image_path'])
df_resampled['category_encoded'] = y_resampled

# Split data
train_df_new, temp_df_new = train_test_split(df_resampled, train_size=0.8, shuffle=True, random_state=42, stratify=df_resampled['category_encoded'])
valid_df_new, test_df_new = train_test_split(temp_df_new, test_size=0.5, shuffle=True, random_state=42, stratify=temp_df_new['category_encoded'])

# Data generators
batch_size = 16
img_size = (256, 256)
channels = 3
img_shape = (img_size[0], img_size[1], channels)
tr_gen = ImageDataGenerator(rescale=1./255)
ts_gen = ImageDataGenerator(rescale=1./255)

train_gen_new = tr_gen.flow_from_dataframe(train_df_new, x_col='image_path', y_col='category_encoded', target_size=img_size, class_mode='sparse', color_mode='rgb', shuffle=True, batch_size=batch_size)
valid_gen_new = ts_gen.flow_from_dataframe(valid_df_new, x_col='image_path', y_col='category_encoded', target_size=img_size, class_mode='sparse', color_mode='rgb', shuffle=True, batch_size=batch_size)
test_gen_new = ts_gen.flow_from_dataframe(test_df_new, x_col='image_path', y_col='category_encoded', target_size=img_size, class_mode='sparse', color_mode='rgb', shuffle=False, batch_size=batch_size)

# Check GPU availability
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
    print("Using GPU")
else:
    print("Using CPU")


In [None]:
# Define U-Net model
def unet_classification_model(input_size=(256, 256, 3), num_classes=4):
    inputs = layers.Input(input_size)
    conv1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(inputs)
    conv1 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(conv1)
    pool1 = layers.MaxPooling2D((2, 2))(conv1)
    conv2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(pool1)
    conv2 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(conv2)
    pool2 = layers.MaxPooling2D((2, 2))(conv2)
    conv3 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(pool2)
    conv3 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(conv3)
    pool3 = layers.MaxPooling2D((2, 2))(conv3)
    bottleneck = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(pool3)
    bottleneck = layers.Conv2D(512, (3, 3), activation='relu', padding='same')(bottleneck)
    upconv3 = layers.Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(bottleneck)
    concat3 = layers.concatenate([upconv3, conv3])
    conv4 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(concat3)
    conv4 = layers.Conv2D(256, (3, 3), activation='relu', padding='same')(conv4)
    upconv2 = layers.Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(conv4)
    concat2 = layers.concatenate([upconv2, conv2])
    conv5 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(concat2)
    conv5 = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(conv5)
    upconv1 = layers.Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv5)
    concat1 = layers.concatenate([upconv1, conv1])
    conv6 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(concat1)
    conv6 = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(conv6)
    gap = layers.GlobalAveragePooling2D()(conv6)
    output = layers.Dense(num_classes, activation='softmax')(gap)
    model = tf.keras.Model(inputs=inputs, outputs=output)
    return model

model = unet_classification_model(input_size=(256, 256, 3), num_classes=4)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Train model
history = model.fit(train_gen_new, epochs=5, batch_size=16, validation_data=valid_gen_new, steps_per_epoch=train_gen_new.samples // train_gen_new.batch_size, validation_steps=valid_gen_new.samples // valid_gen_new.batch_size, verbose=1)


In [None]:
# Define U-Net with attention model
def attention_gate(x, g, inter_channels):
    theta_x = layers.Conv2D(inter_channels, (1, 1), strides=(1, 1), padding='same')(x)
    phi_g = layers.Conv2D(inter_channels, (1, 1), strides=(1, 1), padding='same')(g)
    add_xg = layers.Add()([theta_x, phi_g])
    relu_xg = layers.Activation('relu')(add_xg)
    psi = layers.Conv2D(1, (1, 1), strides=(1, 1), padding='same')(relu_xg)
    sigmoid_xg = layers.Activation('sigmoid')(psi)
    attention = layers.Multiply()([x, sigmoid_xg])
    return attention

def unet_with_attention(input_size=(256, 256, 3), num_classes=4):
    inputs = layers.Input(input_size)


In [None]:
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])


In [None]:
history = model.fit(
    train_gen_new, 
    epochs=5,
    batch_size=16,
    validation_data=valid_gen_new, 
    steps_per_epoch=train_gen_new.samples // train_gen_new.batch_size,
    validation_steps=valid_gen_new.samples // valid_gen_new.batch_size,
    verbose=1
)


In [None]:
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Model Accuracy')
plt.legend()
plt.show()

plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Model Loss')
plt.legend()
plt.show()


In [None]:
y_pred_probs = model.predict(test_gen_new)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = test_gen_new.classes

cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=test_gen_new.class_indices.keys(), yticklabels=test_gen_new.class_indices.keys())
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()

report = classification_report(y_true, y_pred, target_names=test_gen_new.class_indices.keys())
print("Classification Report:\n", report)
