In [2]:
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
from torchmetrics import AUROC
import torchaudio
from torchaudio.transforms import MelSpectrogram, Resample
from transformers.pytorch_utils import Conv1D


import transformers
from transformers import (
    LlamaConfig, 
    LlamaModel, 
    LlamaTokenizer, 
    AutoTokenizer, 
    AutoModelForCausalLM, 
    AutoModel
)
from peft import (
    LoraConfig, 
    IA3Config, 
    get_peft_model, 
    TaskType
)



In [3]:
OPERA_CT_TARGET_MODULES = ["proj"]
OPERA_CE_TARGET_MODULES = ["conv", "fc", "linear"]
target_module_dict = {
    "operaCT": OPERA_CT_TARGET_MODULES, 
    "operaCE": OPERA_CE_TARGET_MODULES
}

In [17]:
def load_audio_to_spectrogram(
    audio_file_path: str,
    target_sr: int = 16000,
    n_fft: int = 400,
    hop_length: int = 160,
    n_mels: int = 80
) -> torch.Tensor:
    waveform, sr = torchaudio.load(audio_file_path)
    if waveform.shape[0] > 1:
        waveform = waveform.mean(dim=0, keepdim=True)
    if sr != target_sr:
        resampler = Resample(sr, target_sr)
        waveform = resampler(waveform)
    mel_transform = MelSpectrogram(
        sample_rate=target_sr,
        n_fft=n_fft,
        hop_length=hop_length,
        n_mels=n_mels
    )
    mel_spec = mel_transform(waveform)
    return mel_spec


In [None]:
def load_data_from_folder(folder_path, prompt="Default prompt", context="Default context"):
    data = []
    for file_name in os.listdir(folder_path):
        if file_name.lower().endswith(".wav"):
            file_path = os.path.join(folder_path, file_name)
            try:
                spectrogram = load_audio_to_spectrogram(file_path)
                if spectrogram.dim() == 2:
                    spectrogram = spectrogram.unsqueeze(0)
                spectrogram = spectrogram.to(torch.float32)
                
                # TÁCH LABEL TỪ TÊN FILE
                try:
                    # Giả sử tên file có dạng "Hxxx_R3.wav" hoặc "Hxxx_L1.wav"
                    # Bước 1: Tách theo dấu "_"
                    #   file_name.split("_") => ["H050", "R3.wav"] hoặc ["H002", "L1.wav"]
                    splitted = file_name.split("_")
                    
                    # Bước 2: Lấy phần [1], rồi bỏ đuôi ".wav"
                    #   e.g. splitted[1] = "R3.wav" => side_and_label = "R3"
                    side_and_label = splitted[1].split(".")[0]  # "R3" hoặc "L1"
                    
                    # Bước 3: Bỏ ký tự 'R' hoặc 'L', lấy phần số còn lại
                    #   side_and_label[1:] => "3" hoặc "1"
                    label_str = side_and_label[1:]
                    label = int(label_str)
                except Exception as e:
                    print(f"Không thể trích xuất label từ tên file {file_name}, sử dụng label mặc định 0. Lỗi: {e}")
                    label = 0

                # Đóng gói (spectrogram, prompt, context, label) vào data
                data.append((spectrogram, prompt, context, label))
            
            except Exception as e:
                print(f"Lỗi khi xử lý file {file_path}: {e}")
    return data

In [6]:
class OperaCTEncoder(nn.Module):
    def __init__(self, in_dim=80, out_dim=256):
        super().__init__()
        # Conv1D của transformers có tham số (nf, nx) với nx = in_dim, nf = out_dim
        self.qkv = Conv1D(out_dim, in_dim)
        self.proj = nn.Linear(out_dim, out_dim)

    def forward(self, x):
        # x có shape: (B, in_dim, time)
        x = x.permute(0, 2, 1)  # Đổi thành (B, time, in_dim)
        x = self.qkv(x)         # => (B, time, out_dim)
        x = x.permute(0, 2, 1)  # => (B, out_dim, time)
        x = torch.relu(x)
        x = x.mean(dim=-1)      # Global average pooling theo chiều thời gian: (B, out_dim)
        x = self.proj(x)        # => (B, out_dim)
        return x

    def forward_window(self, x):
        return self.forward(x)


