In [1]:
!pip install ultralytics
!pip install websockets

Collecting ultralytics
  Downloading ultralytics-8.2.18-py3-none-any.whl.metadata (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.7/40.7 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Collecting thop>=0.1.1 (from ultralytics)
  Downloading thop-0.1.1.post2209072238-py3-none-any.whl.metadata (2.7 kB)
Downloading ultralytics-8.2.18-py3-none-any.whl (757 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m757.2/757.2 kB[0m [31m14.1 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading thop-0.1.1.post2209072238-py3-none-any.whl (15 kB)
Installing collected packages: thop, ultralytics
Successfully installed thop-0.1.1.post2209072238 ultralytics-8.2.18
Collecting streamlink
  Downloading streamlink-6.7.4-py3-none-any.whl.metadata (6.5 kB)
Collecting isodate (from streamlink)
  Downloading isodate-0.6.1-py2.py3-none-any.whl.metadata (9.6 kB)
Collecting pycountry (from streamlink)
  Downloading pycountry-23.12.11-py3-none-any.whl.metadata (

In [2]:
from ultralytics import YOLO
import cv2
import math
from urllib.parse import urlparse


#Function to mark the bounding boxes in the img fram
def mark_boxes(model1, model2, model3, img, classNames):
    """
    Takes in 3 models, model1-> fall_detection model, model2-> fire_detection model, model3-> face_detection model
    """
    fall_results = model1(img, stream=True)
    fire_results = model2(img, stream=True)
    face_results = model3(img, stream=True)
    
    combined_results = [fall_results, fire_results, face_results]

  # Draw bounding boxes and annotations
    
    for i,results in enumerate(combined_results):
        
        for r in results:
            boxes = r.boxes
            for box in boxes:
              #Get bounding box coordinates
              x1, y1, x2, y2 = map(int, box.xyxy[0])

              #Get class index
              cls = int(box.cls[0])

              #confidence
              confidence = math.ceil((box.conf[0]*100))/100
                
              if confidence < 0.40 :
                continue

              if classNames[i][cls] == 'stranger' or classNames[i][cls] == 'sitting' or classNames[i][cls] == 'walking':
                continue
            
              if classNames[i][cls] == 'Fall Detected' and confidence < 0.85:
                continue 
              #Draw Bounding box
              cv2.rectangle(img, (x1,y1), (x2,y2), (0, 128, 255*i % 251), 3)

              # Draw class Name
              org = (x1,y1)
              font = cv2.FONT_HERSHEY_SIMPLEX
              fontScale = 1
              color = (0, 0, 255)
              thickness = 2
              cv2.putText(img, f'{classNames[i][cls]}:{confidence}', org, font, fontScale, color, thickness)

    return img # marked frame is returned.




def crop_image(image, x_min, y_min, x_max, y_max):
    """
    Croping the image based on the give points
    """
    cropped_image = image[y_min:y_max, x_min:x_max]
    return cropped_image

def person_detect(img):
    """
    Function uses the yolo model to detect the person in the image and returns only the person image (cropped)
    """
    model = YOLO('yolov8n.pt')
    classnames = ['person']
    results = model(img, stream= True)
    persons = list()
    for res in results:
        for box in res.boxes:
            
            #Get the class index
            cls = int(box.cls[0])
            if cls == 0: # if the class is person the we crop the image and put it in the presons list
                x1, y1, x2, y2 = map(int, box.xyxy[0])# Get the box boundries in the xyxy format
                persons.append(crop_image(img, x1, y1, x2, y2))
    return persons
                


def identify (img):
    """
    The following function takes in an image frame and checks if rohan or sagar 
    is in the frame or not if both are not there it returns the stranger as output
    
    the input images is the one's from the person_detect function 
    we only check for sagar or rohan only if we found a person in the image
    """
    model = YOLO("models/face_new.pt")
    classnames = ["fire", "stranger", "rohan", "sagar"]
    results = model(img)
    for res in results:
        for box in res.boxes:
            cls = int(box.cls[0])
            confidence = math.ceil((box.conf[0]*100))/100
            
            if cls == 2 and confidence > 0.30:
                return 'rohan'
            elif cls == 3 and confidence > 0.30:
                return 'sagar' 
            
    return 'det'



def identify_fall(img):
    """
    In the given input frame the function return is there is any one falling in the image.
    
    """
    model = YOLO("models/fall_model.pt")
    classnames = ["Fall Detected", "sitting", "walking"]
    results = model(img)
    
    for res in results:
        for box in res.boxes:
            cls = int(box.cls[0])
            confidence = math.ceil((box.conf[0]*100))/100
            
            if cls == 0 and confidence > 0.60:
                return 'Fall Detected'
    return 'no detection'

def identify_fire(img):
    """
    In the given input frame the function return is there is any fire detected in the image.
    """
    model = YOLO("models/fire.pt")
    classnames = ["fire"]
    results = model(img)
    
    for res in results:
        for box in res.boxes:
            cls = int(box.cls[0])
            confidence = math.ceil((box.conf[0]*100))/100
            
            if cls == 0 and confidence > 0.20:
                return 'fire'
    return 'no detection'


import requests

telegram_bot_token = '7179871536:AAFPxxsJvdI00vhsanAxKM1RHJed7S_Ab8o'# use a telegram bot token here 
telegram_chat_id = '-4145119484'# telegram chat id or group id

def send_telegram_message_with_image(image, text):
    # Encode imgae to bytes
    _, img_encoded = cv2.imencode('.jpg', image)
    image_bytes = img_encoded.tobytes()
    
    # send message with image to Telegram using the Telegram Bot API
    url = f'https://api.telegram.org/bot{telegram_bot_token}/sendPhoto'
    files = {'photo': (f'{text}.jpg', image_bytes)}
    data = {'chat_id': telegram_chat_id, 'caption': f'{text} detected!'}
    response = requests.post(url, files = files, data = data)
    
    if response.status_code == 200:
        print("Telegram notification sent successfully!")
    else:
        print("Failed to send Telegram notification.")
        

def anomaly_activity_notify(img):    
        persons = person_detect(img) 
            
        for person in persons:
            prediction = identify(person)
            if prediction == 'sagar' or prediction == 'rohan':
                continue
            else:
                print('```````````````stranger detected``````````````')
                send_telegram_message_with_image(person, 'stranger')
            
        prediction = identify_fire(img)
        if prediction == 'fire':
            print('``````````````fire detected``````````````')
            send_telegram_message_with_image(img, 'fire')
            
        prediction = identify_fall(img)
        if prediction == 'Fall Detected':
            print('``````````````fall detected``````````````')
            send_telegram_message_with_image(img, 'fall')
        
        

# Direct URL method
video_input_url = "/kaggle/input/test-videos/testing3.mp4"
# Create VideoCapture object for video input
cap = cv2.VideoCapture(video_input_url)
cap.set(3, 640)
cap.set(4, 480)

# Define the codec and create a VideoWriter object to save the output video
fourcc = cv2.VideoWriter_fourcc(*'MP4V')
output_video_path = 'output_video.avi'
fps = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height))


# Load the YOLO model
fall_det_model = YOLO("models/fall_model.pt")
fire_det_model = YOLO("models/fire.pt")
face_det_model = YOLO("models/face_new.pt")

# Object classes
fall_class = ["Fall Detected", "sitting", "walking"]
fire_class = ["fire"]
face_class = ["fire", "stranger", "rohan", "sagar"]

count = 0
while True:
    success, img = cap.read()
    
    if not success:
        print("not success")
        break
        
    # using count so that we check for the anomaly for every 20 frames to reduce the computation.
    if count % 20 ==0:
        anomaly_activity_notify(img)

#     anomaly_activity_notify(img)
    img = mark_boxes(fall_det_model, fire_det_model, face_det_model, img, [fall_class, fire_class, face_class])
    out.write(img)
    count = count + 1

# # Release resources
cap.release()
out.release()
# # cv2.destroyAllWindows()


OpenCV: FFMPEG: tag 0x5634504d/'MP4V' is not supported with codec id 12 and format 'avi / AVI (Audio Video Interleaved)'
OpenCV: FFMPEG: fallback to use tag 0x34504d46/'FMP4'


Downloading https://github.com/ultralytics/assets/releases/download/v8.2.0/yolov8n.pt to 'yolov8n.pt'...


100%|██████████| 6.23M/6.23M [00:00<00:00, 75.8MB/s]



0: 384x640 3 persons, 96.8ms
Speed: 10.6ms preprocess, 96.8ms inference, 3287.3ms postprocess per image at shape (1, 3, 384, 640)

0: 640x640 (no detections), 7.3ms
Speed: 3.7ms preprocess, 7.3ms inference, 0.6ms postprocess per image at shape (1, 3, 640, 640)
```````````````stranger detected``````````````
Telegram notification sent successfully!

0: 640x576 1 rohan, 68.1ms
Speed: 2.1ms preprocess, 68.1ms inference, 1.6ms postprocess per image at shape (1, 3, 640, 576)

0: 640x640 1 rohan, 1 sagar, 7.3ms
Speed: 2.6ms preprocess, 7.3ms inference, 1.3ms postprocess per image at shape (1, 3, 640, 640)

0: 384x640 (no detections), 75.6ms
Speed: 1.9ms preprocess, 75.6ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Sitting, 25.7ms
Speed: 1.7ms preprocess, 25.7ms inference, 1.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 1 Sitting, 25.8ms
Speed: 1.9ms preprocess, 25.8ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 38

In [None]:
# Taking the input through web url
import cv2
import numpy as np
import base64
import websocket

# WebSocket server URL
ws_url = "ws://10.32.5.182:5000"

# Initialize video writer
frame_width = 640
frame_height = 480
fps = 30
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter('received_video.avi', fourcc, fps, (frame_width, frame_height))

# Event handler for when the WebSocket connection is opened
def on_open(ws):
    print("WebSocket connection opened.")

# Event handler for when a message is received from the WebSocket server
def on_message(ws, message):
    
    frame_data = base64.b64decode(message) # Decode the base64-encoded message
    nparr = np.frombuffer(frame_data, np.uint8) # Convert the frame data to a numpy array
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) # Decode the image array
    out.write(img) # Write the frame to the video

    print("Received and saved a video frame.")

# Event handler for when an error occurs
def on_error(ws, error):
    print("WebSocket error:", error)

# Event handler for when the WebSocket connection is closed
def on_close(ws):
    print("WebSocket connection closed.")
    # Release video writer
    out.release()

if __name__ == "__main__":
    # Create a WebSocket instance
    ws = websocket.WebSocketApp(ws_url,
                                on_open=on_open,
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close)

    # Connect to the WebSocket server
    ws.run_forever()
