In [1]:
import os
import numpy as np
import cv2
from glob import glob
import tensorflow as tf
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from sklearn.utils import shuffle
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split

In [2]:
os.environ["TF_CPP_MIN_LOG_LEVEL"]="2"

In [3]:
def dice_coef(y,y_pred):
    y=tf.keras.layers.Flatten()(y)
    y_pred=tf.keras.layers.Flatten()(y_pred)
    i=tf.reduce_sum(y*y_pred)
    return (2*i+1e-5)/(tf.reduce_sum(y)+tf.reduce_sum(y_pred))

def dice_loss(y,y_pred):
    return 1.0-dice_coef(y,y_pred)

In [4]:
batch_size=2
lr=1e-4
epochs=1
height=256
width=256

In [5]:
def conv_block(inputs, num_filters):
    x=Conv2D(num_filters,3,padding="same")(inputs)
    x=BatchNormalization()(x)
    x=Activation("relu")(x)

    x=Conv2D(num_filters,3,padding="same")(x)
    x=BatchNormalization()(x)
    x=Activation("relu")(x)
    return x

def encoder_block(inputs,num_filters):
    x=conv_block(inputs,num_filters)
    p=MaxPool2D((2,2))(x)
    return x,p##for skip connections

def decoder_block(inputs,skip,num_filters):
    x=Conv2DTranspose(num_filters,2,strides=2,padding="same")(inputs)
    x=Concatenate()([x,skip])
    x=conv_block(x,num_filters)
    return x

def unet(input_shape):
    inputs=Input(input_shape)
    x1,p1=encoder_block(inputs,64)
    x2,p2=encoder_block(p1,128)
    x3,p3=encoder_block(p2,256)
    x4,p4=encoder_block(p3,512)

    x5=conv_block(p4,1024)

    x6=decoder_block(x5,x4,512)
    x7=decoder_block(x6,x3,256)
    x8=decoder_block(x7,x2,128)
    x9=decoder_block(x8,x1,64)

    op=Conv2D(1,1,padding="same")(x9)

    model=Model(inputs,op,name="UNET")
    return model

def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def load(path,split):
    images=glob(os.path.join(path,"images","image_*.jpg"))
    masks=glob(os.path.join(path,"masks","mask_*.jpg"))
    split_size=int(len(images)*split)
    train_x,val_x=train_test_split(images,test_size=split_size)
    train_y,val_y=train_test_split(masks,test_size=split_size)
    train_x,test_x=train_test_split(train_x,test_size=split_size)
    train_y,test_y=train_test_split(train_y,test_size=split_size)
    return (train_x,train_y), (val_x,val_y), (test_x,test_y)

def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y
    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([height,width, 3])
    y.set_shape([height,width, 1])
    return x, y

def read_image(path):
    path=path.decode()
    x=cv2.imread(path,cv2.IMREAD_COLOR)
    x=cv2.resize(x,(height,width))
    x=x/255
    x=x.astype(np.float32)
    return x

def read_mask(path):
    path=path.decode()
    x=cv2.imread(path,cv2.IMREAD_GRAYSCALE)
    x=cv2.resize(x,(height,width))
    x=x/255
    x=x.astype(np.float32)
    x=np.expand_dims(x,axis=-1)
    return x

def tf_dataset(X,Y,batch=2):
    dataset=tf.data.Dataset.from_tensor_slices((X,Y))
    dataset=dataset.map(tf_parse)
    dataset=dataset.batch(batch)
    dataset=dataset.prefetch(10)
    return dataset

In [6]:
model_path = os.path.join("files", "model.h5")

