### Chuẩn bị môi trường

In [None]:
!pip install transformers datasets torchaudio torch librosa soundfile numpy scikit-learn matplotlib seaborn wandb tqdm

### Chuẩn bị dữ liệu

In [None]:
import os
import shutil
import random

def split_dataset(input_dir, output_dir, train_ratio=0.8, val_ratio=0.05, test_ratio=0.15):
    """
    Chia dữ liệu thành train, validation, test với số lượng file mỗi lớp bằng nhau.
    :param input_dir: Thư mục gốc chứa dữ liệu (các thư mục con là nhãn).
    :param output_dir: Thư mục lưu trữ kết quả chia (train, dev, test).
    :param train_ratio: Tỷ lệ file cho tập train.
    :param val_ratio: Tỷ lệ file cho tập dev.
    :param test_ratio: Tỷ lệ file cho tập test.
    """
    # Kiểm tra và tạo các thư mục đầu ra
    for split in ['train', 'validation', 'test']:
        split_dir = os.path.join(output_dir, split)
        os.makedirs(split_dir, exist_ok=True)

    # Lặp qua từng nhãn
    for label in os.listdir(input_dir):
        label_dir = os.path.join(input_dir, label)
        if not os.path.isdir(label_dir):
            continue

        files = [f for f in os.listdir(label_dir) if f.endswith('.wav')]
        if len(files) < 3:  # Kiểm tra đủ số file tối thiểu
            print(f"Label '{label}' không đủ dữ liệu để chia.")
            continue

        random.shuffle(files)  # Xáo trộn ngẫu nhiên file

        # Tính số lượng file cho từng tập
        train_count = int(len(files) * train_ratio)
        val_count = int(len(files) * val_ratio)
        test_count = len(files) - train_count - val_count

        # Đảm bảo tổng số file khớp
        assert train_count + val_count + test_count == len(files), "Tổng số file không khớp!"

        # Chia file
        train_files = files[:train_count]
        val_files = files[train_count:train_count + val_count]
        test_files = files[train_count + val_count:]

        # Copy file vào thư mục tương ứng
        for split, split_files in zip(['train', 'validation', 'test'], [train_files, val_files, test_files]):
            split_label_dir = os.path.join(output_dir, split, label)
            os.makedirs(split_label_dir, exist_ok=True)
            for file_name in split_files:
                shutil.copy(os.path.join(label_dir, file_name), os.path.join(split_label_dir, file_name))

        print(f"Đã chia nhãn '{label}': {len(train_files)} train, {len(val_files)} validation, {len(test_files)} test.")

# Đường dẫn thư mục gốc chứa dữ liệu
input_directory = "/content/drive/MyDrive/Colab_Notebooks/augmented_data"

# Đường dẫn thư mục đầu ra
output_directory = "/content/splited_data"

# Gọi hàm chia dữ liệu
split_dataset(input_directory, output_directory)


In [None]:
data_dir = "/kaggle/input/my-data/augmented_data"

In [None]:
import os

# Lấy danh sách tên thư mục (câu lệnh)
commands = sorted([folder for folder in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, folder))])

print("Danh sách câu lệnh:", commands)


In [None]:
# Gán nhãn số cho từng câu lệnh
label2id = {command: idx for idx, command in enumerate(commands)}

# In ra ánh xạ
print("Label mapping:", label2id)

In [None]:
# Tạo ánh xạ ngược từ label2id
id2label = {idx: command for command, idx in label2id.items()}

# In ra ánh xạ ngược
print("Reverse Label mapping:", id2label)

In [None]:
from transformers import Wav2Vec2ForSequenceClassification

# Tải mô hình pre-trained
model = Wav2Vec2ForSequenceClassification.from_pretrained(
    "nguyenvulebinh/wav2vec2-base-vietnamese-250h",
    num_labels=len(commands),  # Số câu lệnh
    label2id=label2id,
    id2label=id2label
)


In [None]:
print(model)

In [None]:
# In thông tin từng lớp
for name, param in model.named_parameters():
    print(f"{name}: {param.shape} - Trainable: {param.requires_grad}")

In [None]:
import json

# Lưu ánh xạ nhãn
with open("label_mapping.json", "w") as f:
    json.dump({"label2id": label2id, "id2label": id2label}, f)

# Tải lại khi cần
with open("label_mapping.json", "r") as f:
    mapping = json.load(f)


In [None]:
import os

