### 0. (Optional) Extract video file

In [10]:
import cv2
import os

capture = cv2.VideoCapture("conering.avi")
assert capture.isOpened(), "Cannot open the video file."

num_frames = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
print(num_frames)
# sample_interval = num_frames//180
# Prepare folders
img_filename_fmt = 'dataset/images_corner/frame_{:09d}.jpg'
dirname = os.path.dirname(img_filename_fmt)
os.makedirs(dirname, exist_ok=True)
for ii in range(num_frames):
    capture.set(cv2.CAP_PROP_POS_FRAMES, ii)
    _, frame = capture.read()
    cv2.imwrite(img_filename_fmt.format(ii), frame)

capture.release()

1547


In [13]:
import os

# 원본 이미지가 저장된 디렉토리
source_dir = 'dataset/images_corner'

# 파일 목록 가져오기
files = os.listdir(source_dir)
files = sorted(files)  # 파일 이름 순으로 정렬

# 파일 이름 변경
for index, filename in enumerate(files):
    # 새 파일 이름 형식
    new_filename = f"frame_{index:09d}.jpg"
    # 원본 파일의 전체 경로
    old_path = os.path.join(source_dir, filename)
    # 새 파일의 전체 경로
    new_path = os.path.join(source_dir, new_filename)
    # 파일 이름 변경
    os.rename(old_path, new_path)

print("File renaming completed.")


File renaming completed.


In [12]:
import os
import shutil
import numpy as np  # numpy 라이브러리 추가

# 이미지 파일 경로 설정
source_dir = 'dataset/images_corner'  # 원본 이미지가 저장된 폴더
sample_dir = 'dataset/sample_corner_A'  # 샘플 이미지를 저장할 폴더
move_dir = 'dataset/sample_corner_B'  # 나머지 이미지를 이동할 폴더

# 샘플과 나머지 이미지 저장 폴더 생성
os.makedirs(sample_dir, exist_ok=True)
os.makedirs(move_dir, exist_ok=True)

# 이미지 파일 목록 생성 및 정렬
image_files = ['frame_{:09d}.jpg'.format(i) for i in range(1461)]  # 289부터 3253까지
image_files = sorted(image_files)  # 파일 정렬

# 샘플링할 이미지 수
num_samples = 200

# 샘플링 인덱스 계산
indices = [int(i) for i in np.linspace(0, len(image_files) - 1, num_samples)]

# 샘플 이미지 저장 및 나머지 이미지 이동
for idx, filename in enumerate(image_files):
    src_path = os.path.join(source_dir, filename)
    if idx in indices:
        # 샘플 이미지는 샘플 폴더에 복사
        shutil.copy(src_path, os.path.join(sample_dir, filename))
    else:
        # 나머지 이미지는 이동 폴더로 이동
        shutil.move(src_path, os.path.join(move_dir, filename))

print("Sample images and remaining images have been processed.")


Sample images and remaining images have been processed.


### 1. Label images

- Assume all images have same size

In [1]:
import cv2
import os

from collections import OrderedDict
from ipywidgets import IntSlider, Label, Button, HBox
from ipycanvas import MultiCanvas, hold_canvas

thickness = 3
y_ratio = 0.7     # percentile of y-position from the top
    
# Input images
img_filename_fmt = 'dataset/images_new_straight/frame_{:09d}.jpg'
ann_filename = 'dataset/annotation_new_straight.txt'
ann_dict = OrderedDict()

num_frames = len(os.listdir(os.path.dirname(img_filename_fmt)))

cur_index = 0
height, width = cv2.imread(img_filename_fmt.format(cur_index)).shape[:2]
y_value = int(height * y_ratio)

