In [7]:
import numpy as np
import os
import pandas as pd
import scipy.io as sio
from avi_r import AVIReader
import glob
import logging
import cv2 as cv
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from torch import from_numpy, unsqueeze, as_tensor
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader

class Directory:
    def __init__(self, trial_path):
        self.trial_path = trial_path

    @classmethod
    def get_path(cls, root='data', label='Trials', end_folder='aligned'):
        d = []
        for path, subdirs, files in os.walk(f'{root}'):
            for name in subdirs:
                temp = os.path.join(path, name)
                d.append(temp)
        directory = pd.DataFrame(d, columns=[f'{label}'])
        directory = directory[directory[f'{label}'].str.endswith(f'{end_folder}', -1*len(end_folder))]
        #only store paths that end with {end_folder}
        return cls(directory.iloc[:, 0])
class Mat:
    def __init__(self, time, emg, force):
        # self.mat_df = pd.DataFrame({'time': time, 'emg': emg, 'force': force})
        self.time = time
        self.emg = emg
        self.force = force

    @classmethod
    def load_mat_file(cls, path, struct_name='data', processed = None):
        mat_file = sio.loadmat(path, squeeze_me=True)
        time = mat_file[f'{struct_name}']['time'].item()
        emg = mat_file[f'{struct_name}']['emg'].item()
        #emg = (emg - processed['emg_mean']) / processed['emg_std']  # mean:= 0.0042, std:= 0.1927 (from dataset)
        force = mat_file[f'{struct_name}']['force'].item()
        return cls(time, emg, force)
class Vid:

    def __init__(self, images, fps, nr_frames):

        self.images = images
        self.fps = fps
        self.nr_frames = nr_frames
        self.vid_length = nr_frames / fps

    @classmethod
    def load_vid_file(cls, path, processed: dict = None):
        video = cv.VideoCapture(f'{path}')
        fps = video.get(cv.CAP_PROP_FPS)
        nr_frames = video.get(cv.CAP_PROP_FRAME_COUNT)
        if processed is not None:
            processed_images = []
            while video.isOpened():
                ret, frame = video.read()
                if ret:
                    processed_image = Preprocess.preprocess_frame(frame, preprocess=processed).processed_img
                    processed_images.append(processed_image)
                else:
                    break
            video.release()
            cv.destroyAllWindows()
            return cls(processed_images, fps, nr_frames)
        else:
            images = []
            while video.isOpened():
                ret, frame = video.read()
                if ret:
                    # image = frame
                    gray_image = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
                    images.append(gray_image)
                else:
                    break
            video.release()
            cv.destroyAllWindows()
            return cls(images, fps, nr_frames)

class Preprocess:

    def __init__(self, processed_img):
        self.processed_img = processed_img

    @classmethod
    def preprocess_frame(cls, image, preprocess=None):
        """
        Applies RGBtoGray, Crop and Hist_Eq on image
        """
        if preprocess is None:
            logging.info("preprocess is None! ")

        img = RGBtoGray.to_gray(image).gray_img
        img = Crop.crop(img).cropped_img
        if preprocess['hist_eq']:
            img = Hist_Eq.apply_hist_eq(img).eq_img
        img = transforms.ToTensor()(img)
        #img = transforms.Normalize([preprocess['img_mean']], [preprocess['img_std']])(img) ## FOR CALCULATING MEAN AND STD LEAVE THIS COMMENTED
        img = transforms.Resize((preprocess['img_size'], preprocess['img_size']), antialias=False)(img)

        return cls(img)

class Crop:
    def __init__(self, image):
        self.cropped_img = image

    @classmethod
    def crop(cls, image):
        cropped = image[50:540, 230:720]
        return cls(cropped)

class Hist_Eq:
    def __init__(self, image):
        self.eq_img = image

    @classmethod
    def apply_hist_eq(cls, image):
        img = cv.equalizeHist(image)
        #for adaptive Hist uncomment bellow and comment above
        #clahe = cv.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
        #img = clahe.apply(image)
        return cls(img)

class RGBtoGray:
    def __init__(self, image):
        self.gray_img = image

    @classmethod
    def to_gray(cls, image):
        img = cv.cvtColor(image, cv.COLOR_BGR2GRAY)
        return cls(img)



class Trial:
    def __init__(self, mat: Mat, vid):
        self.mat = mat
        self.vid = vid

    @classmethod
    def from_files(cls, trial_path: str, is_upper_case: bool = False, preprocessing: dict = None):
        mat_path = Trial.get_path(trial_path, 'mat')
        vid_path = Trial.get_path(trial_path, 'AVI')

        mat_file = Mat.load_mat_file(mat_path,processed = preprocessing)

        if is_upper_case:  # avi reader is case sensitive
            vid_path = vid_path[:-3] + 'avi'

        vid_file = Vid.load_vid_file(vid_path, processed=preprocessing)

        return cls(mat_file, vid_file)

    @staticmethod
    def get_path(trial_path: str, extension: str):
        extension_file = os.path.join(trial_path, f'*.{extension}')
        path = glob.glob(extension_file)
        if len(path) == 0:
            logging.error(f'No .{extension} file found for trial:' + trial_path)

        if len(path) > 1:
            logging.warning(f'more than one .{extension} file! First was returned')
        return path[0]

class TrainUnit:
    def __init__(self, frame, emg, force):
        self.frame = frame.unsqueeze(0)  # might have to change to list of frames
        self.emg = as_tensor(emg).unsqueeze(0).unsqueeze(0)
        self.force = as_tensor(np.array([force.mean()])).unsqueeze(0)

