In [None]:
# ==============================================================================
# Section 1: Setup and Environment
# ==============================================================================
import os
import xml.etree.ElementTree as ET
import numpy as np
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.metrics import precision_recall_curve, auc
import cv2

# Define base paths
# IMPORTANT: Please update this path to the root directory of your dataset
BASE_PATH = './2025_Karyogram_CV_Camp/'

SINGLE_CHROMOSOME_PATH = os.path.join(BASE_PATH, 'single_chromosomes_object')
MULTI_CHROMOSOME_PATH = os.path.join(BASE_PATH, '24_chromsomes_object')
TRAIN_TXT_PATH = os.path.join(BASE_PATH, 'train.txt')
TEST_TXT_PATH = os.path.join(BASE_PATH, 'test.txt')

# Check if paths exist
if not os.path.exists(BASE_PATH):
    print(f"Error: The base path '{BASE_PATH}' does not exist.")
    print("Please download the dataset and update the BASE_PATH variable.")
    # As a fallback for demonstration, we will create dummy directories
    os.makedirs(SINGLE_CHROMOSOME_PATH, exist_ok=True)
    os.makedirs(MULTI_CHROMOSOME_PATH, exist_ok=True)
    # Create dummy train/test files
    with open(TRAIN_TXT_PATH, 'w') as f: f.write('dummy_image_1\n')
    with open(TEST_TXT_PATH, 'w') as f: f.write('dummy_image_2\n')

# Setup device
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print(f"Using GPU: {gpus[0].name}")
    except RuntimeError as e:
        print(e)
else:
    print("Using CPU")

# ==============================================================================
# Section 2: Data Loading and Preprocessing (Framework Agnostic)
# ==============================================================================
def parse_xml_annotation(xml_path):
    """Parses a single XML annotation file to extract bounding box info."""
    tree = ET.parse(xml_path)
    root = tree.getroot()
    objects = []
    for obj in root.findall('object'):
        name = obj.find('name').text
        bndbox = obj.find('bndbox')
        xmin = int(bndbox.find('xmin').text)
        ymin = int(bndbox.find('ymin').text)
        xmax = int(bndbox.find('xmax').text)
        ymax = int(bndbox.find('ymax').text)
        objects.append({'name': name, 'xmin': xmin, 'ymin': ymin, 'xmax': xmax, 'ymax': ymax})
    return objects

def create_dataframe(image_ids, data_path):
    """Creates a pandas DataFrame from a list of image IDs and their annotations."""
    records = []
    for img_id in tqdm(image_ids, desc=f"Processing {os.path.basename(data_path)}"):
        xml_path = os.path.join(data_path, f"{img_id}.xml")
        img_path = os.path.join(data_path, f"{img_id}.jpg")
        if os.path.exists(xml_path) and os.path.exists(img_path):
            annotations = parse_xml_annotation(xml_path)
            for ann in annotations:
                records.append({
                    'image_id': img_id,
                    'image_path': img_path,
                    'class_name': ann['name'],
                    'xmin': ann['xmin'],
                    'ymin': ann['ymin'],
                    'xmax': ann['xmax'],
                    'ymax': ann['ymax']
                })
    return pd.DataFrame(records)

# Load train/test splits
try:
    with open(TRAIN_TXT_PATH, 'r') as f:
        train_ids = [line.strip() for line in f.readlines()]
    with open(TEST_TXT_PATH, 'r') as f:
        test_ids = [line.strip() for line in f.readlines()]
except FileNotFoundError:
    print("Warning: train.txt or test.txt not found. Using dummy IDs.")
    train_ids, test_ids = [], []

# Create DataFrames
print("Creating DataFrames for Identifier...")
df_identifier_train = create_dataframe(train_ids, SINGLE_CHROMOSOME_PATH)
df_identifier_test = create_dataframe(test_ids, SINGLE_CHROMOSOME_PATH)

