In [7]:
import os
import cv2
import numpy as np
import pickle
import random
from multiprocessing import Pool, cpu_count

# Cấu hình thư mục
data_dir = './'  # Thư mục chứa dữ liệu
output_dir = './processed_frames'
os.makedirs(output_dir, exist_ok=True)

# Bản đồ cảm xúc
emotions_map = {
    '01': 'neutral', '02': 'calm', '03': 'happy', '04': 'sad',
    '05': 'angry', '06': 'fear', '07': 'disgust', '08': 'surprise'
}

# Lấy faces ngẫu nhiên từ video
def extract_random_faces(video_path, max_faces=10, frame_size=(64, 64), sample_frames=5):
    faces = []
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    cap = cv2.VideoCapture(video_path)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if total_frames < 2:
        cap.release()
        return faces

    frame_indices = sorted(random.sample(range(total_frames), min(sample_frames, total_frames)))

    for i in frame_indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, i)
        ret, frame = cap.read()
        if not ret:
            continue
        small = cv2.resize(frame, (0, 0), fx=0.5, fy=0.5)
        gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY)
        faces_detected = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5)
        if len(faces_detected) > 0:
            (x, y, w, h) = max(faces_detected, key=lambda rect: rect[2] * rect[3])
            x, y, w, h = x*2, y*2, w*2, h*2
            face = frame[y:y+h, x:x+w]
            face = cv2.resize(face, frame_size)
            faces.append(face)
        if len(faces) >= max_faces:
            break

    cap.release()
    return faces

# Hàm xử lý 1 video (dùng cho multiprocessing)
def process_video(args):
    subfolder_path, file = args
    emotion_code = file.split('-')[2]
    emotion = emotions_map.get(emotion_code)
    if not emotion:
        return []

    video_path = os.path.join(subfolder_path, file)
    faces = extract_random_faces(video_path)

    results = []
    for idx, face in enumerate(faces):
        out_name = f"{file.replace('.mp4', '')}_frame{idx}.jpg"
        output_path = os.path.join(output_dir, out_name)
        if not os.path.exists(output_path):
            cv2.imwrite(output_path, cv2.cvtColor(face, cv2.COLOR_RGB2BGR))
        results.append((output_path, emotion))
    return results

# Tìm tất cả video cần xử lý
all_videos = []
for actor_folder in os.listdir(data_dir):
    actor_path = os.path.join(data_dir, actor_folder)
    if os.path.isdir(actor_path) and ('Video_Speech_Actor_' in actor_folder or 'Video_Song_Actor_' in actor_folder):
        for subfolder in os.listdir(actor_path):
            subfolder_path = os.path.join(actor_path, subfolder)
            if os.path.isdir(subfolder_path) and subfolder.startswith('Actor_'):
                for file in os.listdir(subfolder_path):
                    if file.endswith('.mp4'):
                        all_videos.append((subfolder_path, file))

print(f"Đang xử lý {len(all_videos)} video bằng {cpu_count()} core...")

# Dùng multiprocessing để tăng tốc
all_results = []
with Pool(processes=cpu_count()) as pool:
    for result in pool.imap_unordered(process_video, all_videos):
        all_results.extend(result)

print(f"Xử lý xong, tổng số ảnh: {len(all_results)}")

# Chia nhỏ và lưu ra nhiều file pkl
batch_size = 1000
for i in range(0, len(all_results), batch_size):
    batch = all_results[i:i+batch_size]
    image_paths, labels = zip(*batch)
    with open(f"image_paths_labels_batch{i//batch_size + 1}.pkl", "wb") as f:
        pickle.dump((image_paths, labels), f)
    print(f"Đã lưu batch {i//batch_size + 1} với {len(batch)} ảnh.")