def load_data(data_dir, label2id):
    """
    Duyệt qua thư mục dữ liệu để lấy danh sách file và nhãn.
    :param data_dir: Đường dẫn tới thư mục dữ liệu.
    :param label2id: Ánh xạ câu lệnh sang nhãn số.
    :return: Danh sách các mẫu dữ liệu (gồm đường dẫn file và nhãn số).
    """
    data = []

    for label in os.listdir(data_dir):
        label_dir = os.path.join(data_dir, label)

        # Kiểm tra nếu đây là thư mục
        if os.path.isdir(label_dir):
            for file_name in os.listdir(label_dir):
                if file_name.endswith(".wav"):  # Chỉ lấy file .wav
                    file_path = os.path.join(label_dir, file_name)
                    data.append({"path": file_path, "label": label2id[label]})

    return data

# Ánh xạ câu lệnh sang nhãn số
#commands = ["mở", "đóng", "lên", "xuống", "trái", "phải", "bật", "tắt", "dừng", "tiếp tục", "nhanh", "chậm", "cao", "thấp", "dừng lại"]
label2id = {command: idx for idx, command in enumerate(commands)}

# Load dữ liệu từ thư mục
#data_dir = "data/"  # Đường dẫn tới thư mục dữ liệu
dataset = load_data(data_dir, label2id)

print(f"Số lượng mẫu: {len(dataset)}")
print(f"Một mẫu dữ liệu: {dataset[0]}")


In [None]:
from datasets import Dataset, ClassLabel

# Chuyển đổi danh sách dữ liệu thành Dataset
hf_dataset = Dataset.from_list(dataset)

print(hf_dataset)


In [None]:
# Get unique labels from the 'label' column
labels = hf_dataset.unique("label")

# Create a ClassLabel object with the names of the classes
class_label = ClassLabel(names=labels)

# Cast the 'label' column to ClassLabel using the created object
hf_dataset = hf_dataset.cast_column("label", class_label)

In [None]:
# Chia thành tập train (80%) và test (20%) với stratify
train_test = hf_dataset.train_test_split(test_size=0.2, seed=42, stratify_by_column="label")

# Tiếp tục chia tập test thành validation (50%) và test (50%) với stratify
val_test = train_test["test"].train_test_split(test_size=0.5, seed=42, stratify_by_column="label")

# Gộp lại thành dataset hoàn chỉnh
final_dataset = {
    "train": train_test["train"],
    "validation": val_test["train"],
    "test": val_test["test"]
}

print(final_dataset)


In [None]:
from collections import Counter

# Kiểm tra số lượng nhãn trong từng tập
print("Train labels:", Counter(final_dataset["train"]["label"]))
print("Validation labels:", Counter(final_dataset["validation"]["label"]))
print("Test labels:", Counter(final_dataset["test"]["label"]))

In [None]:
import os

def load_data_from_split(data_dir, label2id, split):
    """
    Load dữ liệu từ một tập dữ liệu đã được chia sẵn (train, validation, test).
    :param data_dir: Đường dẫn tới thư mục chứa tập dữ liệu.
    :param label2id: Ánh xạ câu lệnh sang nhãn số.
    :param split: Tên của tập dữ liệu (train, validation, test).
    :return: Danh sách các mẫu dữ liệu (gồm đường dẫn file và nhãn số).
    """
    data = []
    split_dir = os.path.join(data_dir, split)  # Đường dẫn tới thư mục của tập dữ liệu

    for label in os.listdir(split_dir):
        label_dir = os.path.join(split_dir, label)

        # Kiểm tra nếu đây là thư mục
        if os.path.isdir(label_dir):
            for file_name in os.listdir(label_dir):
                if file_name.endswith(".wav"):  # Chỉ lấy file .wav
                    file_path = os.path.join(label_dir, file_name)
                    data.append({"path": file_path, "label": label2id[label]})

    return data

In [None]:
from datasets import Dataset, ClassLabel

# Đường dẫn tới thư mục chứa các tập dữ liệu đã được chia
split_data_dir = "/kaggle/input/datn-data-12/Augmented_data"  # Thay bằng đường dẫn thực tế

# Load dữ liệu cho từng tập
train_dataset = load_data_from_split(split_data_dir, label2id, "Train")
validation_dataset = load_data_from_split(split_data_dir, label2id, "Validation")
test_dataset = load_data_from_split(split_data_dir, label2id, "Test")

# Chuyển đổi sang Hugging Face Dataset
train_dataset = Dataset.from_list(train_dataset)
validation_dataset = Dataset.from_list(validation_dataset)
test_dataset = Dataset.from_list(test_dataset)

# Tạo dataset hoàn chỉnh
final_dataset = {
    "train": train_dataset,
    "validation": validation_dataset,
    "test": test_dataset
}

In [None]:
print(final_dataset["train"][0])

In [None]:
for param in model.wav2vec2.feature_extractor.parameters():
    param.requires_grad = False

for param in model.wav2vec2.feature_projection.parameters():
    param.requires_grad = False

# for layer in model.wav2vec2.encoder.layers[:-4]:  # Freeze tất cả trừ 4 lớp cuối của Transformer
#     for param in layer.parameters():
#         param.requires_grad = False

