In [1]:
###### Comment Legend ######

    # Describing a function
    ### Describing an issue
    ###### Infomational ######

###### To Do List ######

    # Intergrat YOLOv8
    # Update detect sig with IOU
    # Create a function to save bad images for later use

In [1]:
import cv2
import os
import math
import mmcv
import pandas as pd
from time import sleep
import numpy as np
from matplotlib import pyplot as plt
from ultralytics import YOLO
from ultralytics.yolo.utils.plotting import Annotator

from mmpose.apis import inference_top_down_pose_model, init_pose_model, vis_pose_result


%run utils/segmentation/detect.ipynb
%run utils/superimposition/functions.ipynb
%run utils/extraction/objClass.ipynb
%run utils/skeleton/detect_skeletons.ipynb



In [2]:
class Configuration:
    def __init__(self, minimalImg, mainImg, segModel, BBModel, videoFormat, FPS, fileName, skip):
        self.minimalImg = minimalImg
        self.mainImg = mainImg
        
        segPath = "utils/segmentation/"
        
        self.segWeights = segPath+segModel
        self.BBWeights = segPath+BBModel
        
        self.fourcc = cv2.VideoWriter_fourcc(*videoFormat)
        
        self.FPS = FPS
        
        self.mainPath = f"videos/{fileName}/"
        self.processedPath = self.mainPath+"processed/"
        
        self.skip = skip
        
        self.pose_config = "utils/skeleton/demo/hrnet_w32_coco_256x192.py"
        self.pose_checkpoint = "https://download.openmmlab.com/mmpose/top_down/hrnet/hrnet_w32_coco_256x192-c78dce93_20200708.pth"
        
fileList = os.listdir("videos")
fileName = fileList[2]

print(fileList, "\nSelected File:", fileName)

config = Configuration(minimalImg="Thermal", mainImg="RGB", segModel="yolov7-seg.pt", BBModel="yolov7-tiny.pt", videoFormat="mp4v", FPS=10, fileName=fileName, skip=190 )

['2022-12-01_18-03-33', '2022-12-02_13-05-04', '2023-01-24_15-28-00', '2023-01-28_15-50-17', 'combined_outisde_13.gif', 'newStereo', 'outside_test', 'SII'] 
Selected File: 2023-01-24_15-28-00


In [3]:
###### Video Processor ######
%run utils/superimposition/functions.ipynb

# Load the models
seg_model = YOLO("yolov8n-seg.pt")

# BB_model = loadModel(config.BBWeights)
pose_model = init_pose_model(config.pose_config, config.pose_checkpoint, device)

# Grab videos from final location selected
rgbVideo = cv2.VideoCapture(f"{config.mainPath}rgbout.mp4")
thermalVideo = cv2.VideoCapture(f"{config.mainPath}thermalout.mp4")
depthVideo = cv2.VideoCapture(f"{config.mainPath}stereoout.mp4")
leftVideo = cv2.VideoCapture(f"{config.mainPath}leftout.mp4")

# Frame counter
calibrationCount = 0
frameCount = 0

rgbVideo.set(cv2.CAP_PROP_POS_FRAMES, config.skip)
thermalVideo.set(cv2.CAP_PROP_POS_FRAMES, config.skip)
depthVideo.set(cv2.CAP_PROP_POS_FRAMES, config.skip)
leftVideo.set(cv2.CAP_PROP_POS_FRAMES, config.skip)

# ObjList = Object_List()

TenScales = [ [1, 1, 1] ]
TenOffsets = [ [[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]] ]

images = {"RGB": None, "Thermal": None, "Left": None, "Depth": None}

