In [None]:
import os
import cv2
import numpy as np
import pandas as pd
from pathlib import Path
import glob
from skimage.transform import resize
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import random
from PIL import Image

In [None]:
def adjust_yolov8_folder_for_preprocessing_with_images(
    input_label_dir,
    input_image_dir,
    output_label_dir
):
    """
    Adjusts YOLOv8 label files in a folder based on preprocessing
    (resize and crop), using the corresponding image to detect original shape.
    
    Args:
        input_label_dir (str): Path to original YOLOv8 .txt files.
        input_image_dir (str): Folder containing corresponding images.
        output_label_dir (str): Destination path to save adjusted label files.
    """

    target_resize_shape = (512, 512)
    crop_y_start = 96
    crop_x_start = 48
    crop_x_end_offset = -48
    final_width = target_resize_shape[1] - crop_x_start - abs(crop_x_end_offset)
    final_height = target_resize_shape[0] - crop_y_start

    os.makedirs(output_label_dir, exist_ok=True)
    label_files = glob.glob(os.path.join(input_label_dir, "*.txt"))

    for label_path in label_files:
        base_name = os.path.splitext(os.path.basename(label_path))[0]
        for ext in [".png", ".jpg", ".jpeg"]:
            image_path = os.path.join(input_image_dir, 'preprocess_' + base_name + ext)
            print(image_path)
            if os.path.exists(image_path):
                break
        else:
            print(f"Image not found for label: {base_name}")
            continue

        img = cv2.imread(image_path)
        if img is None:
            print(f"Failed to read image: {image_path}")
            continue
        orig_h, orig_w = img.shape[:2]

        adjusted_lines = []
        with open(label_path, 'r') as f:
            for line in f:
                parts = line.strip().split()
                if len(parts) != 5:
                    continue
                class_id, x_center, y_center, width, height = map(float, parts)

                x_center_px = x_center * orig_w
                y_center_px = y_center * orig_h
                width_px = width * orig_w
                height_px = height * orig_h

                x_center_rs = x_center_px * (target_resize_shape[1] / orig_w)
                y_center_rs = y_center_px * (target_resize_shape[0] / orig_h)
                width_rs = width_px * (target_resize_shape[1] / orig_w)
                height_rs = height_px * (target_resize_shape[0] / orig_h)

                x_left = x_center_rs - width_rs / 2
                y_top = y_center_rs - height_rs / 2

                x_left_crop = x_left - crop_x_start
                y_top_crop = y_top - crop_y_start

                if x_left_crop < 0 or y_top_crop < 0 or \
                   x_left_crop + width_rs > final_width or \
                   y_top_crop + height_rs > final_height:
                    continue
                
                x_center_crop = x_left_crop + width_rs / 2
                y_center_crop = y_top_crop + height_rs / 2

                x_center_final = x_center_crop / final_width
                y_center_final = y_center_crop / final_height
                width_final = width_rs / final_width
                height_final = height_rs / final_height

                if 0 <= x_center_final <= 1 and 0 <= y_center_final <= 1:
                    adjusted_line = f"{int(class_id)} {x_center_final:.6f} {y_center_final:.6f} {width_final:.6f} {height_final:.6f}"
                    adjusted_lines.append(adjusted_line)

        save_path = os.path.join(output_label_dir, os.path.basename(label_path))
        with open(save_path, 'w') as f_out:
            f_out.write("\n".join(adjusted_lines))
        print(f"Adjusted: {label_path} → {save_path}")


def normalize_image(image_slice_np):
    slice_float = image_slice_np.astype(np.float32)
    min_val, max_val = np.min(slice_float), np.max(slice_float)
    if max_val > min_val:
        normalized_slice = (slice_float - min_val) / (max_val - min_val)
    else:
        normalized_slice = np.zeros_like(slice_float)
    return normalized_slice

