In [None]:
import cv2
import datetime
import matplotlib.animation as animation
import matplotlib.pyplot as plt
import numpy as np
import pybgs as bgs

from IPython.display import HTML

from breathecam import BreatheCam
from common import get_previous_frame_time
from components import View
from motion import temporal_events, mask_background, get_event


def display_event(video, flattened=False):
    fig, ax = plt.subplots(1, 1)
    fig.tight_layout()

    ax.set_axis_off()

    ims = []

    if len(video.shape) < 4 and flattened:
        ims.append([ax.imshow(video)])
    else:
        for i in range(len(video)):
            ims.append([ax.imshow(video[i], animated=True)])

    anim = animation.ArtistAnimation(fig, ims, interval=175, blit=True, repeat_delay=1000)

    plt.close()

    display(HTML(anim.to_jshtml()))

In [None]:
#expertiment settings and avoid midnight
day = datetime.date.fromisoformat("2024-05-19")
time = datetime.time.fromisoformat("09:49:00")
previous_frame_time = get_previous_frame_time(time, 3)
nframes = 80
nlevels = 4
view = View(2307, 1914, 6814, 2515)

print(f"Size: {view.width}x{view.height} ({view.width * view.height * nframes} pixels)")

In [None]:
camera = BreatheCam.init_from("Clairton Coke Works", day)
breathecam_video = camera.download_video(previous_frame_time, nframes+1, view, nlevels)
fullres_video = camera.download_video(time, nframes, view, 1)

In [None]:
#modified mask_background from motion.py so that can take bgs background subtractor instances
def mask_background(video: np.ndarray, background_subtractor) -> np.ndarray:
    """Generates, by background subtraction, a video of foreground masks

    Parameters
    ----------
    * video - a collection of frames
    * background_subtractor - a mask generating function to apply to each frame

    Returns
    -------

    A collection of frames representing foreground objects in each frame
    """

    if isinstance(background_subtractor, cv2.BackgroundSubtractor):
        print("using cv background subtractor...")
        f = background_subtractor.apply
    elif isinstance(background_subtractor, bgs.SuBSENSE):
        print("using bgs SuBENSE background subtractor")
        f = background_subtractor.apply
    elif isinstance(background_subtractor, bgs.PixelBasedAdaptiveSegmenter):
        print(" PixelBasedAdaptiveSegmenter")
        f = background_subtractor.apply
    elif isinstance(background_subtractor, bgs.LOBSTER):
        print("LOBSTER")
        f = background_subtractor.apply
    else:
        #Also tried ViBe algorithm but couldn't find name
        print("couldn't find name of bgs background subtractor class")
        f = background_subtractor.apply

    if len(video.shape) == 3:
        return np.array([f(video)])
    else:
        return np.array([f(v) for v in video])

#instantiate background_subtractor instance here
#name of class sometimes hard to find
#find in this GitHub: https://github.com/andrewssobral/bgslibrary/tree/master/bgslibrary/algorithms
background_subtractor = bgs.PixelBasedAdaptiveSegmenter()
masks = mask_background(breathecam_video[1:], background_subtractor)




In [None]:

eventspace = temporal_events(masks, neighbors=8, depth=3, threshold=127)
event_filter = lambda e : e.number_of_frames > 3 and e.region.height > 10 and e.region.width > 10
eventspace = list(filter(event_filter, eventspace))

In [None]:
#more of a type than a class. Acutall functionality implemented in sub-classes

#init method. set nessisary hyperparameters alpha, beta etc... 

#update method. mutates the passed estimates array. 

#TODO define estimators as subclasses?
class Estimator():
    def __init__(self):
        pass
    def update(self, estimates, test_results):
        pass

class Exponential_Smoothing_Estimator():
    def __init__(self, alpha):
        self.alpha = alpha
    
    def update(self, estimates, test_results):
        assert(estimates.shape == test_results.shape)
        rows, cols = estimates.shape

        for r in range(rows):
            for c in range(cols):
                alpha_estimate = self.alpha * test_results[r, c]
                prev_estimate = (1 - self.alpha) * estimates[r, c]
                estimates[r, c] = alpha_estimate + prev_estimate

