In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Đường dẫn đến file zip trên Google Drive của bạn
zip_file_path = '/content/drive/MyDrive/data.zip'

# Thư mục đích để giải nén trong môi trường Colab
# Chúng ta sẽ giải nén vào /content/dataset để giữ mọi thứ gọn gàng
extract_to_path = '/content/dataset'

print(f"Đang tạo thư mục đích: {extract_to_path}")
!mkdir -p {extract_to_path} # Tạo thư mục nếu nó chưa tồn tại

print(f"Đang giải nén {zip_file_path} vào {extract_to_path}...")
# Lệnh unzip:
# -q: quiet (không hiển thị quá nhiều thông tin)
# -d {extract_to_path}: chỉ định thư mục đích để giải nén
!unzip -q {zip_file_path} -d {extract_to_path}
print("Giải nén hoàn tất.")

# Kiểm tra cấu trúc thư mục sau khi giải nén (tùy chọn)
print("\nKiểm tra cấu trúc thư mục sau giải nén:")
!ls -R {extract_to_path}

In [8]:
# =================== CÀI ĐẶT & ĐỊNH NGHĨA =================== #
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from transformers import AutoTokenizer, AutoModel
from torch.optim import AdamW
import torch.optim.lr_scheduler as lr_scheduler
from PIL import Image
import pandas as pd
from sklearn.metrics import f1_score
from collections import Counter

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# =================== DATASET =================== #
class MultimodalEmotionDataset(Dataset):
    def __init__(self, data_dir, tokenizer, transform=None, max_length=100):
        self.data_dir = data_dir
        self.tokenizer = tokenizer
        self.transform = transform
        self.max_length = max_length
        self.samples = []
        self.label2id = {"tiêu cực": 0, "trung tính": 1, "tích cực": 2}
        labels_df = pd.read_csv(os.path.join(data_dir, "label.csv"))
        for _, row in labels_df.iterrows():
            id_ = str(row['ID'])
            label = str(row['label']).strip().lower()
            if label in self.label2id:
                text_path = os.path.join(data_dir, "texts", f"{id_}.txt")
                image_path = os.path.join(data_dir, "images", f"{id_}.jpg")
                if os.path.exists(text_path) and os.path.exists(image_path):
                    self.samples.append((id_, label))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        id_, label = self.samples[idx]
        with open(os.path.join(self.data_dir, "texts", f"{id_}.txt"), encoding='utf-8') as f:
            text = f.read().strip()
        encoded = self.tokenizer(
            text, padding="max_length", truncation=True,
            max_length=self.max_length, return_tensors="pt"
        )
        image = Image.open(os.path.join(self.data_dir, "images", f"{id_}.jpg")).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return {
            "input_ids": encoded["input_ids"].squeeze(0),
            "attention_mask": encoded["attention_mask"].squeeze(0),
            "image": image,
            "label": torch.tensor(self.label2id[label])
        }

