In [1]:
#|default_exp process

In [2]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = "1"

# Install and load all libraries

In [4]:
from ml_utils import *

In [5]:
from nbdev.export import nb_export

In [6]:
#|export
from fastai.vision.all import *
from skimage.measure import label, regionprops, find_contours
from evalutils.validators import UniquePathIndicesValidator, DataFrameValidator
from evalutils.exceptions import ValidationError
import json, random, SimpleITK, gc, cv2
from typing import Tuple, Dict
from scipy.ndimage import center_of_mass, label
from pandas import DataFrame

In [7]:
#|export
execute_in_docker = False

In [8]:
#|export
class VideoLoader():
    def load(self, *, fname):
        path = Path(fname)
        print(path)
        if not path.is_file():
            raise IOError(
                f"Could not load {fname} using {self.__class__.__qualname__}."
            )
            #cap = cv2.VideoCapture(str(fname))
        #return [{"video": cap, "path": fname}]
        return [{"path": fname}]

    # only path valid
    def hash_video(self, input_video):
        pass

In [16]:
#|export
class UniqueVideoValidator(DataFrameValidator):
    """
    Validates that each video in the set is unique
    """

    def validate(self, *, df: DataFrame):
        try:
            hashes = df["video"]
        except KeyError:
            raise ValidationError("Column `video` not found in DataFrame.")

        if len(set(hashes)) != len(hashes):
            raise ValidationError("The videos are not unique, please submit a unique video for each case.")

In [17]:
v=VideoLoader()
v.load(fname='./test/vid_1_short.mp4')

test/vid_1_short.mp4


[{'path': './test/vid_1_short.mp4'}]

