In [20]:
import torch
from vesselExtractor import *
from segment_anything import sam_model_registry, SamPredictor
import glob
import matplotlib.pyplot as plt
import stackview
import napari
import numpy as np
from IPython.display import display
import ipywidgets as widgets
import tifffile
import tkinter as tk
from tkinter import filedialog
import skimage
import pandas as pd

In [16]:
# Create a Tk root window (but keep it hidden)
root = tk.Tk()
root.withdraw()

# Open file selection dialog for a single file
file = filedialog.askopenfilename(
    title="Select an MP4 File",
    filetypes=[("MP4 files", "*.mp4")],
    initialdir="../shortclips/Short clips (before and after wash)"
)

print("Selected file:", file)

Selected file: /mnt/5404b8a5-71b7-4464-9a1e-b40cd26fac58/Data_Drive/Wissam/Eye_Surgery/shortclips/Short clips (before and after wash)/2023-02_Phaco+LIO+Hydrus (4)_GD.mp4


In [3]:
def load_mp4_as_mmap(mp4_path, mmap_path="video_frames.mmap", dtype=np.uint8):
    """
    Load an MP4 file as a memory-mapped NumPy array.

    Parameters:
        mp4_path (str): Path to the MP4 file.
        mmap_path (str): Path to store the memory-mapped file.
        dtype (data-type): Data type for the NumPy array. Default is np.uint8.

    Returns:
        np.memmap: Memory-mapped NumPy array of video frames.
    """
    # Open the video file using OpenCV
    video_capture = cv2.VideoCapture(mp4_path)
    if not video_capture.isOpened():
        raise ValueError(f"Cannot open video file: {mp4_path}")
    
    # Get video properties
    frame_count = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_height = int(video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frame_width = int(video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
    channels = 3  # Assuming the video is in color (BGR format)

    # Prepare a memory-mapped array
    mmap_shape = (frame_count, frame_height, frame_width, channels)
    mmap_array = np.memmap(mmap_path, dtype=dtype, mode="w+", shape=mmap_shape)

    # Read and store frames into the memory-mapped array
    for i in range(frame_count):
        ret, frame = video_capture.read()
        if not ret:
            print(f"Warning: Could not read frame {i}.")
            break
        mmap_array[i] = frame[..., ::-1]

    # Release video capture and flush the memory-mapped file
    video_capture.release()
    mmap_array.flush()

    return mmap_array
movie = load_mp4_as_mmap(files[0])

In [184]:
nb_viewer = stackview.slice(movie, continuous_update=True)
display(nb_viewer)

# Create an Output widget
output = widgets.Output()
# Create a button widget
button = widgets.Button(description="Select Frame")

img = 0 
idx = 0 
def on_button_click(b):
    global nb_viewer, movie, img, idx  # Make it refer to the global variable
    idx = nb_viewer.children[1].value  # Get slider value
    img = movie[idx] 
    
# Attach function to button click event
button.on_click(on_button_click)

# Display the button and output widget
display(button, output)

VBox(children=(HBox(children=(VBox(children=(ImageWidget(height=1080, width=1920),)),)), IntSlider(value=258, …

Button(description='Select Frame', style=ButtonStyle())

Output()

### Use the Napari Window to add points to both the Sclera and Pupil point layers. Leave the Napari window open to inspect results.

In [5]:
eyeMasks = napari.Viewer()
eyeMasks.add_image(img)
pupil, sclera = [], []
eyeMasks.add_points(sclera, name='Sclera')
eyeMasks.add_points(pupil, name='Pupil')

<Points layer 'Pupil' at 0x7f1e9f94dae0>

In [17]:
sclera = eyeMasks.layers[1].data
sclera = sclera[:, [1, 0]]
pupil = eyeMasks.layers[2].data
pupil= pupil[:, [1, 0]]


### Generate masks for pupil and sclera 

In [7]:
sam_checkpoint = "sam_vit_l_0b3195.pth"
model_type = "vit_l"

device = "cuda"

sam = sam_model_registry[model_type](checkpoint=sam_checkpoint)
sam.to(device=device)

predictor = SamPredictor(sam)
predictor.set_image(img)

sclera_mask, _, _ = predictor.predict(
    point_coords=sclera,
    point_labels=[1]*sclera.shape[0],
    box=None,
    multimask_output=False,
)
pupil_mask, _, _ = predictor.predict(
    point_coords=pupil,
    point_labels=[1]*pupil.shape[0],
    box=None,
    multimask_output=False,
)
sclera_mask[pupil_mask==1]=0
eyeMasks.add_labels(sclera_mask)
eyeMasks.add_labels(pupil_mask)

  state_dict = torch.load(f)


<Labels layer 'pupil_mask' at 0x7f1e94f97ee0>

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [9]:
model_path = "BV_Net128.pth"  # Path to your saved model file
loaded_model = load_model(model_path, device, in_channels=1, classes=1)
bloodVessels = sliding_window_inference_2d(loaded_model, img, device, threshold=0.15)
bloodVessels[pupil_mask[0,::]==1]=0
bloodVessels[sclera_mask[0,::]==0]=0
eyeMasks.add_labels(bloodVessels)

  model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))


Model loaded from BV_Net128.pth


In [None]:
prtBV = np.sum(bloodVessels) / np.sum(sclera_mask)

In [19]:
# Define output directory
output_dir = "output/"

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
print("Output directory created (if not existing):", output_dir)

Output directory created (if not existing): output/


In [176]:
skelVessles = skimage.morphology.skeletonize(bloodVessels).astype(np.int8)
_points = np.where(skelVessles==1)
branchPoints = []
for coords in zip(_points[0], _points[1]):
    skellyWind = skelVessles[coords[0]-1:coords[0]+2, coords[1]-1:coords[1]+2].copy()
    if  np.sum(skellyWind) ==4:
        if np.sum(skellyWind[1, :])==3 or np.sum(skellyWind[:,1])==3:
            if np.sum(skellyWind[0, :])==2 or np.sum(skellyWind[:,0])==2:
                continue
            if np.sum(skellyWind[2, :])==2 or np.sum(skellyWind[:,2])==2:
                continue
            #if np.sum(skellyWind[0, :])<2 or np.sum(skellyWind[:,0])<2:

            if  np.sum(skellyWind) - 1 >=3:
                branchPoints.append(coords)
    if np.sum(skellyWind) ==4:
        if skellyWind[0,0]+ skellyWind[1,1]+skellyWind[2,2] == 3:
            if np.sum(skellyWind[1, :])==1 and np.sum(skellyWind[:,1])==1:
                branchPoints.append(coords)

        if skellyWind[0,2]+ skellyWind[1,1]+skellyWind[2,0] == 3:
            if np.sum(skellyWind[1, :])==1 and np.sum(skellyWind[:,1])==1:
                branchPoints.append(coords)
        _testWindow = skellyWind.copy()
        _testWindow[1,1] = 0
        if np.sum(_testWindow[0, :])==1 and np.sum(_testWindow[1, :])==1 and np.sum(_testWindow[2, :])==1:
            if np.sum(_testWindow[:, 0])==1 and np.sum(_testWindow[:, 1])==1 and np.sum(_testWindow[:, 2])==1:
                branchPoints.append(coords) 

        if np.sum(_testWindow[0, :])==1 and np.sum(_testWindow[1, :])==1 and np.sum(_testWindow[2, :])==1:
            if np.sum(_testWindow[:, 0])==1 and np.sum(_testWindow[:, 1])==1 and np.sum(_testWindow[:, 2])==1:
                branchPoints.append(coords)

        if (np.sum(_testWindow[0, :])==2 and np.sum(_testWindow[2, :])==1) or (np.sum(_testWindow[0, :])==1 and np.sum(_testWindow[2, :])==2):
            if np.sum(_testWindow[:, 0])==1 and np.sum(_testWindow[:, 1])==1 and np.sum(_testWindow[:, 2])==1:
    
                branchPoints.append(coords)
        if (np.sum(_testWindow[:, 0])==2 and np.sum(_testWindow[:, 2])==1) or (np.sum(_testWindow[:, 0])==1 and np.sum(_testWindow[:, 2])==2):
            if np.sum(_testWindow[:, 0])==1 and np.sum(_testWindow[:, 1])==1 and np.sum(_testWindow[:, 2])==1:
    
                branchPoints.append(coords) 

In [185]:
def quantify_redness(img, bloodVessels):
    masked_pixels = img[bloodVessels == 1]  # Shape (N, 3), where N is the number of masked pixels
    # Compute average color within the masked region
    average_color = np.mean(masked_pixels, axis=0)

    # Compute redness ratio
    total_intensity = np.sum(average_color)
    red_ratio = average_color[0] / total_intensity if total_intensity > 0 else 0

    # Convert to percentage
    red_percentage = red_ratio * 100

    return tuple(average_color), red_ratio, red_percentage
average_color, red_ratio, red_percentage = quantify_redness(img, bloodVessels)
prtBV = np.sum(bloodVessels) / np.sum(sclera_mask)


In [186]:
results = {
    "File" : os.path.basename(file),
    "Frame" : idx,
    "Area %" : prtBV,
    "Red Ratio" : red_ratio,
    "Red %" : red_percentage,
    "Bifurcations": len(branchPoints)
    
}

In [194]:
df = pd.DataFrame.from_dict([results])

In [196]:
csv_filename = "results.csv"

# Check if the CSV already exists
if os.path.exists(csv_filename):
    # Append the new data without writing the header
    df.to_csv(csv_filename, mode="a", header=False, index=False)
else:
    # Write the data to a new file with header
    df.to_csv(csv_filename, mode="w", header=True, index=False)