In [9]:
import re
import ast
from pyvi import ViTokenizer

### Tiền xử lý

In [2]:
def normalize(text):
    t = text.replace('\n', ' ')
    t = t.lower()
    return t

def delete_hashtag(text):
    return re.sub(r'#\w+', '', text)

def delete_link(text):
    return re.sub(r'http\S+', '', text)

def remove_emojis(text):
    emoj = re.compile(r"""[\U0001F600-\U0001F64F\U0001F300-\U0001F5FF\U0001F680-\U0001F6FF\U0001F1E0-\U0001F1FF\U00002702-\U000027B0\U000024C2-\U0001F251\U0001f926-\U0001f937\U00010000-\U0010ffff\u200d\u23cf\u23e9\u231a\ufe0f\u3030-]+(?<!\n)""", re.UNICODE)
    return re.sub(emoj, '', text)

def encode_number(text):
    t = text.split(' ')
    t = map(lambda x: '<number>' if bool(re.match(r'^[0-9]+(\.[0-9]+)?$', x)) else x, t)
    return ' '.join(t)

def delete_onelen_token(text):
    t = text.split(' ')
    t = filter(lambda x: len(x)>1, t)
    return ' '.join(t)

def preprocessing(text):
    t = normalize(text)
    t = delete_hashtag(t)
    t = delete_link(t)
    t = remove_emojis(t)
    t = ViTokenizer.tokenize(t)
    t = encode_number(t)
    t = delete_onelen_token(t)
    return t

### Model and Training

In [3]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from torchtext.transforms import ToTensor
from tqdm.autonotebook import tqdm
from transformers import AutoModel
import pandas as pd
import math

  from tqdm.autonotebook import tqdm


#### Dataset Class

In [5]:
class HashTag_Dataset(Dataset):
    def __init__(self, root='../p_data1.csv', max_length=128):
        super(HashTag_Dataset, self).__init__()
        self.classes = ['#Q&A', '#cv', '#data', '#deep_learning', '#machine_learning', '#math', '#nlp', '#python', '#sharing', '#webinar']
        texts, labels = [], []

        df = pd.read_csv(root, encoding='utf-8-sig')
        texts = df['text']
        labels = df['label']

        self.texts = texts
        self.labels = labels
        self.vocab = self.make_vocab(texts)

    def make_vocab(self, texts):
      vocab = dict()
      for text in texts:
          words = text.split()
          for word in words:
              if word not in vocab:
                  vocab[word] = 1
              else:
                  vocab[word] += 1
      vocab = list(dict(filter(lambda x: x[1]>3, vocab.items())).keys())
      vocab.append('<UNK>')
      vocab.append('<PAD>')
      vocab.append('<CLS>')
      return vocab

    def encode_text(self, text):
        words = text.split()
        words = ['<CLS>'] + words
        if len(words) > 128:
            words = words[:128]
        else:
            words += ['<PAD>']*(128-len(words))
        enc = [self.vocab.index(w) if w in self.vocab else self.vocab.index('<UNK>') for w in words]
        return enc

    def encode_label(self, label):
        enc = ast.literal_eval(label)
        enc = [0.8 if l in enc else 0 for l in self.classes]
        return enc

    def __len__(self):
        return len(self.labels)

    def len_vocab(self):
        return len(self.vocab)
    
    def num_classes(self):
        return len(self.classes)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encode = self.encode_text(text)
        label = self.encode_label(label)
        encode = torch.tensor(encode, dtype=torch.long)
        label = torch.tensor(label, dtype=torch.float32)
        return encode, label

In [7]:
train_set = HashTag_Dataset()
train_set.__getitem__(100)

(tensor([2272,    4,    0,  223,  131,   58,    1,    2,   69,  427,  312,    4,
            5,  813,  493,  210,    8,  587, 2270, 2270,    4,    5,  487,    8,
          428,  183, 1075, 1076,   58,  359,  217,  156,   75,  355,   58,    5,
          301,  595,    8,   67,  204,   38,  304,  217,    4,  220,  283,   69,
          427,  219,  151,  813,  587,  110,  182,  219,  304,   33,  924,  111,
           29,  268,    4,   19,   67,  245,   58,  838,   59,  327, 1069,   92,
          162,    4,  224,   47, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271,
         2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271,
         2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271,
         2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271,
         2271, 2271, 2271, 2271, 2271, 2271, 2271, 2271]),
 tensor([0.8000, 0.8000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000]))

