# Testing the LSTM model on a saved video

In [4]:
import cv2
import math
import numpy as np
from ultralytics import YOLO
from tensorflow import keras
from collections import defaultdict, deque
import os

HOME = os.getcwd()

# Load the models
model_path = os.path.join(HOME,'runs\\detect\\train\\weights\\best.pt')
YOLO_model = YOLO(model_path) 
LSTM_model = keras.models.load_model(f"{HOME}\\accident_prediction_LSTM_model.keras")


# Open the video file
video_path = 'video.mp4'
                   
cap = cv2.VideoCapture(video_path)

max_frames=50
max_objects=30
no_of_features=8

track_history = defaultdict(lambda: [])

# Initialize a deque to hold the latest 50 processed frames
sequences = deque(maxlen=50)

# Loop through the video frames
while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()
    
    if success:
        # Run YOLOv8 inference on the frame
        results = YOLO_model.track(frame,persist=True)
        current_sequence =[None]*max_objects
        flag = False
        if results is not None and results[0].boxes is not None:
            flag = True
            boxes = results[0].boxes.xywh.cpu().tolist()
            # boxes = scaler.fit_transform(boxes)
            if results[0].boxes.id is not None :
                track_ids = results[0].boxes.id.int().cpu().tolist()
            else :
                track_ids = [0]*len(results[0].boxes.xyxy)

            confidences = results[0].boxes.conf.tolist()
            classes = results[0].boxes.cls.tolist()
            for box, track_id, conf, classID in zip(boxes, track_ids, confidences, classes):
                # extract the class and the orientation from the class_id
                if(int(classID)==1):
                    class_id = 1
                    orientation = 1
                elif(int(classID)==2):
                    class_id = 1
                    orientation = 2

                elif(int(classID)==3):
                    class_id = 1
                    orientation = 3

                elif(int(classID)==4):
                    class_id = 2
                    orientation = 1

                elif(int(classID)==5):
                    class_id = 2

                elif(int(classID)==6):
                    class_id = 2
                    orientation = 3

                elif(int(classID)==7):
                    class_id = 3
                    orientation = 1

                elif(int(classID)==8):
                    class_id = 3
                    orientation = 2

                elif(int(classID)==9):
                    class_id = 3
                    orientation = 3

                elif(int(classID)==10):
                    class_id = 4
                    orientation = 1

                elif(int(classID)==11):
                    class_id = 4
                    orientation = 2

                elif(int(classID)==12):
                    class_id = 4
                    orientation = 3

                elif(int(classID)==13):
                    class_id = 5
                    orientation = 1

                elif(int(classID)==14):
                    class_id = 5
                    orientation = 2

                else: #int(classID)==15
                    class_id = 5
                    orientation = 3

                x, y, w, h = box   
                conf = float (int (conf * 1000) / 1000)
                track = track_history[track_id]
                distance_moved = 0
                if(len(track)!=0):
                    prev_position = track[-1]
                    distance_moved = math.sqrt( math.pow(float(x)-prev_position[0] , 2) + math.pow(float(y)-prev_position[1] , 2) )
                track.append((float(x), float(y)))  # x, y center point  
                # add the object info to the list , keep consistency in sequences by TRACK_id
                if(track_id<30):
                    current_sequence[track_id] = [ x, y, w, h, distance_moved, int(class_id) , int(orientation), conf]

            # replace the None values with list of zeros for padding 
            for i in range(len(current_sequence)):
                if(current_sequence[i] is None):
                    current_sequence[i] = [0]*no_of_features  

            sequences.append(current_sequence)  

        # no detected objects in the frame -> put zeros
        if not flag:
            sequences.append([[0]*no_of_features]*max_objects)
        
        input_LSTM = list(sequences)
        # Pad the video to have a maximum of 50 frames
        while len(input_LSTM) < max_frames:
            input_LSTM.append([[0]*no_of_features]*max_objects)

        input_LSTM = np.array(input_LSTM)
        input_LSTM = input_LSTM.reshape(1,-1, max_objects*no_of_features) # flatten each frame to 1D

        # run the lstm model
        predictions = LSTM_model.predict(input_LSTM)
        
        predicted_class = np.argmax(predictions, axis=1)  # Get the index of the most likely class
        confidence_score = predictions[0, predicted_class[0]]  # Access the corresponding probability
                    
        print("Predicted class:", predicted_class[0])
        print("Confidence score:", confidence_score)

        # # Visualize the object detection results on the frame
        # frame = results[0].plot()

        # Draw the rectangle (i.e., the box)
        if( int(confidence_score * 100) / 100  > 0.5  ):
            cv2.rectangle(frame, (50, 50) , (550, 100) , (0, 0, 255), 1)
            cv2.putText(frame, "accident score " + str(float (int (confidence_score * 1000) / 1000))+" WARNING" , (50, 80) ,cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255) , 2)
        else:
            cv2.rectangle(frame, (50, 50) , (400, 100) , (255, 0, 0), 1)
            cv2.putText(frame, "accident score " + str(float (int (confidence_score * 1000) / 1000)) , (50, 80) ,cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) , 2)

        # Display the annotated frame
        cv2.imshow("Accident Prediction Inference",frame)

        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    else:
        # Break the loop if the end of the video is reached
        break

# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()




0: 1088x1920 2 car_backs, 745.5ms
Speed: 45.9ms preprocess, 745.5ms inference, 6.3ms postprocess per image at shape (1, 3, 1088, 1920)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 394ms/step
Predicted class: 0
Confidence score: 0.5052167

0: 1088x1920 2 car_backs, 877.3ms
Speed: 69.9ms preprocess, 877.3ms inference, 4.3ms postprocess per image at shape (1, 3, 1088, 1920)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step
Predicted class: 0
Confidence score: 0.50519335

0: 1088x1920 3 car_backs, 788.2ms
Speed: 57.3ms preprocess, 788.2ms inference, 4.1ms postprocess per image at shape (1, 3, 1088, 1920)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 58ms/step
Predicted class: 0
Confidence score: 0.50975573

0: 1088x1920 2 car_backs, 799.6ms
Speed: 56.1ms preprocess, 799.6ms inference, 6.4ms postprocess per image at shape (1, 3, 1088, 1920)
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 44ms/step
Predicted class: 0
Confi

# Testing the LSTM model on a camera stream

In [None]:
#NOTES
# (samples, timesteps, features)
# samples = size of batch , the batch is the group of examples taken together for a training iteration , so if dataset = 1000 video and the batch = 64 then the iterations will be 1000/64
# timesteps = the size of a single sequence (number of frames per sequence )
# features = size of the frame list expected to be  1D list of values

import cv2
import math
import numpy as np
from ultralytics import YOLO
from tensorflow import keras
from collections import defaultdict, deque
import os

HOME = os.getcwd()

# Load the models
model_path = os.path.join(HOME,'runs\\detect\\train\\weights\\best.pt')
YOLO_model = YOLO(model_path) 
LSTM_model = keras.models.load_model(f"{HOME}\\accident_prediction_LSTM_model.keras")

max_frames=50
max_objects=30
no_of_features=8

track_history = defaultdict(lambda: [])

# Initialize a deque to hold the latest 50 processed frames
sequences = deque(maxlen=50)

