In [1]:
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tqdm import tqdm
import glob

# === Settings ===
csv_path = 'metadata_compiled_dummies.csv'
base_image_dir = '../YuanDataProcessing'
img_x = 128
img_y = 128
img_size = (img_x, img_y)

# === Load CSV and preprocess ===
df = pd.read_csv(csv_path)[['uuid', 'status_COVID-19', 'status_healthy', 'status_symptomatic']]
df = df.dropna(subset=['status_COVID-19', 'status_healthy', 'status_symptomatic'])
df[['status_COVID-19', 'status_healthy', 'status_symptomatic']] = df[['status_COVID-19', 'status_healthy', 'status_symptomatic']].astype(int)

# === Map UUIDs to file paths ===
all_image_paths = glob.glob(os.path.join(base_image_dir, 'folder_*', '*.png'))
uuid_to_path = {os.path.splitext(os.path.basename(p))[0]: p for p in all_image_paths}

# # === Load and preprocess images ===
# X, y = [], []

# for _, row in tqdm(df.iterrows(), total=len(df)):
#     uuid = row['uuid']
#     if uuid in uuid_to_path:
#         img = load_img(uuid_to_path[uuid], target_size=img_size)
#         img_array = img_to_array(img) / 255.0
#         X.append(img_array)
#         y.append(row[['status_COVID-19', 'status_healthy', 'status_symptomatic']].values)
#     else:
#         print(f"Missing image for UUID: {uuid}")

# X = np.array(X)
# y = np.array(y)

X = np.load(f'X_{img_x}x{img_y}.npy', allow_pickle=True)
y = np.load(f'y_{img_x}x{img_y}.npy', allow_pickle=True)

X = np.array(X, dtype=np.float32)
y = np.array(y, dtype=np.float32)

# === Train-test split ===
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y.argmax(axis=1), random_state=42
)

In [8]:
import tensorflow as tf
from tensorflow.keras.losses import Loss
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from sklearn.utils import class_weight
import numpy as np

# === Define Focal Loss ===
class FocalLoss(Loss):
    def __init__(self, gamma=2.0, alpha=1.0):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.alpha = alpha

    def call(self, y_true, y_pred):
        epsilon = tf.keras.backend.epsilon()
        y_pred = tf.clip_by_value(y_pred, epsilon, 1. - epsilon)
        cross_entropy = -y_true * tf.math.log(y_pred)
        weight = self.alpha * tf.pow(1 - y_pred, self.gamma)
        loss = weight * cross_entropy
        return tf.reduce_sum(loss, axis=1)

# === Compute Class Weights (still useful for reference or if needed later) ===
y_train_labels = np.argmax(y_train, axis=1)

class_weights = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train_labels),
    y=y_train_labels
)
class_weights_dict = dict(enumerate(class_weights))
print("Class Weights (for reference):", class_weights_dict)

# === Focal Loss with Class-Specific Alpha ===
# Adjust alpha values based on your class distribution: [COVID-19, Healthy, Symptomatic]
alpha = tf.constant([3.0, 1.0, 2.0])
gamma = 2.0

def categorical_focal_loss(alpha, gamma=2.0):
    def loss(y_true, y_pred):
        y_pred = tf.clip_by_value(y_pred, 1e-7, 1.0 - 1e-7)
        cross_entropy = -y_true * tf.math.log(y_pred)
        weight = tf.pow(1 - y_pred, gamma) * alpha
        focal_loss = weight * cross_entropy
        return tf.reduce_sum(focal_loss, axis=1)
    return loss

# === CNN Model ===
model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(img_size[0], img_size[1], 3)),
    MaxPooling2D(2, 2),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D(2, 2),
    Flatten(),
    Dropout(0.5),
    Dense(128, activation='relu'),
    Dense(3, activation='softmax')  # 3 output classes
])

model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss=categorical_focal_loss(alpha, gamma),
    metrics=['accuracy']
)

# === Callbacks ===
early_stopping = EarlyStopping(
    monitor='val_loss',
    patience=3,
    restore_best_weights=True
)

lr_scheduler = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=2,
    min_lr=1e-6,
    verbose=1
)

# === Train Model ===
history = model.fit(
    X_train, y_train,
    epochs=25,
    batch_size=32,
    validation_split=0.1,
    callbacks=[early_stopping, lr_scheduler]
)

# === Evaluate Model ===
loss, acc = model.evaluate(X_test, y_test)
print(f"Test Accuracy: {acc:.2%}")

