In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
import os
from PIL import Image
import yaml
from utils import *
from helpers import *

import pdb
from tqdm import tqdm

from dlclive import DLCLive, Processor

In [None]:
with open('configs/octo-1.yaml', 'r') as file:
    config = yaml.safe_load(file)
    
IS_METADATA_PRESENT = (config['path']['xls'] is not None)
files_info = None
filenames = None

fps = config['info']['fps']

root_dir = config['path']['root']
video_dir = f"{root_dir}/{config['path']['video']}"

if IS_METADATA_PRESENT:
    xls_path = f"{root_dir}/{config['path']['xls']}"
    files_info = read_octopus_xlsx(xls_path)
    files_info['Stimulation Class'] = files_info['Stimulation Type'].apply(get_stim_class)
    files_info.reset_index(inplace=True)
    filenames = files_info["File Name"].to_list()
else:
    files = os.listdir(video_dir)
    filenames = [os.path.splitext(file)[0] for file in files]

print(f"Processing {len(filenames)} videos from {video_dir}")
if len(filenames) < 4:
    print('\t', end='')
    print(*filenames, sep="\n\t")

###

working_dir = f"{root_dir}/{config['path']['working']}" # to save processed data and figures

model_path = f"{root_dir}/{config['path']['model']}"

dlc_proc = Processor()
dlc_live = DLCLive(
    model_path,
    processor=dlc_proc,
    pcutoff=0.2,
    resize=1)

In [None]:
def get_fig_dir(filename):
    figs_dir = f"{working_dir}/figs"
    os.makedirs(figs_dir, exist_ok=True)
    return figs_dir

def get_data_dir(filename):
    data_dir = f"{working_dir}/data/{filename}"
    os.makedirs(data_dir, exist_ok=True)
    return data_dir

In [None]:
# 62
for video_idx in range(len(filenames)): #tqdm(range(len(filenames))):
    video_filename = filenames[video_idx]

    row = None
    if IS_METADATA_PRESENT:
        row = files_info.iloc[video_idx]
    move_class = int(row['Classification'])
    stim_class = row["Stimulation Class"]

    if move_class == 0: #skip videos which shows no movement
        continue

    stim_signal = stim_class.split()[0]
    if stim_signal == "Mechanical": #skip mech stimulations
        continue

    video = None
    try:
        video = load_video(video_dir, video_filename)
        # print("Video loaded:", video_filename)
    except:
        continue

    figs_dir = get_fig_dir(video_filename)
    data_dir = get_data_dir(video_filename)

    index = -1

    feature_angles = []
    pose_speed = []

    prev_pose_xy = None

    while video.isOpened():
        ret, frame = video.read()
        if not ret:
            break

        index += 1

        curr_pose = detect_pose(dlc_live, frame, index)
        
        curr_pose_xy, curr_pose_p = curr_pose[:, :-1], curr_pose[:, -1]
        if prev_pose_xy is None:
            prev_pose_xy = curr_pose_xy

        # do stuffs with current pose
        pose = np.stack([prev_pose_xy, curr_pose_xy])
        feature_angles_item, pose_speed_item = extract_pose_features(pose)
        feature_angles.append(feature_angles_item)
        pose_speed.append(pose_speed_item)

        # set previous pose
        prev_pose_xy = curr_pose_xy

    video.release()

    feature_angles = np.array(feature_angles)
    pose_speed = np.array(pose_speed)
    np.save(f'{data_dir}/angles.npy', feature_angles)
    np.save(f'{data_dir}/speed.npy', pose_speed)

In [None]:
time_margin = (-0.5, 6) # for plotting
total_trimmed_frames = int(fps*(time_margin[1]-time_margin[0]))
tx = np.linspace(time_margin[0], time_margin[1], total_trimmed_frames)

feature_angles_dict = {
    "Cord": [],
    "Proximal": [],
    "Distal": []
}

pose_speed_dict = {
    "Cord": [],
    "Proximal": [],
    "Distal": []
}

for video_idx in range(len(filenames)):
    video_filename = filenames[video_idx]

    row = None
    if IS_METADATA_PRESENT:
        row = files_info.iloc[video_idx]
    move_class = int(row['Classification'])
    stim_class = row["Stimulation Class"]

    if move_class != 0: #skip videos which shows no movement
        continue

    stim_signal = stim_class.split()[0]
    if stim_signal != "Mechanical": #skip mech stimulations
        continue

    metadata = load_metadata(video_filename, np.inf, row, fps, 0)
    start_f = metadata['start']['f']
    end_f = metadata['end']['f']

    figs_dir = get_fig_dir(video_filename)
    data_dir = get_data_dir(video_filename)

    try:
        feature_angles = np.load(f'{data_dir}/angles.npy')
        pose_speed = np.load(f'{data_dir}/speed.npy')
    except:
        print("File not found: ", video_filename)
        continue

    stim_loc = stim_class.split()[1]
    feature_angles_dict[stim_loc].append(feature_angles[start_f: end_f, :])
    pose_speed_dict[stim_loc].append(pose_speed[start_f: end_f, :])


for key in feature_angles_dict.keys():
    feature_angles_dict[key] = np.array(feature_angles_dict[key])
    pose_speed_dict[key] = np.array(pose_speed_dict[key])