if __name__=="__main__":
    np.random.seed(42)
    tf.random.set_seed(42)
    create_dir("files")
    (train_x,train_y), (val_x,val_y), (test_x,test_y)=load("D:\BrainTumorData",0.2)
    train=tf_dataset(train_x,train_y,batch=batch_size)
    val=tf_dataset(val_x,val_y,batch=batch_size)
    model=unet((height,width,3))
    model.compile(loss=dice_loss,optimizer=Adam(lr),metrics=[dice_coef])
    callbacks=[
        ModelCheckpoint(model_path, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss',factor=0.1,patience=5,min_lr=1e-7,verbose=1),
        EarlyStopping(monitor='val_loss',patience=20,restore_best_weights=False)
    ]

    model.fit(train,epochs=epochs,validation_data=val,callbacks=callbacks)

Epoch 1: val_loss improved from inf to 1.00390, saving model to files\model.h5


  saving_api.save_model(




In [None]:
import pandas as pd 
from tqdm import tqdm
from tensorflow.keras.utils import CustomObjectScope
from sklearn.metrics import f1_score, jaccard_score, precision_score, recall_score, accuracy_score

def save_results(image, mask, y_pred, save_image_path):
    mask = np.expand_dims(mask, axis=-1)
    mask = np.concatenate([mask, mask, mask], axis=-1)

    y_pred = np.expand_dims(y_pred, axis=-1)
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis=-1)
    y_pred = y_pred * 255

    line = np.ones((height, 10, 3)) * 255

    cat_images = np.concatenate([image, line, mask, line, y_pred], axis=1)
    cv2.imwrite(save_image_path, cat_images)


if __name__ == "__main__":
    np.random.seed(42)
    tf.random.set_seed(42)

    with CustomObjectScope({"dice_coef": dice_coef, "dice_loss": dice_loss}):
        model = tf.keras.models.load_model(os.path.join("files", "model.h5"))

    results_dir = "D:/BrainTumorData/results"
    os.makedirs(results_dir, exist_ok=True)

    SCORE = []
    for x, y in tqdm(zip(test_x, test_y), total=len(test_y)):
        name = x.split("/")[-1]

        image = cv2.imread(x, cv2.IMREAD_COLOR)
        image = cv2.resize(image, (height, width))       
        x = image/255.0                         
        x = np.expand_dims(x, axis=0)           

        mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (height, width))

        y_pred = model.predict(x, verbose=0)[0]
        y_pred = np.squeeze(y_pred, axis=-1)
        y_pred = y_pred >= 0.5
        y_pred = y_pred.astype(np.int32)

        save_image_path =os.path.join(results_dir,name)
        save_results(image, mask, y_pred, save_image_path)

        mask = mask/255.0
        mask = (mask > 0.5).astype(np.int32).flatten()
        y_pred = y_pred.flatten()
        accuracy = accuracy_score(mask, y_pred)
        f1_value = f1_score(mask, y_pred, labels=[0, 1], average="binary")
        jac_value = jaccard_score(mask, y_pred, labels=[0, 1], average="binary")
        recall_value = recall_score(mask, y_pred, labels=[0, 1], average="binary", zero_division=0)
        precision_value = precision_score(mask, y_pred, labels=[0, 1], average="binary", zero_division=0)
        SCORE.append([name, f1_value, jac_value, recall_value, precision_value])

    score = [s[1:]for s in SCORE]
    score = np.mean(score, axis=0)
    print(accuracy)
    print(f"F1: {score[0]:0.5f}")
    print(f"Jaccard: {score[1]:0.5f}")
    print(f"Recall: {score[2]:0.5f}")
    print(f"Precision: {score[3]:0.5f}")

    df = pd.DataFrame(SCORE, columns=["Image", "F1", "Jaccard", "Recall", "Precision"])
    df.to_csv("files/score.csv")

In [None]:
def segment_image_kmeans(image, k=2):
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    pixel_values = image_rgb.reshape((-1, 3))
    pixel_values = np.float32(pixel_values)
    criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 0.2)
    _, labels, centers = cv2.kmeans(pixel_values, k, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)
    centers = np.uint8(centers)
    segmented_image = centers[labels.flatten()]
    segmented_image = segmented_image.reshape(image_rgb.shape)
    segmented_gray = cv2.cvtColor(segmented_image, cv2.COLOR_RGB2GRAY)
    _, segmented_binary = cv2.threshold(segmented_gray, 128, 255, cv2.THRESH_BINARY)
    
    return segmented_binary

def process_dataset(dataset_dir, ground_truth_dir, output_dir, k=2):
    os.makedirs(output_dir, exist_ok=True)
    image_files = [f for f in os.listdir(dataset_dir) if f.endswith(('.png', '.jpg', '.jpeg'))]

    scores = []
    for image_file in tqdm(image_files, total=len(image_files)):
        image_path = os.path.join(dataset_dir, image_file)
        image = cv2.imread(image_path)
        ground_truth_path = os.path.join(ground_truth_dir, image_file)
        if not os.path.exists(ground_truth_path):
            print(f"Ground truth not found for {image_file}, skipping.")
            print(f"Expected path: {ground_truth_path}")
            continue
        ground_truth = cv2.imread(ground_truth_path, cv2.IMREAD_GRAYSCALE)

        if ground_truth is None:
            print(f"Failed to read ground truth for {image_file}, skipping.")
            continue
        ground_truth = cv2.resize(ground_truth, (image.shape[1], image.shape[0]))
        segmented_binary = segment_image_kmeans(image, k)
        output_path = os.path.join(output_dir, image_file)
        cv2.imwrite(output_path, segmented_binary)
        ground_truth_flat = ground_truth.flatten() / 255
        segmented_binary_flat = segmented_binary.flatten() / 255 
        ground_truth_flat = np.round(ground_truth_flat).astype(int)
        segmented_binary_flat = np.round(segmented_binary_flat).astype(int)
        accuracy = accuracy_score(ground_truth_flat, segmented_binary_flat)
        scores.append([image_file, accuracy])
    df = pd.DataFrame(scores, columns=["Image", "Accuracy"])
    mean_accuracy = df["Accuracy"].mean()
    print(f"Mean Accuracy: {mean_accuracy:.4f}")

if __name__ == "__main__":
    dataset_dir = r"D:\BrainTumorData\images"
    ground_truth_dir = r"D:\BrainTumorData\masks"
    output_dir = r"D:\BrainTumorData\results_knn"
    process_dataset(dataset_dir, ground_truth_dir, output_dir, k)