print("\nCreating DataFrames for Classifier...")
df_classifier_train = create_dataframe(train_ids, MULTI_CHROMOSOME_PATH)
df_classifier_test = create_dataframe(test_ids, MULTI_CHROMOSOME_PATH)

# ==============================================================================
# Section 3: Module 1 - Identifier (Segmentation Model)
# ==============================================================================

# 3.1. Identifier Data Pipeline (tf.data)
def load_identifier_data(df_row, img_size=(256, 256)):
    """Loads a single image and its corresponding segmentation mask."""
    image_path = df_row['image_path']
    # Load image
    image = tf.io.read_file(image_path)
    image = tf.io.decode_jpeg(image, channels=1) # Grayscale
    original_shape = tf.shape(image)
    image = tf.image.resize(image, img_size)
    image = (tf.cast(image, tf.float32) / 127.5) - 1 # Normalize to [-1, 1]

    # Create mask
    mask = np.zeros((original_shape[0], original_shape[1]), dtype=np.uint8)
    records = df_identifier_train[df_identifier_train['image_path'] == image_path.numpy().decode()]
    for _, row in records.iterrows():
        cv2.rectangle(mask, (row['xmin'], row['ymin']), (row['xmax'], row['ymax']), color=1, thickness=-1)
    
    mask = tf.convert_to_tensor(mask, dtype=tf.float32)
    mask = tf.expand_dims(mask, axis=-1)
    mask = tf.image.resize(mask, img_size, method='nearest')
    return image, mask

def identifier_data_generator(df):
    """Generator function to yield unique image paths for the identifier dataset."""
    unique_images = df.drop_duplicates(subset=['image_path']).reset_index(drop=True)
    for i in range(len(unique_images)):
        yield (unique_images.iloc[i],)

def create_tf_dataset_identifier(df, batch_size=8):
    """Creates a tf.data.Dataset for the identifier model."""
    dataset = tf.data.Dataset.from_generator(
        lambda: identifier_data_generator(df),
        output_signature=(
            tf.TensorSpec(shape=(), dtype=tf.string, name='image_path'),
        )
    )
    # The actual processing function needs to be wrapped in py_function because it uses pandas and cv2
    def process_path(path_tensor):
        path_str = path_tensor.numpy().decode()
        # Create a dummy row dict to pass to the loading function
        row = {'image_path': path_str}
        return load_identifier_data(row)

    dataset = dataset.map(lambda row_tuple: tf.py_function(load_identifier_data, [row_tuple[0]], [tf.float32, tf.float32]),
                          num_parallel_calls=tf.data.AUTOTUNE)
    dataset = dataset.batch(batch_size).prefetch(buffer_size=tf.data.AUTOTUNE)
    return dataset

