# 評価用動画の作成 

16_convertで変換した2つのモデルと, 21_convert_detect の変換モデルをつかって評価用の動画を作成します。

対応デバイス: Jetson Xavier NX, Jetson Orin Nano

In [None]:
import Jetson.GPIO as GPIO

BOARD_NAME = GPIO.gpio_pin_data.get_data()[0]

mode_descriptions = {
    "JETSON_NX": ["15W_2CORE", "15W_4CORE", "15W_6CORE", "10W_2CORE", "10W_4CORE"],
    "JETSON_XAVIER": ["MAXN", "MODE_10W", "MODE_15W", "MODE_30W"],
    "JETSON_NANO": ["MAXN", "5W"],
    "JETSON_ORIN": ["MAXN", "MODE_15W", "MODE_30W", "MODE_40W"],
    "JETSON_ORIN_NANO": ["MODE_15W", "MODE_7W"]
}

product_names = {
    "JETSON_NX": "Jetson Xavier NX",
    "JETSON_XAVIER": "Jetson AGX Xavier",
    "JETSON_NANO": "Jetson Nano",
    "JETSON_ORIN": "Jetson AGX Orin",
    "JETSON_ORIN_NANO": "Jetson Orin Nano"
}

# ボードごとのI2Cバス番号と初期Powerモードを定義する
board_settings = {
    "JETSON_NX": (8, 3),
    "JETSON_XAVIER": (8, 2),
    "JETSON_NANO": (1, 0),
    "JETSON_ORIN": (7, 0),
    "JETSON_ORIN_NANO": (7, 0)
}

i2c_busnum, power_mode = board_settings.get(BOARD_NAME, (None, None))
mode_description = mode_descriptions.get(BOARD_NAME, [])
product_name = product_names.get(BOARD_NAME, "未知のボード")

if power_mode is not None and power_mode < len(mode_description):
    mode_str = mode_description[power_mode]
    print("------------------------------------------------------------")
    print(f"{product_name}を認識: I2Cバス番号: {i2c_busnum}, Powerモード: {mode_str}({power_mode})に設定します。")
    print("------------------------------------------------------------")
else:
    print("未知のボードまたは不正なモードです。")

In [None]:
if (product_name == "Jetson Orin Nano") or (product_name == "Jetson AGX Orin"):
    print("Docker起動のため電力モードは変更できません。")
else:
    !echo "jetson" | sudo -S nvpmodel -m $power_mode

In [None]:
!echo "jetson" | sudo -S nvpmodel -q

In [None]:
if (product_name == "Jetson Orin Nano") or (product_name == "Jetson AGX Orin"):
    print("Docker起動のためjetson_clocksは起動できません。")
else:
    !echo "jetson" | sudo -S jetson_clocks

In [None]:
import ipywidgets
from ipywidgets import Button, Layout, Textarea, HBox, VBox, Label
import os
import glob

l = Layout(flex='0 1 auto', height='100px', min_height='100px', width='auto')
process_widget = ipywidgets.Textarea(description='ログ', value='', layout=l)

process_no = 0
def write_log(msg):
    global process_widget, process_no
    process_no = process_no + 1
    process_widget.value = str(process_no) + ": " + msg + "\n" + process_widget.value

In [None]:
LOAD_TASK = ['camera']
CATEGORIES = ["start","road","etc"]
IMG_WIDTH = 224
IMG_HEIGHT = 224
current_path = os.getcwd()
SKIP = [1,2,3,4,5]

model_a_widget = ipywidgets.Dropdown(options=[],description='走行モデル')
model_a_time_widget = ipywidgets.Label(description='作成日時')
load_a_button = ipywidgets.Button(description='走行モデル読込み')
model_b_widget = ipywidgets.Dropdown(options=[],description='モデル')
model_b_time_widget = ipywidgets.Label(description='作成日時')
load_b_button = ipywidgets.Button(description='走行モデルを読込み')
model_c_widget = ipywidgets.Dropdown(options=[],description='環境モデル')
model_c_time_widget = ipywidgets.Label(description='作成日時')
load_c_button = ipywidgets.Button(description='T環境モデル読込み')
load_datasets_widget = ipywidgets.Dropdown(options=[], description='dataset')
load_task_widget = ipywidgets.Dropdown(options=LOAD_TASK, description='task')
movie_button = ipywidgets.Button(description='動画の作成')
movie_name_widget = ipywidgets.Text(description='動画名',value="run_video_dual_merge_trt")
movie_skip_dropdown = ipywidgets.Dropdown(options=SKIP, description='skip(枚)', index=1)

In [None]:
import datetime
import torch
import torchvision
from torch2trt import TRTModule

