In [2]:
%load_ext autoreload
%autoreload 2

import json
import re
import numpy as np
import cv2
import torch
import torch.nn.functional as F
import torch.nn as nn
from torch.optim import Adam
from time import time, sleep

from utils import *
from custom_pca import custom_pca
from video_loader import VideoLoader
from autoencoders import *

seed = 42

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if device.type == 'cuda':
    print('Device:',torch.cuda.get_device_name(device))
models = torch.load('data_generated/week7/models.pth', map_location=device)

# Convert TempConvAE's weights
d = models['10-TempConvAE']['model']
d["to_lower_rep.weight"] = d["low_dim_mapping.0.weight"]
d["to_lower_rep.bias"] =   d["low_dim_mapping.0.bias"]
d["from_lower_rep.weight"] = d["low_dim_mapping.1.weight"]
d["from_lower_rep.bias"] = d["low_dim_mapping.1.bias"]
del d["low_dim_mapping.0.weight"]
del d["low_dim_mapping.0.bias"]
del d['low_dim_mapping.1.weight']
del d['low_dim_mapping.1.bias']
del models['10-OneHAE']['little_modification']

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
Device: Tesla P100-PCIE-12GB


In [3]:
i2task = {0: 'Knot_Tying',
        1: 'Needle_Passing',
        2: 'Suturing'}
ntask = len(i2task.keys())
task2i = {v: k for k, v in i2task.items()}

# Gesture index to description
gi2descr = {
    1: 'Reaching for needle with right hand',
    2: 'Positioning needle',
    3: 'Pushing needle through tissue',
    4: 'Transferring needle from left to right',
    5: 'Moving to center with needle in grip',
    6: 'Pulling suture with left hand',
    7: 'Pulling suture with right hand',
    8: 'Orienting needle',
    9: 'Using right hand to help tighten suture',
    10: 'Loosening more suture',
    11: 'Dropping suture at end and moving to end points',
    12: 'Reaching for needle with left hand',
    13: 'Making C loop around right hand',
    14: 'Reaching for suture with right hand',
    15: 'Pulling suture with both hands'
}

def load_video_data(tasks=None, subjects=None, trials=None, captures=None, gestures=None):
    if tasks is None:
        tasks = np.array(list(task2i.values()))
    else:
        tasks = np.array(tasks).ravel()

    root_path = 'data/JIGSAWS_converted'
    #or_tasks = '\|'.join([i2task[task] for task in tasks])
    video_meta = !find $root_path -name '*.avi' | sed 's:^.*/\([^/]\+_[A-Z][0-9]\{3\}\)_.*$:\1:'
    video_meta = [name[-4:] for name in video_meta for task in tasks if i2task[task] in name]
    if subjects is None:
        subjects = np.unique([x[0] for x in video_meta])
    else:
        subjects = np.array(subjects).ravel()

    if trials is None:
        trials = np.unique([x[-1] for x in video_meta])
    else:
        trials = np.array(trials).ravel()

    if captures is None:
        captures = np.array([1,2])
    else:
        captures = np.array(captures).ravel()

    if gestures is None:
        gestures = np.array(list(gi2descr.keys()))
    else:
        gestures = np.array(gestures).ravel()

    X = []
    y = []
    for task in tasks:
        task_name = i2task[task]
        for subject in subjects:
            for trial in trials:
                transcr_filename = f'{root_path}/{task_name}/transcriptions/{task_name}_{subject}00{trial}.txt'
                try:
                    with open(transcr_filename, 'r') as fp:
                        for l in fp.readlines():
                            start_frame, end_frame, gesture = l.split()
                            start_frame = int(start_frame)
                            end_frame = int(end_frame)
                            gesture = int(gesture[1:])
                            if not gesture in gestures:
                                continue
                            for capt in [1,2]:
                                video_filename = f'{root_path}/{task_name}/video/{task_name}_{subject}00{trial}_capture{capt}.avi'
                                fragment = VideoLoader(video_filename, gray=True, start_frame=start_frame,
                                                       duration_frames=end_frame-start_frame+1)
                                fragment.trial = trial
                                fragment.jig_capture = capt
                                fragment.subject = subject
                                fragment.task = task

                                X.append(fragment)
                                y.append(gesture)
                except FileNotFoundError:
                    print(f'Video not present: task {task_name}, subject {subject}, trial {trial}, capture {capt}')
                    break
    y = np.array(y)
    return X, y

## Classification procedure for participant B, Suturing task

In [4]:
ncomp = 10
task = 2
subject = 'B'

### Points creation for baseline

In [6]:
video_data, y = load_video_data(tasks=task, subjects=subject)
nsample = len(y)
X = []
for video in video_data:
    video.torch = False
    all_frames = video.get_all_frames().reshape(video.duration_frames, -1)

    compression_model = custom_pca(ncomp)
    compression_model.fit(all_frames)
    frames_enc, shape = compression_model.encode(all_frames)
    A = np.linalg.pinv(frames_enc[:-1])@frames_enc[1:]

    X.append((compression_model, A))

### Computation of all distances

In [None]:
full_martin_gram = np.zeros((len(X), len(X)))
for i, mi in enumerate(X):
    for j, mj in enumerate(X):
        if i > j:
            full_martin_gram[i,j] = full_martin_gram[j,i]
        else:
            full_martin_gram[i,j] = martin_dist()