In [None]:
!pip install ultralytics 

In [4]:
import cv2
import numpy as np
import os
from PIL import Image
import pandas as pd 
from ultralytics import YOLO
import cv2
import torch
from sklearn.linear_model import LinearRegression

import matplotlib.pyplot as plt

In [5]:
BEST_WEIGHT = '/kaggle/input/osteoga-model/OsteoGA_model/segmentation/weights/oai_s_best4.pt'
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def create_model(weight_path=None):
    if weight_path:
        return YOLO(weight_path)
    else:
        return YOLO(BEST_WEIGHT)

class Segmenter():
    def __init__(self, weight_path=None):
        self.model = create_model(weight_path).to(DEVICE)
    
    def segment(self, img):
        """
            input: image (H, W, C)
            output: mask (H, W) with femur is 1 and tibia is 2 
        """
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

        eimg = cv2.equalizeHist(img)

        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
        eimg = clahe.apply(eimg)

        eimg = cv2.cvtColor(eimg, cv2.COLOR_GRAY2RGB)

        res = self.model(eimg, verbose=False)

        mask = res[0].masks.data[0] * (res[0].boxes.cls[0] + 1) + res[0].masks.data[1] * (res[0].boxes.cls[1] + 1)
        mask = mask.cpu().numpy()

        return mask

In [6]:
def center(contour):
    idx = len(contour) // 2
    return contour[idx]


def to_color(image):
    if len(image.shape) == 3 and image.shape[-1] == 3:
        return image
    return cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)


def to_gray(image):
    if len(image.shape) == 3 and image.shape[-1] == 3:
        return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    return image