def preprocess_and_crop_image_for_inference(image_path, pre_crop_shape=(512, 512), final_shape=(416, 416)):
    try:
        img_np = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
        if img_np is None:
            print(f"Warning: Could not read image {image_path}. Skipping.")
            return None
        
        resized_slice = resize(img_np, pre_crop_shape, order=3, preserve_range=True, anti_aliasing=True)
        normalized_slice = normalize_image(resized_slice)
        
        cropped_slice = normalized_slice[96:, 48:-48]
        
        if cropped_slice.shape != final_shape:
            print(f"Warning: Cropped shape {cropped_slice.shape} for {image_path} != expected {final_shape}.")
            cropped_slice = resize(cropped_slice, final_shape, order=3, preserve_range=True, anti_aliasing=True)
        
        final_tensor = np.expand_dims(cropped_slice, axis=-1) 
        final_tensor = np.expand_dims(final_tensor, axis=0)
        return final_tensor.astype(np.float32)
    except Exception as e:
        print(f"Error preprocessing {image_path}: {e}")
        return None

def preprocess_images_in_folder(image_dir, output_dir):
    os.makedirs(output_dir, exist_ok=True)

    for subfolder in os.listdir(image_dir):
        subfolder_path = os.path.join(image_dir, subfolder)
        
        if os.path.isdir(subfolder_path):
            print(f"Processing folder: {subfolder}")

            image_files = glob.glob(os.path.join(subfolder_path, '*.jpg')) + \
                          glob.glob(os.path.join(subfolder_path, '*.jpeg')) + \
                          glob.glob(os.path.join(subfolder_path, '*.png'))

            if not image_files:
                print(f"No images found in folder {subfolder}. Skipping.")
                continue

            for img_path in image_files:
                output_subfolder = os.path.join(output_dir, subfolder)
                os.makedirs(output_subfolder, exist_ok=True)

                for img_path in image_files:
                    input_tensor = preprocess_and_crop_image_for_inference(img_path)
                    if input_tensor is None:
                        continue

                    try:
                        base_filename = os.path.basename(img_path)
                        output_path = os.path.join(output_subfolder, f"preprocessed_{base_filename}")
                        
                        image_to_save = np.squeeze(input_tensor)

                        image_to_save_uint8 = (image_to_save * 255.0).astype(np.uint8)

                        cv2.imwrite(output_path, image_to_save_uint8)

                    except Exception as e:
                        print(f"Error saving preprocessed image for {img_path}: {e}")


    print("Finished processing all images.")

def extract_id_from_filename(filename, source):
    """Extracts ID from filenames based on the source (image, mask, label)."""
    name = Path(filename).stem
    if source == 'image' or source == 'mask':
        return name.split('_')[1].upper()
    elif source == 'label':
        return name.replace('-', '_').split('_')[0].upper()
    else:
        raise ValueError("Source must be one of ['image', 'mask', 'label']")

def find_bounding_boxes_contours(mask, min_area_threshold=100, padding=10):
    if mask is None or mask.size == 0:
        return []
    if mask.dtype != np.uint8:
        mask = mask.astype(np.uint8)
    if mask.ndim == 3 and mask.shape[-1] == 1:
        mask_2d = mask.squeeze(axis=-1)
    elif mask.ndim == 2:
        mask_2d = mask
    else:
        return []
    contours, _ = cv2.findContours(mask_2d, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    boxes = []
    img_h, img_w = mask_2d.shape[:2]
    for contour in contours:
        area = cv2.contourArea(contour)
        if area >= min_area_threshold:
            x, y, w, h = cv2.boundingRect(contour)
            pad_x = padding // 2
            pad_y = padding // 2
            x1p = max(0, x - pad_x)
            y1p = max(0, y - pad_y)
            x2_padded = min(img_w, (x + w) + (padding - pad_x))
            y2_padded = min(img_h, (y + h) + (padding - pad_y))
            final_w = x2_padded - x1p
            final_h = y2_padded - y1p
            if final_w > 0 and final_h > 0:
                boxes.append((x1p, y1p, final_w, final_h))
    boxes.sort(key=lambda b: (b[1], b[0]))
    return boxes

def boxes_overlap(boxA, boxB, threshold=0.1):
    """Check if two boxes overlap using IoU."""
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[0] + boxA[2], boxB[0] + boxB[2])
    yB = min(boxA[1] + boxA[3], boxB[1] + boxB[3])
    interArea = max(0, xB - xA) * max(0, yB - yA)
    if interArea == 0:
        return False
    boxAArea = boxA[2] * boxA[3]
    boxBArea = boxB[2] * boxB[3]
    iou = interArea / float(boxAArea + boxBArea - interArea)
    return iou > threshold

