# utility functions for two dimensional gait analyses

> Basic functions used throughout the 2D module and/or that foster the use of this module

In [None]:
#| default_exp twoD/utils

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export

from typing import List, Tuple, Dict, Optional
from pathlib import Path
import pandas as pd
import numpy as np
from scipy.signal import savgol_filter

from gait_analysis.core import EventBout

In [None]:
#| export

def process_all_dlc_tracking_h5s_with_default_settings(in_dir_path: Path, # path to the input directory which contains all DLC tracking data results
                                                       week_id: int, # number of weeks post injection
                                                       out_dir_path: Path # path to the output directory where all processed results will be saved
                                                      ) -> None:
    filepaths_dlc_trackings = []
    for filepath in in_dir_path.iterdir():
        if filepath.name.endswith('.h5'):
            if 'filtered' not in filepath.name:
                filepaths_dlc_trackings.append(filepath)
    for filepath in tqdm(filepaths_dlc_trackings):
        recording = Tracked2DRecording(filepath = filepath, week_id = week_id)
        if recording.df_successfully_loaded:
            recording.preprocess()
            if recording.logs['coverage_critical_markers'] >= recording.logs['coverage_threshold']: 
                recording.run_event_detection()
                recording.export_results(out_dir_path = out_dir_path)
                recording.inspect_processing()

In [None]:
#| export

def filter_dataframe(df: pd.DataFrame, filter_criteria: List[Tuple]) -> pd.DataFrame:
    # assert all list have equal lenghts
    valid_idxs_per_criterion = []
    for column_name, comparison_method, reference_value in filter_criteria:
        # assert valid key in comparison methods
        # assert column name exists
        if comparison_method == 'greater':
            valid_idxs_per_criterion.append(df.loc[df[column_name] > reference_value].index.values)
        elif comparison_method == 'smaller':
            valid_idxs_per_criterion.append(df.loc[df[column_name] < reference_value].index.values)
        elif comparison_method == 'equal_to':
            valid_idxs_per_criterion.append(df.loc[df[column_name] == reference_value].index.values)
        elif comparison_method == 'is_in_list':
            valid_idxs_per_criterion.append(df.loc[df[column_name].isin(reference_value)].index.values)
        elif comparison_method == 'is_nan':
            valid_idxs_per_criterion.append(df.loc[df[column_name].isnull()].index.values)
    shared_valid_idxs_across_all_criteria = valid_idxs_per_criterion[0]
    if len(valid_idxs_per_criterion) > 1:
        for i in range(1, len(valid_idxs_per_criterion)):
            shared_valid_idxs_across_all_criteria = np.intersect1d(shared_valid_idxs_across_all_criteria, valid_idxs_per_criterion[i])
    df_filtered = df.loc[shared_valid_idxs_across_all_criteria, :].copy()
    return df_filtered

## Functions related to preprocessing:

In [None]:
#| export

def get_max_odd_n_frames_for_time_interval(fps: int, # frames per second of the recording
                                           time_interval: 0.5 # desired maximal time interval in seconds; default = 0.5 s
                                          ) -> int:
    """
    For the savgol_filter function of scipy - which will be used during preprocessing to smooth the data -
    you need an odd integer as the window_length parameter. This function helps to find the maximum odd number
    of frames that still fit within a specified time interval at a given fps.
    """
    assert type(fps) == int, '"fps" has to be an integer!'
    frames_per_time_interval = fps * time_interval
    if frames_per_time_interval % 2 == 0:
        max_odd_frame_count = frames_per_time_interval - 1
    elif frames_per_time_interval == int(frames_per_time_interval):
        max_odd_frame_count = frames_per_time_interval
    else:
        frames_per_time_interval = int(frames_per_time_interval)
        if frames_per_time_interval % 2 == 0:
            max_odd_frame_count = frames_per_time_interval - 1
        else:
            max_odd_frame_count = frames_per_time_interval
    assert max_odd_frame_count > 0, f'The specified time interval is too short to fit an odd number of frames'
    return int(max_odd_frame_count) 

In [None]:
#| export

def get_preprocessing_relevant_marker_ids(df: pd.DataFrame, # DataFrame with x, y, and likelihood for tracked marker_ids
                                          marker_ids_to_exclude: Optional[List[str]]=None # list of marker_ids to exclude; optional default None
                                         ) -> List[str]:
    all_marker_ids = get_all_unique_marker_ids(df = df)
    relevant_marker_ids = all_marker_ids
    if marker_ids_to_exclude != None:
        for marker_id_to_exclude in marker_ids_to_exclude:
            if marker_id_to_exclude in relevant_marker_ids:
                relevant_marker_ids.remove(marker_id_to_exclude)
    return relevant_marker_ids

In [None]:
#| export

