### Human Shade Detection

created by ruyiyang in 2024/10

cleaned in 2025/1

In [1]:
import cv2
import os
import csv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
from PIL import Image, ImageDraw
from inference_sdk import InferenceHTTPClient

from collections import defaultdict

from ultralytics import YOLO, solutions
from ultralytics.utils.plotting import Annotator, colors

from roboflow import Roboflow

from IPython.display import Image, display

In [2]:
video_path = r'E:\Workfolder\Computer-vision\shade-human\data\raw-video\Kabukicho\Kabukicho_240825_truncated.mp4'
file_name = os.path.splitext(os.path.basename(video_path))[0]

E:\Workfolder\Computer-vision\shade-human\data\raw-video\Kabukicho\Kabukicho_240825_truncated.mp4
Kabukicho_240825_truncated


In [None]:
# video preprocessing

# option 1 Adjust the frame rate to 5 FPS, and the total duration remains unchanged
# !ffmpeg -i "data/raw video/Kabukicho_240825_truncated.mp4" -r 5 "data/raw video/Kabukicho_240825_truncated_5fps.mp4"


# # option 2 function: subsample/time laspe video
# def subsample_video(input_path, output_path, time_lapse_interval):
#     cap = cv2.VideoCapture(input_path)
#     if not cap.isOpened():
#         print("Error: Couldn't open video file.")
#         return

#     frame_count = 0
#     fps = cap.get(cv2.CAP_PROP_FPS)
#     width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
#     height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
#     codec = cv2.VideoWriter_fourcc(*'XVID')
#     out = cv2.VideoWriter(output_path, codec, fps, (width, height))

#     while cap.isOpened():
#         ret, frame = cap.read()
#         if not ret:
#             break

#         # Collect one frame out of every xx frames
#         if frame_count % time_lapse_interval == 0:
#             out.write(frame)

#         frame_count += 1

#     cap.release()
#     out.release()
#     cv2.destroyAllWindows()

# if __name__ == "__main__":
#     time_lapse_interval = 5 # change here
#     input_file = video_path 
#     output_file = f"data/raw video/time_lapse/{file_name}_timelapsed{time_lapse_interval}.mp4"  # Replace with desired output video file path
#     subsample_video(input_file, output_file, time_lapse_interval)
#     print(f"saved to {output_file}")

In [None]:
video_path = r'E:\Workfolder\Computer-vision\shade-human\data\raw-video\Kabukicho\Kabukicho_240825_truncated.mp4'
file_name = os.path.splitext(os.path.basename(video_path))[0]

print(video_path)
print(file_name)

In [3]:
# number of frames in the video
cap = cv2.VideoCapture(video_path)

# Get the file name
file_name = os.path.splitext(os.path.basename(video_path))[0]

# Get the total number of frames of the video
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
original_fps = cap.get(cv2.CAP_PROP_FPS)

print(f"The total number of frames in the video：{total_frames}\nVideo frame rate：{original_fps}")
cap.release()

The total number of frames in the video：631
Video frame rate：59.999998098256796


In [None]:
'''
function: detect people whether in the shadow from the video
change the parameters
'''

output_dir = "output"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    
output_video_path = "output/human-and-shade-tracking_{}.avi".format(file_name)
csv_file = 'output/detections_{}.csv'.format(file_name)

# 初始化 Roboflow 客户端用于阴影检测
CLIENT = InferenceHTTPClient(api_url="https://detect.roboflow.com", api_key="GMoYRhBgraVdGZ6B3fFz")

# 加载 YOLO 模型用于人物检测
model_person = YOLO('yolo11l.pt')  

# 打开视频文件
cap = cv2.VideoCapture(video_path)
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))
out = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*"MJPG"), fps, (w, h))

