## Import thư viện

In [1]:
import os
import cv2
import numpy as np
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
import tensorflow as tf
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint
from tensorflow.keras.regularizers import l2
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report

Các thông tin đã có

In [None]:
train_folder = r'C:\Users\axeld\OneDrive\Bureau\MachineLearning\kaggle\input\train'
test_folder = r'C:\Users\axeld\OneDrive\Bureau\MachineLearning\kaggle\input\test'
valid_folder = r'C:\Users\axeld\OneDrive\Bureau\MachineLearning\kaggle\input\valid'

train_annotation_file = os.path.join(train_folder, '_annotations.txt')
test_annotation_file = os.path.join(test_folder, '_annotations.txt')
valid_annotation_file = os.path.join(valid_folder, '_annotations.txt')

# Từ điển ánh xạ class
class_names = {
    0: 'DangerousDriving',
    1: 'Distracted',
    2: 'Drinking',
    3: 'SafeDriving',
    4: 'SleepyDriving',
    5: 'Yawn'
}

# Số lượng lớp và kích thước ảnh
num_classes = len(class_names)
target_size = (224, 224)
batch_size = 32  # Tăng batch size để tăng tốc độ học


## Hàm đọc annotations_class

In [None]:

# Hàm đọc annotation
def read_annotations_class_only(file_path):
    image_labels = {}
    with open(file_path, 'r') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) < 2:
                continue
            image_name = parts[0]
            try:
                class_id = int(parts[1].split(',')[-1])
                if image_name not in image_labels:
                    image_labels[image_name] = class_id
            except ValueError:
                continue
    return pd.DataFrame(list(image_labels.items()), columns=['image_name', 'class_id'])

In [None]:

# Đọc annotation cho các tập
train_df = read_annotations_class_only(train_annotation_file)
valid_df = read_annotations_class_only(valid_annotation_file)
test_df = read_annotations_class_only(test_annotation_file)

# Thêm đường dẫn đầy đủ cho ảnh
train_df['image_path'] = train_df['image_name'].apply(lambda x: os.path.join(train_folder, x))
valid_df['image_path'] = valid_df['image_name'].apply(lambda x: os.path.join(valid_folder, x))
test_df['image_path'] = test_df['image_name'].apply(lambda x: os.path.join(test_folder, x))

# Chuyển class_id thành string cho generator
train_df['class_id_str'] = train_df['class_id'].astype(str)
valid_df['class_id_str'] = valid_df['class_id'].astype(str)
test_df['class_id_str'] = test_df['class_id'].astype(str)


In [None]:
plt.figure(figsize=(18, 5))

# Lấy mỗi class 1 ảnh ví dụ
for class_id in range(num_classes):
    sample_row = train_df[train_df['class_id'] == class_id].iloc[0]
    image_path = os.path.join(train_folder, sample_row['image_name'])
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    plt.subplot(1, num_classes, class_id + 1)
    plt.imshow(image)
    plt.title(class_names[class_id])
    plt.axis('off')

plt.tight_layout()
plt.show()


In [None]:

# Phân tích phân phối các lớp
class_distribution = train_df['class_id'].value_counts().sort_index()
print("Class distribution in training set:")
for class_id, count in class_distribution.items():
    print(f"Class {class_id} ({class_names[class_id]}): {count} images")

## Tạo dữ liệu huấn luyện

In [None]:

# Tối ưu data augmentation dựa trên biểu đồ học tập
train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=15,
    width_shift_range=0.15,
    height_shift_range=0.15,
    shear_range=0.1,
    zoom_range=0.15,
    horizontal_flip=True,
    brightness_range=[0.85, 1.15],  # Thêm thay đổi độ sáng
    fill_mode='nearest'
)

# Chỉ cần rescale cho validation và test
val_test_datagen = ImageDataGenerator(rescale=1./255)

# Tạo generator cho train, valid, test với class_mode='categorical'
train_generator = train_datagen.flow_from_dataframe(
    train_df,
    x_col='image_path',
    y_col='class_id_str',
    target_size=target_size,
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=True
)

valid_generator = val_test_datagen.flow_from_dataframe(
    valid_df,
    x_col='image_path',
    y_col='class_id_str',
    target_size=target_size,
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False
)

test_generator = val_test_datagen.flow_from_dataframe(
    test_df,
    x_col='image_path',
    y_col='class_id_str',
    target_size=target_size,
    batch_size=batch_size,
    class_mode='categorical',
    shuffle=False
)

