CBIS-DDSM-breast

In [None]:
import os
import pandas as pd
import numpy as np
import cv2
from tqdm import tqdm
import pydicom as pdcm

def np_CountUpContinuingOnes(b_arr):
    # Calculate indices for continuing zeros from left side
    left = np.arange(len(b_arr))
    left[b_arr > 0] = 0
    left = np.maximum.accumulate(left)

    # Calculate indices from right side
    rev_arr = b_arr[::-1]
    right = np.arange(len(rev_arr))
    right[rev_arr > 0] = 0
    right = np.maximum.accumulate(right)
    right = len(rev_arr) - 1 - right[::-1]

    return right - left - 1

def ExtractBreast(img):
    img_copy = img.copy()
    img = np.where(img <= 40, 0, img)
    height, _ = img.shape

    y_a = height // 2 + int(height * 0.4)
    y_b = height // 2 - int(height * 0.4)
    b_arr = img[y_b:y_a].std(axis=0) != 0
    continuing_ones = np_CountUpContinuingOnes(b_arr)
    col_ind = np.where(continuing_ones == continuing_ones.max())[0]
    img = img[:, col_ind]

    _, width = img.shape
    x_a = width // 2 + int(width * 0.4)
    x_b = width // 2 - int(width * 0.4)
    b_arr = img[:, x_b:x_a].std(axis=1) != 0
    continuing_ones = np_CountUpContinuingOnes(b_arr)
    row_ind = np.where(continuing_ones == continuing_ones.max())[0]

    return img_copy[row_ind][:, col_ind]


dataset_dir = "/Volumes/图图/CBIS-DDSM_kaggle"
df_dicom_info = pd.read_csv(f'{dataset_dir}/csv/dicom_info.csv')
df_dicom_info['image_path'] = df_dicom_info['image_path'].apply(lambda x: x.replace('CBIS-DDSM', dataset_dir))


description_files = [
    f"{dataset_dir}/csv/mass_case_description_train_set.csv",
    f"{dataset_dir}/csv/mass_case_description_test_set.csv",
    f"{dataset_dir}/csv/calc_case_description_train_set.csv",
    f"{dataset_dir}/csv/calc_case_description_test_set.csv"
]

additional_dfs = []
for file in description_files:
    df = pd.read_csv(file)
    df.rename(columns={'breast density': 'breast_density', 'breast_density': 'breast_density'}, inplace=True)
    additional_dfs.append(df)
additional_df = pd.concat(additional_dfs, ignore_index=True)
image_data = []

for index, row in tqdm(additional_df.iterrows(), total=len(additional_df)):
    patient_id = row['image file path'].split('/')[0]
    img_row = df_dicom_info[df_dicom_info['image_path'].apply(lambda x: x.split('/')[-2]) == row['image file path'].split('/')[-2]]
    
    if img_row.empty:
        print(f"No image info for patient_id: {patient_id}")
        continue

    img_path = img_row['image_path'].values[0]
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    
    if img is None:
        print(f"Failed to read image: {img_path}")
        continue
    
    img = ExtractBreast(img)

    info_dict = {
        "patient_id": patient_id,
        "img_path": img_path,
    }
    
    image_data.append(info_dict)


image_df = pd.DataFrame(image_data)

def save_images_and_info(df, output_base_dir):
    for _, row in df.iterrows():
        patient_id = row['patient_id']
        output_dir = os.path.join(output_base_dir, patient_id)
        os.makedirs(output_dir, exist_ok=True)
        

        img = cv2.imread(row['img_path'], cv2.IMREAD_GRAYSCALE)
        img = ExtractBreast(img)
        img_output_path = os.path.join(output_dir, 'img.jpg')
        cv2.imwrite(img_output_path, cv2.normalize(img, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8))
        
        print(f"Saved {patient_id} images to {output_dir}")


output_base_dir = "Benchmark/CBIS-DDSM-breast"
save_images_and_info(image_df, output_base_dir)

print("Processing complete.")

CBIS-DDSM-finding

In [None]:
import os
import pandas as pd
import numpy as np
import cv2
from tqdm import tqdm
import pydicom as pdcm


dataset_dir = "/Volumes/图图/CBIS-DDSM_kaggle"
df = pd.read_csv(f'{dataset_dir}/csv/dicom_info.csv')
df['image_path'] = df['image_path'].apply(lambda x: x.replace('CBIS-DDSM', dataset_dir))


description_files = [
    f"{dataset_dir}/csv/mass_case_description_train_set.csv",
    f"{dataset_dir}/csv/mass_case_description_test_set.csv",
    f"{dataset_dir}/csv/calc_case_description_train_set.csv",
    f"{dataset_dir}/csv/calc_case_description_test_set.csv"
]
additional_dfs = [pd.read_csv(file) for file in description_files]
additional_df = pd.concat(additional_dfs, ignore_index=True)


