In [None]:
import numpy as np
import pandas as pd
import transformers
from transformers import AutoTokenizer, RobertaModel, PhobertTokenizer
import torch
import py_vncorenlp
from sklearn.feature_extraction.text import TfidfVectorizer

In [None]:
filepath = r""
df = pd.read_csv(filepath, index_col=False)

In [None]:
df

In [None]:
class2id = {class_: id for id, class_ in enumerate(df['Department'].unique())}
def Labeling(label, multiclass=True):
    """Encoding text label

    Args:
        label (str): The label in the string format
        multiclass (bool, optional): True for Multiclass labeling, otherwise Multilabel. Defaults to True.

    Returns:
        int or list[int]: transformed label
    """
    # Multiclass
    if multiclass: return class2id[label]
    
    # MultiLabel if multiclass = False
    multi_label = [0] * len(class2id)
    multi_label[class2id[label]] = 1
    
    return multi_label

df['Label'] = df['Department'].apply(Labeling)

In [None]:
phobert_model = RobertaModel.from_pretrained("vinai/phobert-base")
tokenizer = PhobertTokenizer.from_pretrained("vinai/phobert-base")
device = torch.device("cuda" if torch.accelerator.is_available() else "cpu")
phobert_model.to(device)

In [None]:
class TfidfMatrix:
    _instance = None

    @classmethod
    def get_instance(cls, corpus=None):
        """
        Returns the singleton instance of TfidfVectorizer. If the instance doesn't exist,
        it is created and optionally fitted on the provided corpus.
        
        Args:
            corpus (iterable, optional): An iterable of text documents used to fit the vectorizer.
                                          This is only used when creating the instance for the first time
        """
        if cls._instance is None:
            cls._instance = TfidfVectorizer()
            if corpus is not None and len(corpus) > 0:
                cls._instance.fit(corpus)
            else:
                pass
        return cls._instance
    

