<a href="https://colab.research.google.com/github/Ina0221/Automatic-Tic-Detection/blob/main/script.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#clean environment
from IPython import get_ipython
get_ipython().magic('reset -sf')

#import needed packages and others
!pip install elephant
import h5py
import numpy as np
import pandas as pd
import scipy
import math
import matplotlib as mpl
import matplotlib.pyplot as plt
import cv2
import shutil
from elephant import kernels
import pandas as pd
import quantities as pq
import elephant.spike_train_dissimilarity as std
from neo import SpikeTrain
from elephant.spike_train_dissimilarity import victor_purpura_distance

#Gather SLEAP file and compile in dataframes
-code is based on sleap.ai

In [None]:
def make_dataframes(filename):

    with h5py.File(filename, "r") as f:
        dset_names = list(f.keys())
        locations = f["tracks"][:].T
        node_names = [n.decode() for n in f["node_names"][:]]

    frame_count, node_count, _, instance_count = locations.shape


    #-----------------------------------intrapolation of missing data (sleap.ai)---------------------------------------------

    from scipy.interpolate import interp1d

    def fill_missing(Y, kind="linear"):
        """Fills missing values independently along each dimension after the first."""

        # Store initial shape.
        initial_shape = Y.shape

        # Flatten after first dim.
        Y = Y.reshape((initial_shape[0], -1))

        # Interpolate along each slice.
        for i in range(Y.shape[-1]):
            y = Y[:, i]

            # Build interpolant.
            x = np.flatnonzero(~np.isnan(y))
            f = interp1d(x, y[x], kind=kind, fill_value=np.nan, bounds_error=False)

            # Fill missing
            xq = np.flatnonzero(np.isnan(y))
            y[xq] = f(xq)

            # Fill leading or trailing NaNs with the nearest non-NaN values
            mask = np.isnan(y)
            y[mask] = np.interp(np.flatnonzero(mask), np.flatnonzero(~mask), y[~mask])

            # Save slice
            Y[:, i] = y

        # Restore to initial shape.
        Y = Y.reshape(initial_shape)

        return Y

    locations = fill_missing(locations)


    NOSE_INDEX = 0
    LEFT_INDEX = 1
    RIGHT_INDEX = 2
    CENTER_INDEX = 3


    nose_loc = locations[:, NOSE_INDEX, :, :]
    left_loc = locations[:, LEFT_INDEX, :, :]
    right_loc = locations[:, RIGHT_INDEX, :, :]
    center_loc = locations[:, CENTER_INDEX, :, :]


    #empty dataframe
    CN = ['frame']
    df_body_parts = pd.DataFrame(columns = CN)

    #extract datapoints
    frame = np.array((range(0,nose_loc.shape[0])))
    nosex = nose_loc[:,0,0]
    nosey = nose_loc[:,1,0]

    rightx = right_loc[:,0,0]
    righty = right_loc[:,1,0]

    leftx = left_loc[:,0,0]
    lefty = left_loc[:,1,0]

    centerx = center_loc[:,0,0]
    centery = center_loc[:,1,0]

    #add to frame
    df_body_parts['frame'] = frame.tolist()
    df_body_parts['x-nose'] = nosex.tolist()
    df_body_parts['y-nose'] = nosey.tolist()

    df_body_parts['x-right'] = rightx.tolist()
    df_body_parts['y-right'] = righty.tolist()

    df_body_parts['x-left'] = leftx.tolist()
    df_body_parts['y-left'] = lefty.tolist()

    df_body_parts['x-center'] = centerx.tolist()
    df_body_parts['y-center'] = centery.tolist()


    return df_body_parts

In [None]:
sleap_file = r"file_path.h5"

df = make_dataframes(sleap_file)

#Calculate the velocity with thresholds

In [None]:
# Set thresholds
time_window = 10  # Number of consecutive rows to consider for average velocity
sideways_velocity_threshold = 5.5 ########################################################################______CHANGE
sideways_angle_threshold = np.radians(0) ##############################################################______CHANGE


# Calculate the change in position
df['delta_x'] = df['x-right'].diff()
df['delta_y'] = df['y-right'].diff()

# Calculate the time interval between consecutive rows
df['time_interval'] = df['frame'].diff()

# Calculate velocity by dividing delta_x and delta_y by the time interval
df['velocity_x'] = df['delta_x'] / df['time_interval']
df['velocity_y'] = df['delta_y'] / df['time_interval']