# =================== ENCODERS =================== #
class TextEncoder(nn.Module):
    def __init__(self, model_name='vinai/phobert-base', output_dim=256, dropout_rate=0.2, pooling='mean', freeze_bert=False):
        super(TextEncoder, self).__init__()
        self.phobert = AutoModel.from_pretrained(model_name)
        self.pooling = pooling
        self.linear = nn.Linear(self.phobert.config.hidden_size, output_dim)
        self.dropout = nn.Dropout(dropout_rate)
        self.gelu = nn.GELU()
        if freeze_bert:
            for param in self.phobert.parameters():
                param.requires_grad = False

    def forward(self, input_ids, attention_mask):
        outputs = self.phobert(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = outputs.last_hidden_state
        if self.pooling == 'cls':
            pooled_output = last_hidden_state[:, 0]
        elif self.pooling == 'mean':
            mask = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
            pooled_output = (last_hidden_state * mask).sum(1) / mask.sum(1)
        x = self.dropout(pooled_output)
        x = self.linear(x)
        x = self.gelu(x)
        return x

class ImageEncoder(nn.Module):
    def __init__(self, backbone_name='resnet18', output_dim=256, dropout_rate=0.2, unfreeze_blocks=2, pretrained=True):
        super(ImageEncoder, self).__init__()
        if backbone_name == 'resnet18':
            weights = models.ResNet18_Weights.IMAGENET1K_V1 if pretrained else None
            self.backbone = models.resnet18(weights=weights)
            in_features = 512
        self.backbone = nn.Sequential(*list(self.backbone.children())[:-1])
        self.pool = nn.AdaptiveAvgPool2d((1, 1))
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(in_features, output_dim)
        self.gelu = nn.GELU()
        for param in self.backbone.parameters():
            param.requires_grad = False
        if unfreeze_blocks > 0:
            for module in list(self.backbone.children())[-unfreeze_blocks:]:
                for param in module.parameters():
                    param.requires_grad = True
        for param in self.fc.parameters():
            param.requires_grad = True

    def forward(self, x):
        x = self.backbone(x)
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = self.dropout(x)
        x = self.fc(x)
        x = self.gelu(x)
        return x

# =================== FUSION MODEL =================== #
class FusionClassifier(nn.Module):
    def __init__(self, text_dim=256, image_dim=256, hidden_dim=256, num_classes=3, dropout_rate=0.2, num_heads=4):
        super(FusionClassifier, self).__init__()
        self.text_proj = nn.Linear(text_dim, hidden_dim)
        self.image_proj = nn.Linear(image_dim, hidden_dim)
        self.attention = nn.MultiheadAttention(embed_dim=hidden_dim, num_heads=num_heads, dropout=dropout_rate, batch_first=True)
        self.norm1 = nn.LayerNorm(hidden_dim)
        self.norm2 = nn.LayerNorm(hidden_dim)
        self.gate = nn.Sequential(nn.Linear(hidden_dim * 2, hidden_dim), nn.Sigmoid())
        self.fusion_mlp = nn.Sequential(nn.Linear(hidden_dim * 2, hidden_dim), nn.GELU(), nn.Dropout(dropout_rate), nn.LayerNorm(hidden_dim))
        self.classifier = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2), nn.GELU(), nn.Dropout(dropout_rate),
            nn.LayerNorm(hidden_dim // 2), nn.Linear(hidden_dim // 2, num_classes))

    def forward(self, text_features, image_features):
        text_embed = self.text_proj(text_features)
        image_embed = self.image_proj(image_features)
        attn_output, _ = self.attention(text_embed.unsqueeze(1), image_embed.unsqueeze(1), image_embed.unsqueeze(1))
        attn_output = attn_output.squeeze(1)
        attn_output = self.norm1(attn_output + text_embed)
        concat = torch.cat([attn_output, image_embed], dim=1)
        gate = self.gate(concat)
        fused = gate * attn_output + (1 - gate) * image_embed
        fused = self.fusion_mlp(concat)
        fused = self.norm2(fused)
        return self.classifier(fused)

# =================== HUẤN LUYỆN =================== #
# THAM SỐ
BATCH_SIZE = 32
EPOCHS = 10
LR_TEXT_IMAGE = 2e-5
LR_FUSION = 1e-4
WEIGHT_DECAY = 1e-2
GRAD_CLIP = 1.0
TRAIN_DATA_DIR = "/content/dataset/data/train"
VAL_DATA_DIR = "/content/dataset/data/val"
CHECKPOINT_SAVE_DIR = "/content/checkpoints"

# Tokenizer & transform
tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base")
train_transform = transforms.Compose([transforms.RandomResizedCrop((224, 224)), transforms.RandomHorizontalFlip(),
                                      transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406],
                                                                                   [0.229, 0.224, 0.225])])
val_transform = transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor(),
                                    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])

train_dataset = MultimodalEmotionDataset(TRAIN_DATA_DIR, tokenizer, train_transform)
val_dataset = MultimodalEmotionDataset(VAL_DATA_DIR, tokenizer, val_transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

# Class weights
label_counts = Counter([label for _, label in train_dataset.samples])
class_counts = [label_counts[l] for l in ["tiêu cực", "trung tính", "tích cực"]]
class_weights = 1. / torch.tensor(class_counts, dtype=torch.float)
class_weights = class_weights / class_weights.sum() * 3

# Model
text_encoder = TextEncoder().to(DEVICE)
image_encoder = ImageEncoder().to(DEVICE)
fusion_model = FusionClassifier().to(DEVICE)

optimizer = AdamW([
    {'params': text_encoder.parameters(), 'lr': LR_TEXT_IMAGE},
    {'params': image_encoder.parameters(), 'lr': LR_TEXT_IMAGE},
    {'params': fusion_model.parameters(), 'lr': LR_FUSION}
], weight_decay=WEIGHT_DECAY)
loss_fn = nn.CrossEntropyLoss(weight=class_weights.to(DEVICE))
scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS, eta_min=1e-6)