def get_all_unique_marker_ids(df: pd.DataFrame) -> List[str]:
    unique_marker_ids = []
    for column_name in df.columns:
        marker_id, _ = column_name.split('_')
        if marker_id not in unique_marker_ids:
            unique_marker_ids.append(marker_id)
    return unique_marker_ids

In [None]:
#| export

def smooth_tracked_coords_and_likelihood(df: pd.DataFrame, # DataFrame to smooth
                                         window_length: int, # Odd integer (!) of sliding window size in frames to consider for smoothing
                                         marker_ids: List[str]=['all'], # List of markers that will be smoothed; optional default ['all'] to smooth all marker_ids
                                         polyorder: int=3 # Order of the polynom used for the savgol filter
                                        ) -> pd.DataFrame:
    """
    Smoothes the DataFrame basically using the implementation from DLC2kinematics:
    https://github.com/AdaptiveMotorControlLab/DLC2Kinematics/blob/82e7e60e00e0efb3c51e024c05a5640c91032026/src/dlc2kinematics/preprocess.py#L64
    However, with one key change: likelihoods will also be smoothed.
    In addition, we will not smooth the columns for the tracked LEDs and the MazeCorners.

    Note: window_length has to be an odd integer!
    """
    smoothed_df = df.copy()
    column_names = get_column_names(df = smoothed_df,
                                    column_identifiers = ['x', 'y', 'likelihood'],
                                    marker_ids = marker_ids)
    column_idxs_to_smooth = smoothed_df.columns.get_indexer(column_names)
    smoothed_df.iloc[:, column_idxs_to_smooth] = savgol_filter(x = smoothed_df.iloc[:, column_idxs_to_smooth],
                                                               window_length = window_length,
                                                               polyorder = polyorder,
                                                               axis = 0)
    return smoothed_df  

In [None]:
#| export

def get_column_names(df: pd.DataFrame, 
                     column_identifiers: List[str], 
                     marker_ids: List[str]=['all'],
                    ) -> List[str]:
    matching_column_names = []
    for column_name in df.columns:
        marker_id, column_identifier = column_name.split('_')
        if marker_ids == ['all']:
            if column_identifier in column_identifiers:
                matching_column_names.append(column_name)
        else:
            if (marker_id in marker_ids) and (column_identifier in column_identifiers):
                matching_column_names.append(column_name)
    return matching_column_names

In [None]:
#| export

def interpolate_low_likelihood_intervals(df: pd.DataFrame, 
                                         marker_ids: List[str], 
                                         max_interval_length: int,
                                         framerate: float,
                                        ) -> pd.DataFrame:
    interpolated_df = df.copy()
    for marker_id in marker_ids:
        low_likelihood_interval_border_idxs = get_low_likelihood_interval_border_idxs(likelihood_series = interpolated_df[f'{marker_id}_likelihood'], 
                                                                                      max_interval_length = max_interval_length, 
                                                                                      framerate = framerate)
        for start_idx, end_idx in low_likelihood_interval_border_idxs:
            if (start_idx - 1 >= 0) and (end_idx + 2 < interpolated_df.shape[0]):
                interpolated_df[f'{marker_id}_x'][start_idx - 1 : end_idx + 2] = interpolated_df[f'{marker_id}_x'][start_idx - 1 : end_idx + 2].interpolate()
                interpolated_df[f'{marker_id}_y'][start_idx - 1 : end_idx + 2] = interpolated_df[f'{marker_id}_y'][start_idx - 1 : end_idx + 2].interpolate()
                interpolated_df[f'{marker_id}_likelihood'][start_idx : end_idx + 1] = 0.5
    return interpolated_df  

In [None]:
#| export

def get_low_likelihood_interval_border_idxs(likelihood_series: pd.Series,
                                            framerate: float,
                                            max_interval_length: int,
                                            min_likelihood_threshold: float=0.5
                                           ) -> List[Tuple[int, int]]:
    all_low_likelihood_idxs = np.where(likelihood_series.values < min_likelihood_threshold)[0]
    short_low_likelihood_interval_border_idxs = get_interval_border_idxs(all_matching_idxs = all_low_likelihood_idxs,
                                                                               framerate = framerate,
                                                                               max_interval_duration = max_interval_length*framerate)
    return short_low_likelihood_interval_border_idxs

In [None]:
#| export

