In [None]:
import cv2
import matplotlib.pyplot as plt
from pathlib import Path

from tracker.main import (
    getROIFromVideo,
    cropWithROI,
    getTemplatesFromVideo,
    getTemplateMatches,
    getOutputVidFrameSize,
)

import time
import numpy as np
import imutils
from natsort import natsorted
import multiprocessing
import pandas as pd
from scipy.optimize import curve_fit, least_squares, root
from scipy.signal import find_peaks
from sklearn.metrics import r2_score

%reload_ext autoreload
%autoreload 0

# Video Processing

## Crop Video

In [None]:
vid_path = Path("data/pendulum.mp4")
crop_roi = getROIFromVideo(str(vid_path))
cv2.destroyAllWindows()

## Get Template(s)

In [None]:
pipeline = lambda frame: cropWithROI(frame, crop_roi)

templates = getTemplatesFromVideo(
    str(vid_path), pipeline, templateWidth=70, templateHeight=70
)

## Save Template(s)

In [None]:
# templatesDir = Path("templates_pendulum")
templatesDir = Path("templates_pendulum2")
templatesDir.mkdir(exist_ok=True)

imgPaths = natsorted([str(path) for path in templatesDir.glob("*.jpg")])

if imgPaths:
    latestImgN = int(Path(imgPaths[-1]).stem) + 1
else:
    latestImgN = 0

for i in range(len(templates)):
    cv2.imwrite(str(templatesDir / f"{latestImgN+i}.jpg"), templates[i])

templates.clear()

## Load Template(s)

In [None]:
templatesDir = Path("templates_pendulum/")
# templatesDir = Path("templates_pendulum2")

imgPaths = templatesDir.glob("*.jpg")

templates = []
for imgPath in imgPaths:
    template = cv2.imread(str(imgPath))
    templates.append(template)
    cv2.imshow("Template", template)
    cv2.waitKey(0)

cv2.waitKey(0)
cv2.destroyAllWindows()

## Match Templates

In [None]:
VID_PATH = Path("data/pendulum.mp4")
# OUTPUT_VID_PATH = VID_PATH.parent / (VID_PATH.stem + "_output.mp4")
OUTPUT_VID_PATH = VID_PATH.parent / (VID_PATH.stem + "_output2.mp4")

OUTPUT_HEIGHT = 800

# NMS_THRESHOLD = 0.7
NMS_THRESHOLD = 0.3
CONFIDENCE_THRESHOLD = 0.775

IMSHOW_FLAG = True
WRITE_FLAG = True

num_cpu = multiprocessing.cpu_count() - 1
pool = multiprocessing.pool.ThreadPool(processes=num_cpu)

cap = cv2.VideoCapture(str(VID_PATH))

totalFrames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
frameCount = 0
startTime = time.time()

pipeline = lambda frame: cropWithROI(frame, crop_roi)

frameWidth, frameHeight = getOutputVidFrameSize(str(VID_PATH), pipeline, OUTPUT_HEIGHT)
out = cv2.VideoWriter(
    str(OUTPUT_VID_PATH),
    cv2.VideoWriter_fourcc(*"mp4v"),
    cap.get(cv2.CAP_PROP_FPS),
    (frameWidth, frameHeight),
)
print(f"Output frame width: {frameWidth}, frame height: {frameHeight}")

saveFrames = []
saveBoxes = []