class Moving_Average_Estimator():
    #Skip this one because doesn't fit into the Estimator class interface well
    #Also is prob not as good as others
    def __init__(self):
        return
    
    def update(self, estimates, test_results):
        assert(estimates.shape == test_results.shape)
        rows, cols = estimates.shapes
        for r in range(rows):
            for c in range(cols):
                #If continue implementing. Maybe implement as running average
                pass
class Double_Exponential_Smoothing_Estimator():
    def __init__(self, alpha, beta):
        self.alpha = alpha
        self.beta = beta
    
    #TODO don't understand how exactly how to implement this one ask Harry 

In [None]:
def naive_steam_approx(masks, video, estimator, steam_probability, intensity_tresh = 70):
    #Non-mutating. Returns a copy of video with steam pixels subtracted
    
    estimates = np.full(video.shape[1:3], 0.5)
    for frameIdx in range(video.shape[0]):
        # ########## THis block is buggy ########
        gray_frame = cv2.cvtColor(video[frameIdx], cv2.COLOR_BGR2GRAY)
        curr_mask = masks[frameIdx]
        selected_pixels = np.where(curr_mask == 0, gray_frame, 0)
        test_results = selected_pixels > intensity_tresh
        test_results = np.where(test_results, 1, 0)
        # ######end of buggy block ##########
        estimator.update(estimates, test_results)
    
    print(set(estimates.flatten()))
    #the following np code may also be buggy
    #Remove any pixels with a high probability of being steam
    steam_mask = estimates > steam_probability
    #TODO look at steam_mask. Try masking video with steam_mask to see what happens
    non_steam_mask =  np.logical_not(steam_mask)
    non_steam_mask = non_steam_mask[np.newaxis, :, :, np.newaxis]
    non_steam_mask = np.broadcast_to(non_steam_mask, video.shape)
    non_steam_video = video.copy()
    non_steam_video[non_steam_mask] = 0
    return non_steam_video

In [None]:
#exclude 0th frame of video because that frame is an extra framed just added 
#to init background subtractor
estimator = Exponential_Smoothing_Estimator(0.1)
non_steam_video = naive_steam_approx(masks, breathecam_video[1:], estimator, 0.99, intensity_tresh=70)

In [None]:
display_event(fullres_video)

In [None]:
display_event(masks)

In [None]:
display_event(non_steam_video)

In [None]:
#event shape: (num_frames, _, _,channels)
events_cropped_vids = [] 
for i in range(10):
    event = get_event(eventspace[i], fullres_video[], nlevels=4)
    events_cropped_vids.append(event)

In [None]:
for event_vid in events_cropped_vids:
    display_event(event_vid)

In [None]:
#code for cv2 optical flow usage from:
#1) https://docs.opencv.org/4.x/d4/dee/tutorial_optical_flow.html
#2) https://www.geeksforgeeks.org/python-opencv-dense-optical-flow/

firstframe = event[0]
prev_gray = cv2.cvtColor(firstframe, cv2.COLOR_BGR2GRAY)

hsv_mask = np.zeros_like(firstframe)
hsv_mask[..., 1] = 255

flow_vid = []

#write as loop for testing. Then actually make loop run longer than one frame
#TODO change to not skip first frame??
for frame in event[1:]:
    curr_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None,
                                        0.5, 3, 15, 3, 5, 1.2, 0)

    magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])

    hsv_mask[..., 0] = angle * 180 / np.pi / 2

    hsv_mask[..., 2] = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX)
    rgb = cv2.cvtColor(hsv_mask, cv2.COLOR_HSV2BGR)
    flow_vid.append(rgb)

    prev_gray = curr_gray

flow_vid = np.array(flow_vid)
display_event(flow_vid)
#display_event(fullres_video)