def transform(sentence, pooling='mean', state='static', vncore=False, tfidf_weight=False):
    """
    Chuyển đổi một câu thành vector biểu diễn sử dụng PhoBERT.
    
    Hàm này thực hiện các bước:
      - Tiền xử lý câu: Có thể dùng VnCoreNLP để tách từ nếu cần.
      - Token hóa câu: Chuyển câu thành các token dưới dạng tensor.
      - Trích xuất embedding: Sử dụng pooling trung bình hoặc lấy token [CLS] dựa vào tham số pooling.
      - Tùy chọn: Áp dụng trọng số TF-IDF cho các embedding.
      
    Tham số:
        sentence (str): Câu cần chuyển đổi.
        pooling (str, tùy chọn): Chiến lược pooling ('mean' để tính trung bình, 'cls' để dùng token [CLS]).
        state (str, tùy chọn): Chế độ embedding khi dùng mean pooling ('static' cho embedding tiền huấn luyện, 'context' cho embedding ngữ cảnh). 
                                Lưu ý: Bị bỏ qua nếu pooling là 'cls'.
        vncore (bool, tùy chọn): Nếu True, áp dụng VnCoreNLP để tách câu (ví dụ: chuyển "căng thẳng" thành "căng_thẳng").
        tfidf_weight (bool, tùy chọn): Nếu True, áp dụng trọng số TF-IDF lên các token embeddings.
        
    Trả về:
        numpy.ndarray: Mảng numpy chứa vector của câu với kích thước (1, hidden_dim). Hidden_dim thường là 768.
    """
    
    # Xử lý câu bằng cách VnCoreNLP.
    if vncore:
        try: 
            segmentor = py_vncorenlp.VnCoreNLP(
                save_dir=r"M:\Python\symptom_disease\vncorenlp",
                annotators=['wseg'],
                max_heap_size="-Xmx500m"
            )
            sentence = ' '.join(segmentor.word_segment(sentence))
        except (OSError, ValueError):
            pass
    
    tokenized = tokenizer(text=sentence, padding='max_length', truncation=True, max_length=256, return_tensors='pt')
    # Di chuyển các tensor của token sang GPU
    tokenized = {key: value.to(device) for key, value in tokenized.items()}
        
    if pooling == 'mean':
        # Tùy chọn chế độ embedding: nếu 'context' thì dùng context embedding, nếu không thì dùng static embedding 
        if state == 'context': 
            # Lấy context embedding từ last_hidden_state
            hidden_states = phobert_model(**tokenized).last_hidden_state
        else: 
            # Lấy static embedding từ embedding layer ( first layer)
            hidden_states = phobert_model.embeddings.word_embeddings(tokenized["input_ids"])
        
        # TF-IDF weighting
        if tfidf_weight:
            # TF-IDF matrix 
            tfidf = TfidfMatrix.get_instance(corpus=df['Symptom'])
            # Chuyển câu sang TF-IDF vector 
            tfidf_vector = tfidf.transform([sentence])
            
            # Chuyển các token id thành từ để so sánh với vocabulary của TF-IDF
            word_tokens = tokenizer.convert_ids_to_tokens(tokenized['input_ids'][0])
            
            token_weights = []  
            for token in word_tokens:
                # Tìm weight của token trong vocabulary, nếu không có mặc định là 0.0
                idx = tfidf.vocabulary_.get(token, None)
                if idx is not None:
                    token_weights.append(tfidf_vector[0, idx])
                else:
                    token_weights.append(float(0.0))
                    
            # Chuyển danh sách trọng số thành tensor và thay đổi kích thước để phù hợp với hidden_states
            token_weights = torch.tensor(token_weights).unsqueeze(0).unsqueeze(-1).to(device)  # Kích thước: (1, seq_len, 1)
            # Nhân các hidden_states với trọng số TF-IDF của từng token
            hidden_states *= token_weights
            
        # Reshape attention mask để loại padded tokens
        mask_expanded = tokenized['attention_mask'].unsqueeze(-1).expand(hidden_states.size())
        # sum embedding của các non-padded token 
        sum_embeddings = torch.sum(hidden_states * mask_expanded, dim=1)
        # số lượng non-padded token 
        valid_tokens = mask_expanded.sum(dim=1)
        # Tránh chia cho 0 bằng cách đảm bảo số token tối thiểu là 1
        valid_tokens = torch.clamp(valid_tokens, min=1)
        # Tính sentence vector bằng cách lấy mean của token vectors
        mean_embedding = sum_embeddings / valid_tokens
        sentence_embedding = mean_embedding.detach().cpu().numpy()
        
        return sentence_embedding
    
    elif pooling == 'cls':
        # Với phương pháp CLS, lấy trực tiếp CLS token từ last_hidden_state.
        hidden_states = phobert_model(**tokenized).last_hidden_state
        cls_embedding = hidden_states[:, 0, :]
        return cls_embedding.detach().cpu().numpy()
    
    # Don't mind this
    return phobert_model.embeddings.word_embeddings(tokenized["input_ids"]).mean(dim=1).detach().cpu().numpy()


In [None]:
sentence = df.loc[1].Symptom 
transform(sentence=sentence, pooling='mean', state='static', vncore=True, tfidf_weight=True).shape

In [None]:
merged_df = pd.DataFrame({f"v_{i}":[] for i in range(1, 769)})

for value in df['Symptom']:
    static_vector = transform(sentence=value, pooling='mean', state='static', vncore=True, tfidf_weight=True)
    merged_df.loc[len(merged_df)] = np.hstack(static_vector.tolist())

In [None]:
merged_df['Department'] = df['Department']
merged_df['Category'] = df['Category']

In [None]:
merged_df

In [None]:
import os 

current_dir = os.getcwd()
parent_dir = os.path.dirname(current_dir)
os.chdir(parent_dir)

In [None]:
merged_df.to_csv(r"results\weighted_static.csv", index=False)