In [7]:
from openai import OpenAI

In [None]:
client = OpenAI(
    # This is the default and can be omitted
    api_key = "API-KEY-HERE",
)

In [9]:
class RespChatGPT(nn.Module):
    def __init__(self, configs):
        super(RespChatGPT, self).__init__()

        self.n_cls = configs.n_cls
        self.use_audio = configs.use_audio
        self.head_dropout = configs.head_dropout
        
        # Sử dụng enc_dim = 256 cho audio encoding
        self.d_audio = configs.enc_dim  # = 256
        
        # Khởi tạo encoder với in_dim = số channel của Mel (80), out_dim = 256
        self.base_audio_encoder = OperaCTEncoder(
            in_dim=80,
            out_dim=self.d_audio
        )
        
        # Cấu hình PEFT cho audio encoder
        if configs.audio_peft == "lora":
            peft_config = LoraConfig(
                r=configs.audio_lora_rank,
                lora_alpha=32,
                lora_dropout=0.1,
                target_modules=target_module_dict[configs.audio_encoder]
            )
        elif configs.audio_peft == "IA3":
            peft_config = IA3Config(
                target_modules=target_module_dict[configs.audio_encoder],
                feedforward_modules=['proj']
            )
        else:
            raise NotImplementedError("Audio fine-tuning mode undefined")
        
        self.audio_encoder = get_peft_model(self.base_audio_encoder, peft_config)
        trainable_params = sum(p.numel() for p in self.audio_encoder.parameters() if p.requires_grad)
        print("Audio encoder trainable parameters:", trainable_params)
        
        # Aligner có đầu vào là d_audio (256)
        self.aligner = nn.Linear(self.d_audio, 1)
        
    def forward(self, x_spectrogram, x_prompt, x_context):
        if self.use_audio:
            # x_spectrogram được kỳ vọng có shape (B, 80, T)
            x_enc = self.audio_encoder(x_spectrogram)  # => (B, 256)
            summary_val = self.aligner(x_enc).mean().item()  # Tính giá trị trung bình scalar
            audio_summary = f"Audio summary value: {summary_val:.2f}"
        else:
            audio_summary = "No audio provided."
        
        messages = [
            {"role": "system", "content": "You are a helpful classification assistant."},
            {"role": "user", "content": (
                f"Prompt: {x_prompt}\n"
                f"Context: {x_context}\n"
                f"{audio_summary}\n"
                f"Please classify the input into one of {self.n_cls} classes. "
                "Respond with the class index only."
            )}
        ]
        
        # Gọi ChatGPT API
        response = client.chat.completions.create(
            model="gpt-4o",  # Bạn có thể thay đổi model nếu cần
            messages=messages
        )
        answer = response.choices[0].message.content.strip()
        return answer


In [10]:
class Config:
    n_cls = 10
    d_ff = 512
    llm_dim = 1024         # Không dùng với ChatGPT API
    audio_peft = "lora"
    enc_dim = 256          # Kích thước output mong muốn của audio encoder
    patch_nums = 1
    llm_peft = "frozen"     # Không dùng với ChatGPT API
    llm_lora_rank = 8
    llm_lora_alpha = 32
    llm_lora_dropout = 0.1
    use_audio = True
    llm_model = "chatgpt"   # Sử dụng ChatGPT API
    audio_lora_rank = 4
    audio_encoder = "operaCT"
    aligner = "projection"
    head_dropout = 0.1

In [19]:
def evaluate_accuracy(model, data):
    model.eval()  # Đưa model về chế độ evaluation
    correct = 0
    total = 0

    with torch.no_grad():
        for spectrogram, prompt, context, label in data:
            if spectrogram.dim() == 2:
                spectrogram = spectrogram.unsqueeze(0)
            spectrogram = spectrogram.to(torch.float32)

            output = model(spectrogram, prompt, context)
            try:
                predicted = int(output.strip())
            except Exception as e:
                print(f"Lỗi khi chuyển output sang int: {e}")
                predicted = -1

            if predicted == label:
                correct += 1
            total += 1

    accuracy = correct / total if total > 0 else 0
    print(f"Evaluation Accuracy: {accuracy * 100:.2f}%")
    return accuracy