all_data = []
for index, row in tqdm(additional_df.iterrows(), total=len(additional_df)):
    patient_id = row['cropped image file path'].split('/')[0]
    img_row = df[df['image_path'].apply(lambda x: x.split('/')[-2]) == row['cropped image file path'].split('/')[-2]]
    
    if img_row.empty:
        print(f"No image info for patient_id: {patient_id}")
        continue

    try:
        img_path = img_row[img_row['SeriesDescription'] == 'cropped images']['image_path'].values[0]
    except:
        print(f"Multiple or no entries found for patient_id: {patient_id}")
        continue


    all_data.append({
        "patient_id": patient_id,
        "img_path": img_path,
    })


all_data_df = pd.DataFrame(all_data)

def save_images_and_info(df, output_base_dir):
    for _, row in df.iterrows():
        patient_id = row['patient_id']
        output_dir = os.path.join(output_base_dir, patient_id)
        os.makedirs(output_dir, exist_ok=True)


        img = cv2.imread(row['img_path'], cv2.IMREAD_GRAYSCALE)
        if img is None:
            print(f"Failed to read image: {row['img_path']}")
            continue
        
        img_output_path = os.path.join(output_dir, 'img.jpg')
        cv2.imwrite(img_output_path, cv2.normalize(img, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8))
       
        print(f'Saved {patient_id} image to {output_dir}')


output_base_dir = "Benchmark/CBIS-DDSM-finding"
save_images_and_info(all_data_df, os.path.join(output_base_dir, 'All_Data'))

print("Processing complete.")

generate normal data

In [None]:
import os
import cv2
import numpy as np
import random

def find_cropped_image_position(full_image, cropped_image):
    res = cv2.matchTemplate(full_image, cropped_image, cv2.TM_CCOEFF_NORMED)
    _, _, _, max_loc = cv2.minMaxLoc(res)
    top_left = max_loc
    bottom_right = (top_left[0] + cropped_image.shape[1], top_left[1] + cropped_image.shape[0])
    return (*top_left, *bottom_right)

def random_crop_outside_bboxes(full_image, bboxes, crop_size, num_crops=1):
    height, width = full_image.shape[:2]
    cropped_images = []
    
    def is_inside_any_bbox(x, y):
        for (x1, y1, x2, y2) in bboxes:
            if x1 <= x < x2 and y1 <= y < y2:
                return True
        return False
    
    for _ in range(num_crops):
        while True:
            x = random.randint(0, width - crop_size[1])
            y = random.randint(0, height - crop_size[0])
            if not is_inside_any_bbox(x, y):
                cropped_img = full_image[y:y + crop_size[0], x:x + crop_size[1]]
                cropped_images.append(cropped_img)
                break
    return cropped_images

def process_all_crops_for_folder(full_image_path, cropped_folders, output_dir):
    full_image = cv2.imread(full_image_path+'/img.jpg')
    bboxes = []

    # Collect bboxes from all crops
    try:
        for folder in cropped_folders:
            for cropped_image_name in os.listdir(folder):
                if cropped_image_name.endswith('img.jpg'):
                    cropped_image_path = os.path.join(folder, cropped_image_name)
                    cropped_image = cv2.imread(cropped_image_path)
                    bbox = find_cropped_image_position(full_image, cropped_image)
                    bboxes.append(bbox)

        # Generate random crops and update info_dict.npy
        for idx,folder in enumerate(cropped_folders):
            cropped_image_name=os.path.join(folder, 'img.jpg')
            crop_size = (bboxes[idx][3] - bboxes[idx][1], bboxes[idx][2] - bboxes[idx][0])
            random_crops = random_crop_outside_bboxes(full_image, bboxes, crop_size)
            for crop in random_crops:
                normal_folder = os.path.join(output_dir, f"{os.path.basename(folder)}_normal")
                os.makedirs(normal_folder, exist_ok=True)
                normal_image_path = os.path.join(normal_folder, 'img.jpg')
                cv2.imwrite(normal_image_path, crop)
    except Exception as e:
        print(f"An error occurred: {e}")
        print(full_image_path+' OOps!')
        

root_dir = 'Benchmark/CBIS-DDSM-finding'
base_image_dir = 'Benchmark/CBIS-DDSM-breast'
output_dir = 'Benchmark/CBIS-DDSM-finding'

def find_full_image_path(base_dir, full_image_key):
    full_image_path = os.path.join(base_dir, full_image_key)
    if os.path.exists(full_image_path):
        return full_image_path
    else:
        return None

# Map cropped images to their respective full images
full_image_to_crops_map = {}
for subdir in os.listdir(root_dir):
    full_image_key = '_'.join(subdir.split('_')[:5])
    full_image_to_crops_map.setdefault(full_image_key, []).append(os.path.join(root_dir, subdir))

# Process all mapped cropped folders for each full image
for full_image_key, folders in full_image_to_crops_map.items():
    full_image_path = find_full_image_path(base_image_dir, full_image_key)
    if full_image_path:
        process_all_crops_for_folder(full_image_path, folders, output_dir)
    else:
        print(f"Full image not found for {full_image_key}")