class MyDataset(Dataset):
    """
    Custom Pytorch Dataset class, such that pytorch dataloader can be used.
    """
    def __init__(self, data):
        self.train_unit = data

    def __getitem__(self, index):
        frame = self.train_unit[index].frame
        emg = self.train_unit[index].emg
        force = self.train_unit[index].force
        return frame, emg, force

    def __len__(self):
        return len(self.train_unit)


def map_emg_to_frame(trials: list, emg_fps: int = 1000):  # insert trial list
    train_units_list = []
    for trial in trials:
        us_fps = trial.vid.fps
        vid_length = trial.vid.vid_length
        emg_length = trial.mat.time[-1]
        time_diff = vid_length - emg_length
        if time_diff < 0:
            logging.error("Video length is shorter than EMG length")
        surplus_frames = int(time_diff * us_fps) + 1
        frames = trial.vid.images[0: int(trial.vid.nr_frames - surplus_frames)]
        # map emg to frames
        fps_ratio = emg_fps / us_fps  # fps = frames per second
        for i in range(0, len(frames)):
            emg = trial.mat.emg[int(i * fps_ratio):int((i + 1) * fps_ratio)][:int(fps_ratio)]
            force = trial.mat.force[int(i * fps_ratio):int((i + 1) * fps_ratio)][:int(fps_ratio)]
            train_unit = TrainUnit(frames[i], emg, force)
            train_units_list.append(train_unit)
    return train_units_list

In [21]:
# IMAGE PREPROCESSING

IMG_SIZE = 224
HIST_EQ = False
IMG_MEAN = 0 #0.1756  # Determined experimentally default 0.1756432205438614
IMG_STD =  1 #0.1135  # Determined experimentally default 0.11345013976097107

# EMG PREPROCESSING
EMG_MEAN = -0.0211  # Determined experimentally default -0.021118670256266833
EMG_STD = 0.6729
############################################################################################################
#           SET PARAMETERS FOR TRAINING           #
############################################################################################################

MODE = 'autoencoder'  # choose from end2end/ autoencoder/ force_prediction

BATCH_SIZE = 16
EPOCHS = 15
CRITERION = 'MSE'  # choose from MSE/ L1/ HUBER
OPTIMIZER = 'Adam'  # choose from Adam/ SGD
LEARNING_RATE = 0.001  # default = 0.001
MOMENTUM = 0.9  # default = 0.9

CHECKPOINT_PATH = 'models/checkpoints/'

############################################################################################################
#          GENERATE DICTIONARIES          #
############################################################################################################

PREPROCESSING = {
    'img_size': IMG_SIZE,
    'hist_eq': HIST_EQ,
    'img_mean': IMG_MEAN,
    'img_std': IMG_STD,
    'emg_mean': EMG_MEAN,
    'emg_std': EMG_STD
}

PARAMS = {
    'batch_size': BATCH_SIZE,
    'epochs': EPOCHS,
    'criterion': CRITERION,
    'optimizer': OPTIMIZER,
    'lr': LEARNING_RATE,
    'momentum': MOMENTUM
}

CHECKPOINT = {
    'path': CHECKPOINT_PATH,
    'prefix': 'ckpt',
    'counter': 0,
}


In [22]:
trial_paths = Directory.get_path().trial_path
print(f'trial_paths: {len(trial_paths)}')
trials = []
for path in trial_paths:
    trial = Trial.from_files(path, is_upper_case=True,preprocessing=PREPROCESSING)
    trials.append(trial)

trial_units = map_emg_to_frame(trials)

trial_paths: 15


In [20]:
import torch
def for_emg(dataset):
    """Compute the mean and std value of dataset."""
    # Calculate mean and std of the frames in the dataset
    # Calculate mean and std of the frames in the dataset
    emg_mean = 0
    for unit in dataset:
        emg_mean += unit.emg.squeeze(1).mean(1).sum(0)
    emg_mean = emg_mean / len(dataset)

    emg_var = 0.0
    sample_count = 0
    for unit in dataset:
        batch_samples = unit.frame.size(0)
        # emg = batch.frames.view(batch_samples, 1, -1)
        sample_count += unit.emg.nelement()
        emg_var += ((unit.emg.squeeze() - emg_mean) ** 2).sum(0)
    emg_std = torch.sqrt(emg_var / sample_count)

    return emg_mean, emg_std
emg_mean, emg_std = for_emg(trial_units)
print(f'EMG mean: {emg_mean}')
print(f'EMG std: {emg_std}')

EMG mean: 0.00013043224161738508
EMG std: 0.1296671366367021


EMG mean: -0.021118670256266833
EMG std: 0.6728964018510745


In [24]:
import torch
def for_frames(dataset):
    """Compute the mean and std value of dataset."""
    # Calculate mean and std of the frames in the dataset
    # Calculate mean and std of the frames in the dataset
    frames_mean = 0
    for unit in dataset:
        frames = unit.frame.view( 1, -1)
        frames_mean += frames.mean(1).sum(0)
    frames_mean = frames_mean / len(dataset)

    frames_var = 0.0
    pixel_count = 0
    for unit in dataset:
        frames = unit.frame.view( 1, -1)
        frames_var += ((frames.squeeze(0) - frames_mean) ** 2).sum(0)
        pixel_count += frames.nelement()
    frames_std = torch.sqrt(frames_var / pixel_count)

    return frames_mean, frames_std

frames_mean, frames_std = for_frames(trial_units)
print(f'Frames mean: {frames_mean}')
print(f'Frames std: {frames_std}')

Frames mean: 0.1756432205438614
Frames std: 0.11345013976097107


Frames mean: 0.1756432205438614
Frames std: 0.11345013976097107