# Hàm train và eval
def train_one_epoch(model, dataloader, loss_fn, device, text_encoder, image_encoder, optimizer):
    model.train()
    text_encoder.train()
    image_encoder.train()

    total_loss = 0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []

    for batch in dataloader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        images = batch["image"].to(device)
        labels = batch["label"].to(device)

        text_feat = text_encoder(input_ids, attention_mask)
        image_feat = image_encoder(images)
        logits = model(text_feat, image_feat)

        loss = loss_fn(logits, labels)
        total_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(
            list(text_encoder.parameters()) + list(image_encoder.parameters()) + list(model.parameters()),
            GRAD_CLIP
        )
        optimizer.step()

        preds = torch.argmax(logits, dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / len(dataloader)
    acc = correct / total * 100
    f1 = f1_score(all_labels, all_preds, average='weighted')
    return avg_loss, acc, f1


def evaluate_model(model, dataloader, loss_fn, device, text_encoder, image_encoder):
    model.eval()
    text_encoder.eval()
    image_encoder.eval()

    total_loss = 0
    correct = 0
    total = 0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            images = batch["image"].to(device)
            labels = batch["label"].to(device)

            text_feat = text_encoder(input_ids, attention_mask)
            image_feat = image_encoder(images)
            logits = model(text_feat, image_feat)

            loss = loss_fn(logits, labels)
            total_loss += loss.item()

            preds = torch.argmax(logits, dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / len(dataloader)
    acc = correct / total * 100
    f1 = f1_score(all_labels, all_preds, average='weighted')
    return avg_loss, acc, f1

# Training loop
best_val_f1 = 0.0
patience = 8
trigger_times = 0

for epoch in range(EPOCHS):
    train_loss, train_acc, train_f1 = train_one_epoch(fusion_model, train_loader, loss_fn, DEVICE,
                                                      text_encoder, image_encoder, optimizer)
    val_loss, val_acc, val_f1 = evaluate_model(fusion_model, val_loader, loss_fn, DEVICE,
                                               text_encoder, image_encoder)
    scheduler.step()

    print(f"Epoch {epoch+1} - Train F1: {train_f1:.4f} - Val F1: {val_f1:.4f}")
    if val_f1 > best_val_f1:
        best_val_f1 = val_f1
        os.makedirs(CHECKPOINT_SAVE_DIR, exist_ok=True)
        torch.save({
            'text_encoder': text_encoder,
            'image_encoder': image_encoder,
            'fusion_model': fusion_model
        }, os.path.join(CHECKPOINT_SAVE_DIR, "best_multimodal_model.pt"))
        print(f"✅ Lưu model tại epoch {epoch+1} với F1 = {val_f1:.4f}")
        trigger_times = 0
    else:
        trigger_times += 1
        if trigger_times >= patience:
            print("⛔️ Early stopping.")
            break


Epoch 1 - Train F1: 0.4591 - Val F1: 0.5994
✅ Lưu model tại epoch 1 với F1 = 0.5994
Epoch 2 - Train F1: 0.6678 - Val F1: 0.6406
✅ Lưu model tại epoch 2 với F1 = 0.6406
Epoch 3 - Train F1: 0.7928 - Val F1: 0.7090
✅ Lưu model tại epoch 3 với F1 = 0.7090
Epoch 4 - Train F1: 0.8199 - Val F1: 0.7080
Epoch 5 - Train F1: 0.8797 - Val F1: 0.7269
✅ Lưu model tại epoch 5 với F1 = 0.7269
Epoch 6 - Train F1: 0.9137 - Val F1: 0.7086
Epoch 7 - Train F1: 0.9354 - Val F1: 0.7001
Epoch 8 - Train F1: 0.9482 - Val F1: 0.7300
✅ Lưu model tại epoch 8 với F1 = 0.7300
Epoch 9 - Train F1: 0.9548 - Val F1: 0.7112
Epoch 10 - Train F1: 0.9677 - Val F1: 0.7149


In [10]:
import torch

checkpoint = torch.load("/content/checkpoints/best_multimodal_model.pt", map_location="cpu", weights_only=False)

text_encoder = checkpoint["text_encoder"].eval()
image_encoder = checkpoint["image_encoder"].eval()
fusion_model = checkpoint["fusion_model"].eval()


In [11]:
from PIL import Image
from transformers import AutoTokenizer
from torchvision import transforms
import torch

ID = "1022"
test_dir = "/content/dataset/data/test"
tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base")

# Chuẩn bị ảnh
image_path = f"{test_dir}/images/{ID}.jpg"
image = Image.open(image_path).convert("RGB")
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
image_tensor = transform(image).unsqueeze(0)  # shape [1, 3, 224, 224]

# Chuẩn bị text
with open(f"{test_dir}/texts/{ID}.txt", encoding='utf-8') as f:
    text = f.read().strip()

encoded = tokenizer(text, padding="max_length", truncation=True,
                    max_length=100, return_tensors="pt")
input_ids = encoded["input_ids"]  # shape [1, 100]
attention_mask = encoded["attention_mask"]


In [12]:
with torch.no_grad():
    text_feat = text_encoder(input_ids, attention_mask)
    image_feat = image_encoder(image_tensor)
    logits = fusion_model(text_feat, image_feat)
    predicted = torch.argmax(logits, dim=1).item()

# Mapping ID to label
id2label = {0: "tiêu cực", 1: "trung tính", 2: "tích cực"}
print(f"🖼️ ID = {ID}")
print(f"📄 Văn bản: {text}")
print(f"🔮 Dự đoán cảm xúc: {id2label[predicted]}")


🖼️ ID = 1022
📄 Văn bản: tôi không nói nên lời ... chapelhillshooting muslimlivesmatter usmedia
🔮 Dự đoán cảm xúc: tích cực


In [14]:
import torch

checkpoint = torch.load("/content/checkpoints/best_multimodal_model.pt", map_location="cuda" if torch.cuda.is_available() else "cpu", weights_only=False)
text_encoder = checkpoint['text_encoder'].to("cuda" if torch.cuda.is_available() else "cpu").eval()
image_encoder = checkpoint['image_encoder'].to("cuda" if torch.cuda.is_available() else "cpu").eval()
fusion_model = checkpoint['fusion_model'].to("cuda" if torch.cuda.is_available() else "cpu").eval()


In [15]:
import torch
from sklearn.metrics import accuracy_score, classification_report

# Thiết lập thiết bị
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Đưa mô hình về đúng thiết bị và chuyển sang chế độ eval
text_encoder = checkpoint['text_encoder'].to(device).eval()
image_encoder = checkpoint['image_encoder'].to(device).eval()
fusion_model = checkpoint['fusion_model'].to(device).eval()

# Khởi tạo danh sách lưu kết quả
y_true = []
y_pred = []

# Vòng lặp đánh giá
with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        images = batch['image'].to(device)
        labels = batch['label'].to(device)

        # Trích xuất đặc trưng và dự đoán
        text_feat = text_encoder(input_ids, attention_mask)
        image_feat = image_encoder(images)
        outputs = fusion_model(text_feat, image_feat)
        preds = torch.argmax(outputs, dim=1)

        # Lưu kết quả
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(preds.cpu().numpy())

# In kết quả đánh giá
print("Accuracy:", accuracy_score(y_true, y_pred))
print("Classification Report:")
print(classification_report(y_true, y_pred, target_names=["tiêu cực", "trung tính", "tích cực"]))


Accuracy: 0.7475247524752475
Classification Report:
              precision    recall  f1-score   support

    tiêu cực       0.65      0.61      0.63        54
  trung tính       0.63      0.67      0.65        36
    tích cực       0.83      0.84      0.84       112

    accuracy                           0.75       202
   macro avg       0.70      0.71      0.70       202
weighted avg       0.75      0.75      0.75       202