In [None]:
# Đóng băng toàn bộ Wav2Vec2
for param in model.wav2vec2.parameters():
    param.requires_grad = False

# Chỉ huấn luyện tầng phân loại
for param in model.classifier.parameters():
    param.requires_grad = True


### Fine-tuning mô hình

In [None]:
import torchaudio
from transformers import Wav2Vec2Processor

# 1. Khởi tạo Wav2Vec2Processor
processor = Wav2Vec2Processor.from_pretrained("nguyenvulebinh/wav2vec2-base-vietnamese-250h")

# 2. Hàm đọc file âm thanh và chuyển đổi thành tensor
def speech_file_to_array_fn(batch):
    # Khởi tạo các list rỗng để lưu trữ dữ liệu
    speech_list = []
    sampling_rate_list = []
    target_text_list = []

    # Iterate through each item in the batch
    for i in range(len(batch["path"])):
        speech_array, sampling_rate = torchaudio.load(batch["path"][i])  # Load each path individually
        speech_list.append(speech_array[0].numpy())
        sampling_rate_list.append(sampling_rate)
        target_text_list.append(batch["label"][i])

    # Thêm các list vào batch dictionary
    batch["speech"] = speech_list
    batch["sampling_rate"] = sampling_rate_list
    batch["target_text"] = target_text_list

    return batch

In [None]:
import torchaudio
import numpy as np

max_length_overall = 0

for split_name, dataset in final_dataset.items():
    for example in dataset:
        try:
            waveform, _ = torchaudio.load(example["path"])
            audio_length = waveform.shape[1]
            max_length_overall = max(max_length_overall, audio_length)
        except Exception as e:
            print(f"Không thể load file {example['path']} in {split_name}: {e}")

print(f"Độ dài tối đa của file audio trong cả 3 tập: {max_length_overall}")

In [None]:

# 3. Hàm tiền xử lý
def preprocess_function(examples):
    audio_arrays = [x for x in examples["speech"]]
    # Find the maximum length in the current batch
    #max_len = max(len(x) for x in audio_arrays)
    # Pad to this maximum length
    inputs = processor(
        audio_arrays,
        sampling_rate=16000,
        padding="max_length",  # Use max_length padding
        max_length=max_length_overall,     # Use the maximum length in the batch
        truncation=True,        # Truncate if exceeding max_length
        return_tensors="pt"
    )
    inputs["labels"] = examples["target_text"]
    return inputs

# 4. Áp dụng các hàm tiền xử lý lên tập dữ liệu
final_dataset = {
    "train": final_dataset["train"].map(speech_file_to_array_fn, batched=True, remove_columns=["label"]),
    "validation": final_dataset["validation"].map(speech_file_to_array_fn, batched=True, remove_columns=["label"]),
    "test": final_dataset["test"].map(speech_file_to_array_fn, batched=True, remove_columns=["label"])
}

In [None]:
final_dataset = {
    "train": final_dataset["train"].map(preprocess_function, batched=True, remove_columns=['speech', 'sampling_rate', 'target_text']),
    "validation": final_dataset["validation"].map(preprocess_function, batched=True, remove_columns=['speech', 'sampling_rate', 'target_text']),
    "test": final_dataset["test"].map(preprocess_function, batched=True, remove_columns=['speech', 'sampling_rate', 'target_text'])
}

In [None]:
import numpy as np
max_length = 0
for example in final_dataset["train"]:
    input_values = np.array(example["input_values"])  # Chuyển đổi sang NumPy array
    input_length = input_values.shape[0]
    max_length = max(max_length, input_length)

print(f"Độ dài tối đa của chuỗi sau xử lý: {max_length}")

In [None]:
import os
import shutil
os.environ['WANDB_DISABLED'] = 'true'
import torch
from transformers import TrainingArguments, Trainer, TrainerCallback

# class DeleteOldCheckpointsCallback(TrainerCallback):
#     def __init__(self, save_total_limit=1):
#         self.save_total_limit = save_total_limit

#     def on_save(self, args: TrainingArguments, state, control, **kwargs):
#         """
#         Được gọi sau mỗi khi Trainer lưu checkpoint.
#         """
#         checkpoint_folder = os.path.join(args.output_dir, "checkpoint-*")
#         checkpoints = [
#             os.path.join(args.output_dir, d)
#             for d in os.listdir(args.output_dir)
#             if os.path.isdir(os.path.join(args.output_dir, d)) and d.startswith("checkpoint-")
#         ]
#         checkpoints.sort(key=lambda x: int(x.split("-")[-1]))  # Sắp xếp theo step
#         if len(checkpoints) > self.save_total_limit:
#             for checkpoint in checkpoints[:-self.save_total_limit]:
#                 shutil.rmtree(checkpoint)
#                 #print(f"Deleted checkpoint: {checkpoint}")