In [None]:
for i, key in enumerate(feature_angles_dict.keys()):
    print(key, feature_angles_dict[key].shape)

In [None]:
num_lines = 5 # for 5 positions
colors = plt.cm.Paired(np.linspace(0, 1, num=num_lines))

num_rows, num_cols = 1, 3
fig, axs = plt.subplots(num_rows, num_cols, figsize=(16, 4), gridspec_kw={'top': 0.85})

for i, key in enumerate(pose_speed_dict.keys()):
    pose_speed_mean = np.mean(pose_speed_dict[key], axis=0)

    for k in range(5):
        data = pose_speed_mean[..., k]
        data = smooth_data(data, 15)
        axs[i].plot(tx, data, label=k, linewidth=2, c=colors[k])
    # axs[i].set_ylim(-10, 190)
    axs[i].set_title(key)
    axs[i].set_xlabel("Time (s)")
    axs[i].axvline(x=0, color='orange', linewidth=2, alpha=0.3)

legend_elements = [Line2D([0], [0], color=colors[i], lw=4, label=i) for i in range(5)]
fig.legend(title="Speed", handles=legend_elements, loc='upper right')
fig.suptitle(f'Electrical Stimulations - Speed')

figs_dir_full = f'{figs_dir}/dlc-summary'
os.makedirs(figs_dir_full, exist_ok=True)
fig.savefig(f'{figs_dir_full}/Electrical Stimulations - Speed.png', facecolor='white')

In [None]:
num_lines = 3 # for 3 angles
colors = plt.cm.Paired(np.linspace(0, 1, num=num_lines))

num_rows, num_cols = 1, 3
fig, axs = plt.subplots(num_rows, num_cols, figsize=(16, 4), gridspec_kw={'top': 0.85})

for i, key in enumerate(feature_angles_dict.keys()):
    feature_angle_mean = np.mean(feature_angles_dict[key], axis=0)

    for k in range(3):
        data = feature_angle_mean[..., k]
        data = smooth_data(data, 5)
        axs[i].plot(tx, data, label=k, linewidth=2, c=colors[k])
    axs[i].set_ylim(-10, 190)
    axs[i].set_title(key)
    axs[i].set_xlabel("Time (s)")
    axs[i].axvline(x=0, color='orange', linewidth=2, alpha=0.3)

legend_elements = [Line2D([0], [0], color=colors[i], lw=4, label=i) for i in range(3)]
fig.legend(title="Angle", handles=legend_elements, loc='upper right')
fig.suptitle(f'Electrical Stimulations - Angle')

figs_dir_full = f'{figs_dir}/dlc-summary'
os.makedirs(figs_dir_full, exist_ok=True)
fig.savefig(f'{figs_dir_full}/Electrical Stimulations - Angle.png', facecolor='white')

In [None]:
time_margin = (-0.5, 6) # for plotting
total_trimmed_frames = int(fps*(time_margin[1]-time_margin[0]))
tx = np.linspace(time_margin[0], time_margin[1], total_trimmed_frames)

plt.ioff()

for video_idx in range(len(filenames)):
    video_filename = filenames[video_idx]

    row = None
    if IS_METADATA_PRESENT:
        row = files_info.iloc[video_idx]
    move_class = int(row['Classification'])
    stim_class = row["Stimulation Class"]

    if move_class != 0: #skip videos which shows no movement
        continue

    stim_signal = stim_class.split()[0]
    # if stim_signal != "Mechanical": #skip mech stimulations
    #     continue

    figs_dir = get_fig_dir(video_filename)
    data_dir = get_data_dir(video_filename)

    try:
        feature_angles = np.load(f'{data_dir}/angles.npy')
        pose_speed = np.load(f'{data_dir}/speed.npy')
    except:
        print("File not found: ", video_filename)
        continue

    total_frames = feature_angles.shape[0]

    metadata = load_metadata(video_filename, total_frames, row, fps, 0)
    start_f = metadata['start']['f']
    end_f = metadata['end']['f']

    num_rows, num_cols = 2, 1
    fig, axs = plt.subplots(num_rows, figsize=(8, 8), gridspec_kw={'top': 0.9})

    for k in range(3):
        data = feature_angles[start_f: end_f, k]
        data = smooth_data(data, 5)
        axs[0].plot(tx, data, label=k)
    axs[0].set_title("Angle")
    axs[0].set_xlabel("Time (s)")
    axs[0].legend(title="Angle")
    axs[0].axvline(x=0, color='orange', linewidth=2, alpha=0.3)

    for i in range(5):
        data = pose_speed[start_f: end_f, i]
        data = smooth_data(data, 20)
        axs[1].plot(tx, data, label=i)
    axs[1].set_title("Speed")
    axs[1].legend(title="Keypoints")
    axs[1].axvline(x=0, color='orange', linewidth=2, alpha=0.3)

    fig.subplots_adjust(top=0.9, bottom=0.1, left=0.1, right=0.9, hspace=0.3, wspace=0.3)
    fig.suptitle(f'{move_class}_{stim_class}')

    figs_dir_full = f'{figs_dir}/dlc'
    os.makedirs(figs_dir_full, exist_ok=True)
    fig.savefig(f'{figs_dir_full}/{video_filename}.png', facecolor='white')