# Calculate the angle between the velocity vector and the head-tail axis
head_tail_vector_x = df['x-center'] - df['x-right']
head_tail_vector_y = df['y-center'] - df['y-right']
dot_product = df['velocity_x'] * head_tail_vector_x + df['velocity_y'] * head_tail_vector_y
velocity_magnitude = np.sqrt(df['velocity_x']**2 + df['velocity_y']**2)
head_tail_magnitude = np.sqrt(head_tail_vector_x**2 + head_tail_vector_y**2)
cosine_similarity = dot_product / (velocity_magnitude * head_tail_magnitude)
angle = np.arccos(cosine_similarity)

# Calculate the average velocity over the time window
df['avg_velocity_x'] = df['velocity_x'].rolling(time_window).mean()

# Create a new column 'sideways_movement' based on the thresholds
df['sideways_movement'] = (
    (abs(df['avg_velocity_x']) > sideways_velocity_threshold) &
    (angle > sideways_angle_threshold)
)

# Drop intermediate columns to keep only 'sideways_movement'
df.drop(['delta_x', 'delta_y', 'time_interval', 'velocity_x', 'velocity_y', 'avg_velocity_x'], axis=1, inplace=True)

df.head(10)

#Play video with manual and automatic scoring
-manual scoing in the top left corner

-automatic in the top right corner

In [None]:
def display_video(video_file, koords, manual_file, pred, start, end):
  #koords are x and y positions of the body parts
  #pred are the automatic scores, velocity of the body parts. Rename if needed (here 'df[sideways_movement])
  #start frames and end frames
    # Load the video file
    cap = cv2.VideoCapture(video_file)

    # Load the dataframe containing the tic values
    man = manual_file #df from manual scoring
    points = koords #sleap df for points visualisation
    axis = pd.DataFrame(pred, columns=['Result'])

    #define colors for circle in the corner
    green = (0, 255, 0)
    red = (0, 0, 255)

    # Define the window name
    window_name = "Video Player nose acceleration"
    cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)

    # Track the current frame index
    frame_idx = start

    # Flags to control the playback speed
    speed_up = False
    speed_up_rewind = False

    while frame_idx <= end:
        # Set the frame index
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)

        # Read the current frame
        ret, frame = cap.read()

        if not ret:
            break

        # Get the current frame index and find the corresponding tic value from "tics"
        try:
            row = axis.loc[frame_idx-start]
            # Draw the circle for """"""predictions"""""" in the top right corner depending on the tic value in tics
            if row['Result'] == 'Tic':
                color = green
            else:
                color = red
            cv2.circle(frame, (frame.shape[1] - 50, 50), 50, color, -1)

        except KeyError:
            pass

        # Get the current frame index and find the corresponding tic value from "man"
        try:
            row = man.loc[frame_idx]
            # Draw the circle for """"""manual scoring"""""" in the top left corner depending on the tic value in man
            if row['event'] == 'tic':
                color = green
            else:
                color = red
            cv2.circle(frame, (50, 50), 50, color, -1)

        except KeyError:
            pass

        # Get the current frame index and find the corresponding points of the rat
        try:
            row = points.loc[frame_idx]
            # Draw the circles for each body part depending on the coordinates
            cv2.circle(frame, (int(row['x-nose']), int(row['y-nose'])), 5, (255, 0, 0), -1)
            cv2.circle(frame, (int(row['x-right']), int(row['y-right'])), 5, (0, 0, 255), -1)
            cv2.circle(frame, (int(row['x-left']), int(row['y-left'])), 5, (0, 255, 255), -1)
            cv2.circle(frame, (int(row['x-center']), int(row['y-center'])), 5, (255, 255, 0), -1)

        except KeyError:
            pass

        # Display the frame
        cv2.imshow(window_name, frame)

        # Check for key input
        key = cv2.waitKey(1)

        # Exit if 'q' is pressed
        if key == ord('q'):
            break
        # Rewind if 'r' is pressed
        elif key == ord('r'):
            if speed_up_rewind:
                frame_idx = max(start, frame_idx - 10)
            else:
                frame_idx = max(start, frame_idx - 1)
        # Toggle speed-up if 's' is pressed
        elif key == ord('s'):
            speed_up = not speed_up
        # Toggle rewind speed-up if 'w' is pressed
        elif key == ord('w'):
            speed_up_rewind = not speed_up_rewind

        # Adjust frame index based on the speed-up flags
        if speed_up:
            frame_idx += 10
        elif speed_up_rewind:
            frame_idx -= 10
        else:
            frame_idx += 1

    # Release the video file and close the window
    cap.release()
    cv2.destroyAllWindows()

