In [30]:
import cv2
from IPython.display import clear_output
from matplotlib import pyplot as plt
from ultralytics import YOLO
import numpy as np
import heapq
import math
import time
%matplotlib inline


In [31]:
def resize_wh(orig_w, orig_h):
    # h, w, d = image.shape
    imgsz = 1280 # image size of yolo default is 640
    new_h = imgsz/orig_w * orig_h # new height with same ratio
    w = imgsz
    remainder = (new_h % 32) # YOLO need w,h that can divide by 32
    if remainder > 32/2:
        h = int(new_h - remainder + 32)
    else:
        h = int(new_h - remainder)
    return (w,h)

class DStarLite:    
    def __init__(self, start, end, maze, zoom):
        self.start = (start[0] // zoom, start[1]//zoom)
        self.end = (end[0] // zoom, end[1]//zoom)
        self.zoom = zoom
        self.g, self.rhs, self.U = {}, {}, {}
        self.km = 0
        self.count = 0
        self.graph = maze
        self.x_range, self.y_range = maze.shape

        self.obs = set() # obs map
        for i in range(self.x_range):
            for j in range(self.y_range):
                if maze[i][j] == 0:
                    self.obs.add((i, j))

        for i in range(self.x_range):
            for j in range(self.y_range):
                self.rhs[(i, j)] = float("inf")
                self.g[(i, j)] = float("inf")

        self.rhs[self.end] = 0.0
        self.U[self.end] = self.CalculateKey(self.end) 
         # 一開始就沒有路
        if self.U[self.end] == float("inf"):
            self.extract_path()
        self.visited = set() 
    
    def CalculateKey(self, current):
       # print("calculateKey", current)
        return [min(self.g[current], self.rhs[current]) + self.H(current) + self.km, min(self.g[current], self.rhs[current])]
   
    def H(self, current):
        # manhattan
        # return abs(self.current[0] - self.start[0]) + abs(self.current[1] - self.start[1])
        # euclidean
        return  math.hypot(current[0] - self.start[0], current[1] - self.start[1])   
    
    def ComputePath(self):
        self.count += 1
        while True:
            s, value = self.FindTopKey() 
            # print("Compute", s, value)
            if value >= self.CalculateKey(self.start):
                break
            
            self.U.pop(s) 
            self.visited.add(s) 

            if value < self.CalculateKey(s): # 更新新的key
                self.U[s] = self.CalculateKey(s)
                #print("newKey", s)
            elif self.g[s] > self.rhs[s]: # 找到最短路徑
                #print("newDis", s)
                self.g[s] = self.rhs[s]
                for x in self.get_neighbor(s):
                    self.UpdateVertex(x)
            else: 
                #print("not on path")
                self.g[s] = float("inf")
                self.UpdateVertex(s)
                for x in self.get_neighbor(s):
                    self.UpdateVertex(x)
            # print("queue", self.U.keys())
            
    # return the min key's value
    def FindTopKey(self):
        s = min(self.U, key = self.U.get) # key, value
        return s, self.U[s]

    def get_neighbor(self, current):
        nei_list = set()
        for step in [(-1, 0), (-1, 1), (0, 1), (1, 1), (1, 0), (1, -1), (0, -1), (-1, -1)]:
            neighbor = (current[0] + step[0], current[1] + step[1])
           
            # Make sure within range
            if neighbor[0] < 0 or neighbor[1] < 0 or neighbor[0] >= self.x_range or neighbor[1] >= self.y_range: 
                continue
            # Make sure walkable terrain
            if neighbor in self.obs:
                continue

            nei_list.add(neighbor)
        # print("neighbor",nei_list)
        return nei_list
    
    def UpdateVertex(self, current):
        if current != self.end: 
            #print(current, self.get_neighbor(current))
            self.rhs[current] = float("inf")
            for x in self.get_neighbor(current):
                self.rhs[current] = min(self.rhs[current], self.g[x] + self.cost(current, x))
        
        if current in self.U: 
            self.U.pop(current)

        if self.g[current] != self.rhs[current]:
            self.U[current] = self.CalculateKey(current)

    def cost(self, current, neighbor):
        if self.is_collision(current, neighbor):
            return float("inf")

        return math.hypot(neighbor[0] - current[0], neighbor[1] - current[1])
    
    def is_collision(self, current, neighbor):
        if current in self.obs or neighbor in self.obs:
            return True

        if current[0] != neighbor[0] and current[1] != neighbor[1]:
            if neighbor[0] - current[0] == current[1] - neighbor[1]:
                s1 = (min(current[0], neighbor[0]), min(current[1], neighbor[1]))
                s2 = (max(current[0], neighbor[0]), max(current[1], neighbor[1]))
            else:
                s1 = (min(current[0], neighbor[0]), max(current[1], neighbor[1]))
                s2 = (max(current[0], neighbor[0]), min(current[1], neighbor[1]))

            if s1 in self.obs or s2 in self.obs:
                return True

        return False
    
    def extract_path(self):
        s = self.start
        path = [s]
        find = False
        for k in range(400):
            g_list = {}
            for x in self.get_neighbor(s):
                if x not in path:
                    g_list[x] = self.g[x]
            # 走到死路
            if not g_list:
                break 
            s = min(g_list, key=g_list.get)
            # path.append(tuple(coord * self.zoom for coord in s))
            path.append(s)
            if s in self.obs:
                break
            if s == self.end:
                find = True
                break

        
        # print(find, len(path))
        return find, list(path)
    
   
    def changemap(self, maze, start, box):
        self.start = start
        for x in range(self.x_range):
            for y in range(self.y_range):
                if self.graph[x][y] != maze[x][y]:
                    self.graph[x][y] = maze[x][y]
                    if self.graph[x][y] == 0: # 是障礙物
                        self.obs.add((x, y))
                        self.g[(x, y)] = float("inf")
                        self.rhs[(x, y)] = float("inf")
                    elif (x,y) in self.obs: # 現在不是障礙物
                            self.obs.remove((x, y))
                            self.UpdateVertex((x, y))
                            
                    for s in self.get_neighbor((x, y)):
                        self.UpdateVertex(s)
                        
        for coor in box: # 危險半徑
                x, y, w, h = int(coor[0]), int(coor[1]), int(coor[2]), int(coor[3])
                top_left_x = (x - w // 2) // self.zoom
                top_left_y = (y - h // 2) // self.zoom
                for i in range(top_left_x-3, top_left_x+4):
                    for j in range(top_left_y-3, top_left_y+4):
                        key = (int(i), int(j))
                        if key in self.g:
                            self.g[key] *= 1.1
        

        self.visited = set()
        self.ComputePath()
        

In [32]:
vdo_path = 'Density_low.mov'
vdo = cv2.VideoCapture(vdo_path)

model_path = r'../runs/detect/train36-v8l/weights/best.pt'
model = YOLO(model_path)

# 取得長寬
width = int(vdo.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vdo.get(cv2.CAP_PROP_FRAME_HEIGHT))
width, height = resize_wh(width, height) # 1728 1280 / 69 51
print(width, height)
fps = vdo.get(cv2.CAP_PROP_FPS)
out = cv2.VideoWriter(filename='5. Density_low34.mp4', fourcc=cv2.VideoWriter_fourcc(*'mp4v'), fps=fps, frameSize=(width, height))

start_node = (0,0)
goal_node = (height, width) 
zoom = 10
rds = 6
frame_count = 0
timels = []
maze = np.ones((height//zoom + 1, width//zoom + 1), dtype=np.uint16)
Path = DStarLite(start_node, goal_node,  maze, zoom)
Path.ComputePath()
fond, path = Path.extract_path()
fond = True
total = []
while vdo.isOpened():
    ret, frame = vdo.read()
    if ret:
        # detection part
        t0_det = time.perf_counter()
        frame = cv2.resize(frame, (width, height))
        results = model.predict(frame, imgsz=(height, width), verbose=False)
        for result in results:  
            box = result.boxes.xywh
        t1_det = time.perf_counter()

        path_image = result.plot(labels=False)
        for coor in box:
            x, y= int(coor[0]), int(coor[1])
            cv2.circle(path_image, (x, y), zoom*rds, (255, 0, 0), 1)
        
        # update new path every 5 frame passed
        if frame_count % 5 == 0:

            # map generation part
            t0_map = time.perf_counter()
            maze = np.ones((height//zoom + 1, width//zoom + 1), dtype=np.uint16)
            for coor in box: # obs's range
                x, y, w, h = int(coor[0]), int(coor[1]), int(coor[2]), int(coor[3])
                top_left_x = x - w // 2
                top_left_y = y - h // 2
                maze[top_left_y//zoom-1:(top_left_y+h)//zoom+1, top_left_x//zoom-1:(top_left_x+w)//zoom+1] = 0
            t1_map = time.perf_counter()

            # path generation part
            t0_dstarLite = time.perf_counter()
            if path[0]== goal_node: # 走到終點了
                pass
            elif len(path) < 2: # 還沒到終點 但是沒有下一個點了
                Path.changemap(maze, path[0], box)
                fond, current = Path.extract_path()
                if fond:
                    path = current
            else: # 後面還有路
                if path[1] in total:
                    Path =  DStarLite(path[1], goal_node,  maze, zoom)
                else:
                    Path.changemap(maze, path[1], box)
                fond, current = Path.extract_path()
                if fond:
                    path = current

            
                # with open("path.txt", "a") as file:
                #      file.write(f"{path[0][0] * zoom} {path[0][1] * zoom}\n")
                total.append(path[0])
                path.pop(0)
                
            t1_dstarLite = time.perf_counter()
            
            print(frame_count) 

        timels.append([frame_count, t1_det-t0_det, t1_map-t0_map, t1_dstarLite-t0_dstarLite])


        if path[0] == goal_node: # 走到終點
            for point in path:
                cv2.circle(path_image, (point[1] * zoom, point[0]*zoom), zoom // 2, (255, 16, 240), -1) 
        elif not path or len(path) < 2: # 沒有路了(不管有沒有到終點)
            pass
        else: # 還有下個點
            # print(len(path), path[0])
            color =  (255, 16, 240)
            if not fond:
                color = (0, 255, 0)
            for point in path:
                cv2.circle(path_image, (point[1] * zoom, point[0]*zoom), zoom // 2, color, -1) 
        for point in total:
                cv2.circle(path_image, (point[1] * zoom, point[0]*zoom), zoom // 2, (0, 0,255), -1) 
        out.write(path_image)
        
        frame_count += 1
        clear_output(wait=True)

    else:
        break
out.release()

595


In [33]:
# print(timels)
import pandas as pd
df = pd.DataFrame(timels)
df.rename(columns = {0:'frame', 1:'detection_time', 2:'map_generation_time', 3:'d_star_Lite_time'}, inplace = True)
df.describe()

Unnamed: 0,frame,detection_time,map_generation_time,d_star_Lite_time
count,600.0,600.0,600.0,600.0
mean,299.5,0.054218,0.001673,0.079328
std,173.349358,0.020518,0.000119,0.098854
min,0.0,0.052692,0.001431,0.023398
25%,149.75,0.053117,0.001582,0.033492
50%,299.5,0.05328,0.001675,0.043278
75%,449.25,0.053424,0.001765,0.075808
max,599.0,0.55567,0.001956,0.740161


In [34]:
df.sum()

frame                  179700.000000
detection_time             32.530507
map_generation_time         1.004096
d_star_Lite_time           47.597050
dtype: float64