# 3DCNN烟雾检测模型实现
基于Bonilla-Ormachea等人论文中的S5架构

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    Conv3D, AveragePooling3D, BatchNormalization,
    TimeDistributed, Flatten, Bidirectional, LSTM,
    Dense
)
from tensorflow.keras.optimizers import Adam

## 1. 模型构建

In [None]:
def build_3dcnn_model(input_shape=(30, 240, 240, 1)):
    model = Sequential([
        # 3D卷积层
        Conv3D(32, (5, 5, 3), activation='relu', input_shape=input_shape),
        AveragePooling3D((2, 2, 2)),
        BatchNormalization(),
        
        # 时空特征提取
        Conv3D(64, (3, 3, 3), activation='relu'),
        AveragePooling3D((2, 2, 2)),
        BatchNormalization(),
        
        # 时序处理
        TimeDistributed(Flatten()),
        Bidirectional(LSTM(64, return_sequences=True)),
        Bidirectional(LSTM(32)),
        
        # 分类层
        Dense(1, activation='sigmoid')
    ])
    
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='binary_crossentropy',
        metrics=['accuracy']
    )
    
    return model

# 创建模型
model = build_3dcnn_model()
model.summary()

## 2. 数据预处理

In [None]:
import cv2
import numpy as np

def preprocess_video(video_path, target_size=(240, 240), frames=30):
    """
    视频预处理流程：
    1. 读取视频并采样指定帧数
    2. 转换为灰度
    3. 调整大小
    4. 归一化
    """
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_indices = np.linspace(0, total_frames-1, frames, dtype=np.int32)
    
    video_data = []
    for idx in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if ret:
            # 转换为灰度并resize
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            resized = cv2.resize(gray, target_size)
            video_data.append(resized)
    
    cap.release()
    
    # 归一化并调整维度
    video_array = np.array(video_data) / 255.0
    return np.expand_dims(video_array, axis=-1)  # 添加通道维度

## 3. 模型训练

In [None]:
# 示例训练流程
# 假设已有训练数据X_train, y_train

# 添加早停和模型检查点
callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=5, monitor='val_loss'),
    tf.keras.callbacks.ModelCheckpoint('best_model.h5', save_best_only=True)
]

history = model.fit(
    X_train, y_train,
    batch_size=16,
    epochs=50,
    validation_split=0.2,
    callbacks=callbacks
)

## 4. 模型评估

In [None]:
# 加载最佳模型
best_model = tf.keras.models.load_model('best_model.h5')

# 在测试集上评估
test_loss, test_acc = best_model.evaluate(X_test, y_test)
print(f"测试准确率: {test_acc:.4f}")

# 可视化训练过程
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'], label='训练准确率')
plt.plot(history.history['val_accuracy'], label='验证准确率')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

## 5. 推理示例

In [None]:
def predict_smoke(video_path, model):
    """预测视频是否包含烟雾"""
    processed = preprocess_video(video_path)
    prediction = model.predict(np.expand_dims(processed, axis=0))
    return "有烟雾" if prediction > 0.5 else "无烟雾"

# 示例使用
# result = predict_smoke("test_video.mp4", best_model)
# print(f"检测结果: {result}")