ret, frame = cap.read()
while ret:
    if frameCount % 100 == 0 and frameCount != 0:
        elapsedTime = time.time() - startTime
        estTimeLeft = (totalFrames - frameCount) / frameCount * elapsedTime
        print(f"Frame {frameCount} out of {round(totalFrames)}.")
        print(
            f"\tTime taken: {round(elapsedTime)}s. Est. time left: {round(estTimeLeft)}s"
        )

    img_rgb = pipeline(frame)

    # Multithreading
    mapIterable = []
    for i in range(len(templates)):
        template = templates[i]
        mapIterable.append((img_rgb, template, CONFIDENCE_THRESHOLD))
    results = pool.starmap(func=getTemplateMatches, iterable=mapIterable)

    boxes, confidences = [], []
    for result in results:
        boxes.extend(result[0])
        confidences.extend(result[1])

    # Sequential
    # boxes, confidences = [], []
    # for template in templates:
    #     val = getTemplateMatches(img_rgb, template, CONFIDENCE_THRESHOLD)
    #     boxes.extend(val[0])
    #     confidences.extend(val[1])

    indices = cv2.dnn.NMSBoxes(boxes, confidences, CONFIDENCE_THRESHOLD, NMS_THRESHOLD)
    boxes = [boxes[idx] for idx in indices]
    confidences = [confidences[idx] for idx in indices]

    if boxes:
        saveFrames.append(frameCount)
        saveBoxes.append(boxes[0].copy())

    for box in boxes:
        cv2.rectangle(img_rgb, box[:2], box[2:], (0, 0, 255), 2)

    img_rgb = imutils.resize(img_rgb, height=OUTPUT_HEIGHT)

    if WRITE_FLAG:
        out.write(img_rgb)

    if IMSHOW_FLAG:
        cv2.imshow("Detections", img_rgb)
        key = cv2.waitKey(1)
        if key == ord("q") or key == ord("Q"):
            break

    ret, frame = cap.read()
    frameCount += 1

cv2.destroyAllWindows()
out.release()
cap.release()

## Export Data

In [None]:
csv_path = VID_PATH.parent / (VID_PATH.stem + "_output2.csv")

pix_per_metre = 1
coords = (
    np.array([[(b[0] + b[2]) / 2, (b[1] + b[3]) / 2] for b in saveBoxes])
    / pix_per_metre
)

df = pd.DataFrame([saveFrames, coords[:, 0], coords[:, 1]]).T
df.to_csv(csv_path, index=None, header=["frame", "x", "y"])

# Curve Fitting

## Load Data

In [None]:
vid_path = Path("data/pendulum.mp4")
csv_path = vid_path.parent / (vid_path.stem + "_output.csv")

df = pd.read_csv(csv_path)
frame = df["frame"]
x = df["x"]
y = df["y"]

# Swap the x and y values because the video is rotated.
x, y = y, x

x = max(x) - x
y = max(y) - y

In [None]:
plt.scatter(frame, x, s=1)
plt.show()
plt.scatter(frame, y, s=1)
plt.show()

# Try fitting 
$A \sin(2\pi f t + \alpha)$

In [None]:
FPS = 30
t = frame / FPS


def func(t, A, f, alpha, offset):
    return A * np.sin(2 * np.pi * f * t + alpha) + offset


popt, _ = curve_fit(func, t, x)
plt.scatter(t, x, s=1)
plt.plot(t, func(t, *popt), color="r", label="Best Fit")

In [None]:
r2_score(x, func(t, *popt))

# Oh no bad fit!

The data under-determines our model. But we can find the frequency of the sine wave using a different (more-reliable and more scalable) method.  

Using find_peaks, we locate the points on the graph that correspond to the peaks and troughs of the wave. By comparing the time difference between these points, we can find the period of oscillation, which then translates to frequency.

In [None]:
peaks, _ = find_peaks(x)
troughs, _ = find_peaks(-x)
crit_pts = np.append(peaks, troughs)
t_crit, x_crit = t[crit_pts], x[crit_pts]

plt.scatter(t_crit, x_crit, color="k", label="peaks")
plt.scatter(t, x, s=1)

period = np.mean(np.diff(sorted(t_crit))) * 2
freq = 1 / period

In [None]:
def func(t, A, alpha, offset):
    return A * np.sin(2 * np.pi * freq * t + alpha) + offset


t_lin = np.linspace(min(t), max(t), 1000)

popt, pcov = curve_fit(func, t, x)
plt.scatter(t, x, s=1)
plt.plot(t_lin, func(t_lin, *popt), color="r", linestyle="--", label="Best Fit")
plt.show()

In [None]:
r2_score(x, func(t, *popt))

As we can see, this approach allows us to reliably determine the frequency of the oscillation, allowing for more accurate curve fitting.