def change_load_task(change):
    global current_path
    try:
        path = os.path.join(current_path,load_task_widget.value)
        files = os.listdir(path)
        dirs = [f for f in files if os.path.isdir(os.path.join(path, f))]
        dirs = [f for f in files if f != ".ipynb_checkpoints"]
        dirs = sorted(dirs)
        load_datasets_widget.options = dirs
    except:
        write_log(path + "が存在していません。")
        load_datasets_widget.options = []
load_task_widget.observe(change_load_task)
change_load_task(LOAD_TASK[0])

def model_list(type):
    try:
        files = glob.glob(type + '/*.pth', recursive=True)
        ts = os.path.getctime(files[0])
        d = datetime.datetime.fromtimestamp(ts)
        s = d.strftime('%Y-%m-%d %H:%M:%S')
        if type == "model_trt":
            model_a_widget.options = files
            model_a_time_widget.value = s
            model_b_widget.options = files
            model_b_time_widget.value = s
        elif type == "model_class_trt":
            model_c_widget.options = files
            model_c_time_widget.value = s
    except Exception as e:
        if type == "model_trt":
            model_a_widget.options = []
            model_b_widget.options = []
        else:
            model_c_widget.options = []
        write_log(f"Error:{e}")
model_list("model_trt")
model_list("model_class_trt")

def load_a_model(change):
    global model_a_trt
    try:
        write_log(model_a_widget.value + "の読込を実行します(初回は時間がかかります)。")
        model_a_trt = TRTModule()
        model_a_trt.load_state_dict(torch.load(model_a_widget.value))
        write_log(model_a_widget.value + "の読込に成功しました。")
        load = True
    except Exception as e:
        write_log(f"【Error】 {e} : {model_a_widget.value} の読込に失敗しました。")

load_a_button.on_click(load_a_model)      

def load_b_model(change):
    global model_b_trt
    try:
        write_log(model_b_widget.value + "の読込を実行します(初回は時間がかかります)。")
        model_b_trt = TRTModule()
        model_b_trt.load_state_dict(torch.load(model_b_widget.value))
        write_log(model_b_widget.value + "の読込に成功しました。")
        load = True
    except Exception as e:
        write_log(f"【Error】 {e} : {model_b_widget.value} の読込に失敗しました。")

load_b_button.on_click(load_b_model) 

def load_c_model(change):
    global model_c_trt
    try:
        write_log(model_c_widget.value + "の読込を実行します(初回は時間がかかります)。")
        model_c_trt = TRTModule()
        model_c_trt.load_state_dict(torch.load(model_c_widget.value))
        write_log(model_c_widget.value + "の読込に成功しました。")
        load = True
    except Exception as e:
        write_log(f"【Error】 {e} : {model_c_widget.value} の読込に失敗しました。")

load_c_button.on_click(load_c_model) 

In [None]:
import cv2
import glob
from utils import preprocess
import re
import torch.nn.functional as F
import time

def extract_numbers(filename):
    matches = re.findall(r'(\d+)', filename)
    if matches and len(matches) >= 3: 
        return int(matches[-1])  
    else:
        return float('inf')

def get_file_names(path):
    file_names = os.listdir(path)
    file_names = [os.path.join(path, file_name) for file_name in file_names]
    image_names = []

    image_names = sorted(file_names, key=lambda f: extract_numbers(os.path.basename(f)))
    image_names = [f for f in image_names if os.path.splitext(f)[1].lower() == ".jpg"]
    
    return image_names