# 3.2. Identifier Model Architecture (U-Net)
def build_unet(input_shape=(256, 256, 1)):
    """Builds a U-Net model using Keras Functional API."""
    inputs = keras.Input(shape=input_shape)

    # Encoder
    c1 = layers.Conv2D(64, 3, activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(64, 3, activation='relu', padding='same')(c1)
    p1 = layers.MaxPooling2D(2)(c1)

    c2 = layers.Conv2D(128, 3, activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(128, 3, activation='relu', padding='same')(c2)
    p2 = layers.MaxPooling2D(2)(c2)

    c3 = layers.Conv2D(256, 3, activation='relu', padding='same')(p2)
    c3 = layers.Conv2D(256, 3, activation='relu', padding='same')(c3)
    p3 = layers.MaxPooling2D(2)(c3)

    # Bottleneck
    bn = layers.Conv2D(512, 3, activation='relu', padding='same')(p3)
    bn = layers.Conv2D(512, 3, activation='relu', padding='same')(bn)

    # Decoder
    u3 = layers.Conv2DTranspose(256, 2, strides=2, padding='same')(bn)
    u3 = layers.concatenate([u3, c3])
    d3 = layers.Conv2D(256, 3, activation='relu', padding='same')(u3)
    d3 = layers.Conv2D(256, 3, activation='relu', padding='same')(d3)

    u2 = layers.Conv2DTranspose(128, 2, strides=2, padding='same')(d3)
    u2 = layers.concatenate([u2, c2])
    d2 = layers.Conv2D(128, 3, activation='relu', padding='same')(u2)
    d2 = layers.Conv2D(128, 3, activation='relu', padding='same')(d2)

    u1 = layers.Conv2DTranspose(64, 2, strides=2, padding='same')(d2)
    u1 = layers.concatenate([u1, c1])
    d1 = layers.Conv2D(64, 3, activation='relu', padding='same')(u1)
    d1 = layers.Conv2D(64, 3, activation='relu', padding='same')(d1)

    outputs = layers.Conv2D(1, 1, activation='sigmoid')(d1) # Use sigmoid for binary segmentation

    model = keras.Model(inputs, outputs)
    return model

# 3.3. Identifier Visualization
def visualize_segmentation_tf(model, dataset, num_images=3):
    """Visualizes segmentation results for a TensorFlow model."""
    plt.figure(figsize=(12, num_images * 4))
    for i, (image, mask) in enumerate(dataset.take(num_images)):
        pred_mask = model.predict(image)[0]

        plt.subplot(num_images, 3, i * 3 + 1)
        plt.title("Input Image")
        plt.imshow(image[0, :, :, 0] * 0.5 + 0.5, cmap='gray') # Denormalize
        plt.axis("off")

        plt.subplot(num_images, 3, i * 3 + 2)
        plt.title("Ground Truth Mask")
        plt.imshow(mask[0, :, :, 0], cmap='gray')
        plt.axis("off")

        plt.subplot(num_images, 3, i * 3 + 3)
        plt.title("Predicted Mask")
        plt.imshow(pred_mask[:, :, 0] > 0.5, cmap='gray')
        plt.axis("off")
    plt.show()


In [None]:

# ==============================================================================
# Section 4: Module 2 - Classifier
# ==============================================================================

# 4.1. Classifier Data Pipeline (tf.data)
def setup_classifier_data():
    if not df_classifier_train.empty:
        class_names = sorted(df_classifier_train['class_name'].unique())
        if 'X' in class_names: class_names.remove('X')
        if 'Y' in class_names: class_names.remove('Y')
        class_names = sorted(class_names, key=lambda x: int(x) if x.isdigit() else 99)
        if 'X' in df_classifier_train['class_name'].unique(): class_names.append('X')
        if 'Y' in df_classifier_train['class_name'].unique(): class_names.append('Y')
        
        class_to_idx = {name: i for i, name in enumerate(class_names)}
        idx_to_class = {i: name for i, name in enumerate(class_names)}
        num_classes = len(class_names)
        print(f"Number of classes: {num_classes}")
        return num_classes, class_to_idx, idx_to_class
    else:
        print("Classifier DataFrame is empty. Using fallback class count.")
        return 24, {}, {}

def load_classifier_data(row, class_map, img_size=(64, 64)):
    image = Image.open(row['image_path']).convert('L')
    cropped_image = image.crop((row['xmin'], row['ymin'], row['xmax'], row['ymax']))
    cropped_image = cropped_image.resize(img_size)
    image_np = np.array(cropped_image, dtype=np.float32)
    image_np = (image_np / 127.5) - 1 # Normalize to [-1, 1]
    image_np = np.expand_dims(image_np, axis=-1)
    
    label = class_map[row['class_name']]
    return image_np, label

def classifier_data_generator(df, class_map):
    for _, row in df.iterrows():
        yield load_classifier_data(row, class_map)

def create_tf_dataset_classifier(df, class_map, batch_size=32):
    dataset = tf.data.Dataset.from_generator(
        lambda: classifier_data_generator(df, class_map),
        output_signature=(
            tf.TensorSpec(shape=(64, 64, 1), dtype=tf.float32),
            tf.TensorSpec(shape=(), dtype=tf.int32)
        )
    )
    return dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

# 4.2. Classifier Model Architecture (CNN)
def build_classifier(num_classes, input_shape=(64, 64, 1)):
    """Builds a simple CNN for classification."""
    model = keras.Sequential([
        keras.Input(shape=input_shape),
        layers.Conv2D(16, 3, padding='same', activation='relu'),
        layers.MaxPooling2D(),
        layers.Conv2D(32, 3, padding='same', activation='relu'),
        layers.MaxPooling2D(),
        layers.Conv2D(64, 3, padding='same', activation='relu'),
        layers.MaxPooling2D(),
        layers.Flatten(),
        layers.Dense(512, activation='relu'),
        layers.Dropout(0.5),
        layers.Dense(num_classes) # Logits output
    ])
    return model

# ==============================================================================
# Section 5 & 6: Pipeline Integration and Evaluation
# ==============================================================================
def karyotype_pipeline_tf(image_path, identifier, classifier, class_map):
    """Runs the full end-to-end pipeline using TensorFlow models."""
    # 1. Load image and preprocess for identifier
    img_orig = Image.open(image_path).convert('L')
    img_arr = np.array(img_orig)
    img_tensor = tf.convert_to_tensor(img_arr, dtype=tf.float32)
    img_tensor = tf.expand_dims(img_tensor, axis=-1)
    img_resized = tf.image.resize(img_tensor, (256, 256))
    img_normalized = (img_resized / 127.5) - 1
    
    # 2. Get segmentation mask
    mask_pred = identifier.predict(tf.expand_dims(img_normalized, axis=0))[0]
    mask_binary = (mask_pred > 0.5).astype(np.uint8)

    # 3. Find contours
    contours, _ = cv2.findContours(mask_binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    results = []
    original_h, original_w = img_arr.shape
    mask_h, mask_w, _ = mask_binary.shape
    
    for cnt in contours:
        x, y, w, h = cv2.boundingRect(cnt)
        if w * h < 20: continue

        # 4. Scale bbox and crop for classifier
        orig_x = int(x * original_w / mask_w)
        orig_y = int(y * original_h / mask_h)
        orig_w = int(w * original_w / mask_w)
        orig_h = int(h * original_h / mask_h)
        
        crop = img_orig.crop((orig_x, orig_y, orig_x + orig_w, orig_y + orig_h))
        crop_resized = crop.resize((64, 64))
        crop_arr = np.array(crop_resized, dtype=np.float32)
        crop_arr = (crop_arr / 127.5) - 1
        crop_tensor = tf.expand_dims(tf.expand_dims(crop_arr, axis=-1), axis=0)

        # 5. Classify chromosome
        logits = classifier.predict(crop_tensor)
        probs = tf.nn.softmax(logits, axis=-1).numpy().flatten()
        pred_idx = np.argmax(probs)
        confidence = probs[pred_idx]
        pred_class = class_map[pred_idx]

        results.append({
            'bbox': [orig_x, orig_y, orig_x + orig_w, orig_y + orig_h],
            'class_name': pred_class,
            'confidence': confidence,
            'all_probs': probs
        })
    return results

def iou(boxA, boxB):
    """Calculate Intersection over Union (IoU) of two bounding boxes."""
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    interArea = max(0, xB - xA + 1) * max(0, yB - yA + 1)
    boxAArea = (boxA[2] - boxA[0] + 1) * (boxA[3] - boxA[1] + 1)
    boxBArea = (boxB[2] - boxB[0] + 1) * (boxB[3] - boxB[1] + 1)
    return interArea / float(boxAArea + boxBArea - interArea)

def evaluate_auprc_tf(test_df, identifier, classifier, class_to_idx, idx_to_class, iou_thresh=0.5):
    """Evaluates the full pipeline and computes auPRC."""
    y_true, y_scores = [], []
    num_classes = len(class_to_idx)
    
    unique_test_images = test_df.drop_duplicates(subset=['image_id'])
    for _, row in tqdm(unique_test_images.iterrows(), total=len(unique_test_images), desc="Evaluating auPRC"):
        predictions = karyotype_pipeline_tf(row['image_path'], identifier, classifier, idx_to_class)
        
        gt_rows = test_df[test_df['image_id'] == row['image_id']]
        gt_boxes = [list(r[['xmin', 'ymin', 'xmax', 'ymax']]) for _, r in gt_rows.iterrows()]
        gt_labels = [class_to_idx[name] for name in gt_rows['class_name']]

        for pred in predictions:
            best_iou, best_gt_idx = 0, -1
            for i, gt_box in enumerate(gt_boxes):
                current_iou = iou(pred['bbox'], gt_box)
                if current_iou > best_iou:
                    best_iou, best_gt_idx = current_iou, i
            
            pred_label_idx = class_to_idx[pred['class_name']]
            
            for c in range(num_classes):
                score = pred['all_probs'][c]
                if c == pred_label_idx:
                    match = 1 if best_iou >= iou_thresh and best_gt_idx != -1 and pred_label_idx == gt_labels[best_gt_idx] else 0
                    y_true.append(match)
                    y_scores.append(score)

    precision, recall, _ = precision_recall_curve(y_true, y_scores)
    auprc = auc(recall, precision)
    return precision, recall, auprc

# ==============================================================================
# Main Execution Block
# ==============================================================================
if __name__ == '__main__':
    # --- Identifier Training ---
    if not df_identifier_train.empty:
        train_ds_id = create_tf_dataset_identifier(df_identifier_train)
        test_ds_id = create_tf_dataset_identifier(df_identifier_test)
        
        identifier_model = build_unet()
        identifier_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
        print("\n--- Training Identifier Model ---")
        identifier_model.fit(train_ds_id, validation_data=test_ds_id, epochs=1) # Use more epochs for real training
        identifier_model.save('identifier_model.keras')
        
        print("\n--- Visualizing Identifier Results ---")
        visualize_segmentation_tf(identifier_model, test_ds_id)
    else:
        print("Identifier DataFrames are empty. Skipping Identifier training.")

    # --- Classifier Training ---
    NUM_CLASSES, CLASS_TO_IDX, IDX_TO_CLASS = setup_classifier_data()
    if not df_classifier_train.empty:
        train_ds_cl = create_tf_dataset_classifier(df_classifier_train, CLASS_TO_IDX)
        test_ds_cl = create_tf_dataset_classifier(df_classifier_test, CLASS_TO_IDX)

        classifier_model = build_classifier(NUM_CLASSES)
        classifier_model.compile(optimizer='adam',
                                 loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                                 metrics=['accuracy'])
        print("\n--- Training Classifier Model ---")
        classifier_model.fit(train_ds_cl, validation_data=test_ds_cl, epochs=1) # Use more epochs for real training
        classifier_model.save('classifier_model.keras')
    else:
        print("Classifier DataFrames are empty. Skipping Classifier training.")

    # --- Full Pipeline Execution and Evaluation ---
    print("\n--- Running Full Pipeline and Evaluation ---")
    try:
        # Load models if not in memory
        if 'identifier_model' not in locals():
            identifier_model = keras.models.load_model('identifier_model.keras')
        if 'classifier_model' not in locals():
            classifier_model = keras.models.load_model('classifier_model.keras')

        if not df_classifier_test.empty:
            precision, recall, auprc = evaluate_auprc_tf(df_classifier_test, identifier_model, classifier_model, CLASS_TO_IDX, IDX_TO_CLASS)
            print(f"Final auPRC: {auprc:.4f}")

            plt.figure(figsize=(8, 6))
            plt.plot(recall, precision, marker='.', label=f'auPRC = {auprc:.3f}')
            plt.title('Precision-Recall Curve (TensorFlow)')
            plt.xlabel('Recall')
            plt.ylabel('Precision')
            plt.legend()
            plt.grid(True)
            plt.show()
    except (NameError, FileNotFoundError) as e:
        print(f"Could not run evaluation: {e}")