In [1]:
%load_ext autoreload
%autoreload 1

import cv2 as cv
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.io import wavfile

%matplotlib widget

%aimport utils.video
from utils.video import open_video, write_diff_video, play_video, get_triggers_from_audio, get_video_frames_from_callback_audio

%aimport utils.audio
from utils.audio import AudioObject

In [4]:
n_cam = 1  # 0 top view, 1 side view

# root = "/Volumes/users/randazzo/callbacks/loom_only/or14pu27/loom_only/or14pu27-loom_only-20240813120633-Stim0-Block4"
# root = "/Volumes/users/randazzo/callbacks/loom_only/or14pu27/loom_only/or14pu27-loom_only-20240813120025-Stim0-Block0"
root = "/Volumes/users/randazzo/callbacks/loom_only/or54rd45/loom_only/or54rd45-loom_only-20240820121205-Stim0-Block9"

wav_path = f"{root}.wav"
avi_path = f"{root}_CAM{n_cam}-0000.avi"

In [10]:
fs, audio = wavfile.read(wav_path)

bird_audio = AudioObject.from_wav(wav_path, channels=0)
bird_audio.filtfilt_butter_default()
bird_audio.rectify_smooth(smooth_window_f=round(2 * 44.1))

cam_frames = audio[:, 1]
loom_trigs = audio[:, 4]


# PLOT ALL CHANNELS
# channels = audio.shape[1]

# fig, axs = plt.subplots(nrows=channels, sharex=True)

# for i in range(channels):
#     ax = axs.ravel()[i]

#     ax.plot(audio[:, i])

In [4]:
# CREATE DIFF VIDEO
# diff_video_fname = "/Users/cirorandazzo/code/callback-analysis/data/new_vid.avi"
# write_diff_video(avi_path, diff_video_fname)
# print("Successfully wrote diff video")

play both videos simultaneously

In [5]:
# names = [avi_path, diff_video_fname]
# window_titles = ["original", "diff"]

# cap = [cv.VideoCapture(i) for i in names]

# frames = [None] * len(names)
# gray = [None] * len(names)
# ret = [None] * len(names)

# while True:

#     for i, c in enumerate(cap):
#         if c is not None:
#             ret[i], frames[i] = c.read()

#     for i, f in enumerate(frames):
#         if ret[i] is True:
#             gray[i] = cv.cvtColor(f, cv.COLOR_BGR2GRAY)
#             cv.imshow(window_titles[i], gray[i])

#     if cv.waitKey(1) & 0xFF == ord("q"):
#         break


# for c in cap:
#     if c is not None:
#         c.release()

# cv.destroyAllWindows()

or individual videos

In [6]:
# play_video(avi_path)
# play_video(diff_video_fname)

In [11]:
# consider first frame w/ no change
max_change = [0]
avg_change = [0]

capture = open_video(diff_video_fname)

while True:
    ret, frame = capture.read()

    if frame is None:
        break
    else:
        max_change.append(np.max(frame))
        avg_change.append(np.mean(frame))

max_change = np.array(max_change)
avg_change = np.array(avg_change)

In [8]:
#

# import cv2 as cv

# capture = open_video(
#     "/Volumes/PlasticBag/anxiety_calls/loom_only-diff_videos/or14pu27-loom_only-20240813121410-Stim0-Block9_CAM1-DIFF.avi"
# )
# capture = open_video(
#     "/Volumes/PlasticBag/anxiety_calls/loom_only-diff_videos/or14pu27-loom_only-20240813121239-Stim0-Block8_CAM1-DIFF.avi"
# )
# capture.get(cv.CAP_PROP_FRAME_COUNT)

In [None]:
vid_frames_ii = get_video_frames_from_callback_audio(cam_frames, allowable_range=None)

max_change = max_change[: len(vid_frames_ii)]
avg_change = avg_change[: len(vid_frames_ii)]

vid_frames_ii

In [None]:
fig, ax = plt.subplots()
ax.plot(cam_frames)
ax.scatter(vid_frames_ii, cam_frames[vid_frames_ii], zorder=3, marker="+", c="r")

plt.show()

In [None]:
loom_onsets = get_triggers_from_audio(loom_trigs)
loom_onsets

In [None]:
fig, axs = plt.subplots(nrows=4, sharex=True, dpi=200)
xs = [None, None, vid_frames_ii, vid_frames_ii]
data = [bird_audio.audio_frs, loom_trigs, avg_change, max_change]
titles = ["audio", "loom_trigs", "avg_change", "max_change"]

# xs = [None, None, vid_frames_ii, None]
# data = [bird_audio, loom_trigs, avg_change, cam_frames]
# titles = ["audio", "loom_trigs", "avg_change", "frames"]

in_s = True

markers = dict(
    zorder=3,
    marker="+",
)

tf = lambda x: np.median(x) * 1.2  # sample threshold function

