In [2]:
import cv2
import requests

# For Android IP Webcam
# import numpy as np
# from requests.auth import HTTPBasicAuth

from dotenv import load_dotenv
import os
load_dotenv()

from ultralytics import YOLO

from threading import Timer
import time
import datetime

import base64

user = os.getenv('USER')
password = os.getenv('PASSWORD')
url = os.getenv('URL')

ntfy_user=os.getenv('NTFY_USER')
ntfy_pass=os.getenv('NTFY_PASS')

proj_path = os.getenv('PROJ_PATH')

host = os.getenv('HOSTNAME')
topic = os.getenv('TOPIC')

model = YOLO(os.getenv('MODEL'))

suspend = os.getenv('SUSPEND')

os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;0"

In [None]:
def get_img_from_ipcam_stream(
    url:str = None,
    user:str = None,
    password:str = None
    ):
  
  if url is None:
    raise ValueError("url is Empty")
  if user is None or password is None:
    raise Warning("Username and password not set, check if your server is set to with password.")
  
  # ipcam = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
  # ret, frame = ipcam.read()
  # if ret:
  #   img = cv2.imdecode(frame, cv2.IMREAD_COLOR)

  # For Android IP webcam
  img_resp = requests.get(url, auth=HTTPBasicAuth(username=user, password=password))
  img_arr = np.array(bytearray(img_resp.content), dtype=np.uint8) 
  img = cv2.imdecode(img_arr, cv2.IMREAD_COLOR)
  
  return img

In [None]:
def extract_model_prediction(model, img) -> dict:
  result = model(img)[0]
  max_conf = 0
  for conf, cs in zip(result.boxes.conf, result.boxes.cls):
    if conf >= 0.3 and result.names[int(cs)] == 'cat' and (conf >= max_conf):
      max_conf = conf
  
  msg_dict = None
  if max_conf >= 0.8:
    msg_dict = {
      "pr": "default",
      "title": "Cat Detected!",
      "msg": f"Found cat at confedence level: {conf}",
      # "img": img,
      "tags": "tada"
    }
  elif max_conf >= 0.3:
    msg_dict = {
      "pr": "low",
      "title": "Cat Detected! Probably...",
      "msg": f"Found cat at confedence level: {conf}. This could be wrong.",
      # "img": img,
      "tags": "tada"
    }
  else:
    msg_dict = None

  return msg_dict

In [None]:
def push_ntfy(
    host:str = None,
    topic:str = None,
    msg_dict:dict= None,
    img_path = None
    ):
  auth = base64.b64encode((ntfy_user+":"+ntfy_pass).encode('UTF-8'))

  requests.post(
    f"https://{host}/{topic}",               
    data=msg_dict['msg'].encode(encoding='utf-8'),
    headers={
      "Authorization": auth,
      "Title": msg_dict['title'],
      "Priority": msg_dict['pr'],
      "Tags": msg_dict['tags']
    }
  )

  if img_path is not None:
    data = open(img_path, "rb")
    filename = img_path.split('/')[-1]
    requests.put(
      f"https://{host}/{topic}",
      data=data,
      headers={"Filename": filename}
    )

In [None]:
def chk_crt_path(path):
  if not os.path.exists(path):
    os.mkdir(path)
  elif not os.path.isdir(path):
    raise ValueError("path given exists and is not a directory.")

In [None]:
import cv2
import threading

class RTSPStream:
  def __init__(self, rtsp_url):
    self.rtsp_url = rtsp_url
    self.cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
    self.frame = None
    self.lock = threading.Lock()
    self.running = True
    threading.Thread(target=self.update, daemon=True).start()

  def update(self):
    while self.running:
      ret, frame = self.cap.read()
      if ret:
        with self.lock:
          self.frame = frame

  def get_frame(self):
    with self.lock:
      return self.frame.copy() if self.frame is not None else None

  def stop(self):
    self.running = False
    self.cap.release()

In [None]:
# Initialize threaded RTSP stream
stream = RTSPStream(url)

threshold = 500
activation = 0
step = 20

chk_crt_path(f"{proj_path}/saved_img")

while True:
  try:
    frame = stream.get_frame()
    filename=f"{proj_path}/saved_img/{datetime.datetime.now().strftime('%c')}.jpg"
    
    if frame is not None:
      msg_dict = extract_model_prediction(model, frame)
      
      if msg_dict is not None:
        activation += step
      else:
        activation -= (step/10)

      if activation >= threshold:
        cv2.imwrite(filename, frame)
        push_ntfy(host, topic, msg_dict, filename)

        time.sleep(SUSPEND)
        activation = 0
      
  except KeyboardInterrupt:
    break

stream.stop()

In [None]:
test_img = proj_path + '/test_img/test1.jpg'

file= f"{proj_path}/saved_img/{datetime.datetime.now().strftime('%c')}.jpg"

img = cv2.imread(test_img)
cv2.imwrite(file, img)

if img is None:
  raise ValueError("Image doesn't exist.")
else:
  model = YOLO('yolov8n.pt')
  msg_dict = extract_model_prediction(model, img)
  if msg_dict is not None:
    push_ntfy(host, topic, msg_dict, file)