def get_interval_border_idxs(all_matching_idxs: np.ndarray,
                              framerate: float,
                              min_interval_duration: Optional[float]=None, 
                              max_interval_duration: Optional[float]=None,
                             ) -> List[Tuple[int, int]]:
    interval_border_idxs = []
    if all_matching_idxs.shape[0] >= 1:
        step_idxs = np.where(np.diff(all_matching_idxs) > 1)[0]
        step_end_idxs = np.concatenate([step_idxs, np.array([all_matching_idxs.shape[0] - 1])])
        step_start_idxs = np.concatenate([np.array([0]), step_idxs + 1])
        interval_start_idxs = all_matching_idxs[step_start_idxs]
        interval_end_idxs = all_matching_idxs[step_end_idxs]
        for start_idx, end_idx in zip(interval_start_idxs, interval_end_idxs):
            interval_frame_count = (end_idx+1) - start_idx
            interval_duration = interval_frame_count * framerate          
            if (min_interval_duration != None) and (max_interval_duration != None):
                append_interval = min_interval_duration <= interval_duration <= max_interval_duration 
            elif min_interval_duration != None:
                append_interval = min_interval_duration <= interval_duration
            elif max_interval_duration != None:
                append_interval = interval_duration <= max_interval_duration
            else:
                append_interval = True
            if append_interval:
                interval_border_idxs.append((start_idx, end_idx))
    return interval_border_idxs  

In [None]:
#| export

def add_new_marker_derived_from_existing_markers(df: pd.DataFrame,
                                                 existing_markers: List[str],
                                                 new_marker_id: str,
                                                 likelihood_threshold: float = 0.5
                                                )->None:
    df_with_new_marker = df.copy()
    for coordinate in ['x', 'y']:
        df_with_new_marker[f'{new_marker_id}_{coordinate}'] = (sum([df_with_new_marker[f'{marker_id}_{coordinate}'] for marker_id in existing_markers]))/len(existing_markers)
    df_with_new_marker[f'{new_marker_id}_likelihood'] = 0
    row_idxs_where_all_likelihoods_exceeded_threshold = get_idxs_where_all_markers_exceed_likelihood(df = df_with_new_marker, 
                                                                                                       marker_ids = existing_markers, 
                                                                                                       likelihood_threshold = 0.5)
    df_with_new_marker.iloc[row_idxs_where_all_likelihoods_exceeded_threshold, -1] = 1
    return df_with_new_marker

In [None]:
#| export

def get_idxs_where_all_markers_exceed_likelihood(df: pd.DataFrame,
                                                  marker_ids: List[str],
                                                  likelihood_threshold: float=0.5
                                                 ) -> np.ndarray:
    valid_idxs_per_marker_id = []
    for marker_id in marker_ids:
        valid_idxs_per_marker_id.append(df.loc[df[f'{marker_id}_likelihood'] >= likelihood_threshold].index.values)
    shared_valid_idxs_for_all_markers = valid_idxs_per_marker_id[0]
    if len(valid_idxs_per_marker_id) > 1:
        for i in range(1, len(valid_idxs_per_marker_id)):
            shared_valid_idxs_for_all_markers = np.intersect1d(shared_valid_idxs_for_all_markers, valid_idxs_per_marker_id[i])
    return shared_valid_idxs_for_all_markers

In [None]:
#| export

def compute_coverage(df: pd.DataFrame,
                     critical_marker_ids: List[str],
                     likelihood_threshold: float=0.5
                    ) -> float:
    idxs_where_all_markers_exceed_likelihood_threshold = get_idxs_where_all_markers_exceed_likelihood(df = df, 
                                                                                                    marker_ids = critical_marker_ids,
                                                                                                    likelihood_threshold = likelihood_threshold)
    return idxs_where_all_markers_exceed_likelihood_threshold.shape[0] / df.shape[0]

In [None]:
#| export

def get_corner_coords_with_likelihoods(df: pd.DataFrame) -> Dict:
    corner_coords_with_likelihood = {}
    for corner_marker_id in ['MazeCornerClosedRight', 'MazeCornerClosedLeft', 'MazeCornerOpenRight', 'MazeCornerOpenLeft']:
        xy_coords, min_likelihood = get_most_reliable_marker_position_with_likelihood(df = df, marker_id = corner_marker_id)
        corner_coords_with_likelihood[corner_marker_id] = {'coords': xy_coords, 'min_likelihood': min_likelihood}
    return corner_coords_with_likelihood

In [None]:
#| export

def get_most_reliable_marker_position_with_likelihood(df: pd.DataFrame,
                                                      marker_id: str,
                                                      percentile: float=99.95
                                                     ) -> Tuple[np.array, float]:
    likelihood_threshold = np.nanpercentile(df[f'{marker_id}_likelihood'].values, percentile)
    df_most_reliable_frames = df.loc[df[f'{marker_id}_likelihood'] >= likelihood_threshold].copy()
    most_reliable_x, most_reliable_y = df_most_reliable_frames[f'{marker_id}_x'].median(), df_most_reliable_frames[f'{marker_id}_y'].median()
    return np.array([most_reliable_x, most_reliable_y]), likelihood_threshold 

In [None]:
#| export

def get_translation_vector(coords_to_become_origin: np.ndarray) -> np.ndarray:
    return -coords_to_become_origin

In [None]:
#| export

