## 图像数据处理函数

In [4]:
from PIL import Image
import numpy as np
import tensorflow as tf

In [5]:
def handle_img(path):
    img = Image.open(path)
    
    # 将图像转换为灰度图像（如果需要）
    # img_gray = img.convert('L')
    
    img_resized = img.resize((1280, 720))
    
    img_array = np.array(img_resized)
    print("Image Shape:", img_array.shape)
    return img_array

## 所属类型的处理

In [6]:
def handle_status(status):
    status_array = np.array([0, 0, 0])
    status_array[status] = 1
    return status_array


## 获取数据集

In [7]:
## 读取json文件
import json

# X_train = np.array([], dtype=np.ndarray)
# Y_train = np.array([], dtype=np.ndarray)
X_train = []
Y_train = []


# 数据源的根目录
dataset_dir = '../dataset/'
# 训练数据的根目录
dataset_train = dataset_dir + 'amap_traffic_train/amap_traffic_train_0712/'

train_json = dataset_dir + 'amap_traffic_annotations_train.json'

with open(train_json, 'r') as train_file:
    # 读取JSON字符串
    train_data = train_file.read()
    
# 解析JSON字符串，返回字典格式的数据
train_data = json.loads(train_data)

"""
train_data['annotations'] 为数组
"""

annotations = train_data['annotations']

for annotation in annotations[0: 10]:
    dir_name = annotation['id']
    print('dataset id:', dir_name)
    
    img_dir = dataset_train + dir_name
    frames = annotation['frames']


    # 处理所属类型
    print('handle status.')
    status_data = handle_status(annotation['status'])
    print('status_data: ', status_data)
    
    # 处理照片组
    print('handle images.')
    for frame in frames:
        # 计算出照片的地址
        img_path = img_dir + '/' + frame['frame_name']
        # 获取单张照片的矩阵
        img_data = handle_img(img_path)

        X_train.append(img_data)
        Y_train.append(status_data)
        
    print('==============')
    
X_train = np.array(X_train)
Y_train = np.array(Y_train)
        
print('X_train.shape: ', X_train.shape)
print('Y_train.shape: ', Y_train.shape)


dataset id: 000001
handle status.
status_data:  [1 0 0]
handle images.
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
dataset id: 000002
handle status.
status_data:  [1 0 0]
handle images.
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
dataset id: 000003
handle status.
status_data:  [1 0 0]
handle images.
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
dataset id: 000004
handle status.
status_data:  [1 0 0]
handle images.
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
dataset id: 000005
handle status.
status_data:  [1 0 0]
handle images.
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
Image Shape: (720, 1280, 3)
dataset id: 000006
handle sta

## 卷积神经网络

In [8]:
class CNNModule:
    
    # 模型文件存储的目录
    module_path = './module/cnn_model.h5'
    module = None

    NUM_EPOCHS = 30

    BATCH_SIZE = 128
    

    def build_module(self):

        # 构建卷积神经网络模型
        self.module = tf.keras.models.Sequential([
            # 输入层
            tf.keras.layers.InputLayer(input_shape=(720, 1280, 3)),
            # 卷积层
            tf.keras.layers.Conv2D(filters=32, kernel_size=(6, 6), activation='relu'),
            # 池化层
            tf.keras.layers.MaxPooling2D((2, 2)),
            # 卷积层
            tf.keras.layers.Conv2D(filters=64, kernel_size=(6, 6), activation='relu'),
            # 池化层
            tf.keras.layers.MaxPooling2D((2, 2)),
            # 卷积层
            tf.keras.layers.Conv2D(filters=128, kernel_size=(3, 3), activation='relu'),
            # 池化层
            tf.keras.layers.MaxPooling2D((2, 2)),
            # 池化层
            tf.keras.layers.GlobalAveragePooling2D(),
            # 全连接层
            tf.keras.layers.Flatten(),
            # 输出层
            tf.keras.layers.Dense(128, activation='relu'),
            # 净化层
            tf.keras.layers.Dropout(rate=0.2),
            # 输出层
            tf.keras.layers.Dense(3, activation='softmax')
        ])
        
        self.module.compile(optimizer='adam',
                            loss='categorical_crossentropy',
                            metrics=['accuracy'])

        self.module.summary()
        
        
    def fit(self, x_train, y_train):        
        self.module.fit(x_train, y_train, epochs=self.NUM_EPOCHS,
                        batch_size=self.BATCH_SIZE, validation_split=0.2)
        
        
        y_pred = np.round(self.predict(x_train))
        t = np.array(y_train == y_pred)

        count = 0
        for i in t:
            if i.all():
                count = count + 1
        
        print("准确率为：", count / len(t) * 100, "%")
        if count / len(t) > 0.95:
            print('save model.')
            self.module.save(filepath=self.module_path)
        
        
    def predict(self, x_test):
        return self.module.predict(x_test)
    

In [None]:
cnnModule = CNNModule()
cnnModule.build_module()
cnnModule.fit(X_train, Y_train)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 715, 1275, 32)     3488      
                                                                 
 max_pooling2d (MaxPooling2  (None, 357, 637, 32)      0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 352, 632, 64)      73792     
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 176, 316, 64)      0         
 g2D)                                                            
                                                                 
 conv2d_2 (Conv2D)           (None, 174, 314, 128)     73856     
                                                                 
 max_pooling2d_2 (MaxPoolin  (None, 87, 157, 128)      0