In [35]:
import numpy as np
import cv2

In [36]:
def movement_tracker(fst, snd):
    frame_diff = cv2.absdiff(fst, snd)
    frame_diff = cv2.GaussianBlur(cv2.cvtColor(frame_diff, cv2.COLOR_BGR2GRAY), (5,5), 0)
    frame_diff  = cv2.dilate(frame_diff, None, iterations=3)
    frame_diff = cv2.threshold(frame_diff, thresh=20, maxval=255, type=cv2.THRESH_BINARY)[1]
    contours, _ = cv2.findContours(frame_diff, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    out_frame   = snd.copy()
    movement_marks = np.empty_like(out_frame, np.uint8)
    for con in contours:
        (x,y,w,h) = cv2.boundingRect(con)
        if cv2.contourArea(con) > 120:
            cv2.rectangle(movement_marks, (x,y), (x+w, y+h), (0,0,255), cv2.FILLED)
    mask = movement_marks.astype(bool)
    alpha = 0.3
    out_frame[mask] = cv2.addWeighted(out_frame, alpha, movement_marks, 1-alpha, 0)[mask]
    return out_frame

In [37]:
def movement_tracker_TWO(fst, snd):
    frame_diff = cv2.absdiff(fst, snd)
    frame_diff = cv2.medianBlur(cv2.cvtColor(frame_diff, cv2.COLOR_BGR2GRAY), 5) #Блюр чтобы сгладить шум
    frame_diff = cv2.threshold(frame_diff, thresh=20, maxval=255, type=cv2.THRESH_BINARY)[1] #Если значение пиксели выше thresh то он белый, остальные в черный
    frame_diff = cv2.dilate(frame_diff, np.ones((3,3)), iterations=3) #Расширить белые пиксели

    frame_diff_color = cv2.cvtColor(cv2.medianBlur(frame_diff, 5), cv2.COLOR_GRAY2BGR)
    frame_diff_color[frame_diff.astype(bool)] = [0,0,255]
    frame_diff_color[np.logical_not(frame_diff.astype(bool))] = [0,255,0]
    alpha = 0.7
    out_frame = cv2.addWeighted(fst, alpha, frame_diff_color, 1-alpha, 0)
    return out_frame

In [38]:
class MovementTimer:
    timer = 0

    def __init__(self, cyclePeriod = 15):
        self.cyclePeriod = cyclePeriod
    
    def timeIT(self):
        self.timer += 1
        if self.timer == self.cyclePeriod:
            self.timer = 0
            return True
        return False

In [39]:
class FrameRenderer():
    
    def __init__(self, prefixName = "",  initialframe = None , cyclePeriod = 15):
        self.timer = MovementTimer(cyclePeriod)
        self.prevFrame = initialframe
        self.prefix = prefixName.strip() + " "
        self.state = False
    
    def drawState(self, frame : cv2.Mat):
        if self.state:
            return cv2.putText(frame, "Tracking", (0, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
        else:
            return cv2.putText(frame, "Stay", (0, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

    def cycle(self, nextFrame  : cv2.Mat):
        if self.timer.timeIT():
            self.state = not self.state
        if self.state:
            frameMTwo = movement_tracker_TWO(nextFrame, self.prevFrame)
            frameMOne = movement_tracker(nextFrame, self.prevFrame)
        else:
            frameMTwo = nextFrame
            frameMOne = nextFrame
        self.prevFrame = nextFrame.copy()
        cv2.imshow(self.prefix + 'ORIGINAL', nextFrame)
        cv2.imshow(self.prefix + 'MOVEMENTTWO', self.drawState(frameMTwo))
        cv2.imshow(self.prefix + 'MOVEMENT', self.drawState(frameMOne))

In [40]:
def from_file(filename):
    cap = cv2.VideoCapture(filename)
    ret, prev = cap.read()
    renderer = FrameRenderer(prefixName = "File", initialframe=prev, cyclePeriod=60)
    if not ret:
        raise ValueError("Video stream not opened")

    while True:
        key = cv2.waitKey(20) & 0xff
        ret, frame = cap.read()
        if key == 27 or not ret: break
        renderer.cycle(frame)
    
    cv2.destroyAllWindows()
    cap.release()

In [41]:
try:
    import yt_dlp
    from vidgear.gears import CamGear
    def from_site(url):
        options = {"STREAM_RESOLUTION": "480p", }
        stream = CamGear(source=url, stream_mode=True,logging=False, **options).start()
        renderer = FrameRenderer(prefixName = "Stream", initialframe=stream.read(), cyclePeriod=60)
        while True:
            key = cv2.waitKey(20) & 0xFF
            frame = stream.read()
            if frame is None or key == 27: break #ESC
            renderer.cycle(frame)
        cv2.destroyAllWindows()
        stream.stop()

except ModuleNotFoundError:
    print("Not founded lib to work with stream")
    def from_site(url):
        return None


Not founded lib to work with stream


In [42]:
if __name__ == '__main__':
    from_site("https://www.youtube.com/watch?v=akjT10sjPTc")
    from_file("NovCAm.mp4")