## Xây dựng mô hình MobileNetV2

In [None]:

# Xây dựng mô hình MobileNetV2 với một cấu trúc cải thiện
base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(224, 224, 3))

# Xây dựng phần đầu của mô hình với bộ điều chỉnh L2
x = base_model.output
x = GlobalAveragePooling2D()(x)
x = Dense(512, activation='relu', kernel_regularizer=l2(0.001))(x)
x = Dropout(0.4)(x)  # Tăng dropout để giảm overfitting
x = Dense(256, activation='relu', kernel_regularizer=l2(0.001))(x)
x = Dropout(0.3)(x)
predictions = Dense(num_classes, activation='softmax')(x)

model = Model(inputs=base_model.input, outputs=predictions)

# Đóng băng các lớp của base model
for layer in base_model.layers:
    layer.trainable = False

# Biên dịch mô hình với hàm học tốt hơn
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(
    optimizer=optimizer,
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Hiển thị tóm tắt mô hình
model.summary()


## Huấn luyện

In [None]:
# Tạo các callbacks được tối ưu
reduce_lr = ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,  # Giảm nhẹ learning rate hơn
    patience=2,  # Giảm patience để phản ứng nhanh hơn
    min_lr=1e-6,
    verbose=1
)

early_stopping = EarlyStopping(
    monitor='val_accuracy',  # Thay đổi từ val_loss sang val_accuracy
    patience=7,  # Tăng thêm patience để cho phép mô hình học lâu hơn
    restore_best_weights=True,
    verbose=1
)

model_checkpoint = ModelCheckpoint(
    'best_driver_model.keras',
    monitor='val_accuracy',
    save_best_only=True,
    verbose=1
)

callbacks = [reduce_lr, early_stopping, model_checkpoint]

# Huấn luyện mô hình (giai đoạn 1)
print("Phase 1: Training with frozen base model")
history1 = model.fit(
    train_generator,
    validation_data=valid_generator,
    epochs=15,  # Tăng số epoch cho giai đoạn đầu
    callbacks=callbacks,
    verbose=1
)

# Fine-tune: Mở khóa một số lớp của base model với chiến lược mở theo từng khối
print("Phase 2: Fine-tuning the model")
# Mở khóa các lớp cuối của base model theo các khối của MobileNetV2
for layer in base_model.layers[-50:]:  # Mở khóa nhiều lớp hơn
    layer.trainable = True

# Biên dịch lại với learning rate nhỏ hơn và optimizer tốt hơn
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5),  # Tăng learning rate so với trước đó
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Huấn luyện tiếp (giai đoạn 2) với early stopping tốt hơn
callbacks_fine_tuning = [
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=2, min_lr=1e-7, verbose=1),
    EarlyStopping(monitor='val_accuracy', patience=8, restore_best_weights=True, verbose=1),
    ModelCheckpoint('best_finetuned_driver_model.keras', monitor='val_accuracy', save_best_only=True, verbose=1)
]

history2 = model.fit(
    train_generator,
    validation_data=valid_generator,
    epochs=20,  # Tăng số epochs cho fine-tuning
    callbacks=callbacks_fine_tuning,
    verbose=1
)


In [None]:
# Lưu mô hình
model.save('driver_behavior_model_optimized.keras')
print("Model saved as 'driver_behavior_model_optimized.keras'")

In [None]:

# Đánh giá mô hình trên tập test
print("Evaluating model on test set:")
test_loss, test_accuracy = model.evaluate(test_generator)
print(f"Test Accuracy: {test_accuracy:.4f}")

# Dự đoán trên tập test
predictions = model.predict(test_generator)
predicted_classes = np.argmax(predictions, axis=1)
true_classes = test_generator.classes

# Tính toán confusion matrix
cm = confusion_matrix(true_classes, predicted_classes)

# Hiển thị confusion matrix
plt.figure(figsize=(10, 8))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(len(class_names))
plt.xticks(tick_marks, [class_names[i] for i in range(len(class_names))], rotation=45)
plt.yticks(tick_marks, [class_names[i] for i in range(len(class_names))])
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.tight_layout()
plt.show()

# In classification report
print("Classification Report:")
print(classification_report(true_classes, predicted_classes, 
                          target_names=[class_names[i] for i in range(len(class_names))]))

