## path 탐색 / 디렉터리 내부 파일 탐색

In [6]:
import os

print(os.listdir("weights"))

['test2.pt', 'test3.pt', 'yolov8n.pt', 'test1.pt']


In [4]:
print(os.getcwd())

/Users/bhc/Desktop/dev/evc-switch-predictor


In [5]:
w_list = os.listdir("weights/")
model_map = {
    "YOLOv8" : w_list
}

print(model_map)

{'YOLOv8': ['test2.pt', 'test3.pt', 'yolov8n.pt', 'test1.pt']}


## RTSP 데이터 가공 테스트

In [1]:
import cv2
import datetime
import os
from ultralytics import YOLO
import numpy as np
from PIL import Image
from matplotlib import pyplot as plt
import gradio as gr

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def writeVideo(url):
    model = YOLO("yolov8n.pt")
    currenttime = datetime.datetime.now()
    video_capture = cv2.VideoCapture(url)
    
    video_capture.set(3, 120)
    video_capture.set(4, 120)
    fps = 20

    streaming_window_width = int(video_capture.get(3))
    streaming_window_height = int(video_capture.get(4))

    filename = str(currenttime.strftime('%Y %m %d %H %M %S'))
    path = os.getcwd() + "/stream/" + f"{filename}.mp4"

    fourcc = cv2.VideoWriter_fourcc(*'h264')
    out = cv2.VideoWriter(path, fourcc, fps, (streaming_window_width, streaming_window_height))

    while True:
        ret, frame = video_capture.read()
        results = model(frame)
        annotated_frame = results[0].plot()

        cv2.imshow("streaming video", annotated_frame)
        out.write(annotated_frame)

        k = cv2.waitKey(1) & 0xff
        if k == ord('q'):
            break

    video_capture.release()
    out.release()
    cv2.destroyAllWindows()
    cv2.waitKey(1)

In [None]:
url = 'rtsp://user1:ketiabcs@evc.re.kr:39091/h264Preview_01_main'

writeVideo(url)

In [3]:
## use gradio
url = 'rtsp://user1:ketiabcs@evc.re.kr:39091/h264Preview_01_main'

def view(url):
    return writeVideo(url)

demo = gr.Interface(
    fn=view,
    inputs=gr.Textbox(),
    outputs=gr.Gallery(),
    examples=[
        url
    ]
)

demo.queue()
demo.launch(server_port=7861)

Running on local URL:  http://127.0.0.1:7861

To create a public link, set `share=True` in `launch()`.




OpenCV: FFMPEG: tag 0x34363268/'h264' is not supported with codec id 27 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x31637661/'avc1'

0: 384x640 1 person, 66.4ms
Speed: 2.6ms preprocess, 66.4ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)
Traceback (most recent call last):
  File "/Users/bhc/opt/anaconda3/envs/edge-fw/lib/python3.8/site-packages/gradio/queueing.py", line 406, in call_prediction
    output = await route_utils.call_process_api(
  File "/Users/bhc/opt/anaconda3/envs/edge-fw/lib/python3.8/site-packages/gradio/route_utils.py", line 226, in call_process_api
    output = await app.get_blocks().process_api(
  File "/Users/bhc/opt/anaconda3/envs/edge-fw/lib/python3.8/site-packages/gradio/blocks.py", line 1554, in process_api
    result = await self.call_function(
  File "/Users/bhc/opt/anaconda3/envs/edge-fw/lib/python3.8/site-packages/gradio/blocks.py", line 1192, in call_function
    prediction = await anyio.to_thread.run_s

## 이미지 저장 송출 테스트

### 1. numpy 활용

In [2]:
import cv2
import datetime
import os
from ultralytics import YOLO
import numpy as np
from PIL import Image
from matplotlib import pyplot as plt
import gradio as gr

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
print(os.listdir("video/"))

['out', 'test.mp4']


In [5]:
url = 'rtsp://user1:ketiabcs@evc.re.kr:39091/h264Preview_01_main'
model = YOLO('yolov8n.pt')

In [12]:
def writeVideo(url, model):
    currenttime = datetime.datetime.now()
    video_capture = cv2.VideoCapture(url)
    
    video_capture.set(3, 120)
    video_capture.set(4, 120)
    fps = 20

    streaming_window_width = int(video_capture.get(3))
    streaming_window_height = int(video_capture.get(4))

    filename = str(currenttime.strftime('%Y %m %d %H %M %S'))
    path = os.getcwd() + "/stream/" + f"{filename}.mp4"

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(path, fourcc, fps, (streaming_window_width, streaming_window_height))

    iterating, frame = video_capture.read()
    framecnt = 0
    imgs = []

    while iterating:
        results = model(frame)
        framecnt += 1
        annotated_frame = results[0].plot()
        annotated_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)

        out.write(annotated_frame)
        img_arr = np.array(annotated_frame)
        imgs.append(img_arr)
        

        if len(imgs) >= 50:
            break

    return imgs

    video_capture.release()
    out.release()
    cv2.destroyAllWindows()

In [None]:
imgs = writeVideo(url, model)

In [None]:
def show_img():
    imgs = writeVideo(url, model)
    
    for i in imgs:
        return Image.fromarray(i)

with gr.Blocks() as demo:
    out_frame = gr.Image()
    btn = gr.Button(value="show images")

    btn.click(
        show_img,
        outputs=out_frame
    )

demo.queue()
demo.launch(server_port=7861)

Running on local URL:  http://127.0.0.1:7862

To create a public link, set `share=True` in `launch()`.





0: 384x640 3 persons, 1 laptop, 49.6ms
Speed: 2.5ms preprocess, 49.6ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 1 laptop, 43.2ms
Speed: 1.7ms preprocess, 43.2ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 1 laptop, 66.6ms
Speed: 1.7ms preprocess, 66.6ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 1 laptop, 48.6ms
Speed: 1.6ms preprocess, 48.6ms inference, 0.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 1 laptop, 44.5ms
Speed: 1.5ms preprocess, 44.5ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 1 laptop, 65.2ms
Speed: 1.7ms preprocess, 65.2ms inference, 0.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 1 laptop, 44.5ms
Speed: 1.5ms preprocess, 44.5ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 1 laptop, 47.9ms
S

: 

### 2. 로컬 저장 방식

In [None]:
def writeVideo(url, model):
    currenttime = datetime.datetime.now()
    video_capture = cv2.VideoCapture(url)
    
    video_capture.set(3, 120)
    video_capture.set(4, 120)
    fps = 20

    streaming_window_width = int(video_capture.get(3))
    streaming_window_height = int(video_capture.get(4))

    filename = str(currenttime.strftime('%Y %m %d %H %M %S'))
    path = os.getcwd() + "/stream/" + f"{filename}.mp4"

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(path, fourcc, fps, (streaming_window_width, streaming_window_height))

    iterating, frame = video_capture.read()
    framecnt = 0
    imgs = []

    while iterating:
        results = model(frame)
        framecnt += 1
        annotated_frame = results[0].plot()
        annotated_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)

        out.write(annotated_frame)
        img_arr = np.array(annotated_frame)
        imgs.append(img_arr)
        

        if len(imgs) >= 50:
            break

    return imgs

    video_capture.release()
    out.release()
    cv2.destroyAllWindows()