for ax, x, y, t in zip(axs.ravel(), xs, data, titles):
    if x is None:
        x = np.arange(len(y))

    if t in ["loom_trigs"]:
        if in_s:
            loom_x = loom_onsets / fs
        else:
            loom_x = loom_onsets

        ax.scatter(loom_x, y[loom_onsets], c="r", **markers)

    if t in ["avg_change", "max_change"]:
        crossings = get_triggers_from_audio(
            y,
            threshold_function=tf,
            crossing_direction="up",
        )

        x_cr = x[crossings]
        y_cr = y[crossings]

        if in_s:
            x_cr = x_cr / fs

        ax.scatter(x_cr, y_cr, c="green", **markers)

    # plot actual timeseries
    if in_s:
        x = x / fs  # convert to seconds

    ax.plot(x, y)

    # testing thresholds
    thr = tf(y)
    ax.plot([x.min(), x.max()], [thr, thr], c="k", linestyle="dotted")

    ax.set(
        title=t,
    )

axs[-1].set(
    xlabel="Time (s)",
    # xlim=(1, 2),
    # xlim=(31.4, 32),
)
fig.tight_layout()
plt.show()

In [None]:
fig, axs = plt.subplots(nrows=3, sharex=True, dpi=200)
xs = [
    None,
    None,
    vid_frames_ii,
]
data = [
    bird_audio.audio_frs,
    loom_trigs,
    avg_change,
]
titles = [
    "audio",
    "loom_trigs",
    "avg_change",
]

# xs = [None, None, vid_frames_ii, None]
# data = [bird_audio, loom_trigs, avg_change, cam_frames]
# titles = ["audio", "loom_trigs", "avg_change", "frames"]

in_s = True

markers = dict(
    zorder=3,
    marker="+",
)

tf = lambda x: np.median(x) * 1.2  # sample threshold function

for ax, x, y, t in zip(axs.ravel(), xs, data, titles):
    if x is None:
        x = np.arange(len(y))

    if t in ["loom_trigs"]:
        if in_s:
            loom_x = loom_onsets / fs
        else:
            loom_x = loom_onsets

        ax.scatter(loom_x, y[loom_onsets], c="r", **markers)

    if t in ["avg_change", "max_change"]:
        crossings = get_triggers_from_audio(
            y,
            threshold_function=tf,
            crossing_direction="up",
        )

        x_cr = x[crossings]
        y_cr = y[crossings]

        if in_s:
            x_cr = x_cr / fs

        ax.scatter(x_cr, y_cr, c="green", **markers)

    # plot actual timeseries
    if in_s:
        x = x / fs  # convert to seconds

    ax.plot(x, y)

    # testing thresholds
    thr = tf(y)
    ax.plot([x.min(), x.max()], [thr, thr], c="k", linestyle="dotted")

    ax.set(
        title=t,
    )

axs[-1].set(
    xlabel="Time (s)",
    # xlim=(1, 2),
    # xlim=(31.4, 32),
)

fig.tight_layout()

In [12]:
thresholded_movement_ii_vid = get_triggers_from_audio(
    avg_change,
    threshold_function=tf,
    crossing_direction="up",
)

# convert indices to audio timing (frames)
movement_ii = vid_frames_ii[thresholded_movement_ii_vid]

In [13]:
window_length = round(300 * (fs / 1000))  # 300 ms --> fr


def get_loom_movements(window_start, window_length, movement_ii, fs):

    window_end = window_start + window_length

    movement_times = movement_ii[
        (movement_ii > window_start) & (movement_ii <= window_end)
    ]
    movement_times = (movement_times - window_start) * 1000 / fs

    if not any(vid_frames_ii >= window_end):
        movement_count = np.NaN
    else:  # ensure data exists for the whole window
        movement_count = len(movement_times)

    latency_ms = min(movement_times, default=np.NaN)

    return latency_ms, movement_count, movement_times

In [14]:
def get_loom_latency_df(loom_onsets, window_length, movement_ii, fs):
    df = pd.DataFrame()

    df["loom_onset"] = loom_onsets

    out = df["loom_onset"].apply(
        get_loom_movements,
        args=(window_length, movement_ii, fs),
    )

    out = pd.DataFrame(
        [[l, c, t] for l, c, t in out.values],
        columns=["Latency", "Count", "Times"],
    )

    df = pd.concat([df, out], axis=1)
    return df

In [None]:
df = get_loom_latency_df(loom_onsets, window_length, movement_ii, fs)
df

In [14]:
# TODO: find latency between loom_onset and next avg_change crossing; make plots.

# thr = np.median(avg_change) * 1.2

# crossings = get_triggers_from_audio(
#     avg_change,
#     threshold_function=lambda x: np.median(x) * 1.2,
#     crossing_direction="up",
# )

# fig, axs = plt.subplots(nrows=0, sharex=True)

# axs[0].plot()

In [None]:
from movement import plot_latency_by_trial
import pickle
import os
import matplotlib.pyplot as plt

folder = "/Volumes/PlasticBag/anxiety_calls/loom_only-diff_videos"

birdnames = ["or14pu27", "or54rd45"]
colors = ["red", "blue"]


for birdname, c in zip(birdnames, colors):

    with open(
        f"/Volumes/PlasticBag/anxiety_calls/loom_only-diff_videos/all_data-{birdname}.pickle",
        "rb",
    ) as f:
        data = pickle.load(f)

    fig, ax = plt.subplots()

    ax = plot_latency_by_trial(
        data,
        color_data=c,
        ax=ax,
    )

    ax.set(ylim=[-5, 275], title=birdname)

    plt.savefig(os.path.join(folder, f"mvmt_latency-{birdname}.png"), dpi=400)