# Vẽ biểu đồ loss và accuracy
plt.figure(figsize=(15, 5))
plt.subplot(1, 2, 1)
# Kết hợp lịch sử từ cả hai giai đoạn huấn luyện
all_epochs = range(1, len(history1.history['loss']) + len(history2.history['loss']) + 1)
all_train_loss = history1.history['loss'] + history2.history['loss']
all_val_loss = history1.history['val_loss'] + history2.history['val_loss']
plt.plot(all_epochs, all_train_loss, 'b-', label='Training Loss')
plt.plot(all_epochs, all_val_loss, 'r-', label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
all_train_acc = history1.history['accuracy'] + history2.history['accuracy']
all_val_acc = history1.history['val_accuracy'] + history2.history['val_accuracy']
plt.plot(all_epochs, all_train_acc, 'b-', label='Training Accuracy')
plt.plot(all_epochs, all_val_acc, 'r-', label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

# Phân tích các class khó nhận dạng
def analyze_class_performance(cm, class_names):
    plt.figure(figsize=(12, 6))
    
    # Tính recall cho từng class (True Positive / (True Positive + False Negative))
    recall_per_class = np.diag(cm) / np.sum(cm, axis=1)
    
    # Tính precision cho từng class (True Positive / (True Positive + False Positive))
    precision_per_class = np.diag(cm) / np.sum(cm, axis=0)
    
    # Vẽ biểu đồ
    x = np.arange(len(class_names))
    width = 0.35
    
    fig, ax = plt.subplots(figsize=(12, 6))
    rects1 = ax.bar(x - width/2, recall_per_class, width, label='Recall')
    rects2 = ax.bar(x + width/2, precision_per_class, width, label='Precision')
    
    ax.set_ylabel('Score')
    ax.set_title('Precision and Recall by Class')
    ax.set_xticks(x)
    ax.set_xticklabels([class_names[i] for i in range(len(class_names))], rotation=45)
    ax.legend()
    
    plt.tight_layout()
    plt.show()
    
    # In ra các lớp có độ chính xác thấp nhất
    print("Classes with lowest recall:")
    for i in np.argsort(recall_per_class):
        print(f"Class {i} ({class_names[i]}): {recall_per_class[i]:.4f}")
        if len(np.argsort(recall_per_class)) > 3:  # In 3 lớp tệ nhất nếu có nhiều hơn 3 lớp
            break

analyze_class_performance(cm, class_names)



# Tạo hàm để sử dụng mô hình trên hình ảnh mới
def predict_driver_behavior(model, image_path, class_names):
    # Đọc và tiền xử lý hình ảnh
    img = cv2.imread(image_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, target_size)
    img = img / 255.0
    img = np.expand_dims(img, axis=0)
    
    # Dự đoán
    prediction = model.predict(img)
    class_idx = np.argmax(prediction)
    confidence = prediction[0][class_idx]
    
    return class_names[class_idx], confidence

# Tạo mã để hiển thị kết quả trên một ảnh mẫu từ tập test
def visualize_prediction(model, test_generator, class_names, num_samples=5):
    # Lấy một batch từ generator
    images, labels = next(test_generator)
    
    # Dự đoán
    predictions = model.predict(images)
    
    # Hiển thị kết quả
    plt.figure(figsize=(20, 4 * num_samples))
    for i in range(min(num_samples, len(images))):
        true_class = np.argmax(labels[i])
        pred_class = np.argmax(predictions[i])
        confidence = predictions[i][pred_class]
        
        plt.subplot(num_samples, 2, 2*i+1)
        plt.imshow(images[i])
        plt.title(f"True: {class_names[true_class]}\nPredicted: {class_names[pred_class]}\nConfidence: {confidence:.4f}")
        plt.axis('off')
        
        # Hiển thị confidence cho tất cả các lớp
        plt.subplot(num_samples, 2, 2*i+2)
        bars = plt.bar(range(len(class_names)), predictions[i])
        plt.xticks(range(len(class_names)), [class_names[j] for j in range(len(class_names))], rotation=45)
        plt.title("Class Probabilities")
        
        # Tô màu cho dự đoán và ground truth
        bars[true_class].set_color('green')
        if pred_class != true_class:
            bars[pred_class].set_color('red')
    
    plt.tight_layout()
    plt.show()

# Hiển thị một số dự đoán mẫu
print("Visualizing sample predictions:")
visualize_prediction(model, test_generator, class_names)