# Actual Loop which goes through all frames and moves as appropriate
while(True): 

    # Read from all videos a single frame
    ret, images["RGB"] = rgbVideo.read()
    ret1, images["Thermal"] = thermalVideo.read()
    ret2, images["Depth"] = depthVideo.read()
    ret3, images["Left"] = leftVideo.read()
    if not (ret & ret1 & ret2 and ret3):
        break
        
    
    # Every 100th frame preform calibration 
    if calibrationCount % 2 == 0:

        # print("\nCalibrating Image")
        scales, offset = calibration(images.copy(), seg_model, config.minimalImg)
        
        if scales != None and offset != None:
            if TenScales[0] == [1,1,1]:
                del TenScales[0]
                del TenOffsets[0]
            
            TenScales.append(scales)
            TenOffsets.append(offset)

        if len(TenScales) > 30:
            radDel = int(np.random.rand()*30)
            
            del TenScales[radDel]
            del TenOffsets[radDel]
        
        scales, offset = filter_offsets_scales(TenScales, TenOffsets)
        scales = {"RGB": scales[0], "Thermal": scales[1], "Depth": scales[2]}
        offset = {"RGB": offset[0], "Thermal": offset[1], "Depth": offset[2]}
        
        # if scales != None and offset != None:
        #     print(scales, offset)


    # Convert stereoframe into colors for visualization 
    images["Depth"] = cv2.cvtColor(images["Depth"], cv2.COLOR_BGR2GRAY)
    
    # Modify images based on calibration settings
    images["Unrelated"] = superimpose_images(images.copy(), addColor=True)
    images = modify_images(images.copy(), scales, offset, images[config.minimalImg].shape[0])
    
    images["Related"] = superimpose_images(images.copy(), addColor=True)

    # Detections from main image
    # yoloResults = seg_model.predict(source=images[config.mainImg], device=0)
    
    # if yoloResults:
    #     box_array = np.array(yoloResults[0].boxes.xyxy.cpu())
    #     person_results = [{'bbox': box, 'track_id': i} for i, box in enumerate(box_array)]
    #     poses = inference_top_down_pose_model(pose_model, images[config.mainImg], person_results, format='xyxy')[0]
    
    # ObjList.appendObjects(images, mainDetections, masks, poses, frameCount)
    
    StereoframeMapped = cv2.normalize(images["Depth"], None, 255, 0, cv2.NORM_INF, cv2.CV_8UC1)
    StereoframeMapped = cv2.equalizeHist(StereoframeMapped)
    StereoframeMapped = cv2.applyColorMap(StereoframeMapped, cv2.COLORMAP_JET)
    images["Color_Depth"] = StereoframeMapped
    
    # Produce superimposed Image based on previous relationships
    # images["Superimposed"] = apply_mask(images.copy(), segmentation_model, scales, yoloResults[0].masks.data.cpu(), config.mainImg)

    # Create video to process 
    if calibrationCount==0:

        thermalout = cv2.VideoWriter(f"{config.processedPath}Thermal_sii.mp4", config.fourcc, config.FPS, (images[config.minimalImg].shape[:2][::-1]))
        stereoout = cv2.VideoWriter(f"{config.processedPath}Depth_sii.mp4", config.fourcc, config.FPS, (images[config.minimalImg].shape[:2][::-1]), 0)
        rgbout = cv2.VideoWriter(f"{config.processedPath}RGB_sii.mp4", config.fourcc, config.FPS, (images[config.minimalImg].shape[:2][::-1]))
        # simout = cv2.VideoWriter(f"{config.processedPath}SII.mp4", config.fourcc, config.FPS, (images[config.minimalImg].shape[:2][::-1]))
        related_SIIout = cv2.VideoWriter(f"{config.processedPath}Related_SII.mp4", config.fourcc, config.FPS, (images[config.minimalImg].shape[:2][::-1]))
        unrelated_SIIout = cv2.VideoWriter(f"{config.processedPath}Unrelated_SII.mp4", config.fourcc, config.FPS, (images[config.minimalImg].shape[:2][::-1]))

    # images["Superimposed"] = cv2.resize(images["Superimposed"], (512, 512))
    images["Related"] = cv2.resize(images["Related"], images[config.minimalImg].shape[:2][::-1])
    images["Unrelated"] = cv2.resize(images["Unrelated"], images[config.minimalImg].shape[:2][::-1])
    
    # if yoloResults:
    #     images["Superimposed"] = vis_pose_result(pose_model, images["Superimposed"], poses, bbox_color=(0,0,0))
    
    for label in images:
        cv2.imshow(label, images[label])
    
    # Write frames to all the videos
    thermalout.write(images["Thermal"])
    stereoout.write(images["Depth"])
    rgbout.write(images["RGB"])
    # simout.write(images["Superimposed"])
    related_SIIout.write(images["Related"])
    unrelated_SIIout.write(images["Unrelated"])

    # Increment frame count
    calibrationCount+=1
    frameCount+=1
    
    # isd = input()
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
        
print("\nProcessing Ended. Saving videos...")
        
    
# Save all videos
# simout.release()
rgbout.release()
thermalout.release()
stereoout.release()
related_SIIout.release()
unrelated_SIIout.release()

print("Saving complete!")

# csv = ObjList.createDataframe()
# csv.to_csv(f"videos/{fileName}/processed/object_list.csv", index=False)

### Create a function that grqabs all frames that broke the function

cv2.destroyAllWindows()