Class Weights (for reference): {0: 5.237959442332066, 1: 0.4450636728320276, 2: 1.7786744136001722}


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/25
[1m465/465[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 102ms/step - accuracy: 0.7405 - loss: 9.0535 - val_accuracy: 0.7509 - val_loss: 9.0823 - learning_rate: 0.0010
Epoch 2/25
[1m465/465[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 103ms/step - accuracy: 0.7475 - loss: 9.1300 - val_accuracy: 0.7509 - val_loss: 9.0823 - learning_rate: 0.0010
Epoch 3/25
[1m465/465[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 100ms/step - accuracy: 0.7444 - loss: 9.2562
Epoch 3: ReduceLROnPlateau reducing learning rate to 0.0005000000237487257.
[1m465/465[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 103ms/step - accuracy: 0.7444 - loss: 9.2559 - val_accuracy: 0.7509 - val_loss: 9.0823 - learning_rate: 0.0010
Epoch 4/25
[1m465/465[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m48s[0m 104ms/step - accuracy: 0.7528 - loss: 8.9784 - val_accuracy: 0.7509 - val_loss: 9.0823 - learning_rate: 5.0000e-04
[1m130/130[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0

In [9]:
from sklearn.metrics import confusion_matrix, classification_report

# === Predict on test set ===
y_pred_probs = model.predict(X_test)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test, axis=1)

# === Confusion Matrix ===
cm = confusion_matrix(y_true, y_pred)
print("Confusion Matrix:")
print(cm)

# === Classification Report (optional, includes precision, recall, f1) ===
print("\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=['COVID-19', 'Healthy', 'Symptomatic']))

[1m130/130[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 25ms/step
Confusion Matrix:
[[   0  263    0]
 [   0 3095    0]
 [   0  775    0]]

Classification Report:
              precision    recall  f1-score   support

    COVID-19       0.00      0.00      0.00       263
     Healthy       0.75      1.00      0.86      3095
 Symptomatic       0.00      0.00      0.00       775

    accuracy                           0.75      4133
   macro avg       0.25      0.33      0.29      4133
weighted avg       0.56      0.75      0.64      4133



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [10]:
# # === Compute Class Weights ===
# y_train_labels = np.argmax(y_train, axis=1)
# class_weights = class_weight.compute_class_weight(
#     class_weight='balanced',
#     classes=np.unique(y_train_labels),
#     y=y_train_labels
# )
# class_weights_dict = dict(enumerate(class_weights))
# print("Class Weights:", class_weights_dict)

# === Adjusted Class Weights ===
class_weights_dict = {
    0: 6.5,  # COVID-19 class weight (increased)
    1: 0.4,  # Healthy class weight (slightly reduced)
    2: 2.5   # Symptomatic class weight (increased)
}

print("Adjusted Class Weights:", class_weights_dict)

# === CNN Model ===
model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(img_size[0], img_size[1], 3)),
    MaxPooling2D(2, 2),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D(2, 2),
    Flatten(),
    Dropout(0.5),
    Dense(128, activation='relu'),
    Dense(3, activation='softmax')  # 3 output classes
])

model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

# === Early Stopping Callback ===
early_stopping = EarlyStopping(
    monitor='val_loss',  # Monitor the validation loss
    patience=5,  # Wait for 5 epochs with no improvement before stopping
    restore_best_weights=True  # Restore the weights from the best epoch
)

# === Train Model with Class Weights and Early Stopping ===
history = model.fit(
    X_train, y_train,
    epochs=20,
    batch_size=32,
    validation_split=0.1,
    class_weight=class_weights_dict,
    callbacks=[early_stopping]  # Add the early stopping callback
)

# === Evaluate Model ===
loss, acc = model.evaluate(X_test, y_test)
print(f"Test Accuracy: {acc:.2%}")

Adjusted Class Weights: {0: 6.5, 1: 0.4, 2: 2.5}


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/20
[1m465/465[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m50s[0m 104ms/step - accuracy: 0.1680 - loss: 1.6202 - val_accuracy: 0.1681 - val_loss: 1.2022
Epoch 2/20
[1m465/465[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m56s[0m 119ms/step - accuracy: 0.1646 - loss: 1.2504 - val_accuracy: 0.0659 - val_loss: 1.2156
Epoch 3/20
[1m465/465[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 100ms/step - accuracy: 0.1654 - loss: 1.2431 - val_accuracy: 0.1832 - val_loss: 1.2108
Epoch 4/20
[1m465/465[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 101ms/step - accuracy: 0.1791 - loss: 1.2536 - val_accuracy: 0.1560 - val_loss: 1.2167
Epoch 5/20
[1m465/465[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 101ms/step - accuracy: 0.1890 - loss: 1.2108 - val_accuracy: 0.1409 - val_loss: 1.3888
Epoch 6/20
[1m465/465[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 102ms/step - accuracy: 0.2456 - loss: 1.1690 - val_accuracy: 0.2074 - val_loss: 1.1866
Epoch 7/20

In [11]:
from sklearn.metrics import confusion_matrix, classification_report

# === Predict on test set ===
y_pred_probs = model.predict(X_test)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test, axis=1)

# === Confusion Matrix ===
cm = confusion_matrix(y_true, y_pred)
print("Confusion Matrix:")
print(cm)

# === Classification Report (optional, includes precision, recall, f1) ===
print("\nClassification Report:")
print(classification_report(y_true, y_pred, target_names=['COVID-19', 'Healthy', 'Symptomatic']))

[1m130/130[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 24ms/step
Confusion Matrix:
[[  54  102  107]
 [ 493 1303 1299]
 [ 114  328  333]]

Classification Report:
              precision    recall  f1-score   support

    COVID-19       0.08      0.21      0.12       263
     Healthy       0.75      0.42      0.54      3095
 Symptomatic       0.19      0.43      0.26       775

    accuracy                           0.41      4133
   macro avg       0.34      0.35      0.31      4133
weighted avg       0.60      0.41      0.46      4133