# 初始化 CSV 文件
csv_headers = ['frame', 'person_id', 'bottom_mid_x', 'bottom_mid_y', 'in_shadow']
with open(csv_file, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(csv_headers)

frame_count = 0

# 统计信息变量
total_people = 0
people_in_shadow = 0
people_outside_shadow = 0

# 创建窗口并设置窗口大小
# cv2.namedWindow("instance-segmentation-object-tracking", cv2.WINDOW_NORMAL)
# cv2.resizeWindow("instance-segmentation-object-tracking", 800, 600)

# 开始处理每一帧
while cap.isOpened():
    success, frame = cap.read()
    if not success:
        print("Video frame is empty or video processing has been successfully completed.")
        break

    frame_count += 1

    # Perform people count detection every frame, change here
    if frame_count % 1 == 0:
        # 获取当前帧的阴影 boundary
        shadow_result = CLIENT.infer(frame, model_id="shade-detection/1")
        boundaries = []

        # 单张图片inference的数据结构
        if 'predictions' in shadow_result:  # 单张图像的结构
            predictions = shadow_result['predictions']
            for prediction in predictions:
                points = prediction.get("points", [])
                if points:
                    boundary = [(int(point['x']), int(point['y'])) for point in points]
                    boundaries.append(boundary)  # 将阴影区域的点坐标添加到 boundaries 列表中

        annotator = Annotator(frame, line_width=2)
        
        # 重置统计变量
        people_in_shadow = 0
        people_outside_shadow = 0
        total_people = 0
        detections = []  # 初始化为空列表，以便在每次检测时更新

        # 绘制阴影多边形
        for boundary in boundaries:
            # 使用 cv2.polylines 画出多边形阴影区域
            boundary_points = np.array(boundary, np.int32).reshape((-1, 1, 2))
            cv2.polylines(frame, [boundary_points], isClosed=True, color=(255, 0, 0), thickness=2)

        # 检测人
        # person_results = model_person(frame, classes=[0], conf=0.05, show=False)  # 0是人的类别ID，关闭自动显示窗口
        person_results = model_person.track(frame, persist=True, classes=[0], conf=0.05, show=False) 
        
        if person_results and len(person_results) > 0:
            boxes = person_results[0].boxes

            if boxes and len(boxes) > 0:
                # 提取 bounding boxes
                xyxy = boxes.xyxy.cpu().numpy()  # 以 xyxy 格式获取盒子
                track_ids = np.arange(len(xyxy))  # 为每个框生成唯一ID

                for i, (x1, y1, x2, y2) in enumerate(xyxy):
                    bottom_mid_x = (x1 + x2) / 2  # 底边中点的 X 坐标
                    bottom_mid_y = y2  # 底边的 Y 坐标
                    in_shadow = False

                    # 检查是否在阴影中
                    for boundary in boundaries:
                        if cv2.pointPolygonTest(np.array(boundary, np.int32), (int(bottom_mid_x), int(bottom_mid_y)), False) >= 0:
                            in_shadow = True
                            people_in_shadow += 1
                            break
                    else:
                        people_outside_shadow += 1

                    # 记录检测结果
                    detections.append([frame_count, track_ids[i], bottom_mid_x, bottom_mid_y, in_shadow])
                    total_people += 1

                    # 显示检测框、文本和底边中点的圆点
                    color = colors(int(track_ids[i]), True)
                    txt_color = annotator.get_txt_color(color)
                    label = f"ID {track_ids[i]} {'in shadow' if in_shadow else 'out shadow'}"
                    annotator.box_label([x1, y1, x2, y2], label=label, color=color, txt_color=txt_color)
                    cv2.circle(frame, (int(bottom_mid_x), int(bottom_mid_y)), 5, (0, 0, 255), -1)  # 绘制底边中点

        # 将结果写入 CSV
        with open(csv_file, 'a', newline='') as file:
            writer = csv.writer(file)
            writer.writerows(detections)

    # 将统计结果显示在帧上
    text = f"Frame {frame_count}: Total: {total_people}, In Shadow: {people_in_shadow}, Outside Shadow: {people_outside_shadow}"
    cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

    # 写入视频帧
    out.write(frame)
    cv2.imshow("instance-segmentation-object-tracking", frame)

    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
out.release()
cv2.destroyAllWindows()

print(f"Processing complete. Results saved in {output_video_path} and {csv_file}.")