In [7]:
def detect_limit_points(mask, verbose=0):
    # tìm giới hạn hai bên của khớp gối
    '''
        input: mask (H, W) with femur is 1 and tibia is 2
        output: 4 points serve as limit points to determine the upper and lower contours from the full contours of the femur and tibia
    '''
    h, w = mask.shape
    res = []
    upper_pivot = np.array([0, w // 2])  # r c
    lower_pivot = np.array([h, w // 2])  # r c

    left_slice = slice(0, w // 2)
    right_slice = slice(w // 2, None)
    center_slice = slice(int(0.2 * h), int(0.8 * h))

    left = np.zeros_like(mask)
    left[center_slice, left_slice] = mask[center_slice, left_slice]

    right = np.zeros_like(mask)
    right[center_slice, right_slice] = mask[center_slice, right_slice]

    if verbose:
        cv2_imshow([left, right])

    pivot = np.array([0, w])
    coords = np.argwhere(left == 1)
    distances = ((coords - pivot) ** 2).sum(axis=-1)
    point = coords[distances.argmax()][::-1]
    res.append(point)

    pivot = np.array([0, 0])
    coords = np.argwhere(right == 1)
    distances = ((coords - pivot) ** 2).sum(axis=-1)
    point = coords[distances.argmax()][::-1]
    res.append(point)

    pivot = np.array([h, w])
    coords = np.argwhere(left == 2)
    distances = ((coords - pivot) ** 2).sum(axis=-1)
    point = coords[distances.argmax()][::-1]
    res.append(point)

    pivot = np.array([h, 0])
    coords = np.argwhere(right == 2)
    distances = ((coords - pivot) ** 2).sum(axis=-1)
    point = coords[distances.argmax()][::-1]
    res.append(point)

    if verbose:
        cv2_imshow(draw_points(127 * mask, res))

    return res

def find_boundaries(mask, start, end, top=True, verbose=0):
    #     nếu top = True, tìm đường bao bên trên cùng từ left đến right
    #     nếu top = False, tìm đường bao dưới cùng từ left đến right
    '''
        input:  
            mask (H, W) of femur or tibia 
            start, end is limit point to extract upper_contour/lower_contour from femur/tibia full contour
        output: upper_contour/lower_contour
        use top = True if determine lower contour from tibia mask
    '''
    boundaries = []
    height, width = mask.shape

    contours, _ = cv2.findContours(255 * mask.astype(np.uint8), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)

    areas = np.array([cv2.contourArea(cnt) for cnt in contours])
    contour = contours[areas.argmax()]
    contour = contour.reshape(-1, 2)
    org_contour = contour.copy()

    start_idx = ((start - contour) ** 2).sum(axis=-1).argmin()
    end_idx = ((end - contour) ** 2).sum(axis=-1).argmin()
    if start_idx <= end_idx:
        contour = contour[start_idx:end_idx + 1]
    else:
        contour = np.concatenate([contour[start_idx:], contour[:end_idx + 1]])

    if top:
        sorted_indices = np.argsort(contour[:, 1])[::-1]
    else:
        sorted_indices = np.argsort(contour[:, 1])
    contour = contour[sorted_indices]

    unique_indices = sorted(np.unique(contour[:, 0], return_index=True)[1])
    contour = contour[unique_indices]
    sorted_indices = np.argsort(contour[:, 0])
    contour = contour[sorted_indices]
    if verbose:
        temp = draw_points(127 * mask.astype(np.uint8), contour, thickness=5)
        temp = draw_points(temp, [start, end], color=[155, 155], thickness=15)
        # cv2_imshow(temp)

    return np.array(contour), np.array(org_contour)


def get_contours(mask, verbose=0):
    '''
        input: mask (H, W) with femur is 1 and tibia is 2
        output: upper_contour, lower_contour
    '''
    limit_points = detect_limit_points(mask, verbose=verbose)
    upper_contour, full_upper = find_boundaries(mask == 1, limit_points[0], limit_points[1], top=False, verbose=verbose)
    lower_contour, full_lower = find_boundaries(mask == 2, limit_points[3], limit_points[2], top=True, verbose=verbose)
    if verbose:
        temp = draw_points(127 * mask, full_upper, thickness=3, color=(255, 0, 0))
        temp = draw_points(temp, full_lower, thickness=3)
        # cv2_imshow(temp)
        # cv2.imwrite('full.png', temp)
        temp = draw_points(temp, limit_points, thickness=7, color=(0, 0, 255))
        # cv2_imshow(temp)
        # cv2.imwrite('limit_points.png', temp)
    if verbose:
        temp = draw_points(127 * mask, upper_contour, thickness=3, color=(255, 0, 0))
        temp = draw_points(temp, lower_contour, thickness=3)
        # cv2_imshow(temp)
        # cv2.imwrite('cropped.png', temp)

    return upper_contour, lower_contour

In [8]:
def distance(mask, upper_contour, lower_contour, p=0.12, verbose = 0):
    '''
        input:
            mask (H, W) with femur is 1 and tibia is 2
            upper_contour, lower_contour
        output: left_distances and right_distances vectors
    '''
    x_min = max(lower_contour[0,0],upper_contour[0,0])
    x_max = min(lower_contour[-1,0],upper_contour[-1,0])
    lower_contour = lower_contour[(lower_contour[:,0]>=x_min) & (lower_contour[:,0]<=x_max)]
    
    left, right = getMiddle(mask, lower_contour, verbose = verbose)
    
    left_idx = np.where(lower_contour[:,0] == left)[0][0]
    right_idx = np.where(lower_contour[:,0] == right)[0][0]
#     left_lower_contour = lower_contour[left_idx:]
#     right_lower_contour = lower_contour[:right_idx+1][::-1]

    left_lower_contour = lower_contour[(lower_contour[:,0]<=left) & (lower_contour[:,0]>=x_min) ]
    right_lower_contour = lower_contour[(lower_contour[:,0]>=right) & (lower_contour[:,0]<=x_max)][::-1]

    left_upper_contour = upper_contour[(upper_contour[:,0]<=left) & (upper_contour[:,0]>=x_min)]
    right_upper_contour = upper_contour[(upper_contour[:,0]>=right) & (upper_contour[:,0]<=x_max)][::-1]

    if verbose == 1:
        temp = draw_points(mask*127, left_lower_contour, color = (0, 255, 0), thickness = 5)
        temp = draw_points(temp, right_lower_contour, color = (0, 255, 0), thickness = 5)
        temp = draw_points(temp, left_upper_contour, color = (255, 0, 0), thickness = 5)
        temp = draw_points(temp, right_upper_contour, color = (255, 0, 0), thickness = 5)
        # cv2_imshow(temp)
        # cv2.imwrite('center_cropped.png', temp)
        plt.imshow(temp)
    links =  list(zip(left_upper_contour,left_lower_contour)), list(zip(right_upper_contour,right_lower_contour))

    temp = left_upper_contour, right_upper_contour, left_lower_contour, right_lower_contour

    return left_lower_contour[:,1]-left_upper_contour[:,1], right_lower_contour[:,1] - right_upper_contour[:,1], links, temp


def getMiddle(mask, contour, verbose=0):
    '''
        use Linear Regression to construct a straight line to remove the bone segment from upper_contour and lower_contour
        input: 
            mask (H, W) with femur is 1 and tibia is 2
            lower_contour
        output: 2 point (left and right) define the boundary within which every point of the bone segment is located
    '''
    X = contour[:, 0].reshape(-1, 1)
    y = contour[:, 1]
    reg = LinearRegression().fit(X, y)
    i_min = np.argmin(y[int(len(y) * 0.2):int(len(y) * 0.8)]) + int(len(y) * 0.2)
    left = i_min - 1
    right = i_min + 1
    left_check = False
    right_check = False
    if verbose == 1:
        # print('get Middle 1')
        cmask = draw_points(mask, contour, thickness=3, color=(255, 0, 0))
        cmask = draw_points(cmask, np.hstack([X, reg.predict(X).reshape(-1, 1).astype('int')]),thickness = 5,
                            color=(0, 255, 0))
        # cv2.imwrite("lr_mask.png", cmask)
        cmask = draw_points(mask,[[X[i_min][0], y[i_min]]], thickness=5, color=(0, 0, 255))
        cmask = draw_points(cmask,[[X[i_min+50][0], y[i_min+50]]], thickness=5, color=(0, 255, 0))
        # cv2_imshow(cmask)
        cv2_imshow(cmask)

    while True:
        while not left_check:
            if y[left] > reg.predict(X[left].reshape(-1, 1)):
                break
            left -= 1
        while not right_check:
            if y[right] > reg.predict(X[right].reshape(-1, 1)):
                break
            right += 1
        if verbose == 1:
            print("left: ",left)
            print("right: " ,right)
            # print('get middle 2')
            cmask = draw_points(cmask, [contour[left]], thickness=5, color=(255, 255, 0))
            cmask = draw_points(cmask, [contour[right]], thickness=5, color=(0, 255, 255))
            # print(cmask.shape)
            # cv2.imwrite("lr.png", cmask)
            # cv2_imshow(cmask)
            # plt.show()
            cv2_imshow(cmask)

        left_min = np.argmin(y[int(len(y) * 0.2):left]) + int(len(y) * 0.2) if int(len(y) * 0.2) < left else left
        right_min = np.argmin(y[right:int(len(y) * 0.8)]) + right if right < int(len(y) * 0.8) else right
        if y[left_min] > reg.predict(X[left_min].reshape(-1, 1)):
            left_check = True
        if y[right_min] > reg.predict(X[right_min].reshape(-1, 1)):
            right_check = True
        if right_check and left_check:
            break
        left = left_min - 1
        right = right_min + 1
    return min(X.flatten()[left], X.flatten()[right]), max(X.flatten()[left], X.flatten()[right])

In [9]:
def pooling_array(array, n, mode='mean'):
    if mode == 'mean':
        pool = lambda x: np.mean(x)
    elif mode == 'min':
        pool = lambda x: np.min(x)
    elif mode == 'sum':
        pool = lambda x: np.sum(x)

    if n == 1:
        return pool(array)

    array_length = len(array)
    if array_length < n:
        return array
    segment_length = array_length // n
    remaining_elements = array_length % n

    if remaining_elements == 0:
        segments = np.split(array, n)
    else:
        mid = remaining_elements * (segment_length + 1)
        segments = np.split(array[:mid], remaining_elements)
        segments += np.split(array[mid:], n - remaining_elements)

    segments = [pool(segment) for segment in segments]

    return np.array(segments)


def pool_links(links, dim, mode="mean"):
    '''
        links là 1 list gồm nhiều cặp tọa độ trên dưới (knee join space giữa lower_contour vs upper_contour) có dạng: 
        [(array([436, 421], dtype=int32), array([436, 451], dtype=int32)), (array([436, 421], dtype=int32), array([436, 451], dtype=int32)), ...]
        đầu tiên lấy danh sách các tọa độ x của các pair coord này, pool nó y như pool distance, rồi filter links ban đầu để lấy các pair có x thuộc pooled

    '''
    pooled_x = pooling_array(np.array(links)[:, 0, 0], dim, mode)
    filtered_pairs = []
    for x in pooled_x:
        for pair in links:
            if pair[0][0] == int(x):
                filtered_pairs.append(pair)
                break  # Lấy 1 cặp và dừng lại
    return filtered_pairs


def get_JSW(mask, dim=None, pool='mean', p=0.3, verbose=0):
    '''
        input: mask (H, W) with femur is 1 and tibia is 2
        output: 2 distance vectors (left, right) (aggregated to <dim> by <pool>), links (list of pairs coords for each side)
    '''
    if isinstance(mask, str):
        mask = cv2.imread(mask, 0)
    if mask is None:
        return np.zeros(10), np.zeros(10)
    uc, lc = get_contours(mask, verbose=verbose)
    left_distances, right_distances, links, contours = distance(mask, uc, lc, p=p, verbose=verbose)
    if verbose:
        # print('in getjsw')
        temp = draw_points(mask * 127, contours[0], thickness=3, color=(255, 0, 0))
        temp = draw_points(temp, contours[1], thickness=5, color=(255, 0, 0))
        temp = draw_points(temp, contours[2], thickness=5, color=(0, 255, 0))
        temp = draw_points(temp, contours[3], thickness=5, color=(0, 255, 0))
        temp = draw_lines(temp, links[::6], color=(0, 0, 255), thickness=2)
        # cv2_imshow(temp)
        # cv2.imwrite("drawn_lines.png", temp)
    if dim:
        left_distances = pooling_array(left_distances, dim, pool)
        right_distances = pooling_array(right_distances, dim, pool)

        
    return left_distances, right_distances


def calculate_diff(left_jsw, right_jsw):
    '''
        input: left_distances and right_distances vectors
        output: jsw_m and jsw_mm
    '''
    jsw_max = max(np.max(left_jsw), np.max(right_jsw))
    jsw_max_side = np.argmax([np.max(left_jsw), np.max(right_jsw)])
    if jsw_max_side == 0:
        jsw_min = np.min(right_jsw)
    else:
        jsw_min = np.min(left_jsw)
    
    diff_mean = abs(np.mean(left_jsw) - np.mean(right_jsw)) / jsw_max
    diff_max_min = (jsw_max - jsw_min) / jsw_max

    return diff_mean, diff_max_min


def calculate_jsw_info(left_jsw, right_jsw):
    '''
        input: left_distances and right_distances vectors
        output: % diff, mean_left, mean_right, side_min (0 is left, 1 is right), index with value min (in side min), value min (in side min)
    '''
    mean_left = np.mean(left_jsw)
    mean_right = np.mean(right_jsw)

    side_min, index_min, value_min = (0, np.argmin(left_jsw), np.min(left_jsw)) if mean_left <= mean_right else (1, np.argmin(right_jsw), np.min(right_jsw)) 

    diff_percentage = np.abs((mean_left - mean_right) / mean_right) * 100

    return diff_percentage, mean_left, mean_right, side_min, index_min, value_min

In [10]:
def draw_points(image, points, color=None, random_color=False, same=True, thickness=1):
    if color is None and not random_color:
        color = (0, 255, 0)  # Màu mặc định là xanh lá cây (BGR)
    if random_color:
        color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))

    image = to_color(image)

    for point in points:
        if random_color and not same:
            color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))

        x, y = point
        image = cv2.circle(image, (x, y), thickness, color, -1)  # Vẽ điểm lên ảnh
    return image


def draw_lines(image, pairs, color=None, random_color=False, same=True, thickness=1):
    image_with_line = to_color(np.copy(image))

    if color is None and not random_color:
        color = (255, 0, 0)  # Màu mặc định là xanh lá cây (BGR)
    if random_color:
        color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))

    # Vẽ đường thẳng dựa trên danh sách các cặp điểm
    for pair in pairs:

        if random_color and not same:
            color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))

        start_point = pair[0]
        end_point = pair[1]
        image_with_line = cv2.line(image_with_line, start_point, end_point, color, thickness)
        image_with_line = cv2.circle(image_with_line, start_point, thickness + 1, color, -1)
        image_with_line = cv2.circle(image_with_line, end_point, thickness + 1, color, -1)

    return image_with_line