In [None]:
#|export
class Surgtoolloc_det(DetectionAlgorithm):
    def __init__(self):
        super().__init__(
            index_key='input_video',
            file_loaders={'input_video': VideoLoader()},
            input_path=Path("/input/") if execute_in_docker else Path("./test/"),
            output_file=Path("/output/surgical-tool-presence.json") if execute_in_docker else Path(
                "./output/surgical-tool-presence.json"),
            validators=dict(input_video=(UniquePathIndicesValidator(),)),
        )
        
        # loading ensemble learner
        self.ensem_learner=[load_learner(m, cpu=False) for m in Path('/opt/algorithm/cls').ls() if m.suffix=='.pkl']
        self.crop_learner=load_learner('/opt/algorithm/seg/seg_v1.pkl', cpu=False)
        self.codes = ["Background", "Foreground"]

        self.tool_list = ["needle_driver",
                          "monopolar_curved_scissor",
                          "force_bipolar",
                          "clip_applier",
                          "tip_up_fenestrated_grasper",
                          "cadiere_forceps",
                          "bipolar_forceps",
                          "vessel_sealer",
                          "suction_irrigator",
                          "bipolar_dissector",
                          "prograsp_forceps",
                          "stapler",
                          "permanent_cautery_hook_spatula",
                          "grasping_retractor"]


    def get_image_mask(fn):
        f=Path(str(fn).replace('images', 'masks').replace('jpg','png'))
        return PILMask.create(f) 

    def custom_accuracy(inp, targ):
        targ = targ.squeeze(1)
        return (inp.argmax(dim=1)==targ).float().mean()
    
    def crop_images(src):
        fs=get_image_files(src)
        preds,_ = self.crop_learner.get_preds(dl=l.dls.test_dl(fs))
        for p, f in zip(preds,l.dl.items):

            fn = f.name

            im=PILImage.create(f)
            (h,w)=im.shape
            mask=PILMask.create((np.array(p.argmax(0))*255).astype(np.uint8))
            mask=Resize((h,w), ResizeMethod.Squish) (mask)

            lbl = label(np.array(mask))
            props = regionprops(lbl)
            x1,y1,x2,y2=props[0].bbox[0],props[0].bbox[2],props[0].bbox[1],props[0].bbox[3]

            im_c = PILImage.create(np.array(im)[x1:y1,x2:y2])
            im_c.save(src/fn)
    
    def extract_images(video_file):     
        # read the video file    
        cap = cv2.VideoCapture(video_file)
        # start the loop
        count = 0
        src=Path(self._input_file)
        while True:
            is_read, f = cap.read()
            if not is_read:
                # break out of the loop if there are no frames to read
                break
            name = str(src/f'{count}.jpg')
            cv2.imwrite(name,f)
            count+=1
        cap.release()

        proc_images(src)
    
    def tool_detection_model_output(self):
        
        
        random_tool_predictions = [random.randint(0, len(self.tool_list) - 1), random.randint(0, len(self.tool_list) - 1)]

        return [self.tool_list[random_tool_predictions[0]], self.tool_list[random_tool_predictions[1]]]

    def tool_detect_json_sample(self):
        # single output dict
        slice_dict = {"slice_nr": 1}
        tool_boolean_dict = {i: False for i in self.tool_list}

        single_output_dict = {**slice_dict, **tool_boolean_dict}

        return single_output_dict

    def process_case(self, *, idx, case):

        # Input video would return the collection of all frames (cap object)
        input_video_file_path = case #VideoLoader.load(case)
        # Detect and score candidates
        scored_candidates = self.predict(case.path) #video file > load evalutils.py

        # return
        # Write resulting candidates to result.json for this case
        return scored_candidates

    def save(self):
        with open(str(self._output_file), "w") as f:
            json.dump(self._case_results[0], f)


    def predict(self, fname) -> Dict:
        """
        Inputs:
        fname -> video file path
        
        Output:
        tools -> list of prediction dictionaries (per frame) in the correct format as described in documentation 
        """
        
        print('Video file to be loaded: ' + str(fname))
        extract_images(fname)
        
        fs=get_image_files(input_path)
        
        num_frames = len(fs)
        
        ###                                                                     ###
        ###  TODO: adapt the following part for YOUR submission: make prediction
        ###                                                                     ###
        
        print(num_frames)

        # generate output json
        all_frames_predicted_outputs = []
        all_undefined_tools=[]
        
        tta_res=[]
        prs_items=[]
        for learn in ensem_learner:
            tta_res.append(learn.tta(dl=learn.dls.test_dl(fs)))
            if len(prs_items)<1:
                prs_items=learn.dl.items

        tta_prs=first(zip(*tta_res))
        tta_prs+=tta_prs[1:3]
        tta_prs=torch.stack(tta_prs)

        lbls=[]
        for i in range(len(dls.c)):
            arm_preds = tta_prs[:,:,cfg(i):cfg(i+1)].mean(0);
            arm_idxs = arm_preds.argmax(dim=1)
            arm_vocab = np.array(dls.vocab[i])
            lbls.append(arm_vocab[arm_idxs])

        for usm1,usm2,usm3,usm4,f in zip(lbls[0],lbls[1],lbls[2],lbls[3],fs_itm):
            frame_dict=self.tool_detect_json_sample()
            frame_dict['slice_nr']=int(f.stem)
            frame_dict[usm1]=True if usm1 in frame_dict.keys() else all_undefined_tools.append(usm1)
            frame_dict[usm2]=True if usm2 in frame_dict.keys() else all_undefined_tools.append(usm2)
            frame_dict[usm3]=True if usm3 in frame_dict.keys() else all_undefined_tools.append(usm3)
            frame_dict[usm4]=True if usm4 in frame_dict.keys() else all_undefined_tools.append(usm4)
            frame_dict.pop("nan", None)
            frame_dict.pop("blank", None)
            frame_dict.pop("out_of_view", None)
            all_frames_predicted_outputs.append(frame_dict) 

        print(f'List of undefined tools: {set(all_undefined_tools)}.')
        tools=sorted(all_frames_predicted_outputs, key=lambda d: d['slice_nr']) 

        return tools

In [None]:
Surgtoolloc_det.process?

In [None]:
#|export
if __name__ == "__main__":
    Surgtoolloc_det().process()

In [None]:
nb_export('09_inference_2.ipynb', '.')

In [None]:
lst=('a','b','c')
for l in lst:
    print(l)

In [None]:
ensemble_learner=[load_learner(m,cpu=False) for m in Path('models/small').ls() if m.suffix=='.pkl']

In [None]:
print(f'There are {len(ensemble_learner)} model(s) in this ensemble learner.');

In [None]:
dls.c

In [None]:
codes = ["Background", "Foreground"]

def get_image_mask(fn):
    f=Path(str(fn).replace('images', 'masks').replace('jpg','png'))
    return PILMask.create(f) 