def evaluate_maze_shape_using_open_corners(corners_and_likelihoods: Dict, tolerance: float) -> Dict:
    best_result = {'valid': False, 'mean_error': tolerance + 1, 'open_corner_id': None, 'side_id': None}
    all_open_corner_marker_ids = [corner_marker_id for corner_marker_id in corners_and_likelihoods.keys() if 'Open' in corner_marker_id]
    for open_corner_marker_id in all_open_corner_marker_ids:
        valid_positions = False
        side_id = open_corner_marker_id[open_corner_marker_id.find('Open') + 4:]
        if side_id == 'Left': opposite_side_id = 'Right'
        else: opposite_side_id = 'Left'
        closed_corner_opposite_side = f'MazeCornerClosed{opposite_side_id}'
        angle_error = compute_angle_error(a = corners_and_likelihoods[f'MazeCornerClosed{opposite_side_id}']['coords'],
                                          b = corners_and_likelihoods[f'MazeCornerClosed{side_id}']['coords'],
                                          c = corners_and_likelihoods[open_corner_marker_id]['coords'])
        distance_ratio_error = compute_distance_ratio_error(corners_and_likelihoods = corners_and_likelihoods,
                                                            open_corner_marker_id = open_corner_marker_id,
                                                            side_id = side_id)
        if (angle_error <= tolerance) & (distance_ratio_error <= tolerance):
            valid_positions = True
        mean_error = (angle_error + distance_ratio_error) / 2
        if mean_error < best_result['mean_error']:
            best_result['valid'] = valid_positions
            best_result['mean_error'] = mean_error
            best_result['open_corner_id'] = open_corner_marker_id
            best_result['side_id'] = side_id
    return best_result

In [None]:
#| export

def compute_error_proportion(query_value: float, target_value: float) -> float:
    return abs(query_value - target_value) / target_value

In [None]:
#| export