def center(contour):
    idx = len(contour) // 2
    return contour[idx]


def to_color(image):
    if len(image.shape) == 3 and image.shape[-1] == 3:
        return image
    return cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)


def to_gray(image):
    if len(image.shape) == 3 and image.shape[-1] == 3:
        return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    return image


def cv2_imshow(images):
    if not isinstance(images, list):
        images = [images]

    num_images = len(images)

    # Hiển thị ảnh đơn lẻ trực tiếp bằng imshow
    if num_images == 1:
        image = images[0]
        if len(image.shape) == 3 and image.shape[2] == 3:
            # Ảnh màu (RGB)
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            plt.imshow(image_rgb)
        else:
            # Ảnh xám
            plt.imshow(image, cmap='gray')

        plt.axis("off")
        plt.show()
    else:
        # Hiển thị nhiều ảnh trên cùng một cột
        fig, ax = plt.subplots(num_images, 1, figsize=(4, 4 * num_images))

        for i in range(num_images):
            image = images[i]
            if len(image.shape) == 3 and image.shape[2] == 3:
                # Ảnh màu (RGB)
                image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
                ax[i].imshow(image_rgb)
            else:
                # Ảnh xám
                ax[i].imshow(image, cmap='gray')

            ax[i].axis("off")

        plt.tight_layout()
        plt.show()