def compute_metrics(pred):
    """
    Tính toán độ chính xác của mô hình.
    """
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    acc = (labels == preds).sum().item() / len(labels)
    return {"accuracy": acc}

# Định nghĩa các tham số huấn luyện
training_args = TrainingArguments(
    output_dir="./results",          # Thư mục lưu kết quả
    per_device_train_batch_size=32,  # Kích thước batch cho training
    per_device_eval_batch_size=64,   # Kích thước batch cho evaluation
    num_train_epochs=30,              # Số epochs
    evaluation_strategy="epoch",     # Đánh giá sau mỗi epoch
    save_strategy="epoch",           # Lưu model sau mỗi epoch
    save_total_limit=1,              # Chỉ giữ lại 1 checkpoint mới nhất
    learning_rate=2e-5,              # Tốc độ học
    weight_decay=0.01,               # Weight decay
    logging_dir="./logs",            # Thư mục lưu logs
    load_best_model_at_end=True,    # Tải model tốt nhất sau khi huấn luyện xong
    metric_for_best_model="accuracy", # Metric để đánh giá model tốt nhất
    push_to_hub=False,               # Không push model lên Hugging Face Hub
    logging_strategy="no",           # Tắt ghi logs
)

# Khởi tạo Trainer
trainer = Trainer(
    model=model,                         # Mô hình đã tải
    args=training_args,                  # Tham số huấn luyện
    # callbacks=[DeleteOldCheckpointsCallback(save_total_limit=1)],
    train_dataset=final_dataset["train"], # Tập dữ liệu train
    eval_dataset=final_dataset["validation"], # Tập dữ liệu validation
    compute_metrics=compute_metrics,    # Hàm tính toán metrics
)

In [None]:
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total parameters: {total_params}")
print(f"Trainable parameters: {trainable_params}")


In [None]:
from torch.utils.data import DataLoader

train_dataloader = DataLoader(
    dataset=train_dataset,
    batch_size=32,  # Batch size có thể tăng
    shuffle=True,
    num_workers=4  # Tăng số worker để tải dữ liệu nhanh hơn
)
print(f"Train DataLoader num_workers: {train_dataloader.num_workers}")


In [None]:
trainer.train()

### Đánh giá mô hình

In [None]:
# Đánh giá mô hình trên tập test
predictions = trainer.predict(final_dataset["test"])

# Lấy accuracy từ kết quả đánh giá
accuracy = predictions.metrics["test_accuracy"]

print(f"Accuracy trên tập test: {accuracy*100}%")

In [None]:
history = trainer.state.log_history

In [None]:
train_loss = [log['loss'] for log in history if 'loss' in log]
train_accuracy = [log['eval_accuracy'] for log in history if 'eval_accuracy' in log]

In [None]:
import matplotlib.pyplot as plt

history = trainer.state.log_history

# Trích xuất dữ liệu
train_loss = [log['loss'] for log in history if 'loss' in log]
eval_loss = [log['eval_loss'] for log in history if 'eval_loss' in log]
train_accuracy = [log['eval_accuracy'] for log in history if 'eval_accuracy' in log]

# Vẽ đồ thị Loss
plt.figure(figsize=(10, 5))  # Điều chỉnh kích thước đồ thị nếu cần
plt.plot(train_loss, label='Training Loss')
plt.plot(eval_loss, label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Vẽ đồ thị Accuracy
plt.figure(figsize=(10, 5))  # Điều chỉnh kích thước đồ thị nếu cần
plt.plot(train_accuracy, label='Validation Accuracy')
plt.title('Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Lấy nhãn dự đoán và nhãn thực tế
predicted_labels = predictions.predictions.argmax(-1)
true_labels = predictions.label_ids

# Tính toán confusion matrix
cm = confusion_matrix(true_labels, predicted_labels)

# Vẽ confusion matrix bằng seaborn
plt.figure(figsize=(10, 10))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
            xticklabels=model.config.id2label.values(),
            yticklabels=model.config.id2label.values())
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()

### Lưu mô hình

In [None]:
!du -sh /kaggle/working/*

In [None]:
!rm -rf /kaggle/working/logs
!rm -rf /kaggle/working/results


In [None]:
trainer.model.save_pretrained('/kaggle/working/wav2vec2_model')

In [None]:
from transformers import Wav2Vec2CTCTokenizer

tokenizer = Wav2Vec2CTCTokenizer.from_pretrained('nguyenvulebinh/wav2vec2-base-vietnamese-250h')
tokenizer.save_pretrained('/kaggle/working/wav2vec2_model')

In [None]:
!zip -r /kaggle/working/wav2vec2_model.zip /kaggle/working/wav2vec2_model