# Loop through the video frames
while cap.isOpened():
    # Read a frame from the video
    success, frame = cap.read()
    
    if success:
        # Run YOLOv8 inference on the frame
        results = YOLO_model.track(frame,persist=True)
        current_sequence =[None]*max_objects
        flag = False
        if results is not None and results[0].boxes is not None:
            flag = True
            boxes = results[0].boxes.xywh.cpu().tolist()
            # boxes = scaler.fit_transform(boxes)
            if results[0].boxes.id is not None :
                track_ids = results[0].boxes.id.int().cpu().tolist()
            else :
                track_ids = [0]*len(results[0].boxes.xyxy)

            confidences = results[0].boxes.conf.tolist()
            classes = results[0].boxes.cls.tolist()
            for box, track_id, conf, classID in zip(boxes, track_ids, confidences, classes):
                # extract the class and the orientation from the class_id
                if(int(classID)==1):
                    class_id = 1
                    orientation = 1
                elif(int(classID)==2):
                    class_id = 1
                    orientation = 2

                elif(int(classID)==3):
                    class_id = 1
                    orientation = 3

                elif(int(classID)==4):
                    class_id = 2
                    orientation = 1

                elif(int(classID)==5):
                    class_id = 2

                elif(int(classID)==6):
                    class_id = 2
                    orientation = 3

                elif(int(classID)==7):
                    class_id = 3
                    orientation = 1

                elif(int(classID)==8):
                    class_id = 3
                    orientation = 2

                elif(int(classID)==9):
                    class_id = 3
                    orientation = 3

                elif(int(classID)==10):
                    class_id = 4
                    orientation = 1

                elif(int(classID)==11):
                    class_id = 4
                    orientation = 2

                elif(int(classID)==12):
                    class_id = 4
                    orientation = 3

                elif(int(classID)==13):
                    class_id = 5
                    orientation = 1

                elif(int(classID)==14):
                    class_id = 5
                    orientation = 2

                else: #int(classID)==15
                    class_id = 5
                    orientation = 3

                x, y, w, h = box   
                conf = float (int (conf * 1000) / 1000)
                track = track_history[track_id]
                distance_moved = 0
                if(len(track)!=0):
                    prev_position = track[-1]
                    distance_moved = math.sqrt( math.pow(float(x)-prev_position[0] , 2) + math.pow(float(y)-prev_position[1] , 2) )
                track.append((float(x), float(y)))  # x, y center point  
                # add the object info to the list , keep consistency in sequences by TRACK_id
                if(track_id<30):
                    current_sequence[track_id] = [ x, y, w, h, distance_moved, int(class_id) , int(orientation), conf]

            # replace the None values with list of zeros for padding 
            for i in range(len(current_sequence)):
                if(current_sequence[i] is None):
                    current_sequence[i] = [0]*no_of_features  

            sequences.append(current_sequence)  

        # no detected objects in the frame -> put zeros
        if not flag:
            sequences.append([[0]*no_of_features]*max_objects)
        
        input_LSTM = list(sequences)
        # Pad the video to have a maximum of 50 frames
        while len(input_LSTM) < max_frames:
            input_LSTM.append([[0]*no_of_features]*max_objects)

        input_LSTM = np.array(input_LSTM)
        input_LSTM = input_LSTM.reshape(1,-1, max_objects*no_of_features) # flatten each frame to 1D

        # run the lstm model
        predictions = LSTM_model.predict(input_LSTM)
        
        predicted_class = np.argmax(predictions, axis=1)  # Get the index of the most likely class
        confidence_score = predictions[0, predicted_class[0]]  # Access the corresponding probability
                    
        print("Predicted class:", predicted_class[0])
        print("Confidence score:", confidence_score)

        # # Visualize the object detection results on the frame
        # frame = results[0].plot()

        # Draw the rectangle (i.e., the box)
        if( int(confidence_score * 100) / 100  > 0.5  ):
            cv2.rectangle(frame, (50, 50) , (550, 100) , (0, 0, 255), 1)
            cv2.putText(frame, "accident score " + str(float (int (confidence_score * 1000) / 1000))+" WARNING" , (50, 80) ,cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255) , 2)
        else:
            cv2.rectangle(frame, (50, 50) , (400, 100) , (255, 0, 0), 1)
            cv2.putText(frame, "accident score " + str(float (int (confidence_score * 1000) / 1000)) , (50, 80) ,cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0) , 2)

        # Display the annotated frame
        cv2.imshow("Accident Prediction Inference",frame)

        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    else:
        # Break the loop if the end of the video is reached
        break

# Release the video capture object and close the display window
cap.release()
cv2.destroyAllWindows()