In [11]:
seg_model = Segmenter()

In [25]:
DIR = '/kaggle/input/knee-osteoarthritis-dataset-with-severity'

In [26]:
subsets = ['train', 'val', 'test']


In [27]:
data = []

In [28]:
for subset in subsets:
    subset_path = os.path.join(DIR, subset)
    labels = os.listdir(subset_path)
    for label in labels:
        image_paths = os.path.join(subset_path, label)
        for image in os.listdir(image_paths):
            try:
                img = cv2.imread(os.path.join(image_paths, image))
                mask = seg_model.segment(img)
                left_distances, right_distances = get_JSW(mask, dim = 10, verbose = 0)
                jsw_m, jsw_mm = calculate_diff(left_distances, right_distances)
                diff_percentage, mean_left, mean_right, side_min, index_min, value_min = calculate_jsw_info(left_distances, right_distances)
                data.append([image.replace(".png", ""), subset, label, jsw_m, jsw_mm, value_min])
            except Exception:
                print(f"{subset}/{label}/{image}")


In [19]:
path = "/kaggle/input/knee-osteoarthritis-dataset-with-severity/test/4/9445318L.png"
# path = "/kaggle/input/knee-osteoarthritis-dataset-with-severity/train/0/9003126L.png"

In [20]:
img = cv2.imread(path)
mask = seg_model.segment(img)
left_distances, right_distances = get_JSW(mask, dim = 10, verbose = 0)
jsw_m, jsw_mm = calculate_diff(left_distances, right_distances)
diff_percentage, mean_left, mean_right, side_min, index_min, value_min = calculate_jsw_info(left_distances, right_distances)

In [29]:
import pandas as pd

In [30]:
df = pd.DataFrame(data, columns=['id', 'subset', 'kl_grade', 'jsw_mean', 'jsw_mm', 'jsw_min'])
df.to_csv('jsws.csv', index=False)