def load_yolo_boxes(label_path, image_shape):
    h, w = image_shape[:2]
    boxes = []
    if not os.path.exists(label_path):
        return boxes
    with open(label_path, 'r') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) != 5:
                continue
            class_id, xc, yc, bw, bh = map(float, parts)
            x = int((xc - bw / 2) * w)
            y = int((yc - bh / 2) * h)
            width = int(bw * w)
            height = int(bh * h)
            boxes.append((x, y, width, height, int(class_id)))
    return boxes

def get_middle_images_indices(num_images, middle_count=5):
    """Get the indices for the middle 'n' images."""
    if num_images < middle_count:
        return None
    
    middle_start = (num_images - middle_count) // 2
    middle_end = middle_start + middle_count
    return list(range(middle_start, middle_end))

def process_all(image_dir, label_dir, mask_dir, series_dir):
    results = []
    for folder in Path(series_dir).iterdir():
        if not folder.is_dir():
            continue
        
        image_id = folder.name.upper()
        image_files = sorted(list(folder.glob("*.jpg")))
        num_images = len(image_files)
        
        middle_indices = get_middle_images_indices(num_images)
        if middle_indices is None:
            continue
        
        mask_path = next(Path(mask_dir).glob(f"*_{image_id}_*.png"), None)
        if not mask_path or not mask_path.exists():
            continue
        mask = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)
        if mask is None:
            continue

        gt_boxes = find_bounding_boxes_contours(mask)
        gt_boxes = list({(x, y, w, h) for (x, y, w, h) in gt_boxes})

        label_file = next(Path(label_dir).glob(f"{image_id.replace('_', '-')}_*.txt"), None)
        if not label_file:
            continue
        sample_image = cv2.imread(str(image_files[middle_indices[len(middle_indices) // 2]]))
        if sample_image is None:
            continue
        yolo_boxes = load_yolo_boxes(str(label_file), sample_image.shape)

        for gt_box in gt_boxes:
            for yolo_box in yolo_boxes:
                if boxes_overlap(gt_box, yolo_box[:4]):
                    cropped_images = []
                    for index in middle_indices:
                        image = cv2.imread(str(image_files[index]))
                        x, y, w, h = gt_box
                        x-=5
                        y-=5
                        w+=10
                        h+=10
                        cropped = image[y:y+h, x:x+w]
                        cropped_images.append(cropped)

                    label = 0 if yolo_box[4] % 2 == 0 else 1
                    results.append({
                        "ID": image_id,
                        "IMG1": cropped_images[0],
                        "IMG2": cropped_images[1],
                        "IMG3": cropped_images[2],
                        "IMG4": cropped_images[3],
                        "IMG5": cropped_images[4],
                        "Label": label
                    })
    return pd.DataFrame(results)

def save_cropped_images_and_labels(df, output_dir):
    """
    Saves cropped images from columns IMG1–IMG5 using filename pattern: ID_index_IMGx.jpg,
    and writes a CSV mapping filenames to labels.

    Args:
        df (pd.DataFrame): DataFrame with columns 'ID', 'Label', 'IMG1'..'IMG5'
        output_dir (str): Directory where images and labels.csv will be saved
    """
    os.makedirs(output_dir, exist_ok=True)
    labels_list = []

    for idx, row in df.iterrows():
        image_id = row["ID"]
        label = row["Label"]

        for i in range(1, 6):
            img_col = f"IMG{i}"
            image = row[img_col]
            if image is None or image.size == 0:
                continue

            filename = f"{image_id}_{idx+1}_IMG{i}.jpg"
            filepath = os.path.join(output_dir, filename)

            cv2.imwrite(filepath, image)

            labels_list.append({
                "filename": filename,
                "label": label
            })

    labels_df = pd.DataFrame(labels_list)
    labels_df.to_csv(os.path.join(output_dir, "labels.csv"), index=False)
    print(f"Saved {len(labels_list)} images and labels to {output_dir}")

In [None]:
image_dir = 'C:/Users/danaa/Downloads/JUH_segmentation_output/train_preprocessed'
label_dir = 'C:/Users/danaa/Desktop/uni/GP 2/Full Annotated Data/train/labels'
mask_dir = 'C:/Users/danaa/Downloads/JUH_segmentation_output/train_masks'

adjust_yolov8_folder_for_preprocessing_with_images(
    input_label_dir=label_dir,
    input_image_dir=image_dir,
    output_label_dir="C:/Users/danaa/Downloads/adjusted labels"
)

In [None]:
image_directory = "C:/Users/danaa/Desktop/Sagittal"
output_directory = "C:/Users/danaa/Desktop/preprocessed series"

preprocess_images_in_folder(image_directory, output_directory)

In [None]:
series_dir = 'C:/Users/danaa/Desktop/preprocessed series'
label_dir = "C:/Users/danaa/Downloads/adjusted labels"
image_dir = 'C:/Users/danaa/Downloads/JUH_segmentation_output/train_preprocessed'
mask_dir = 'C:/Users/danaa/Downloads/JUH_segmentation_output/train_masks'

df = process_all(image_dir, label_dir, mask_dir, series_dir)

In [None]:
output_path = "seg cropped"
save_cropped_images_and_labels(df, output_path)

Saved 6700 images and labels to seg cropped


preprocessing

In [None]:
def read_image_paths_and_labels(path):
    image_files = [f for f in os.listdir(path) if f.endswith('.jpg')]
    labels_file = os.path.join(path, "labels.csv")
    labels_df = pd.read_csv(labels_file)

    label_dict = dict(zip(labels_df['filename'], labels_df['label']))

    id_groups = {}

    for img_file in image_files:
        id_num = '_'.join(img_file.split('_')[:2])

        if id_num not in id_groups:
            id_groups[id_num] = {}

        img_idx = int(img_file.split('_')[2].split('.')[0].replace('IMG', ''))
        img_path = os.path.join(path, img_file)

        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)

        id_groups[id_num][f"IMG{img_idx}"] = img

    data = []
    for id_num, images in id_groups.items():
        row = {'ID': id_num, 'Label': label_dict.get(f"{id_num}_IMG1.jpg", None)}
        row.update(images)
        data.append(row)

    df = pd.DataFrame(data)

    return df

def visualize_image(image):
    """
    Visualizes a 2D numpy array image.

    Args:
        image (numpy.ndarray): 2D numpy array representing the image to be visualized.

    Returns:
        None
    """

    plt.imshow(image, cmap='gray')
    plt.axis('off')
    plt.show()

def preprocess(df):
    processed_data = []

    for _, row in df.iterrows():
        ID = row["ID"]
        Label = row["Label"]
        processed_images = [img / 255.0 for img in row[2:]]
        processed_images = [cv2.resize(img, (80, 40)) for img in processed_images]
        processed_data.append([ID] + [Label] + processed_images)

    return pd.DataFrame(processed_data, columns=df.columns)



def split_df_by_patient(df, test_size=0.2, val_size=0.2, random_state=42):
    df['PatientID'] = df['ID'].apply(lambda x: x.replace('-', '_').split('_')[0])
    unique_patients = df['PatientID'].unique()

    train_patients, test_patients = train_test_split(
        unique_patients, test_size=test_size, random_state=random_state
    )

    train_patients_final, val_patients = train_test_split(
        train_patients, test_size=val_size, random_state=random_state
    )

    train_df = df[df['PatientID'].isin(train_patients_final)].reset_index(drop=True)
    val_df = df[df['PatientID'].isin(val_patients)].reset_index(drop=True)
    test_df = df[df['PatientID'].isin(test_patients)].reset_index(drop=True)
    
    return train_df, val_df, test_df

def rotate(img, angle):
    (h, w) = img.shape[:2]
    center = (w // 2, h // 2)
    rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
    rotated_image = cv2.warpAffine(img, rotation_matrix, (w, h))
    return rotated_image

def add_noise(image, mean=0, sigma=0.01):
    noise = np.random.normal(mean, sigma, image.shape).astype('float64')
    noisy_image = cv2.add(image, noise)
    return noisy_image

def horizontal_flip(image):
    flipped_image = cv2.flip(image, 1)
    return flipped_image


def augmentation(df):

    rotations_r = pd.DataFrame(columns=['ID', 'Label', 'IMG1', 'IMG2', 'IMG3', 'IMG4', 'IMG5'])
    rotations_l = pd.DataFrame(columns=['ID', 'Label', 'IMG1', 'IMG2', 'IMG3', 'IMG4', 'IMG5'])
    noise = pd.DataFrame(columns=['ID', 'Label', 'IMG1', 'IMG2', 'IMG3', 'IMG4', 'IMG5'])
    flip = pd.DataFrame(columns=['ID', 'Label', 'IMG1', 'IMG2', 'IMG3', 'IMG4', 'IMG5'])

    for _,i in df.iterrows():

        if i['Label']==0:
            continue

        angle = random.randint(5, 15)
        rotations_r.loc[len(rotations_r)] = [i['ID'], i['Label'], 
                                               rotate(i['IMG1'], angle), rotate(i['IMG2'], angle), rotate(i['IMG3'], angle),
                                               rotate(i['IMG4'], angle), rotate(i['IMG5'], angle)] 
        
    for _,i in df.iterrows():

        flip.loc[len(flip)] = [i['ID'], i['Label'], 
                                               horizontal_flip(i['IMG1']), horizontal_flip(i['IMG2']), horizontal_flip(i['IMG3']),
                                               horizontal_flip(i['IMG4']), horizontal_flip(i['IMG5'])] 
        
        
    for _,i in df.iterrows():


        angle = random.randint(-15, -5)
        rotations_l.loc[len(rotations_l)] = [i['ID'], i['Label'], 
                                            rotate(i['IMG1'], angle), rotate(i['IMG2'], angle), rotate(i['IMG3'], angle),
                                            rotate(i['IMG4'], angle), rotate(i['IMG5'], angle)] 
        

    for _,i in df.iterrows():

        if i['Label']==0:
            continue

        noise.loc[len(noise)] = [i['ID'], i['Label'], 
                                add_noise(i['IMG1'], 0, 0.02), add_noise(i['IMG2'], 0, 0.02), add_noise(i['IMG3'], 0, 0.02),
                                add_noise(i['IMG4'], 0, 0.02), add_noise(i['IMG5'], 0, 0.02)] 
        
    for _,i in df.iterrows():

        noise.loc[len(noise)] = [i['ID'], i['Label'], 
                                add_noise(i['IMG1']), add_noise(i['IMG2']), add_noise(i['IMG3']),
                                add_noise(i['IMG4']), add_noise(i['IMG5'])] 
        

    df = pd.concat([df, rotations_l], axis=0, ignore_index=True)
    df = pd.concat([df, rotations_r], axis=0, ignore_index=True)
    df = pd.concat([df, noise], axis=0, ignore_index=True)
    df = pd.concat([df, flip], axis=0, ignore_index=True)

    
    return df

In [None]:
cropped_path = "seg cropped"
df = read_image_paths_and_labels(cropped_path)

print(len(df))
visualize_image(df.iloc[100]['IMG1'])

In [None]:
df = preprocess(df)

visualize_image(df.iloc[100]['IMG1'])

In [None]:
train_df, val_df, test_df = split_df_by_patient(df)

print(len(train_df))

In [None]:
train_df = augmentation(train_df)
val_df = augmentation(val_df)
test_df = augmentation(test_df)

print(len(train_df))
print(train_df['Label'].value_counts())

In [None]:
save_dir = "C:/Users/danaa/Desktop/Full Data/train"
os.makedirs(save_dir, exist_ok=True)

id_counts = {}
unique_ids = []

for original_id in train_df['ID']:
    count = id_counts.get(original_id, 0) + 1
    id_counts[original_id] = count
    unique_ids.append(f"{original_id}_{count}")

train_df['Unique_ID'] = unique_ids

label_records = []
img_cols = ['IMG1', 'IMG2', 'IMG3', 'IMG4', 'IMG5']

def convert_to_uint8(array):
    if array.dtype != np.uint8:
        array = np.clip(array, 0, 1) * 255
        array = array.astype(np.uint8)
    return array

for idx, row in train_df.iterrows():
    uid = row['Unique_ID']
    label = row['Label']
    for col in img_cols:
        img_array = row[col]
        if isinstance(img_array, np.ndarray):
            img_array = convert_to_uint8(img_array)
            image = Image.fromarray(img_array)
            filename = f"{uid}_{col}.jpg"
            image.save(os.path.join(save_dir, filename))
            label_records.append({'filename': filename, 'label': label})

label_df = pd.DataFrame(label_records)
label_df.to_csv(os.path.join(save_dir, "labels.csv"), index=False)

In [None]:
save_dir = "C:/Users/danaa/Desktop/Full Data/test"
os.makedirs(save_dir, exist_ok=True)

id_counts = {}
unique_ids = []

for original_id in test_df['ID']:
    count = id_counts.get(original_id, 0) + 1
    id_counts[original_id] = count
    unique_ids.append(f"{original_id}_{count}")

test_df['Unique_ID'] = unique_ids

label_records = []
img_cols = ['IMG1', 'IMG2', 'IMG3', 'IMG4', 'IMG5']

def convert_to_uint8(array):
    if array.dtype != np.uint8:
        array = np.clip(array, 0, 1) * 255
        array = array.astype(np.uint8)
    return array

for idx, row in test_df.iterrows():
    uid = row['Unique_ID']
    label = row['Label']
    for col in img_cols:
        img_array = row[col]
        if isinstance(img_array, np.ndarray):
            img_array = convert_to_uint8(img_array)
            image = Image.fromarray(img_array)
            filename = f"{uid}_{col}.jpg"
            image.save(os.path.join(save_dir, filename))
            label_records.append({'filename': filename, 'label': label})

label_df = pd.DataFrame(label_records)
label_df.to_csv(os.path.join(save_dir, "labels.csv"), index=False)

In [None]:
save_dir = "C:/Users/danaa/Desktop/Full Data/val"
os.makedirs(save_dir, exist_ok=True)

id_counts = {}
unique_ids = []

for original_id in val_df['ID']:
    count = id_counts.get(original_id, 0) + 1
    id_counts[original_id] = count
    unique_ids.append(f"{original_id}_{count}")

val_df['Unique_ID'] = unique_ids

label_records = []
img_cols = ['IMG1', 'IMG2', 'IMG3', 'IMG4', 'IMG5']

def convert_to_uint8(array):
    if array.dtype != np.uint8:
        array = np.clip(array, 0, 1) * 255
        array = array.astype(np.uint8)
    return array

for idx, row in val_df.iterrows():
    uid = row['Unique_ID']
    label = row['Label']
    for col in img_cols:
        img_array = row[col]
        if isinstance(img_array, np.ndarray):
            img_array = convert_to_uint8(img_array)
            image = Image.fromarray(img_array)
            filename = f"{uid}_{col}.jpg"
            image.save(os.path.join(save_dir, filename))
            label_records.append({'filename': filename, 'label': label})

label_df = pd.DataFrame(label_records)
label_df.to_csv(os.path.join(save_dir, "labels.csv"), index=False)