## An even better model!

#### An underdamped pendulum decays in amplitude exponentially​  

\begin{align} \large \theta_{max}(t) = \theta_{0} e^{-\frac{\eta t}{2m}}  \end{align}

##### $ \eta $ : air drag constant ($ kg \cdot s^{-1} $)

In [None]:
def func(t, A, alpha, offset, m):
    return A * (np.exp(m * t)) * np.sin(2 * np.pi * freq * t + alpha) + offset


t_lin = np.linspace(min(t), max(t), 1000)

popt, pcov = curve_fit(func, t, x)
plt.scatter(t, x, s=1)
plt.plot(t_lin, func(t_lin, *popt), color="r", linestyle="--", label="Best Fit")
plt.show()

In [None]:
r2_score(x, func(t, *popt))

## Account for rotation

The video taken is slightly tilted: the x-axis of the frame doesn't align perfectly with the horizon. This can be seen from the graph below, where the pendulum is higher on the right end of the oscillation than the left end of the oscillation (by right, they should be symmetric). We can perform a matrix multiplication to rotate the video to correct for this tilt.

In [None]:
y_min = np.min(y)
x_mean = np.mean(x)
x_disp = x - x_mean
y_disp = y - y_min

plt.scatter(x_disp, y_disp)
plt.gca().set_aspect("equal")
plt.xlabel("x position")
plt.ylabel("y position")

In [None]:
xy = np.array([x_disp, y_disp]).T
rotation_matrix = lambda a: np.array([[np.cos(a), -np.sin(a)], [np.sin(a), np.cos(a)]])


def rotate(a):
    return xy @ rotation_matrix(a).T


def error_fn(a):
    rotated_xy = rotate(a)
    rotated_y = rotated_xy[0, :, 1]
    return rotated_y


angle_estimate = 0
res = least_squares(error_fn, angle_estimate)
angle = res.x
np.rad2deg(angle)

So, we need to rotate the whole trajectory of the pendulum by about -3.63 degrees to level video with the horizon.

In [None]:
rotated_xy = rotate(angle)
x_rotated, y_rotated = rotated_xy[0].T
plt.scatter(x_rotated, y_rotated)
plt.gca().set_aspect("equal")

In [None]:
def func(t, A, alpha, offset, m):
    return A * (np.exp(m * t)) * np.sin(2 * np.pi * freq * t + alpha) + offset


t_lin = np.linspace(min(t), max(t), 1000)

popt, pcov = curve_fit(func, t, x_rotated)
plt.scatter(t, x_rotated, s=1)
plt.plot(t_lin, func(t_lin, *popt), color="r", linestyle="--", label="Best Fit")

In [None]:
r2_score(x_rotated, func(t, *popt))

In [None]:
peaks, _ = find_peaks(x_rotated)
troughs, _ = find_peaks(-x_rotated)
crit_pts = np.append(peaks, troughs)

x_mean = np.mean(x_rotated[crit_pts])
x_disp = x_rotated - x_mean

plt.scatter(t, np.abs(x_disp), s=1)
plt.show()
plt.scatter(t, np.abs(x_disp), s=1)

x_cutoff = np.mean(x_rotated[troughs])
plt.axhline(np.abs(x_cutoff - x_mean), linestyle="--", color="r")
plt.show()

## Final Result

In [None]:
vid_path = Path("data/pendulum.mp4")
csv_path = vid_path.parent / (vid_path.stem + "_output2.csv")

df = pd.read_csv(csv_path)
frame = df["frame"]
x = df["y"]
y = df["x"]

FPS = 30
t = frame / FPS

# Find peaks to find frequency
peaks, _ = find_peaks(x)
troughs, _ = find_peaks(-x)
crit_pts = np.append(peaks, troughs)
t_crit, x_crit = t[crit_pts], x[crit_pts]
period = np.mean(np.diff(sorted(t_crit))) * 2
freq = 1 / period


def func(t, A, alpha, offset, m):
    return A * (np.exp(m * t)) * np.sin(2 * np.pi * freq * t + alpha) + offset