#### Model Architecture

In [10]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

class HashtagRecommendation(nn.Module):

    def __init__(self, num_labels, vocab_size, d_model=768, n_head=0):
        super(HashtagRecommendation, self).__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoder = PositionalEncoding(d_model, 0.1)
        self.phobert = AutoModel.from_pretrained("vinai/phobert-base")
        for param in self.phobert.parameters():
            param.requires_grad = False
        # self.transformerEncoder1 = nn.TransformerEncoderLayer(d_model, n_head)
        # self.transformerEncoder2 = nn.TransformerEncoderLayer(d_model, n_head)
        # self.transformerEncoder3 = nn.TransformerEncoderLayer(d_model, n_head)
        # self.transformerEncoder4 = nn.TransformerEncoderLayer(d_model, n_head)
        self.fc1 = nn.Linear(d_model, 512)
        self.fc2 = nn.Linear(512, 64)
        self.fc3 = nn.Linear(64, num_labels)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
        self.dropout = nn.Dropout(p=0.3)


    def forward(self, input_ids):
        attention_mask = (input_ids != 2271).float()
        embedded = self.embedding(input_ids)* math.sqrt(self.d_model)
        embedded = self.pos_encoder(embedded)
        phobert_output = self.phobert(inputs_embeds=embedded, attention_mask=attention_mask)[0]

        # transformer_output = self.transformerEncoder1(embedded)
        # transformer_output = self.transformerEncoder2(embedded)
        # transformer_output = self.transformerEncoder3(embedded)
        # transformer_output = self.transformerEncoder4(embedded)
        # Lấy embedding của token [CLS]
        cls_embedding = phobert_output[:, 0, :]

        output = self.dropout(cls_embedding)
        output = self.fc1(output)
        output = self.relu(output)

        output = self.dropout(output)
        output = self.fc2(output)
        output = self.relu(output)

        output = self.dropout(output)
        output = self.fc3(output)
        output = self.sigmoid(output)
        return output

#### Training

In [11]:
train_set = HashTag_Dataset()
train_loader = DataLoader(train_set, batch_size=8, shuffle=True, num_workers=2, drop_last=True)
if torch.cuda.is_available():
    device = 'cuda'
else:
    device = 'cpu'
num_epochs = 250

In [12]:
model = HashtagRecommendation(num_labels=train_set.num_classes(), vocab_size=train_set.len_vocab())
model = model.to(device)
criterion = nn.BCELoss()
optimizer = Adam(model.parameters(), lr=0.00001)
num_iters = len(train_loader)

In [13]:
best_acc = 0
for epoch in range(num_epochs):
    model.train()
    progress_bar = tqdm(train_loader, colour='green')
    for iter, (texts, labels) in enumerate(progress_bar):
        texts = texts.to(device)
        labels = labels.to(dtype=torch.float).to(device)

        # forward
        outputs = model(texts)
        loss_value = criterion(outputs, labels)
        progress_bar.set_description("Epoch {}/{}. Iteration {}/{}. Loss {:.5f}".format(epoch+1, num_epochs, iter+1, num_iters, loss_value))
        # backward
        optimizer.zero_grad()
        loss_value.backward()
        optimizer.step()

  0%|          | 0/142 [00:00<?, ?it/s]

### Testing

In [None]:
test = """Chào mọi người. Hiện tại em đang làm đồ án về truy vấn thông tin. Em làm về content based image retrieval.
Em định làm thêm text based image retrieval nhưng em đang kẹt ở phần caption của image. Do dataset là phải tự scrape về nên chỉ có thể scrape được ảnh.
Em có thử dùng 1 vài tool để tạo caption nhưng kết quả ra khá tệ.
 Giờ em phải làm như nào để tạo được caption ạ. Em cảm ơn mọi người."""

In [None]:
test = preprocessing(test)
test = train_set.encode_text(test)
test = ToTensor()(test)[None, :]
test = test.to(device)
pred = model(test)[0].tolist()
print(pred)
pred = [train_set.classes[i] for i in range(10) if pred[i]>0.5]

print(pred)