In [2]:
import datetime
import queue
import time
import threading
import cv2

MAX_RETRIES = 5  # Define the constant here
FPS = 10  # Define the FPS constant here as well

# Define the ClearingQueue class
class ClearingQueue(queue.Queue):
    def put(self, item):
        while True:
            try:
                super().put(item, block=False)
            except queue.Full:
                _ = self.get_nowait()
            else:
                break

# Define the Pynopticon class
class Pynopticon:
    def __init__(self, record_frames=100, width=640, height=480, cam=0, fps=10, new_frame_callback=None, youtube=None, sg=None):
        self.record_frames = record_frames
        self.queue = ClearingQueue(maxsize=record_frames)
        self.stopped = False
        self.t = None
        self.cap = None

        self.width = width
        self.height = height
        self.cam = cam
        self.fps = fps  # Store FPS as an instance variable

        self.new_frame_callback = new_frame_callback
        self.youtube = youtube
        self.sg = sg

    def start(self):
        """ Start the video capture. """

        if self.t is not None:
            raise RuntimeError("Already started, call stop first")

        def _record():
            print("opening camera", self.cam)
            self.cap = cv2.VideoCapture(f"/dev/video{self.cam}", cv2.CAP_V4L2)
            self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
            print("opened camera")
            retries = 0
            while not self.stopped:
                ret, frame = self.cap.read()
                time.sleep(1 / self.fps)  # Use the instance variable self.fps
                if not ret or not self.cap.isOpened():
                    if retries < MAX_RETRIES:  # Use the MAX_RETRIES constant
                        self.cap.release()
                        self.cap = cv2.VideoCapture(self.cam)
                        self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
                        retries += 1
                        continue
                    else:
                        break

                frame = cv2.resize(frame, (self.width, self.height))
                self.queue.put(frame)
                if self.new_frame_callback is not None:
                    self.new_frame_callback(frame)
            print("! cam exit: ", self.cam)
            self.cap.release()

        self.stopped = False
        self.t = threading.Thread(target=_record, daemon=True)
        self.t.start()

    def stop(self):
        """ Stop without saving. """
        print("stopping")
        self.stopped = True
        if self.t is not None:
            self.t.join()
            self.t = None

    def reset(self):
        """ Reset queue. """
        self.queue = ClearingQueue(maxsize=self.record_frames)

    def save(self, outname: str = "output.avi", fps: int = 15, upload=False, title=None, description=None, mail_to: list = None, mail_from=None):
        """ Stop and save. """
        self.stop()

        out = cv2.VideoWriter(outname, cv2.VideoWriter_fourcc(*"DIVX"), fps, (self.width, self.height))

        while True:
            try:
                frame = self.queue.get_nowait()
                out.write(frame)
            except queue.Empty:
                break

        out.release()

        # Upload and email functionality is omitted here for simplicity
        return None

# Main function
def main():
    # Initialize Pynopticon with the correct camera index and resolution
    pynopticon = Pynopticon(record_frames=100, width=640, height=480, cam=2, fps=10)  # Adjust cam index and fps as needed

    # Start capturing video
    pynopticon.start()

    # Record for a few seconds (e.g., 10 seconds)
    time.sleep(10)

    # Stop capturing
    pynopticon.stop()

    # Save the captured video to a file named "output.avi"
    pynopticon.save(outname="output.avi", fps=10)

if __name__ == "__main__":
    main()


opening camera 2
opened camera
stopping
! cam exit:  2
stopping