def display_video(video_file, koords, manual_file, pred, start, end):

#Plot manual scores

In [None]:
#define start and end frames for plotting, manually scored

fig = plt.figure(figsize=(15,7))
ax1 = fig.add_subplot(211)
ax1.plot(manual_file['frames'][start:end], manual_file['State'][start:end], label='Tic and No Tic')
plt.xticks(range(start,end,200))
ax1.legend()
y_tick_locations = [0,1]
ax1.set_yticks(y_tick_locations)
y_tick_labels = ['no tic', 'tic']
ax1.set_yticklabels(y_tick_labels)
#ax1.invert_yaxis()
ax1.set_title('Manual Scoring')

#Plot automatic scores

In [None]:
df["sideways_movement"] = df["sideways_movement"].apply(lambda x: 1 if x == True else 0)

x = df.index[start:end]
y = df["sideways_movement"][start:end]

fig = plt.figure(figsize=(15, 7))
ax2 = fig.add_subplot(211)
ax2.plot(x, y, label='Tic and No Tic')
plt.xticks(range(start, end, 200))

plt.yticks([0, 1])  # Set y-axis labels as "No Tic" and "Tic"
ax2.set_yticks([0, 1])
y_tick_labels = ['no tic', 'tic']
ax2.set_yticklabels(y_tick_labels)

ax2.legend()
#ax2.set_title('Automatic scoring')

# Add axis titles
ax2.set_xlabel('Frames', fontsize = 14)

plt.show()

#Victor-Purpura Distance
determines the similarity between manual and automatic scoring

In [None]:
start = 11000
end = 13100
predictions = result_df.replace({True: 'Tic', False: 'no Tic'})[start:end]

# Step 1: Extract the column
predictions = predictions['x'] #####set column from automatoc scoring to test on

# Step 2: Rename the column
predictions = predictions.rename('Automatic')

man_per = pd.DataFrame(pd.read_excel('path_manualScoring.xlsx')) #perfectly manually scored
man = pd.DataFrame(pd.read_excel('path_Manual_Scoring.xlsx')) #normally manually score, if wanted
man=man[(man["State"]=="state start")&(man["Markings"]=="Tics")]
man_per=man_per[(man_per["State"]=="state start")&(man_per["Markings"]=="Tics")]

#get automated scores
#paste time line in seconds to predicitons dataframe
Pred = pd.DataFrame(predictions)
Pred['time'] = (Pred.index/25)

#only keep the the first tic in sequence turn rest to no Tic
# Create a mask to identify the first occurrence of 'tic' in each sequence
mask = (Pred['Automatic'] == 'Tic') & (Pred['Automatic'].shift(1) != 'Tic')
Pred.loc[~mask, 'Automatic'] = 'No Tic'

if Pred['Automatic'].iloc[0] == 'Tic':
    Pred['Automatic'].iloc[0] = 'No Tic'

# #filter for start tics time points
Pred = Pred[(Pred["Automatic"]=="Tic")]

start_s = int(start/25)
end_s = int(end/25)

# Use boolean indexing to get the rows within the specified time range
man = man[(man["Trial_time"] >= start_s) & (man["Trial_time"] <= end_s)]
man_per = man_per[(man_per["Trial_time"] >= start_s) & (man_per["Trial_time"] <= end_s)]

#################################################################################################################################################
def filter_adjacent(series, threshold):
    result = [series.index[0]]
    for i in range(1, len(series)):
        if abs(series.iloc[i] - series[result[-1]]) >= threshold:
            result.append(series.index[i])
    return series.loc[result]

filtered_series = filter_adjacent(Pred['time'], 0.5)
#################################################################################################################################################

# # #define SpikeTrains for comparison
q = 1.0 / (10.0 * pq.s)
st_a = SpikeTrain(man_per["Trial_time"], units='s', t_stop=1000.0)
#st_b = SpikeTrain(man["Trial_time"], units='s', t_stop=1000.0)
st_b = SpikeTrain(filtered_series, units='s', t_stop=1000.0) #automatic scoring for body parts ->need to be adjusted in definition

# # Create a rate quantity
cost_factor = 0 / pq.s

# # Calculate Victor-Purpura distance
distance = std.victor_purpura_distance([st_a, st_b], cost_factor, algorithm='fast', sort=False, kernel = kernels.TriangularKernel(sigma=0.815 * pq.s))[0, 1]
print(f"Victor-Purpura distance {round(distance, 2)}")
print(round(distance,2))
