In [1]:
pip install "moviepy<2.0.0"

Note: you may need to restart the kernel to use updated packages.


In [2]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
from pathlib import Path
import glob
import json
import seaborn as sns
from datetime import datetime
# from aind_dynamic_foraging_basic_analysis.licks.lick_analysis import load_nwb
import re
from matplotlib import colormaps  

In [3]:
#Load kinematics data
from tongue_kinematics_utils import load_keypoints_from_csv, integrate_keypoints_with_video_time

#keypoints
keypoint_dfs = load_keypoints_from_csv('/root/capsule/data/BottomViewPylon1-MIB-2025-02-17/inference/behavior_716325_2024-05-31_10-31-14/bottom_camera.csv')

#get video timebase for keypoint dataframe
keypoint_dfs_trimmed, video_csv_trimmed, keypoint_timebase = integrate_keypoints_with_video_time(
    '/root/capsule/data/behavior_716325_2024-05-31_10-31-14/behavior-videos/bottom_camera.csv', 
    keypoint_dfs
)

keypoints extracted: ['nose_tip', 'jaw', 'tongue_tip_right', 'tongue_tip_center', 'tongue_tip_left', 'pointer_finger_r', 'paw_wrist_r', 'pointer_finger_l', 'paw_wrist_l', 'spout_r', 'spout_l']
Video QC: Frame numbers are sequential with no gaps.
Video QC: Timing differences are within expected range.
keypoint_df trimmed from 2689719 to 2689718


In [4]:
import subprocess
def extract_clips_ffmpeg_after_reencode(input_video_path, timestamps, clip_length, output_dir):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    for idx, start_time in enumerate(timestamps):
        end_time = start_time + clip_length
        input_basename_ext = os.path.basename(input_video_path)
        input_basename, _ = os.path.splitext(input_basename_ext)
        output_filename = input_basename + f"_clip_{idx+1}_{start_time:.2f}s_to_{end_time:.2f}s.mp4"
        output_path = os.path.join(output_dir, output_filename)

        if os.path.isfile(output_path):
            continue

        command = [
            'ffmpeg',
            '-ss', str(start_time),
            '-i', input_video_path,
            '-t', str(clip_length),
            '-c', 'copy',             # Copy codec (no re-encoding)
            output_path
        ]
        subprocess.run(command, check=True)
        print(f"Clip saved to {output_path}")

In [11]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import datetime
from moviepy.editor import VideoFileClip
from typing import Dict, List, Optional, Tuple, Union
import os