def compute_angle_error(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> float:
    # b is point at the joint that connects the other two
    ba = a - b
    bc = c - b
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.degrees(np.arccos(cosine_angle))
    return compute_error_proportion(query_value = angle, target_value = 90)

In [None]:
#| export

def compute_distance_ratio_error(corners_and_likelihoods: Dict, open_corner_marker_id: str, side_id: str) -> float:
    maze_width = et_distance_between_two_points(corners_and_likelihoods['MazeCornerClosedLeft']['coords'],
                                                       corners_and_likelihoods['MazeCornerClosedRight']['coords'])
    maze_length = get_distance_between_two_points(corners_and_likelihoods[f'MazeCornerClosed{side_id}']['coords'],
                                                       corners_and_likelihoods[open_corner_marker_id]['coords'])
    distance_ratio = maze_length/maze_width
    return compute_error_proportion(query_value = distance_ratio, target_value = 50/4)

In [None]:
#| export

def get_distance_between_two_points(coords_point_a: np.ndarray, coords_point_b: np.ndarray) -> float:
    return ((coords_point_a[0] - coords_point_b[0])**2 + (coords_point_a[1] - coords_point_b[1])**2)**0.5

In [None]:
#| export

def get_conversion_factor_px_to_cm(coords_point_a: np.ndarray, coords_point_b: np.ndarray, distance_in_cm: float) -> float:
    distance = get_distance_between_two_points(coords_point_a, coords_point_b)
    return distance_in_cm / distance

In [None]:
#| export

def get_rotation_angle_with_open_corner(corners: Dict, side_id: str, translation_vector: np.ndarray, conversion_factor: float) -> float:
    """
    Function, that calculates the rotation angle of the maze considering the best matching open corner
    and the corresponding closed corner on the same side.

    Returns:
        float: angle in radians
    """
    if side_id == 'Left':
        side_specific_y = 0
    else:
        side_specific_y = 4
    translated_closed_corner = corners[f'MazeCornerClosed{side_id}']['coords'] + translation_vector
    translated_open_corner = corners[f'MazeCornerOpen{side_id}']['coords'] + translation_vector
    target_rotated_open_corner = np.asarray([50 / conversion_factor, side_specific_y / conversion_factor])
    length_a = get_distance_between_two_points(translated_open_corner, target_rotated_open_corner) * conversion_factor
    length_b = get_distance_between_two_points(translated_open_corner, translated_closed_corner) * conversion_factor
    length_c = 50
    angle = math.acos((length_b**2 + length_c**2 - length_a**2) / (2 * length_b * length_c))
    return angle  

In [None]:
#| export

def get_rotation_angle_with_closed_corners_only(corners: Dict, translation_vector: np.ndarray, conversion_factor: float) -> float:
    translated_closed_left = corners['MazeCornerClosedLeft']['coords'] + translation_vector
    translated_closed_right = corners['MazeCornerClosedRight']['coords'] + translation_vector
    target_rotated_closed_right = np.asarray([0, 4 / conversion_factor])

    length_a = get_distance_between_two_points(translated_closed_right, target_rotated_closed_right) * conversion_factor
    length_b = get_distance_between_two_points(translated_closed_left, translated_closed_right) * conversion_factor
    length_c = 4
    angle = math.acos((length_b**2 + length_c**2 - length_a**2) / (2 * length_b * length_c))
    return angle

In [None]:
#| export

def normalize_df(df: pd.DataFrame, normalization_parameters)->None:
    unadjusted_df = df.copy()
    translated_df = translate_df(df = unadjusted_df, translation_vector = normalization_parameters['translation_vector'])
    rotated_and_translated_df = rotate_df(df = translated_df, rotation_angle = normalization_parameters['rotation_angle'])
    final_df = convert_df_to_cm(df = rotated_and_translated_df, conversion_factor = normalization_parameters['conversion_factor'])
    return final_df

In [None]:
#| export

def translate_df(df: pd.DataFrame, translation_vector: np.array) -> pd.DataFrame:
    for marker_id in get_all_unique_marker_ids(df = df):
        df.loc[:, [f'{marker_id}_x', f'{marker_id}_y']] += translation_vector
    return df

In [None]:
#| export

def rotate_df(df: pd.DataFrame, # DataFrame with 2D coordinates to be rotated
              rotation_angle: float # rotation angle in radians
             ) -> pd.DataFrame:
    df_rotated = df.copy()
    cos_theta, sin_theta = math.cos(rotation_angle), math.sin(rotation_angle)
    for marker_id in get_all_unique_marker_ids(df = df):
        df_rotated[f'{marker_id}_x'] = df[f'{marker_id}_x'] * cos_theta - df[f'{marker_id}_y']  * sin_theta
        df_rotated[f'{marker_id}_y'] = df[f'{marker_id}_x'] * sin_theta + df[f'{marker_id}_y']  * cos_theta
    return df_rotated

In [None]:
#| export

def convert_df_to_cm(df: pd.DataFrame, conversion_factor: float) -> pd.DataFrame:
    for marker_id in get_all_unique_marker_ids(df = df):
        df.loc[:, [f'{marker_id}_x', f'{marker_id}_y']] *= conversion_factor
    return df

### Functions related to run_event_detection

In [None]:
def create_behavior_df(normalized_df: pd.DataFrame, bodyparts_to_include: List[str]) -> pd.DataFrame:
    column_names = get_column_names(df = normalized_df, column_identifiers = ['x', 'y', 'likelihood'], marker_ids = bodyparts_to_include)
    return normalized_df[column_names].copy()

In [None]:
#| export

def add_orientation_to_behavior_df(behavior_df: pd.DataFrame,
                                   all_bodyparts: Dict,
                                   bodyparts_for_direction_front_to_back: List[str]) -> pd.DataFrame:
    assert len(bodyparts_for_direction_front_to_back) ==2, '"bodyparts_for_direction_front_to_back" must be a list of exact 2 marker_ids!'
    front_marker_id = bodyparts_for_direction_front_to_back[0]
    back_marker_id = bodyparts_for_direction_front_to_back[1]
    behavior_df.loc[all_bodyparts[front_marker_id].df['x'] > all_bodyparts[back_marker_id].df['x'], 'facing_towards_open_end'] = True
    return behavior_df

In [None]:
#| export

def add_immobility_based_on_several_bodyparts_to_behavior_df(behavior_df: pd.DataFrame,
                                                             all_bodyparts: Dict,
                                                             bodyparts_critical_for_freezing: List[str]) -> pd.DataFrame:
    
    # ToDo: Shares some code witht he "filter_dataframe" function, can it be reused here?
    #       However, here we iterate through several dfs and use the shared indices across 
    #       These dataframes, so the behavior is different and adaptations would be required.
    valid_idxs_per_marker_id = []
    for bodypart_id in bodyparts_critical_for_freezing:
        tmp_df = all_bodyparts[bodypart_id].df.copy()
        valid_idxs_per_marker_id.append(tmp_df.loc[tmp_df['immobility'] == True].index.values)
    shared_valid_idxs_for_all_markers = valid_idxs_per_marker_id[0]
    if len(valid_idxs_per_marker_id) > 1:
        for next_set_of_valid_idxs in valid_idxs_per_marker_id[1:]:
            shared_valid_idxs_for_all_markers = np.intersect1d(shared_valid_idxs_for_all_markers, next_set_of_valid_idxs)
    behavior_df.loc[shared_valid_idxs_for_all_markers, 'immobility'] = True
    return behavior_df

In [None]:
#| export

def get_immobility_related_events(behavior_df: pd.DataFrame, fps: float, min_interval_duration: float, event_type: str) -> List[EventBout]:
    all_immobility_idxs = np.where(behavior_df['immobility'].values == True)[0]
    immobility_interval_border_idxs = get_interval_border_idxs(all_matching_idxs = all_immobility_idxs,
                                                              framerate = 1/fps,
                                                              min_interval_duration = min_interval_duration)
    immobility_related_events = create_event_objects(interval_border_idxs = immobility_interval_border_idxs, fps = fps, event_type = event_type)
    return immobility_related_events   

In [None]:
#| export

def create_event_objects(interval_border_idxs: List[Tuple[int, int]], fps: int, event_type: str) -> List[EventBout]:
    events = []
    event_id = 0
    for start_idx, end_idx in interval_border_idxs:
        single_event = EventBout2D(event_id = event_id, start_idx = start_idx, end_idx = end_idx, fps = fps, event_type = event_type)
        events.append(single_event)
        event_id += 1
    return events  

In [None]:
#| export

def add_event_bouts_to_behavior_df(behavior_df: pd.DataFrame, event_type: str, events: List[EventBout]) -> pd.DataFrame:
    assert event_type not in list(behavior_df.columns), f'{event_type} was already a column in self.behavior_df!'
    behavior_df[event_type] = np.nan
    behavior_df[f'{event_type}_id'] = np.nan
    behavior_df[f'{event_type}_duration'] = np.nan
    if len(events) > 0:
        for event_bout in events:
            assert event_bout.event_type == event_type, f'Event types didn´t match! Expected {event_type} but found {event_bout.event_type}.'
            behavior_df.iloc[event_bout.start_idx : event_bout.end_idx + 1, -3] = True
            behavior_df.iloc[event_bout.start_idx : event_bout.end_idx + 1, -2] = event_bout.id
            behavior_df.iloc[event_bout.start_idx : event_bout.end_idx + 1, -1] = event_bout.duration
    return behavior_df

In [None]:
#| export

def get_gait_events(all_bodyparts: Dict, fps: int, gait_min_rolling_speed: float, gait_min_duration: float) -> List[EventBout]:
    idxs_with_sufficient_speed = np.where(all_bodyparts['CenterOfGravity'].df['rolling_speed_cm_per_s'].values >= gait_min_rolling_speed)[0]
    gait_interval_border_idxs = get_interval_border_idxs(all_matching_idxs = idxs_with_sufficient_speed,
                                                        framerate = 1/fps,
                                                        min_interval_duration = gait_min_duration)
    gait_events = create_event_objects(interval_border_idxs = gait_interval_border_idxs, fps = fps event_type = 'gait_bout')
    return gait_events

In [None]:
#| export

def get_gait_disruption_events(behavior_df: pd.DataFrame, fps: int, gait_events: List[EventBout], gait_disruption_max_time_to_immobility: float) -> List[EventBout]:
    n_frames_max_distance = int(gait_disruption_max_time_to_immobility * fps)
    gait_disruption_interval_border_idxs = []
    for gait_bout in gait_events:
        end_idx = gait_bout.end_idx
        unique_immobility_bout_values = behavior_df.loc[end_idx : end_idx + n_frames_max_distance + 1, 'immobility_bout'].unique()
        if True in unique_immobility_bout_values:
            closest_immobility_bout_id = behavior_df.loc[end_idx : end_idx + n_frames_max_distance + 1, 'immobility_bout_id'].dropna().unique().min()
            immobility_interval_border_idxs = get_interval_border_idxs_from_event_type_and_id(behavior_df = behavior_df,
                                                                                              event_type = 'immobility_bout',
                                                                                              event_id = closest_immobility_bout_id)
            gait_disruption_interval_border_idxs.append(immobility_interval_border_idxs)
    gait_disruption_events = create_event_objects(interval_border_idxs = gait_disruption_interval_border_idxs, fps = fps event_type = 'gait_disruption_bout')
    return gait_disruption_events

In [None]:
#| export

def get_interval_border_idxs_from_event_type_and_id(behavior_df: pd.DataFrame, event_type: str, event_id: int) -> Tuple[int, int]:
    interval_idxs = behavior_df.loc[behavior_df[f'{event_type}_id'] == event_id].index.values
    return interval_idxs[0], interval_idxs[-1]

### Functions related to export_results():

In [None]:
#| export

def export_immobility_related_bouts(df: pd.DataFrame, event_type: str; framerate: float) -> pd.DataFrame:
    results_per_event = {'bout_id': [],
                        'duration': [],
                        'CenterOfGravity_x_at_bout_start': [],
                        'towards_open_at_bout_start': [],
                        'distance_covered_cm': [], 
                        'start_time': [],
                        'end_time': []}
    results_per_event['bout_id'] = get_all_bout_ids(df = df, event_type = event_type)
    if len(results_per_event['bout_id']) >= 1:
        results_per_event['duration'] = get_bout_duration_per_bout_id(df = df, event_type = event_type, event_ids = results_per_event['bout_id'])
        x_positions_center_of_gravity_at_interval_borders = get_column_values_at_event_borders(df = df,
                                                                                                event_type = event_type,
                                                                                                event_ids = results_per_event['bout_id'],
                                                                                                column_name = 'CenterOfGravity_x')
        results_per_event['CenterOfGravity_x_at_bout_start'] = x_positions_center_of_gravity_at_interval_borders[:, 0]
        direction_towards_open_at_interval_borders = get_column_values_at_event_borders(df = df,
                                                                                        event_type = event_type,
                                                                                        event_ids = results_per_event['bout_id'],
                                                                                        column_name = 'facing_towards_open_end')
        results_per_event['towards_open_at_bout_start'] = direction_towards_open_at_interval_borders[:, 0]
        results_per_event['distance_covered_cm'] = get_distance_covered_per_event(df = df, 
                                                                                   event_type = event_type,
                                                                                   event_ids = results_per_event['bout_id'],
                                                                                   marker_id = 'CenterOfGravity')
        bout_start_and_end_time = get_interval_start_and_end_time_per_event(df = df,
                                                                            event_type = event_type,
                                                                            event_ids = results_per_event['bout_id'],
                                                                            framerate = framerate)
        results_per_event['start_time'] = bout_start_and_end_time[:, 0]
        results_per_event['end_time'] = bout_start_and_end_time[:, 1]
    return pd.DataFrame(data = results_per_event)

In [None]:
def get_all_bout_ids(df: pd.DataFrame, event_type: str) -> np.ndarray:
    return df[f'{event_type}_id'].dropna().unique()

In [None]:
def get_bout_duration_per_bout_id(df: pd.DataFrame, event_type: str, event_ids: List[float]) -> List[float]:
    durations = []
    for event_id in event_ids:
        durations.append(df.loc[df[f'{event_type}_id'] == event_id, f'{event_type}_duration'].iloc[0])
    return durations

In [None]:
def get_column_values_at_event_borders(df: pd.DataFrame, event_type: str, event_ids: List[float], column_name: str) -> np.ndarray:
    values_at_interval_borders = []
    for event_id in event_ids:
        start_value = df.loc[df[f'{event_type}_id'] == event_id, column_name].iloc[0]
        end_value = df.loc[df[f'{event_type}_id'] == event_id, column_name].iloc[-1]
        values_at_interval_borders.append((start_value, end_value))
    return np.asarray(values_at_interval_borders)

In [None]:
def get_distance_covered_per_event(df: pd.DataFrame, event_type: str, event_ids: List[float], marker_id: str) -> List[float]:
    distances_per_event = []
    for event_id in event_ids:
        df_tmp = df.loc[df[f'{event_type}_id'] == event_id].copy()
        distances_per_event.append(((df_tmp[f'{marker_id}_x'].diff()**2 + df_tmp[f'{marker_id}_y'].diff()**2)**0.5).cumsum().iloc[-1])
    return distances_per_event

In [None]:
def get_interval_start_and_end_time_per_event(df: pd.DataFrame, event_type: str, event_ids: List[float], framerate: float) -> np.ndarray:
    interval_border_idxs = []
    for event_id in event_ids:
        start_time, end_time = df.loc[df[f'{event_type}_id'] == event_id].index.values[[0, -1]]*framerate
        interval_border_idxs.append((start_time, end_time))
    return np.asarray(interval_border_idxs)

In [None]:
def export_gait_related_bouts(df: pd.DataFrame, event_type: str, framerate: float) -> pd.DataFrame:
    # ToDo: very similar to "export_immobility_related_bouts" - can they be combined to one generalized version?
    results_per_event = {'bout_id': [],
                        'duration': [],
                        'CenterOfGravity_x_at_bout_end': [],
                        'towards_open_at_bout_end': [],
                        'distance_covered_cm': [], 
                        'start_time': [],
                        'end_time': []}
    results_per_event['bout_id'] = get_all_bout_ids(df = df, event_type = event_type)
    if len(results_per_event['bout_id']) >= 1:
        results_per_event['duration'] = get_bout_duration_per_bout_id(df = df, event_type = event_type, event_ids = results_per_event['bout_id'])
        x_positions_center_of_gravity_at_interval_borders = get_column_values_at_event_borders(df = df,
                                                                                                event_type = event_type,
                                                                                                event_ids = results_per_event['bout_id'],
                                                                                                column_name = 'CenterOfGravity_x')
        results_per_event['CenterOfGravity_x_at_bout_end'] = x_positions_center_of_gravity_at_interval_borders[:, 1]
        direction_towards_open_at_interval_borders = get_column_values_at_event_borders(df = df,
                                                                                        event_type = event_type,
                                                                                        event_ids = results_per_event['bout_id'],
                                                                                        column_name = 'facing_towards_open_end')
        results_per_event['towards_open_at_bout_end'] = direction_towards_open_at_interval_borders[:, 1]
        results_per_event['distance_covered_cm'] = get_distance_covered_per_event(df = df, 
                                                                                   event_type = event_type,
                                                                                   event_ids = results_per_event['bout_id'],
                                                                                   marker_id = 'CenterOfGravity')
        bout_start_and_end_time = get_interval_start_and_end_time_per_event(df = df, event_type = event_type, event_ids = results_per_event['bout_id'], framerate = framerate)
        results_per_event['start_time'] = bout_start_and_end_time[:, 0]
        results_per_event['end_time'] = bout_start_and_end_time[:, 1]
    return pd.DataFrame(data = results_per_event)

In [None]:
def create_session_overview_df(dfs_to_export_with_individual_bout_dfs: Dict[str, pd.DataFrame]) -> pd.DataFrame:
    session_overview = {'bout_type': [],
                        'total_bouts_count': [],
                        'total_duration': [],
                        'total_distance_covered': [],
                        'mean_duration': [],
                        'mean_distance_covered': [],
                        'mean_CenterOfGravity_x': []}
    for tab_name, df in dfs_to_export_with_individual_bout_dfs.items():
        bout_ids_split_depending_on_direction = get_bout_id_splits_depending_on_direction(df = df)
        for split_id, relevant_bout_ids in bout_ids_split_depending_on_direction.items():
            session_overview = add_results_to_session_overview(session_overview = session_overview, 
                                                                 df = df, 
                                                                 event_type = tab_name, 
                                                                 event_prefix = split_id, 
                                                                 bout_ids = relevant_bout_ids)
    return pd.DataFrame(data = session_overview)

In [None]:
def get_bout_id_splits_depending_on_direction(df: pd.DataFrame) -> Dict[str, List[float]]:
    towards_open_column_name = get_column_name_from_substring(all_columns = list(df.columns), substring = 'towards_open')
    bout_ids_split_by_direction = {'all': list(df['bout_id'].unique()),
                                   'towards_open': list(df.loc[df[towards_open_column_name] == True, 'bout_id'].unique()),
                                   'towards_closed': list(df.loc[df[towards_open_column_name] != True, 'bout_id'].unique())}
    return bout_ids_split_by_direction                                                                                   

In [None]:
def get_column_name_from_substring(all_columns: List[str], substring: str) -> str:
    matching_column_names = [column_name for column_name in all_columns if substring in column_name]
    assert len(matching_column_names) == 1, \
            f'There should be exactly one match for {substring} - however, {len(matching_column_names)} were found: [{matching_column_names}].'
    return matching_column_names[0]

In [None]:
def add_results_to_session_overview(session_overview: Dict, df: pd.DataFrame, event_type: str, event_prefix: str, bout_ids: List[float]) -> Dict:
    session_overview['bout_type'].append(f'{event_prefix}_{event_type}')
    if len(bout_ids) > 0:
        session_overview['total_bouts_count'].append(len(bout_ids))
        session_overview['total_duration'].append(df.loc[df['bout_id'].isin(bout_ids), 'duration'].cumsum().iloc[-1])
        session_overview['total_distance_covered'].append(df.loc[df['bout_id'].isin(bout_ids), 'distance_covered_cm'].cumsum().iloc[-1])
        session_overview['mean_duration'].append(df.loc[df['bout_id'].isin(bout_ids), 'duration'].mean())
        session_overview['mean_distance_covered'].append(df.loc[df['bout_id'].isin(bout_ids), 'distance_covered_cm'].mean())
        center_of_gravity_x_column_name = get_column_name_from_substring(all_columns = list(df.columns), substring = 'CenterOfGravity_x')
        session_overview['mean_CenterOfGravity_x'].append(df.loc[df['bout_id'].isin(bout_ids), center_of_gravity_x_column_name].mean())
    else:
        session_overview['total_bouts_count'].append(0)
        session_overview['total_duration'].append(0)
        session_overview['total_distance_covered'].append(0)
        session_overview['mean_duration'].append(np.nan)
        session_overview['mean_distance_covered'].append(np.nan)
        session_overview['mean_CenterOfGravity_x'].append(np.nan)            
    return session_overview

In [None]:
def create_parameter_settings_df(logs: Dict) -> pd.DataFrame:
    logged_settings = {'parameter': [], 'specified_value': []}
    for parameter, value in logs.items():
        logged_settings['parameter'].append(parameter)
        logged_settings['specified_value'].append(value)
    return pd.DataFrame(data = logged_settings)

In [None]:
def write_xlsx_file_to_disk(base_output_filepath: Path, dfs_to_export: Dict[str, pd.DataFrame]) -> None:
    writer = pd.ExcelWriter(f'{base_output_filepath}.xlsx', engine='xlsxwriter')
    for tab_name, df in dfs_to_export.items():
        df.to_excel(writer, sheet_name = tab_name)
    writer.save()

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()