In [None]:
if __name__ == "__main__":
    # Khởi tạo model và cấu hình
    configs = Config()
    model = RespChatGPT(configs)
    
    # Đường dẫn folder chứa các file .wav của bạn
    folder_path = "RespiratoryDatabase@TR"  # Thay đổi đường dẫn cho phù hợp

    # Load dữ liệu từ folder
    eval_data = load_data_from_folder(
        folder_path,
        prompt="Đây là prompt thử nghiệm.",
        context="Đây là context thử nghiệm."
    )
    print(f"Đã tải {len(eval_data)} mẫu từ folder {folder_path}.")
    
    # Đánh giá accuracy trên data từ folder
    evaluate_accuracy(model, eval_data)

Audio encoder trainable parameters: 2048
Đã tải 503 mẫu từ folder RespiratoryDatabase@TR.
Lỗi khi chuyển output sang int: invalid literal for int() with base 10: 'The task requires classifying whether the participant has COVID-19 based on audio of their exhalation sounds. Given the context and the audio summary value, you should classify the input into one of 
Lỗi khi chuyển output sang int: invalid literal for int() with base 10: "The task description specifies a binary classification to determine if a participant has COVID-19 or not, which suggests only two classes (1 for COVID-19 and 0 for non-COVID-19). However, the prompt 
Lỗi khi chuyển output sang int: invalid literal for int() with base 10: "I'm sorry, but it seems there's a discrepancy between your task description and the instruction to classify into one of 10 classes. Could you please clarify whether you want to classify the presence 
Lỗi khi chuyển output sang int: invalid literal for int() with base 10: "I'm sorry, but the

KeyboardInterrupt: 

In [18]:
if __name__ == "__main__":
    configs = Config()
    model = RespChatGPT(configs)
    
    # Load Mel spectrogram: ban đầu có thể có shape (80, time_steps)
    spectrogram = load_audio_to_spectrogram(audio_file_path)
    print("Mel spectrogram shape:", spectrogram.shape)
    
    # Nếu spectrogram có shape (80, T) -> thêm batch dimension -> (1, 80, T)
    if spectrogram.dim() == 2:
        spectrogram = spectrogram.unsqueeze(0)
    
    spectrogram = spectrogram.to(torch.float32)
    print("Input to model shape:", spectrogram.shape)
    
    x_prompt = "Đây là một prompt thử nghiệm."
    x_context = "Đây là phần context thử nghiệm."
    
    # Gọi model (PyTorch sẽ tự gọi forward)
    output = model(spectrogram, x_prompt, x_context)
    print("ChatGPT classification output:", output)

Audio encoder trainable parameters: 2048
Mel spectrogram shape: torch.Size([1, 80, 2636])
Input to model shape: torch.Size([1, 80, 2636])
ChatGPT classification output: 3


In [19]:
import torch
from sklearn.metrics import accuracy_score

In [20]:
def evaluate_accuracy(model, data_loader, device='cpu'):
    model.to(device)
    model.eval()

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in data_loader:
            spectrograms, prompts, contexts, labels = batch
            spectrograms = spectrograms.to(device)

            # Thực hiện inference
            preds = []
            for spec, prompt, context in zip(spectrograms, prompts, contexts):
                spec = spec.unsqueeze(0)  # Thêm batch dimension nếu cần thiết
                output = model(spec, prompt, context)
                try:
                    preds.append(int(output))
                except ValueError:
                    preds.append(-1)  # Gán nhãn -1 nếu response không phải là số hợp lệ

            all_preds.extend(preds)
            all_labels.extend(labels.numpy())

    # Tính accuracy (loại bỏ những prediction không hợp lệ)
    valid_preds_labels = [(pred, label) for pred, label in zip(all_preds, all_labels) if pred != -1]

    if not valid_preds_labels:
        print("No valid predictions were made.")
        return 0

    valid_preds, valid_labels = zip(*valid_preds_labels)
    accuracy = accuracy_score(valid_labels, valid_preds)

    return accuracy
