IMPORTS

In [1]:
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from preprocess import preprocess_image
import pandas as pd
import ast
from sklearn.model_selection import train_test_split
import cv2
from preprocess import preprocess_image
import matplotlib.pyplot as plt

layers = tf.keras.layers
models = tf.keras.models

In [2]:
print("TensorFlow version:", tf.__version__)
print("GPU devices:", tf.config.list_physical_devices('GPU'))

# Test GPU acceleration
with tf.device('/GPU:0'):
    a = tf.constant([[1.0, 2.0], [3.0, 4.0]])
    b = tf.constant([[5.0, 6.0], [7.0, 8.0]])
    c = tf.matmul(a, b)
    print(c)

TensorFlow version: 2.18.0
GPU devices: []
tf.Tensor(
[[19. 22.]
 [43. 50.]], shape=(2, 2), dtype=float32)


In [None]:
# Check for GPU availability
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        # Configure TensorFlow to use memory growth (prevents taking all VRAM)
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print(f"GPU acceleration enabled: {len(gpus)} GPU(s) available")
        print(f"GPU devices: {gpus}")
        
        # Set visible devices to use only GPU
        tf.config.set_visible_devices(gpus, 'GPU')
        
        # Verify TensorFlow sees the GPU
        logical_gpus = tf.config.list_logical_devices('GPU')
        print(f"Logical GPUs: {len(logical_gpus)}")
        print("TensorFlow will use GPU for computations")
    except RuntimeError as e:
        print(f"Error configuring GPU: {e}")
else:
    print("No GPU found. Running on CPU.")

In [None]:
def load_dataset(data_dir, cases_file, labels_file):
    """
    Load dataset connecting images to their labels via case IDs
    
    Args:
        data_dir (str): Directory containing the images
        cases_file (str): Path to CSV file mapping cases to images
        labels_file (str): Path to CSV file with diagnostic labels
        
    Returns:
        tuple: (images, labels, unique_labels, confidences) as numpy arrays
    """
    # Load CSVs
    cases_df = pd.read_csv(cases_file)
    labels_df = pd.read_csv(labels_file)
    print(f"Loaded {len(cases_df)} cases and {len(labels_df)} labels")
    
    # Create image to case mapping
    image_to_case = {}
    for _, case in cases_df.iterrows():
        case_id = case['case_id']
        for col in ['image_1_path', 'image_2_path', 'image_3_path']:
            if pd.notna(case[col]):
                # Extract image filename from path
                img_filename = os.path.basename(case[col])
                img_id = os.path.splitext(img_filename)[0]
                image_to_case[img_id] = case_id
    
    print(f"Mapped {len(image_to_case)} images to their cases")
    
    # Create case to label mapping
    case_to_labels = {}
    for _, row in labels_df.iterrows():
        case_id = row['case_id']
        labels_str = row['dermatologist_skin_condition_on_label_name']
        conf_str = row['dermatologist_skin_condition_confidence']
        
        if pd.notna(labels_str) and labels_str != '[]':
            case_to_labels[case_id] = (labels_str, conf_str)
    
    print(f"Found labels for {len(case_to_labels)} cases")
    
    # Process images
    images = []
    labels = []
    confidences = []
    all_labels = set()
    processed_count = 0
    
    # Get all image files
    image_files = [f for f in os.listdir(data_dir) if f.endswith('.png')]
    print(f"Found {len(image_files)} images in directory")
    
    for image_name in image_files:
        img_id = os.path.splitext(image_name)[0]
        
        # Find case for this image
        if img_id in image_to_case:
            case_id = image_to_case[img_id]
            
            # Find labels for this case
            if case_id in case_to_labels:
                labels_str, conf_str = case_to_labels[case_id]
                
                try:
                    # Parse label data
                    label_names = ast.literal_eval(labels_str)
                    confidence_scores = ast.literal_eval(conf_str)
                    
                    # Process image
                    img = preprocess_image(img_id)
                    
                    if img is not None:
                        # Add each label with its confidence
                        for label, confidence in zip(label_names, confidence_scores):
                            all_labels.add(label)
                            images.append(img)
                            labels.append(label)
                            confidences.append(confidence)
                        
                        processed_count += 1
                except Exception:
                    pass
        
    print(f"Successfully processed {processed_count} images")
    
    if not images:
        raise ValueError("No valid images were found. Check your data paths.")
    
    # Map labels to numerical values
    unique_labels = sorted(list(all_labels))
    print(f"Found {len(unique_labels)} unique labels")
    
    # Create mapping
    label_mapping = {label: i for i, label in enumerate(unique_labels)}
    numerical_labels = [label_mapping[label] for label in labels]
    
    return np.array(images), np.array(numerical_labels), unique_labels, np.array(confidences)

In [None]:
def create_model(input_shape=(244, 244, 3), num_classes=2):
    """
    Create a simple CNN model
    """
    model = models.Sequential([
        layers.Conv2D(32, (3, 3), activation='relu', input_shape=input_shape),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(64, (3, 3), activation='relu'),
        layers.MaxPooling2D((2, 2)),
        layers.Conv2D(128, (3, 3), activation='relu'),
        layers.Flatten(),
        layers.Dense(64, activation='relu'),
        layers.Dense(num_classes, activation='softmax')
    ])
    return model

In [None]:
def weighted_loss(y_true, y_pred, confidence):
    loss = tf.keras.losses.SparseCategoricalCrossentropy()(y_true, y_pred) * confidence
    return loss

In [None]:
# Set random seed for reproducibility
tf.random.set_seed(42)

# Data directories
data_dir = 'dataset/images/'
cases_file = 'dataset/scin_cases.csv'
labels_file = 'dataset/scin_labels.csv'

# Create models directory if it doesn't exist
os.makedirs('models', exist_ok=True)

# Load dataset
X, y, unique_labels, confidence = load_dataset(data_dir, cases_file, labels_file)

# Split data
X_train, X_val, y_train, y_val, confidence_train, confidence_val = train_test_split(
    X, y, confidence, test_size=0.2, random_state=42
)

print(f"Training set: {X_train.shape[0]} samples")
print(f"Validation set: {X_val.shape[0]} samples")

In [None]:
num_classes = len(unique_labels)
model = create_model(input_shape=(224, 224, 3), num_classes=num_classes)
model.summary()

model.compile(
    optimizer='adam',
    loss=lambda y_true, y_pred: weighted_loss(y_true, y_pred, confidence_train),
    metrics=['accuracy']
)

# Train model
history = model.fit(
    X_train, y_train, 
    epochs=10, 
    validation_data=(X_val, y_val), 
    batch_size=32,
    verbose=1
)

In [None]:
# Plot training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [None]:
model.save('models/final_model.h5')
print("Model training completed successfully!")