<h1 style = 'text-align: center'> <b>Week 03: Text Feature Extraction</b> </h1>

- Mentee: Võ Nguyễn Hoàng Kim
- Mentee ID: 240103

In [4]:
!pip install underthesea

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import os
import underthesea
from underthesea import text_normalize, word_tokenize
import numpy as np
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split, cross_val_score
import matplotlib.pyplot as pls
import seaborn as sns
from collections import Counter
import random
import torch
import torch.nn as nn
import torch.optim as optim
import re




In [5]:
# path of files
original_folder_path = '/kaggle/input/10000-vietnamese-books/output'
stop_words_path = '/kaggle/input/vietnamese-stop-words/vietnamese-stopwords.txt'

# read data
def get_path_of_texts(orginal_folder_path):
    list_of_texts = [os.path.join(original_folder_path, text_path) 
                     for text_path in os.listdir(original_folder_path)]
    return list_of_texts

def read_file_txt(path_file, split_line = False):
    with open(path_file, 'r', encoding = 'utf-8') as f:
        file = f.read().lower()
    if(split_line):
        return file.splitlines()
    return file

list_of_texts = get_path_of_texts(original_folder_path)
stop_words = read_file_txt(stop_words_path, split_line = True)

In [6]:
# preprocessdata & split texts into tokens
def remove_stop_words(text, stop_words):
    # using regex with re.sub to remove stop word (better than using String replace)
    stop_words_pattern = r'\b(?:' + '|'.join(map(re.escape, stop_words)) + r')\b'
    cleaned_text = re.sub(stop_words_pattern, ' ', text)
    # remove redundant spaces
    cleaned_text = ' '.join(cleaned_text.split())
    return cleaned_text

def preprocess_text(text_path, stop_words, idx = None):
    lowcase_text = read_file_txt(text_path)
    # preprocess raw text
    removed_text = remove_stop_words(lowcase_text, stop_words)
    normalized_text = text_normalize(removed_text)
    tokens = word_tokenize(normalized_text)
    print(idx, '..',end = '')

    return tokens
    
# build vocab
def build_vocabs(list_of_tokens):
    all_tokens = [token for text_token in list_of_tokens for token in text_token ]
    word_count = Counter(all_tokens)
    vocab = sorted(word_count, key = word_count.get, reverse = True)
    word_to_idx = {word : idx for idx, word in enumerate(vocab, 1)}
    word_to_idx['<PAD>'] = 0
    idx_to_word = {idx : word for word, idx in word_to_idx.items()} 
    return word_to_idx, idx_to_word

# enconded text to indx 
def encoded_texts(list_of_tokens, word_to_idx):
    encoded_texts = [[word_to_idx[word] for word in text_token] 
                     for text_token in list_of_tokens]
    return encoded_texts        

# put data to device 
def to_tensor(encoded_texts, device):
    return [torch.tensor(text).to(device) for text in encoded_texts]

# create sample for LSTM
def create_sequences(data, seq_length, device):
    sequences = []
    for i in range(len(data) - seq_length):
        seq = data[i : i + seq_length]    # Tạo chuỗi con dài seq_length
        label = data[i + seq_length]      # Nhãn là từ tiếp theo sau chuỗi
        sequences.append((seq.to(device), label.to(device)))
    return sequences        

In [7]:
# Split data into tran and test set
def train_test_split(file_paths, val_size = 0.1, test_size = 0.2):
    # shuffle the data
    random.shuffle(file_paths)
    total_size = len(file_paths)
    val_size = int(total_size * val_size)
    test_size = int(total_size * test_size)
    
    train_paths = file_paths[: -(val_size + test_size)]
    val_paths = file_paths[-(val_size + test_size): -test_size]
    test_paths = file_paths[-test_size:]

    return train_paths, val_paths, test_paths

# create Dataset class
class TextDataset(Dataset):
    def __init__(self, file_paths, word_to_idx, seq_length, device):
        self.file_paths = file_paths
        self.word_to_idx = word_to_idx
        self.seq_length = seq_length
        self.device = device

    def __len__(self):
        return len(self.file_paths)
    
    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        tokens = preprocess_text(file_path, stop_words)
        encoded_text = [self.word_to_idx.get(token, self.word_to_idx["<PAD>"]) for token in tokens]
        tensor_data = torch.tensor(encoded_text).to(self.device)
        
        sequences = []
        for i in range(len(tensor_data) - self.seq_length):
            seq = tensor_data[i : i + self.seq_length]
            label = tensor_data[i + self.seq_length]
            seq, label = seq.to(self.device), label.to(self.device)
            sequences.append((seq, label))
        return sequences
    
# create data loader
def create_dataloader(train_paths, val_paths, test_paths, word_to_idx, seq_length, batch_size = 32, device = 'cpu'):
    train_dataset = TextDataset(train_paths, word_to_idx, seq_length=10, device=device)
    val_dataset = TextDataset(val_paths, word_to_idx, seq_length=10, device=device)
    test_dataset = TextDataset(test_paths, word_to_idx, seq_length=10, device=device)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader

train_paths, val_paths, test_paths = train_test_split(list_of_texts, val_size=0.1, test_size=0.1)
print(len(train_paths), len(val_paths), len(test_paths))


8333 1041 1041


In [None]:
train_tokens = [preprocess_text(path, stop_words, idx) for idx, path in enumerate(train_paths)]
word_to_idx, idx_to_word = build_vocabs(train_tokens)

In [None]:
# create LSTM model
class myLSTMModel(torch.nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(myLSTMModel, self).__init__()
        self.embedding = torch.nn.Embedding(vocab_size, embedding_dim)
        self.lstm = torch.nn.LSTM(embedding_dim, hidden_dim)
        self.fc = torch.nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        x = self.embedding(x)
        lstm_out, _ = self.lstm(x.view(len(x), 1, -1))
        x = self.fc(lstm_out[-1])
        return x

def train(model, train_loader, val_loader, criterion, optimizer, device, num_epochs = 10):
    for epoch in range(num_epochs):
        print(f'Epoch [{epoch+1}/{num_epochs}]', end = ' ') 
        model.train()
        train_loss = 0.0
        for sequences, labels in train_loader:
            sequences, labels = sequences.to(device), labels.to(device)
            
            optimizer.zero_grad()
            
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        
        # evaluate on the valid set
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for sequences, labels in val_loader:
                sequences, labels = sequences.to(device), labels.to(device)
                outputs = model(sequences)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
        
        print(f"Train loss: {train_loss/len(train_loader)}, Validation Loss: {val_loss/len(val_loader)}")


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)


vocab_size = len(word_to_idx)
embedding_dim = 128
hidden_dim = 256
output_dim = vocab_size  

model = myLSTMModel(vocab_size, embedding_dim, hidden_dim, output_dim).to(device)


criterion = nn.CrossEntropyLoss()


learning_rate = 0.001
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


num_epochs = 10

seq_length = 10
batch_size = 32

train_loader, val_loader, test_loader = create_dataloader(train_paths,val_paths, test_paths, word_to_idx, seq_length = seq_length,  batch_size=batch_size, device = device)

train(model = model, train_loader = train_loader, val_loader = val_loader, 
      criterion = criterion, optimizer = optimizer,  device = device, num_epochs = 20)

In [None]:
# evaluate model with test set
def evaluate(model, test_loader):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for seqs, labels in test_loader:
            seqs, labels = seqs.to(device), labels.to(device)
            outputs = model(seqs)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
    
    avg_test_loss = total_loss / len(test_loader)
    print(f"Test Loss: {avg_test_loss:.4f}")

# Gọi hàm đánh giá
evaluate(model, test_loader)
