In [2]:
import os
import cv2
import numpy as np
from ultralytics import YOLO
from gtts import gTTS
from pydub import AudioSegment
from tqdm.auto import tqdm

In [3]:
class Watcher:
    def __init__(self, frame, seg_model, det_model):
        self.frame = frame
        self.seg_model = seg_model
        self.det_model = det_model
        self.walkers = []
        self.jaywalkers = []

    def _infer_road(self, conf):
        canvas = np.zeros_like(self.frame)
        seg_inf = self.seg_model(self.frame, conf=conf, verbose=False)
        try:
            road_seg = seg_inf[0].masks.xy[0].astype(np.int32)
            cv2.fillPoly(canvas, [road_seg], (0, 0, 255))
        except:
            pass
        return canvas

    def _infer_people(self, conf):
        det_inf = self.det_model(self.frame, classes=0, conf=conf, verbose=False)
        person_det = det_inf[0].boxes.xyxy.cpu().numpy().astype(np.int32)
        canvases = []
        for person_coord in person_det:
            xmin, ymin, xmax, ymax = person_coord
            person_area = cv2.rectangle(np.zeros_like(self.frame), (xmin, ymin), (xmax, ymax), (0, 0, 255), -1)
            canvases.append(person_area)
        return canvases

    def infer(self, seg_conf=0.8, det_conf=0.6):
        road_area = self._infer_road(seg_conf)
        people_areas = self._infer_people(det_conf)
        for person_area in people_areas:
            overlap = cv2.bitwise_and(road_area, person_area)
            jaywalking = np.any(overlap)
            if jaywalking:
                self.jaywalkers.append(person_area)
            else:
                walker_area = person_area.copy()
                walker_area[np.all(walker_area == (0, 0, 255), axis=-1)] = (0, 255, 0)
                self.walkers.append(walker_area)

    def count(self):
        jay_count = len(self.jaywalkers)
        total_count = len(self.walkers) + len(self.jaywalkers)
        return total_count, jay_count

    def visualize(self):
        canvas = np.zeros_like(self.frame)
        for jaywalker in self.jaywalkers:
            canvas = cv2.add(canvas, jaywalker)
        for walker in self.walkers:
            canvas = cv2.add(canvas, walker)
        return cv2.addWeighted(self.frame, 1, canvas, 0.5, 0)

In [4]:
def get_loc(file_name):
    BASE_URL = "demo-prepare"
    return os.path.join(BASE_URL, file_name)


def get_color(total_count, jaywalker_count):
    if jaywalker_count > 0:
        return (0, 0, 255)
    if total_count > 2:
        return (0, 69, 255)
    return (0, 255, 0)

In [5]:
seg_model = YOLO(get_loc("team_best.pt"))
det_model = YOLO("yolov8m.pt")

In [10]:
video_folder = get_loc("videos")
video_paths = [os.path.join(video_folder, x) for x in os.listdir(video_folder)]
caps = [cv2.VideoCapture(video_path) for video_path in video_paths]

frame_width = int(caps[0].get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(caps[0].get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(caps[0].get(cv2.CAP_PROP_FPS))
frame_len = int(caps[0].get(cv2.CAP_PROP_FRAME_COUNT))

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output_video.mp4', fourcc, fps, (frame_width * 2, frame_height * 2))

prev = None
CYCLE_SEC = 5
audio = AudioSegment.silent(duration=CYCLE_SEC * 1000)

for idx in tqdm(range(frame_len)):
    rets = []
    frames = []
    for cap in caps:
        ret, frame = cap.read()
        rets.append(ret)
        frames.append(frame)

    if not all(rets):
        break

    canvas = np.zeros((frame_height * 2, frame_width * 2, 3), np.uint8)

    counts = []
    detections = []
    for frame in frames:
        watcher = Watcher(frame, seg_model, det_model)
        watcher.infer()
        detections.append(watcher.visualize())
        counts.append(watcher.count())

    if idx % fps == 0:
        prev = counts
    else:
        counts = prev

    tts_rep = CYCLE_SEC * fps
    if idx in [x for x in range(tts_rep, frame_len, tts_rep)]:
        subtotal = sum([total for total, jay in counts])
        if subtotal != 0:
            temp_file = 'temp.mp3'
            tts = gTTS(f'구역에 {subtotal}명 있습니다.', lang='ko')
            tts.save(temp_file)
            tts_audio = AudioSegment.from_mp3(temp_file)
            left_duration_sec = CYCLE_SEC - tts_audio.duration_seconds
            silent_audio = AudioSegment.silent(duration=left_duration_sec * 1000)
            audio = audio + tts_audio + silent_audio
        else:
            silent_audio = AudioSegment.silent(duration=CYCLE_SEC * 1000)
            audio = audio + silent_audio

    a_frame, b_frame, c_frame = detections
    canvas[0:frame_height, 0:frame_width] = a_frame
    canvas[0:frame_height, frame_width:frame_width * 2] = b_frame
    canvas[frame_height:frame_height * 2, 0:frame_width] = c_frame

    coords = [(2215, 1255), (1600, 1510), (1570, 550)]

    map = cv2.imread(get_loc("map.png"))
    resized_map = cv2.resize(map, (3840, 2160))
    map_canvas = np.zeros_like(resized_map)

    for coord, count in zip(coords, counts):
        cv2.circle(map_canvas, coord, 300, get_color(*count), -1)
    cctv_map = cv2.addWeighted(resized_map, 0.75, map_canvas, 0.25, 0)
    canvas[frame_height:frame_height * 2, frame_width:frame_width * 2] = cctv_map

    out.write(canvas)

audio.export('output_audio.mp3', format='mp3')

for cap in caps:
    cap.release()
out.release()
cv2.destroyAllWindows()

  0%|          | 0/450 [00:00<?, ?it/s]

In [1]:
import os
from moviepy.editor import VideoFileClip, AudioFileClip

demo_video = VideoFileClip(os.path.join(os.getcwd(), 'output_video.mp4'))
demo_audio = AudioFileClip(os.path.join(os.getcwd(), 'output_audio.mp3'))
demo = demo_video.set_audio(demo_audio)
demo.write_videofile(os.path.join(os.getcwd(), 'demo.mp4'))

Moviepy - Building video /Users/lostin185/Desktop/demo.mp4.
MoviePy - Writing audio in demoTEMP_MPY_wvf_snd.mp3


                                                                    

MoviePy - Done.
Moviepy - Writing video /Users/lostin185/Desktop/demo.mp4



                                                              

Moviepy - Done !
Moviepy - video ready /Users/lostin185/Desktop/demo.mp4