def create_labeled_video(
    clip: VideoFileClip,
    xs_arr: np.ndarray,
    ys_arr: np.ndarray,
    mask_array: Optional[np.ndarray] = None,
    dotsize: int = 4,
    colormap: str = "cool",
    fps: Optional[float] = None,
    filename: str = "movie.mp4",
    start_time: float = 0.0,
) -> None:
    """Helper function for creating annotated videos.

    Args
        clip
        xs_arr: shape T x n_joints
        ys_arr: shape T x n_joints
        mask_array: shape T x n_joints; timepoints/joints with a False entry will not be plotted
        dotsize: size of marker dot on labeled video
        colormap: matplotlib color map for markers
        fps: None to default to fps of original video
        filename: video file name
        start_time: time (in seconds) of video start

    """

    if mask_array is None:
        mask_array = ~np.isnan(xs_arr)

    n_frames, n_keypoints = xs_arr.shape

    # set colormap for each color
    colors = make_cmap(n_keypoints, cmap=colormap)

    # extract info from clip
    nx, ny = clip.size
    dur = int(clip.duration - clip.start)
    fps_og = clip.fps

    # upsample clip if low resolution; need to do this for dots and text to look nice
    if nx <= 100 or ny <= 100:
        upsample_factor = 2.5
    elif nx <= 192 or ny <= 192:
        upsample_factor = 2
    else:
        upsample_factor = 1

    if upsample_factor > 1:
        clip = clip.resize((upsample_factor * nx, upsample_factor * ny))
        nx, ny = clip.size

    print(f"Duration of video [s]: {np.round(dur, 2)}, recorded at {np.round(fps_og, 2)} fps!")

    def seconds_to_hms(seconds):
        # Convert seconds to a timedelta object
        td = datetime.timedelta(seconds=seconds)

        # Extract hours, minutes, and seconds from the timedelta object
        hours = td // datetime.timedelta(hours=1)
        minutes = (td // datetime.timedelta(minutes=1)) % 60
        seconds = td % datetime.timedelta(minutes=1)

        # Format the hours, minutes, and seconds into a string
        hms_str = f"{hours:02}:{minutes:02}:{seconds.seconds:02}"

        return hms_str

    # add marker to each frame t, where t is in sec
    def add_marker_and_timestamps(get_frame, t):
        image = get_frame(t * 1.0)
        # frame [ny x ny x 3]
        frame = image.copy()
        # convert from sec to indices
        index = int(np.round(t * 1.0 * fps_og))
        # ----------------
        # markers
        # ----------------
        for bpindex in range(n_keypoints):
            if index >= n_frames:
                print("Skipped frame {}, marker {}".format(index, bpindex))
                continue
            if mask_array[index, bpindex]:
                xc = min(int(upsample_factor * xs_arr[index, bpindex]), nx - 1)
                yc = min(int(upsample_factor * ys_arr[index, bpindex]), ny - 1)
                frame = cv2.circle(
                    frame,
                    center=(xc, yc),
                    radius=dotsize,
                    color=colors[bpindex].tolist(),
                    thickness=-1,
                )
        # ----------------
        # timestamps
        # ----------------
        seconds_from_start = t + start_time
        time_from_start = seconds_to_hms(seconds_from_start)
        idx_from_start = int(np.round(seconds_from_start * 1.0 * fps_og))
        text = f"t={time_from_start}, frame={idx_from_start}"
        # define text info
        font = cv2.FONT_HERSHEY_SIMPLEX
        font_scale = 0.5
        font_thickness = 1
        # calculate the size of the text
        text_size = cv2.getTextSize(text, font, font_scale, font_thickness)[0]
        # calculate the position of the text in the upper-left corner
        offset = 6
        text_x = offset  # offset from the left
        text_y = text_size[1] + offset  # offset from the bottom
        # make black rectangle with a small padding of offset / 2 pixels
        cv2.rectangle(
            frame,
            (text_x - int(offset / 2), text_y + int(offset / 2)),
            (text_x + text_size[0] + int(offset / 2), text_y - text_size[1] - int(offset / 2)),
            (0, 0, 0),  # rectangle color
            cv2.FILLED,
        )
        cv2.putText(
            frame,
            text,
            (text_x, text_y),
            font,
            font_scale,
            (255, 255, 255),  # font color
            font_thickness,
            lineType=cv2.LINE_AA,
        )
        return frame

    clip_marked = clip.fl(add_marker_and_timestamps)
    clip_marked.write_videofile(filename, codec="libx264")
    clip_marked.close()


def make_cmap(number_colors: int, cmap: str = "cool"):
    color_class = plt.cm.ScalarMappable(cmap=cmap)
    C = color_class.to_rgba(np.linspace(0, 1, number_colors))
    colors = (C[:, :3] * 255).astype(np.uint8)
    return colors


def process_and_label_clips(input_video_path, timestamps, clip_length, clip_output_dir, label_output_dir, keypoint_dataframes, confidence_level = 0.8):
    # Step 1: Extract clips
    extract_clips_ffmpeg_after_reencode(input_video_path, timestamps, clip_length, clip_output_dir)
    
    # For each timestamp/clip
    for idx, start_time in enumerate(timestamps):
        # Construct expected clip filename (should match the naming scheme in your extract function)
        input_basename_ext = os.path.basename(input_video_path)
        input_basename, _ = os.path.splitext(input_basename_ext)
        clip_filename = f"{input_basename}_clip_{idx+1}_{start_time:.2f}s_to_{start_time+clip_length:.2f}s.mp4"
        clip_path = os.path.join(clip_output_dir, clip_filename)
        
        # Load the clip
        clip = VideoFileClip(clip_path)
        
        # Step 2 & 3: Build xs_arr and ys_arr for the clip
        # We assume each dataframe's 'time' column is in seconds relative to the original video.
        xs_list = []
        ys_list = []
        conf_list = []
        for key, df in keypoint_dataframes.items():
            # Filter the dataframe for the clip’s time window.
            # You might need to adjust tolerance if your times are not perfectly aligned.
            clip_df = df[(df['time'] >= start_time) & (df['time'] < start_time + clip_length)]
            
            # Here, we assume one row per frame. 
            # If the number of rows doesn't match the number of frames in the clip,
            # you could resample or interpolate the keypoint positions.
            xs_list.append(clip_df['x'].to_numpy())
            ys_list.append(clip_df['y'].to_numpy())
            conf_list.append(clip_df['confidence'].to_numpy())
        
        # Convert lists to 2D arrays: each column corresponds to a keypoint.
        # (This requires that all keypoint arrays have the same length.)
        xs_arr = np.column_stack(xs_list)
        ys_arr = np.column_stack(ys_list)
        conf_arr = np.column_stack(conf_list)
        
        # Optional: Verify that xs_arr.shape[0] (number of timepoints) matches expected frame count.
        expected_frames = int(clip.fps * clip.duration)
        if xs_arr.shape[0] != expected_frames:
            print(f"Warning: Number of keypoint frames ({xs_arr.shape[0]}) does not match video frames ({expected_frames}).")
            # You could add interpolation or padding here if needed.
        
        # Step 4: Create labeled video for this clip
        labeled_clip_filename = f"{input_basename}_clip_{idx+1}_{start_time:.2f}s_to_{start_time+clip_length:.2f}s_labeled.mp4"
        if not os.path.exists(label_output_dir):
            os.makedirs(label_output_dir)
        labeled_clip_path = os.path.join(label_output_dir, labeled_clip_filename)

        mask_array = conf_arr > confidence_level

        create_labeled_video(clip, xs_arr, ys_arr, mask_array=mask_array, filename=labeled_clip_path)
        clip.close()

In [9]:
import os
import numpy as np
import pandas as pd

def process_and_label_clips(input_video_path, timestamps, clip_length, clip_output_dir, label_output_dir, keypoint_dataframes, confidence_level = 0.8):
    # Step 1: Extract clips
    extract_clips_ffmpeg_after_reencode(input_video_path, timestamps, clip_length, clip_output_dir)
    
    # For each timestamp/clip
    for idx, start_time in enumerate(timestamps):
        # Construct expected clip filename (should match the naming scheme in your extract function)
        input_basename_ext = os.path.basename(input_video_path)
        input_basename, _ = os.path.splitext(input_basename_ext)
        clip_filename = f"{input_basename}_clip_{idx+1}_{start_time:.2f}s_to_{start_time+clip_length:.2f}s.mp4"
        clip_path = os.path.join(clip_output_dir, clip_filename)
        
        # Load the clip
        clip = VideoFileClip(clip_path)
        
        # Step 2 & 3: Build xs_arr and ys_arr for the clip
        # We assume each dataframe's 'time' column is in seconds relative to the original video.
        xs_list = []
        ys_list = []
        conf_list = []
        for key, df in keypoint_dataframes.items():
            # Filter the dataframe for the clip’s time window.
            # You might need to adjust tolerance if your times are not perfectly aligned.
            clip_df = df[(df['time'] >= start_time) & (df['time'] < start_time + clip_length)]
            
            # Here, we assume one row per frame. 
            # If the number of rows doesn't match the number of frames in the clip,
            # you could resample or interpolate the keypoint positions.
            xs_list.append(clip_df['x'].to_numpy())
            ys_list.append(clip_df['y'].to_numpy())
            conf_list.append(clip_df['confidence'].to_numpy())
        
        # Convert lists to 2D arrays: each column corresponds to a keypoint.
        # (This requires that all keypoint arrays have the same length.)
        xs_arr = np.column_stack(xs_list)
        ys_arr = np.column_stack(ys_list)
        conf_arr = np.column_stack(conf_list)
        
        # Optional: Verify that xs_arr.shape[0] (number of timepoints) matches expected frame count.
        expected_frames = int(clip.fps * clip.duration)
        if xs_arr.shape[0] != expected_frames:
            print(f"Warning: Number of keypoint frames ({xs_arr.shape[0]}) does not match video frames ({expected_frames}).")
            # You could add interpolation or padding here if needed.
        
        # Step 4: Create labeled video for this clip
        labeled_clip_filename = f"{input_basename}_clip_{idx+1}_{start_time:.2f}s_to_{start_time+clip_length:.2f}s_labeled.mp4"
        if not os.path.exists(label_output_dir):
            os.makedirs(label_output_dir)
        labeled_clip_path = os.path.join(label_output_dir, labeled_clip_filename)

        mask_array = conf_arr > confidence_level

        create_labeled_video(clip, xs_arr, ys_arr, mask_array=mask_array, filename=labeled_clip_path)
        clip.close()


In [12]:
input_video_path = '/root/capsule/data/BottomViewPylon1-MIB-2025-02-17/inference/behavior_716325_2024-05-31_10-31-14/bottom_camera.mp4'
timestamps = [0.5]
clip_length = 1.0
clip_output_dir = '/root/capsule/scratch/test_clips/clips'
label_output_dir = '/root/capsule/scratch/test_clips/labeled'
process_and_label_clips(input_video_path, timestamps, clip_length, clip_output_dir, label_output_dir, keypoint_dfs_trimmed)


Duration of video [s]: 1, recorded at 500.0 fps!
Moviepy - Building video /root/capsule/scratch/test_clips/labeled/bottom_camera_clip_1_0.50s_to_1.50s_labeled.mp4.
Moviepy - Writing video /root/capsule/scratch/test_clips/labeled/bottom_camera_clip_1_0.50s_to_1.50s_labeled.mp4



                                                               

Moviepy - Done !
Moviepy - video ready /root/capsule/scratch/test_clips/labeled/bottom_camera_clip_1_0.50s_to_1.50s_labeled.mp4


In [26]:
import moviepy
print(moviepy.__version__)

2.1.1