Đang xử lý 4904 video bằng 12 core...
Xử lý xong, tổng số ảnh: 24427
Đã lưu batch 1 với 1000 ảnh.
Đã lưu batch 2 với 1000 ảnh.
Đã lưu batch 3 với 1000 ảnh.
Đã lưu batch 4 với 1000 ảnh.
Đã lưu batch 5 với 1000 ảnh.
Đã lưu batch 6 với 1000 ảnh.
Đã lưu batch 7 với 1000 ảnh.
Đã lưu batch 8 với 1000 ảnh.
Đã lưu batch 9 với 1000 ảnh.
Đã lưu batch 10 với 1000 ảnh.
Đã lưu batch 11 với 1000 ảnh.
Đã lưu batch 12 với 1000 ảnh.
Đã lưu batch 13 với 1000 ảnh.
Đã lưu batch 14 với 1000 ảnh.
Đã lưu batch 15 với 1000 ảnh.
Đã lưu batch 16 với 1000 ảnh.
Đã lưu batch 17 với 1000 ảnh.
Đã lưu batch 18 với 1000 ảnh.
Đã lưu batch 19 với 1000 ảnh.
Đã lưu batch 20 với 1000 ảnh.
Đã lưu batch 21 với 1000 ảnh.
Đã lưu batch 22 với 1000 ảnh.
Đã lưu batch 23 với 1000 ảnh.
Đã lưu batch 24 với 1000 ảnh.
Đã lưu batch 25 với 427 ảnh.


In [4]:
import torch
'cuda' if torch.cuda.is_available() else 'cpu'

'cuda'

In [None]:
# train_from_images.py
import os
import pickle
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import torchvision.transforms as transforms
from PIL import Image
import cv2

# Load tất cả file .pkl chứa ảnh đã tách
image_paths = []
labels = []

for filename in os.listdir():
    if filename.startswith("image_paths_labels_batch") and filename.endswith(".pkl"):
        with open(filename, "rb") as f:
            paths, lbls = pickle.load(f)
            image_paths.extend(paths)
            labels.extend(lbls)

print(f"✅ Loaded {len(image_paths)} images from {len(set(labels))} classes.")

# Encode nhãn
le = LabelEncoder()
labels_encoded = torch.tensor(le.fit_transform(labels), dtype=torch.long)

# Transform
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

# Dataset class
class PreprocessedDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img = cv2.imread(self.image_paths[idx])
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        if self.transform:
            img = self.transform(img)
        return img, self.labels[idx]

# Train/test split
train_paths, test_paths, train_labels, test_labels = train_test_split(
    image_paths, labels_encoded, test_size=0.3, random_state=42, stratify=labels_encoded
)

train_dataset = PreprocessedDataset(train_paths, train_labels, transform)
test_dataset = PreprocessedDataset(test_paths, test_labels, transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

# CNN model
class FacialEmotionCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 16, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(16, 32, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Flatten(),
            nn.Linear(64*8*8, 64), nn.ReLU(), nn.Dropout(0.3),
            nn.Linear(64, num_classes)
        )

    def forward(self, x):
        return self.model(x)

# Training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = FacialEmotionCNN(num_classes=len(le.classes_)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

def train_model(model, loader, criterion, optimizer, epochs=50):
    model.train()
    for epoch in range(epochs):
        total, correct, running_loss = 0, 0, 0
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss:.4f}, Acc: {100*correct/total:.2f}%")

    torch.save(model.state_dict(), "facial_emotion_cnn.pth")
    print("✅ Model saved to facial_emotion_cnn.pth")

def evaluate_model(model, loader):
    model.eval()
    total, correct = 0, 0
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print(f"🎯 Test Accuracy: {100*correct/total:.2f}%")

# Run
train_model(model, train_loader, criterion, optimizer)
evaluate_model(model, test_loader)


✅ Loaded 24427 images from 8 classes.
Epoch 1/50, Loss: 847.5332, Acc: 39.02%
Epoch 2/50, Loss: 587.6785, Acc: 59.39%
Epoch 3/50, Loss: 481.7855, Acc: 66.80%
Epoch 4/50, Loss: 414.0335, Acc: 71.90%
Epoch 5/50, Loss: 360.2077, Acc: 75.34%
Epoch 6/50, Loss: 324.8477, Acc: 77.78%
Epoch 7/50, Loss: 297.8691, Acc: 79.56%
Epoch 8/50, Loss: 275.0000, Acc: 80.92%
Epoch 9/50, Loss: 256.2184, Acc: 82.34%
Epoch 10/50, Loss: 244.5457, Acc: 82.88%
Epoch 11/50, Loss: 227.4775, Acc: 84.17%
Epoch 12/50, Loss: 218.5861, Acc: 84.67%
Epoch 13/50, Loss: 204.9960, Acc: 85.48%
Epoch 14/50, Loss: 196.7040, Acc: 85.97%
Epoch 15/50, Loss: 195.6734, Acc: 86.29%
Epoch 16/50, Loss: 181.1172, Acc: 87.03%
Epoch 17/50, Loss: 173.6123, Acc: 87.71%
Epoch 18/50, Loss: 167.7726, Acc: 88.05%