def make_movie(change):
    global model_a_trt,model_b_trt,model_c_trt,current_path

    DETECT_START = 0
    DETECT_ROAD = 1
    DETECT_ETC = 2
    
    if not movie_name_widget.value.strip():
        write_log("ファイル名を指定してください。")
        return 
    write_log("動画を作成します。")
    path = os.path.join(current_path,"video")
    if not os.path.exists(path):
        subprocess.call(['mkdir', '-p', path])
    output = path + "/" + movie_name_widget.value + ".mp4"
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    fps = int(30/movie_skip_dropdown.value)
    outfh = cv2.VideoWriter(output, fourcc, fps, (224, 224))
    
    xy_path = os.path.join(current_path, load_task_widget.value, load_datasets_widget.value, "xy")     
    file_list = get_file_names(xy_path)
    detect_count = 0
    last_detect = -1
    lap = 0
    
    try:
        res_num = len(file_list)
        write_log(f"画像枚数:{res_num}")
        count = 0
        skip_movie = movie_skip_dropdown.value
        last_lap_time = -5
        current_time = 0
        terminal_time = 1/(int(30/skip_movie))
        process_drive_time = 0
        process_detect_time = 0
        total_process_drive_time = 0
        total_process_detect_time = 0
        for i, file_name in enumerate(file_list):
            if i % skip_movie == 0:
                current_time += terminal_time
                img = cv2.imread(file_name)
                
                process_detect_time = time.time()
                image = preprocess(img).half()
                
                # 周回ごとに呼び出されるモデルを入れ替える
                if lap == 2 or lap == 4:
                    model = model_b_trt
                    select_model_name = "shortcut"
                else:
                    model = model_a_trt
                    select_model_name = "normal"
                
                process_drive_time = time.time()
                output = model(image).detach().cpu().numpy().flatten()
                result_x = float(output[0])
                result_y = float(output[1])
                result_x = int(IMG_WIDTH * (result_x / 2.0 + 0.5))
                result_y = int(IMG_HEIGHT * (result_y / 2.0 + 0.5))    
                img = cv2.circle(img, (int(result_x), int(result_y)), 8, (255, 0, 0), 3)

                # Speed
                result_speed = output[3]
                result_speed = int(IMG_WIDTH * (result_speed / 2.0 + 0.5))
                if result_speed > 224:
                    result_speed = 244
                elif result_speed < 0:
                    result_speed = 0
                total_process_drive_time += time.time() - process_drive_time 

                img = cv2.line(img,(218,0),(218,224),(0,0,0),5)
                img = cv2.line(img,(219,224-result_speed),(219,224),(0,140,255),3)
                img = cv2.putText(img,"speed:"+str(result_speed),(160,215),cv2.FONT_HERSHEY_SIMPLEX,0.3,(255,255,255))

                
                output = model_c_trt(image).detach()
                output = F.softmax(output, dim=1).cpu().numpy().flatten()
                category_index = output.argmax()
                total_process_detect_time += time.time() - process_detect_time 
                
                thickness = 2 # テキストの太さ
                font_scale = 0.5  # フォントの大きさ

                for h, score in enumerate(list(output)):
                    if h == category_index:
                        img = cv2.putText(img, f"{CATEGORIES[h]} : {int(score * 100)} %", (10, h * 15 + 40), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 255), thickness)
                    else:
                        img = cv2.putText(img, f"{CATEGORIES[h]} : {int(score * 100)} %", (10, h * 15 + 40), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255), thickness)
                
                # Lapの判定と5秒間のスキップロジック 
                if current_time - last_lap_time > 5:  # 5秒経過後に判定を再開
                    if category_index == DETECT_START:
                        if last_detect == DETECT_START:
                            detect_count += 1
                            write_log(f"lad:{lap},detect_count:{detect_count}")
                            if detect_count > int(30/(skip_movie*2)): # Orin Nanoは60FPSで30FPSの動画では走行のためカウントを1/2する
                                lap += 1
                                detect_count = 0
                                last_lap_time = current_time  # Lapの時間を更新
                        else:
                            detect_count = 1
                    else:
                        detect_count = 0
                last_detect = category_index
            
                # Lap情報の表示
                img = cv2.putText(img, "Lap: " + str(lap), (10, 200), cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255), thickness)
                
                if i%(skip_movie*10) == 0:
                    write_log(f"{current_time:.1f}秒まで完了, 走行推論: {total_process_drive_time/10*1000:.1f}ms, 環境推論: {total_process_detect_time/10*1000:.1f}ms, {int(i/skip_movie)}枚目/{int(res_num/skip_movie)}枚中を処理中")
                    total_process_detect_time = 0
                    total_process_drive_time = 0
                    
                outfh.write(img)
                del img
    except Exception as e:
        write_log(f"Error: {e}")
    finally:
        # エラーが発生しても確実にリソースを解放する
        outfh.release()
        write_log("動画の出力が完了しました。")
        #get_jetson_nano_memory_usage()

movie_button.on_click(make_movie)

In [None]:
separator = ipywidgets.HTML('<hr style="border-color:gray;margin:10px 0"/>')
title1 = ipywidgets.HTML('<b>【1.走行モデル(直進)の選択】</b> 走行モデルを選択してください。')
title2 = ipywidgets.HTML('<b>【2.走行モデル(ショートカット)の選択】</b> 走行モデルを選択してください。')
title3 = ipywidgets.HTML('<b>【3.環境モデルの選択】</b> 環境モデルを選択してください。')
title4 = ipywidgets.HTML('<b>【4.データセットの選択】</b> データセットを選択してください。')
title5 = ipywidgets.HTML('<b>【5.走行動画の作成】</b> 動画ファイル名を指定してください。')

convert_widget = ipywidgets.VBox([
    separator,
    title1,
    ipywidgets.HBox([model_a_widget,model_a_time_widget,load_a_button]),
    process_widget,
    title2,
    ipywidgets.HBox([model_b_widget,model_b_time_widget,load_b_button]),
    process_widget,
    title3,
    ipywidgets.HBox([model_c_widget,model_c_time_widget,load_c_button]),
    process_widget,
    title4,
    ipywidgets.HBox([load_datasets_widget,load_task_widget]),
    process_widget,
    title5,
    ipywidgets.HBox([movie_name_widget,movie_skip_dropdown,movie_button]),
    process_widget,
])
display(convert_widget)