load checkpoint from http path: https://download.openmmlab.com/mmpose/top_down/hrnet/hrnet_w32_coco_256x192-c78dce93_20200708.pth
[          0      0.3568     0.61475    0.093054      0.3058]
[          0     0.41813     0.58382    0.067556     0.21789]
[          0     0.35688     0.61494    0.093618     0.30562]
[          0     0.41804     0.58365    0.067095     0.21667]
[          0     0.35565      0.6154    0.092874     0.30591]
[          0      0.4183      0.5838    0.066662      0.2167]
[          0     0.35616     0.61506     0.09227      0.3066]
[          0     0.41907     0.58377    0.067759     0.21583]
[          0     0.35617     0.61472    0.092447     0.30641]
[          0      0.4192     0.58354    0.068986     0.21639]
[          0     0.35619     0.61485    0.092516     0.30618]
[          0     0.41921     0.58354    0.068887     0.21653]
[          0      0.3561     0.61487    0.092605     0.30634]
[          0     0.41914     0.58353    0.069095     0.21638]
[ 

In [12]:
# [self.compare_class, self.compare_bb, self.compare_rgb, self.compare_temperature, self.compare_spatial, self.compare_pose]
rgbout.release()
thermalout.release()
stereoout.release()
related_SIIout.release()
unrelated_SIIout.release()

In [3]:
%run utils/superimposition/functions.ipynb

seg_model = YOLO("yolov8n-seg.pt")
pose_model = init_pose_model(config.pose_config, config.pose_checkpoint, device)

ObjList = Object_List()
images = {"RGB": None, "Thermal": None, "Depth": None}

PATH = f"{config.processedPath}{config.mainImg}_sii.mp4"

rgbVideo = cv2.VideoCapture(f"{config.processedPath}RGB_sii.mp4")
thermalVideo = cv2.VideoCapture(f"{config.processedPath}Thermal_sii.mp4")
depthVideo = cv2.VideoCapture(f"{config.processedPath}Depth_sii.mp4")

trackout = cv2.VideoWriter(f"{config.processedPath}track.mp4", config.fourcc, config.FPS, (512,512))

for result in seg_model.track(source=PATH, stream=True, verbose=False, classes=[0,2], device=0):
    
    ret, images["RGB"] = rgbVideo.read()
    ret1, images["Thermal"] = thermalVideo.read()
    ret2, images["Depth"] = depthVideo.read()
    if not (ret & ret1 & ret2):
        break
    
    
    StereoframeMapped = cv2.normalize(images["Depth"], None, 255, 0, cv2.NORM_INF, cv2.CV_32F)
    StereoframeMapped = cv2.convertScaleAbs(StereoframeMapped)
    StereoframeMapped = cv2.cvtColor(StereoframeMapped, cv2.COLOR_BGR2GRAY)
    StereoframeMapped = cv2.equalizeHist(StereoframeMapped)
    StereoframeMapped = cv2.applyColorMap(StereoframeMapped, cv2.COLORMAP_JET)
    images["Color_Depth"] = StereoframeMapped
    
    # frame = result.orig_img
    
    if result.boxes:
        images["Superimposed"] = apply_mask(images.copy(), result.masks.data.cpu(), config.mainImg)
        
        box_array = np.array(result.boxes.xyxy.cpu())
        person_results = [{'bbox': box, 'track_id': i} for i, box in enumerate(box_array)]
        poses = inference_top_down_pose_model(pose_model, images[config.mainImg], person_results, format='xyxy')[0]
        
        
        images["Superimposed"] = vis_pose_result(pose_model, images["Superimposed"], poses, bbox_color=(0,0,0))
        
    # ObjList.appendObjects(images, mainDetections, masks, poses, frameCount)
    
        
    annotator = Annotator(images["Superimposed"])
    
    for box in result.boxes:
        if box.id:
            annotator.box_label(box.xyxy[0], label=f"{box.id.item()} | {int(box.conf*100)}%", color=(0,225,100))
    
    frame = annotator.result()
    
    for label in images:
        cv2.imshow(label, images[label])
    
    cv2.imshow("YOLO", frame)
    
    trackout.write(images["Superimposed"])
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

trackout.release()

load checkpoint from http path: https://download.openmmlab.com/mmpose/top_down/hrnet/hrnet_w32_coco_256x192-c78dce93_20200708.pth


In [5]:
trackout.release()

In [None]:
csv = ObjList.createDataframe()
csv.to_csv(f"videos/{fileName}/processed/object_list.csv", index=False)

In [None]:
weights = {"Color": 0.15, "Temperature": 0.15, "Pose": 0.40, "Depth": 0.10, "Bounding Box": 0.20}
x = [1, 1, 1, 1, 1]
x1 = list(weights.values())
print(x, x1)
np.dot(x, x1)

In [None]:
img = images["RGB"].copy()


for obj in ObjList.ObjectList:
    print(len(obj.bb_history))
    for bb in obj.bb_history:
        color = [int(np.random.rand(1)*255), int(np.random.rand(1)*255), int(np.random.rand(1)*255)] 
        img = cv2.circle(img, bb[:2], 5, color , 5)
        
        cv2.imshow("HAHAHAAHAH", img)
        cv2.waitKey(1)
        sleep(0.5)
    

In [None]:
cv2.destroyAllWindows()

In [None]:
x = 1
print(-x)

In [None]:
c1 = np.array([0,0,0])
c2 = np.array([2,5,7])
sum( abs(c1 - c2) ) / 765

In [None]:
x = np.array([1])

type(x) == type(None)