t_lin = np.linspace(min(t), max(t), 1000)

popt, pcov = curve_fit(func, t, x)
plt.scatter(t, x, s=1)
plt.plot(t_lin, func(t_lin, *popt), color="r", linestyle="--", label="Best Fit")

In [None]:
r2_score(x, func(t, *popt))

# Pendulum Parameters

Using some geometric reasoning, we can find the maximum angular displacement $\theta_{0}$ of the pendulum and length of string $R$ by solving these equations below, where:


H = vertical displacement of the pendulum (height)

W = horizontal displacement of the pendulum (width)

$$
H \sin(θ_{0}) + W \cos(θ_{0}) - W = 0
$$

$$
R \left[1 - \cos \left(\sin^{-1} \left(\frac{W}{R}\right) \right)\right] - H = 0
$$
$$
R [1 - \sqrt{1 - \frac{W^2}{R^2}}] - H = 0
$$

$$
R = \frac{W}{\sin(θ_{0})}

## String Length

The following two cells give us the maximum horizontal and vertical displacements for each oscillation. We need to do this for every oscillation because the shaking of the video and the amplitude decay due to air drag make the estimates like `max(y) - min(y)` inaccurate.

In [None]:
def moving_average(a, n=3):
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1 :] / n


FPS = 30
t = np.array(frame / FPS)

y = np.array(y)

# Moving average to remove unwanted secondary peaks
y_peaks, _ = find_peaks(moving_average(y, 10))
y_troughs, _ = find_peaks(-moving_average(y, 10))

y_peaks += 5  # Due to moving average
y_troughs += 5
crit_pts = np.append(y_peaks, y_troughs)
t_crit, y_crit = t[crit_pts], y[crit_pts]

plt.scatter(t_crit, y_crit, color="k", label="peaks")
plt.scatter(t, y, s=1)
plt.show()

x = np.array(x)
peaks, _ = find_peaks(x)
troughs, _ = find_peaks(-x)
crit_pts = np.append(peaks, troughs)
x_amplitude = x[peaks] - x[troughs]
t_crit, x_crit = t[crit_pts], x[crit_pts]

plt.scatter(t_crit, x_crit, color="k", label="peaks")
plt.scatter(t, x, s=1)
plt.show()

In [None]:
heights = y[y_peaks] - y[y_troughs]
widths = (x[peaks] - x[troughs]) / 2


def len_pendulum_str(R, W, H):
    return R * (1 - np.sqrt(1 - (W**2 / R**2))) - H


Rs = []
for W, H in zip(widths, heights):
    Rs.append(root(lambda R: len_pendulum_str(R, W, H), 500).x[0])

list(zip(widths, heights, Rs))

We get a different estimate of the string length each oscillation. This is likely due to noise from the shaking of the camera. 

In [None]:
(max(Rs) - min(Rs)) / np.mean(Rs)

The range is 21.7% of the mean

In [None]:
np.std(Rs) / np.mean(Rs)

The standard deviation is 7% of the mean. This suggests that taking any one string length from one oscillation is likely to introduce a lot of error but taking the mean reduces the error significantly.

## Maximum Angular Displacement

Get an upper bound of the pendulum's angular displacement to check the small angle approximation used in the equation.

In [None]:
W = min(widths)
H = max(heights)
print(f"W: {W}, H: {H}")


def max_ang_disp(theta_0):
    return H * np.sin(theta_0) + W * np.cos(theta_0) - W


theta_lin = np.linspace(0, 1, 1000)
plt.plot(theta_lin, max_ang_disp(theta_lin))
plt.axhline(0, color="k", linestyle="--")
plt.xlim(0, 1)
plt.show()

theta_0_sol = root(max_ang_disp, 0.4)
print(theta_0_sol)

theta_0 = theta_0_sol.x[0]
print(f"\ntheta_0 in degrees: {theta_0 / np.pi * 180}")

The small angle approximation is not well justified in this case and we expect the curve to deviate significantly over a longer period
of time. To address this, we can either increase the string length, reduce the initial displacement or solve the general differential
equation for the pendulum. 