def custom_accuracy(inp, targ):
    targ = targ.squeeze(1)
    return (inp.argmax(dim=1)==targ).float().mean()

def proc_images(src):
    l=load_learner('/home/bilal/mlworks/surgtoolloc/models/seg/seg_v1.pkl', cpu=False)
    fs=get_image_files(src)
    preds,_ = l.get_preds(dl=l.dls.test_dl(fs))
    for p, f in zip(preds,l.dl.items):

        fn = f.name

        im=PILImage.create(f)
        (h,w)=im.shape
        mask=PILMask.create((np.array(p.argmax(0))*255).astype(np.uint8))
        mask=Resize((h,w), ResizeMethod.Squish) (mask)

        lbl = label(np.array(mask))
        props = regionprops(lbl)
        x1,y1,x2,y2=props[0].bbox[0],props[0].bbox[2],props[0].bbox[1],props[0].bbox[3]

        im_c = PILImage.create(np.array(im)[x1:y1,x2:y2])
        im_c.save(src/fn)

In [None]:
def extract_images(video_file):     
    # read the video file    
    cap = cv2.VideoCapture(video_file)
    # start the loop
    count = 0
    src=Path('test_data/input/')
    while True:
        is_read, f = cap.read()
        if not is_read:
            # break out of the loop if there are no frames to read
            break
        name = str(src/f'{count}.jpg')
        cv2.imwrite(name,f)
        count+=1
    cap.release()
    
    proc_images(src)

In [None]:
def predict(video_file):
    
    extract_images(video_file)
    
    pred_dict = {
        "slice_nr": -1,
        "needle_driver": False,
        "monopolar_curved_scissors": False,
        "force_bipolar": False,
        "clip_applier": False,
        "tip_up_fenestrated_grasper": False,
        "cadiere_forceps": False,
        "bipolar_forceps": False,
        "vessel_sealer": False,
        "suction_irrigator": False,
        "bipolar_dissector": False,
        "prograsp_forceps": False,
        "stapler": False,
        "permanent_cautery_hook_spatula": False,
        "grasping_retractor": False
    }

    ignored=[]
    video_output=[]
    inp = 'test_data/input'
    out = 'test_data/output'
    
    fs=get_image_files(inp)
    

    
    tta_res=[]
    fs_itm=[]
    for learn in ensemble_learner:
        tta_res.append(learn.tta(dl=learn.dls.test_dl(fs)))
        if len(fs_itm)<1:
            fs_itm=learn.dl.items
    
    tta_prs=first(zip(*tta_res))
#     tta_prs+=tta_prs[1:3]
    tta_prs=torch.stack(tta_prs)
    
    lbls=[]
    for i in range(len(dls.c)):
        arm_preds = tta_prs[:,:,cfg(i):cfg(i+1)].mean(0);
        arm_idxs = arm_preds.argmax(dim=1)
        arm_vocab = np.array(dls.vocab[i])
        lbls.append(arm_vocab[arm_idxs])
    
    for usm1,usm2,usm3,usm4,f in zip(lbls[0],lbls[1],lbls[2],lbls[3],fs_itm):
        print(usm1,usm2,usm3,usm4,f)
        a_pred_dict=copy.deepcopy(pred_dict)
        a_pred_dict['slice_nr']=int(f.stem)
        a_pred_dict[usm1]=True if usm1 in a_pred_dict.keys() else ignored.append(usm1)
        a_pred_dict[usm2]=True if usm2 in a_pred_dict.keys() else ignored.append(usm2)
        a_pred_dict[usm3]=True if usm3 in a_pred_dict.keys() else ignored.append(usm3)
        a_pred_dict[usm4]=True if usm4 in a_pred_dict.keys() else ignored.append(usm4)
        a_pred_dict.pop("nan", None)
        a_pred_dict.pop("blank", None)
        video_output.append(copy.deepcopy(a_pred_dict))   
        
    video_output=sorted(video_output, key=lambda d: d['slice_nr']) 
    print(set(ignored))
    
    with open('test_data/output/surgical-tool-presence.json', 'w') as fn:
        json.dump(video_output, fn)


In [None]:
video_file='test_data/vid_1_short.mp4'
predict(video_file)