In [47]:
import time
import copy
import torch
from torch import optim, nn
import torch.nn.functional as F
import torchvision
from torchvision import transforms
from torchvision.models import resnet18
import numpy as np
from matplotlib import pyplot as plt
from PIL import Image
import sys
sys.path.append("..")
from IPython import display
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore") # 忽略警告

In [48]:
import numpy as np

In [49]:
from pathlib import Path
import os
import cv2
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split


def transfer_label(label):
    width, height, channel = label.shape
    res = np.zeros_like(label)
    channel = label[:, :, 0]
    for row_idx in range(height):
        for col_idx in range(width):
            color = channel[row_idx, col_idx]
            channel_map = {0 : 0, 2 : 1, 3 : 2}
            res[row_idx, col_idx, channel_map[channel[row_idx, col_idx]]] = 1
    return res
            

def load_image(type):
    root = f"./road-segmenttation-6/{type}"
    mask_image_list = []
    origin_image_list = []
    
    for root, dirs, files in os.walk(root):
        for file in files:
            mask_file_path = os.path.join(root, file)
            if mask_file_path[-3:] == "png":
                mask_frame = cv2.imread(mask_file_path)
                mask_frame_rgb = mask_frame[:, :, ::-1].copy()
                mask_frame_label = transfer_label(mask_frame_rgb)
                mask_image_list.append(mask_frame_label)
                
                origin_file_path = mask_file_path[:-9] + ".jpg"
                origin_frame = cv2.imread(origin_file_path)
                origin_frame_rgb = origin_frame[:, :, ::-1].copy()
                origin_image_list.append(origin_frame_rgb)
                
    mask_image_np = np.vstack(mask_image_list).reshape((-1, 640, 640, 3))
    origin_image_np = np.vstack(origin_image_list).reshape((-1, 640, 640, 3))
    
    images, labels = shuffle(origin_image_np, mask_image_np)
    print(f'images.shape: {images.shape}')
    print(f'labels.shape: {labels.shape}')
    return images, labels

In [50]:
train_images, train_labels = load_image('train')
test_images, test_labels = load_image('test')

images.shape: (150, 640, 640, 3)
labels.shape: (150, 640, 640, 3)
images.shape: (8, 640, 640, 3)
labels.shape: (8, 640, 640, 3)


## Model

In [51]:
# Import necessary items from Keras
from keras.models import Sequential
from keras.layers import Activation, Dropout, UpSampling2D
from keras.layers import Conv2DTranspose, Conv2D, MaxPooling2D
from keras.layers import BatchNormalization
from keras.preprocessing.image import ImageDataGenerator
from keras import regularizers

In [52]:
def create_model(input_shape, pool_size):
    # 创建网络模型
    model = Sequential()
    # 对输入层进行归一化处理
    print(f"input shape = {input_shape}")
    model.add(BatchNormalization(input_shape=input_shape))

    # 卷积层1，名为Conv1
    model.add(Conv2D(8, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv1'))

    # 卷积层2
    model.add(Conv2D(16, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv2'))

    # 最大化层
    model.add(MaxPooling2D(pool_size=pool_size))

    # 卷积层3
    model.add(Conv2D(16, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv3'))
    model.add(Dropout(0.2))

    # 卷积层4
    model.add(Conv2D(32, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv4'))
    model.add(Dropout(0.2))

    # 卷积层5
    model.add(Conv2D(32, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv5'))
    model.add(Dropout(0.2))

    # 最大化层2
    model.add(MaxPooling2D(pool_size=pool_size))

    # 卷积层6
    model.add(Conv2D(64, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv6'))
    model.add(Dropout(0.2))

    # 卷积层7
    model.add(Conv2D(64, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Conv7'))
    model.add(Dropout(0.2))

    # 最大化层3
    model.add(MaxPooling2D(pool_size=pool_size))

    # 上采样层1
    model.add(UpSampling2D(size=pool_size))

    # 反卷积层 1
    model.add(Conv2DTranspose(64, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv1'))
    model.add(Dropout(0.2))

    # 反卷积层 2
    model.add(Conv2DTranspose(64, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv2'))
    model.add(Dropout(0.2))

    # 上采样层 2
    model.add(UpSampling2D(size=pool_size))

    # 反卷积层 3
    model.add(Conv2DTranspose(32, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv3'))
    model.add(Dropout(0.2))

    # 反卷积层 4
    model.add(Conv2DTranspose(32, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv4'))
    model.add(Dropout(0.2))

    # 反卷积层 5
    model.add(Conv2DTranspose(16, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv5'))
    model.add(Dropout(0.2))

    # 上采样层 3
    model.add(UpSampling2D(size=pool_size))

    # 反卷积层 6
    model.add(Conv2DTranspose(16, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Deconv6'))

    # 输出层
    model.add(Conv2DTranspose(3, (3, 3), padding='valid', strides=(1,1), activation = 'relu', name = 'Final'))

    return model

## 训练模型

In [58]:
epochs = 10
pool_size = (2, 2)
images, labels = load_image('train')
val_images, val_labels = load_image('valid')
input_shape = images.shape[1:]
model = create_model(input_shape, pool_size)

datagen = ImageDataGenerator(channel_shift_range=0.2)
datagen.fit(images)

# 编译模型
model.compile(optimizer='Adam', loss='mean_squared_error')
# 可视化模型
model.summary()

images.shape: (150, 640, 640, 3)
labels.shape: (150, 640, 640, 3)
images.shape: (14, 640, 640, 3)
labels.shape: (14, 640, 640, 3)
input shape = (640, 640, 3)
Model: "sequential_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 batch_normalization_6 (Bat  (None, 640, 640, 3)       12        
 chNormalization)                                                
                                                                 
 Conv1 (Conv2D)              (None, 638, 638, 8)       224       
                                                                 
 Conv2 (Conv2D)              (None, 636, 636, 16)      1168      
                                                                 
 max_pooling2d_6 (MaxPoolin  (None, 318, 318, 16)      0         
 g2D)                                                            
                                                                 
 Conv3 (Conv2D)             

In [59]:
model.fit_generator(datagen.flow(images, labels, batch_size=30), steps_per_epoch=len(images)/30,
epochs=epochs, verbose=1, validation_data=(val_images, val_labels))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.src.callbacks.History at 0x2e977f040>

In [60]:
model.save('first_try.h5')

## 使用模型

In [63]:
from IPython.display import clear_output, Image, display, HTML

def create_video(path, model):
    vs = cv2.VideoCapture(path)
    while True:
        (grabbed, frame_source) = vs.read()
        if not grabbed: break
        
        height, width = frame_source.shape[:2]
        
        frame = cv2.resize(frame_source ,(640, 640))
        
        # 为frame_input添加一个维度
        frame_input = frame[None, :, :, :]
        prediction = model.predict(frame_input)
        prediction = prediction[0] * 255
        
        blank = cv2.resize(prediction, (width, height))
        output = cv2.addWeighted(frame_source, 0.3, blank, 0.7, 0, dtype = cv2.CV_32F)
        
        # 清空绘图空间
        clear_output(wait=True)
        
        # 显示处理结果
        _, jpg = cv2.imencode('.jpg', output)
        display(jpg)

        #按键盘中的q键退出检测
        if cv2.waitKey(1) & 0xFF == ord('q'):   break
        
    # 释放资源
    print("[INFO] cleaning up...")
    vs.release()
    cv2.destroyAllWindows()


In [64]:
create_video('../pose/test.mp4', model)

array([255, 216, 255, ..., 207, 255, 217], dtype=uint8)

KeyboardInterrupt: 