def set_image():        
    image = cv2.imread(img_filename_fmt.format(cur_index))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)    
    image[y_value-thickness//2:y_value+thickness//2] = (255, 0, 0)

    canvas[0].clear()
    canvas[0].put_image_data(image, 0, 0)
    canvas[0].flush()

    pos = ann_dict.get(img_filename_fmt.format(cur_index))
    if pos is not None:
        handle_mouse_down(pos[0], pos[1])    

    cur_fname.value = 'Current image: {:s} | '.format(img_filename_fmt.format(cur_index))

def handle_mouse_move(xpos, ypos):
    with hold_canvas():
        canvas[1].clear()  # Clear the old animation step
        canvas[1].fill_style = "yellow"
        canvas[1].fill_circle(xpos, y_value, 5)  # Draw the new frame    

def handle_mouse_down(xpos, ypos):    
    with hold_canvas():
        canvas[2].clear()
        canvas[2].fill_style = "green"
        canvas[2].fill_circle(xpos, y_value, 5)  # Draw the new frame    

    cur_pos.value = "({:d}, {:d}) ".format(xpos, y_value)    
    ann_dict[img_filename_fmt.format(cur_index)] = (xpos, y_value)
    
def handle_slider_change(change):
    global y_value
    y_value = change.new
    set_image()
    canvas[1].clear()
    canvas[2].clear()

def handle_save_button(b):
    with open(ann_filename, 'w') as f:
        for k, v in ann_dict.items():            
            f.write("{:s}\t{:d}\t{:d}\n".format(k, v[0], v[1]))    

def handle_prev_button(b):
    global cur_index
    cur_index = max(0, cur_index - 1)
    canvas.clear()
    set_image()

def handle_next_button(key, shift_key, ctrl_key, meta_key):
    global cur_index
    cur_index = min(num_frames - 1, cur_index + 1)
    canvas.clear()
    set_image()

    
canvas = MultiCanvas(3, width=width, height=height)
cur_fname = Label(value='', disabled=False)
cur_pos = Label(value='', disabled=True)
yslider = IntSlider(description="Y-bar: ", stype={'description_width': 'initial'}, value=y_value, min=1, max=height-2, step=1)
prev_btn = Button(description='Prev', icon='arrow-left')
next_btn = Button(description='Next', icon='arrow-right')
save_btn = Button(description='Save labels', icon='check')

set_image()
canvas.on_mouse_move(handle_mouse_move)
canvas.on_mouse_down(handle_mouse_down)
yslider.observe(handle_slider_change, names='value')

prev_btn.on_click(handle_prev_button)
next_btn.on_click(handle_next_button)
save_btn.on_click(handle_save_button)

display(canvas, HBox([cur_fname, cur_pos, yslider]), HBox([prev_btn, next_btn, save_btn]))


NameError: name 'handle_next_button' is not defined

### 2. Train a model

In [None]:
import torch
import torchvision

def get_model():
    model = torchvision.models.alexnet(num_classes=2, dropout=0.0)
    return model

device = torch.device('cuda')
model = get_model()
model = model.to(device)

In [None]:
import torch
from cnn.center_dataset import CenterDataset

batch_size = 4

dataset = CenterDataset('dataset', random_hflip=False)
train_loader = torch.utils.data.DataLoader(
    dataset,
    num_workers=0,
    batch_size=batch_size,
    shuffle=True,
)

In [None]:
import ipywidgets
import torch.nn.functional as f

epoch = 200
learning_rate = 2e-3
# learning_rate = 2e-4

epoch_slider = ipywidgets.IntSlider(description='Epochs', value=epoch, min=1, max=200, step=1)
lr_slider = ipywidgets.FloatSlider(description='lr', value=learning_rate, min=1e-4, max=1e-2, step=1e-4, readout_format='.4f')
train_button = ipywidgets.Button(description='Train', icon='tasks')
loss_text = ipywidgets.Textarea(description='Progress', value='', rows=15, layout=ipywidgets.Layout(width="50%", height="auto"))
layout = ipywidgets.VBox([ipywidgets.HBox([epoch_slider, lr_slider, train_button]), loss_text])


def train_model(b):
    global epoch_slider
    for epoch in range(epoch_slider.value):
        loss_text.value += "<<<<< Epoch {:d} >>>>>\n".format(epoch)
        train_step()                


def train_step():
    global model, lr_slider, loss_text, train_laoder, device

    try:
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        # optimizer = torch.optim.SGD(model.parameters(), lr=lr_slider.value, momentum=0.9)

        train_button.disabled = True                
        model = model.train()        

        num_iters = len(train_loader)
        for ii, (images, labels) in enumerate(train_loader):
            # send data to device
            images = images.to(device)
            labels = labels.to(device)
            
            # zero gradients of parameters
            optimizer.zero_grad()

            # execute model to get outputs
            outputs = model(images)

            # compute MSE loss over x coordinates            
            loss = f.mse_loss(outputs, labels, reduction='sum')

            # run backpropogation to accumulate gradients
            loss.backward()

            # step optimizer to adjust parameters
            optimizer.step()

            if ii % 10 == 0:
                xlbl, ylbl = labels[0].cpu()
                xlbl = ( xlbl.item() / 2 + 0.5 ) * 800
                ylbl = ( ylbl.item() / 2 + 0.5 ) * 450

                xpre, ypre = outputs[0].cpu()
                xpre = ( xpre.item() / 2 + 0.5 ) * 800
                ypre = ( ypre.item() / 2 + 0.5 ) * 450

                msg = "[{:04d} / {:04d}] loss: {:.4f} | labels: ({:.2f}, {:.2f}), outpus: ({:.2f}, {:.2f})\n".format(ii, num_iters, loss.item(), xlbl, ylbl, xpre, ypre)
                loss_text.value += msg                
                    
    except Exception as e:
        print(e)
        pass
        
    model = model.eval()
    torch.save(model.state_dict(), 'road_following_model.pth')
    
    train_button.disabled = False
    
train_button.on_click(train_model)    

display(layout)

In [None]:
import PIL.Image
from cnn.center_dataset import TEST_TRANSFORMS

def preprocess(image: PIL.Image):
    device = torch.device('cuda')    
    image = TEST_TRANSFORMS(image).to(device)
    return image[None, ...]

In [None]:
model = get_model()
model.load_state_dict(torch.load('road_following_model.pth'))
model = model.to(device)

# from torch2trt import TRTModule
# model = TRTModule()
# model.load_state_dict(torch.load('road_following_model.pth'))

In [None]:
import cv2
import copy
import numpy as np
import PIL.Image

img_filename_fmt = 'dataset/images/frame_{:09d}.jpg'
ann_filename = 'dataset/annotation.txt'
with open(ann_filename, 'r') as f:
    data = [line.split() for line in f.readlines()]

filename, xpos, ypos = data[3]

xpos = int(xpos)
ypos = int(ypos)

image_ori = PIL.Image.open(filename)
width = image_ori.width
height = image_ori.height

with torch.no_grad():
    image = preprocess(image_ori)
    output = model(image).detach().cpu().numpy()
x, y = output[0]

x = (x / 2 + 0.5) * width
y = (y / 2 + 0.5) * height
print(x, y)

image_np = copy.deepcopy(np.asarray(image_ori))
cv2.circle(image_np, (int(x), int(y)), radius=5, color=(255, 0, 0))  # Pred
cv2.circle(image_np, (xpos, ypos), radius=5, color=(0, 255, 0))     # GT

PIL.Image.fromarray(image_np)

