In [1]:
import numpy as np
import pandas as pd
import cv2
import os
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix
from sklearn.utils import class_weight

import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard, EarlyStopping, ModelCheckpoint



In [None]:
# Path to your image dataset
#data_dir = '/home/gcekcse/Documents/ML_Project_hk/aptos2019-blindness-detection/G1/G1_images'
#labels_file = '/home/gcekcse/Documents/ML_Project_hk/aptos2019-blindness-detection/G1/G1.csv'
data_dir = 'C:/Users/heman/Documents/ML_project/aptos2019-blindness-detection/G1/G1_images'
labels_file = 'C:/Users/heman/Documents/ML_project/aptos2019-blindness-detection/G1/G1.csv'


# Load labels
df = pd.read_csv(labels_file)



In [None]:
# Count the number of samples for each class
class_counts = df['diagnosis'].value_counts().reset_index()

# Rename the columns for better understanding
class_counts.columns = ['Class', 'Number of Samples']

# Display the tabular view
print(class_counts)

In [None]:
# Split data into training and testing
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

# Display sample data
print(train_df.head())

In [None]:

IMG_SIZE = 224  # Resize images to 224x224
#Uncomment the below code when it's run first time

'''def preprocess_image(image_path):
    img = cv2.imread(image_path)
    if img is None:
        print(f"Error: Unable to load image at {image_path}")
        return None  # Skip images that are not loaded
    else:
        img = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
        img = img / 255.0  # Normalize
        return img

def preprocess_data(dataframe, data_dir):
    X = []
    y = []
    for idx, row in dataframe.iterrows():
        img_path = os.path.join(data_dir, row['id_code'] + ".png")  # Assuming .png extension
        img = preprocess_image(img_path)
        
        if img is not None:  # Proceed only if the image was loaded
            X.append(img)
            y.append(row['diagnosis'])  # Assuming 'diagnosis' contains the target label
            
        if idx % 100 == 0:  # Log progress every 100 images
            print(f"Processed {idx + 1}/{len(dataframe)} images.")
    
    return np.array(X), np.array(y)

# Run preprocessing with logging
print("Preprocessing training data...")
X_train, y_train = preprocess_data(train_df, data_dir)

print("Preprocessing test data...")
X_test, y_test = preprocess_data(test_df, data_dir)

print("Preprocessing completed.")

# Save training data
np.save('X_train.npy', X_train)
np.save('y_train.npy', y_train)

# Save test data
np.save('X_test.npy', X_test)
np.save('y_test.npy', y_test)'''

In [None]:
#Loading Data

X_train = np.load('X_train.npy')
y_train = np.load('y_train.npy')
X_test = np.load('X_test.npy')
y_test = np.load('y_test.npy')


In [None]:
# Load ResNet50 base model (pre-trained on ImageNet)
base_model = ResNet50(weights='imagenet', include_top=False, input_shape=(IMG_SIZE, IMG_SIZE, 3))

# Freeze the base model
base_model.trainable = False

# Add custom layers on top
model = Sequential([
    base_model,
    Flatten(),
    Dense(512, activation='relu'),
    Dropout(0.5),
    Dense(5, activation='softmax')  # Assuming 5 DR severity levels
])

# Compile the model
model.compile(optimizer=Adam(learning_rate=1e-4), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Show the model summary
model.summary()

# WEIGHT CALCULATION

# 'y_train' contains your training labels
class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)

# Convert class weights to a dictionary format required by Keras
class_weight_dict = dict(enumerate(class_weights))


In [None]:

# Check if GPU is available
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

# Data augmentation
datagen = ImageDataGenerator(
    rotation_range=20,
    zoom_range=0.15,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.15,
    horizontal_flip=True,
    fill_mode="nearest"
)

# Define the checkpoint callback to overwrite the saved model after each epoch
checkpoint_callback = ModelCheckpoint(
    filepath=r'C:/Users/heman/Documents/ML_project/models/model.h5',  # Constant file path
    save_best_only=False,
    save_weights_only=False,
    verbose=1
)

# Early stopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

In [None]:
# Fit the model
history = model.fit(
    datagen.flow(X_train, y_train, batch_size=32),
    validation_data=(X_test, y_test),
    epochs=20,
    callbacks=[checkpoint_callback, early_stopping],
    class_weight=class_weight_dict
)

# Save the training history to a CSV file
history_df = pd.DataFrame(history.history)
history_df.to_csv('training_history.csv', index=False)

print("Training history saved to 'training_history.csv'")

In [None]:
!tensorboard --logdir=logs/fit


In [None]:
# Load the model from the specified file path
model = load_model(r'C:/Users/heman/Documents/ML_project/models/model.h5')
X_test = np.load('X_test.npy')
y_test = np.load('y_test.npy')

# Convert your input to a tensor
X_test_tensor = tf.convert_to_tensor(X_test, dtype=tf.float32)

# Evaluate on the test set
test_loss, test_acc = model.evaluate(X_test_tensor, y_test)
print(f"Test Accuracy: {test_acc * 100:.2f}%")

# Classification report with zero_division set to handle cases with no predicted samples
y_pred = np.argmax(model.predict(X_test_tensor), axis=1)
print(classification_report(y_test, y_pred, zero_division=1))

# ROC-AUC score (use predicted probabilities instead of class labels)
y_pred_prob = model.predict(X_test_tensor)  # Get the predicted probabilities
print(f"AUC-ROC: {roc_auc_score(y_test, y_pred_prob, multi_class='ovo')}")

In [None]:
# Confusion Matrix
cm = confusion_matrix(y_test, y_pred)

# Plotting the confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=np.unique(y_test), yticklabels=np.unique(y_test))
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix')
plt.show()
