In [None]:
import pandas as pd
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import re
from collections import Counter

def dummy_npwarn_decorator_factory():
    def npwarn_decorator(x):
        return x
    return npwarn_decorator
np._no_nep50_warning = getattr(np, '_no_nep50_warning', dummy_npwarn_decorator_factory)
data_path_train = os.path.join(os.getcwd().replace('model', ''), 'data/processed_train.csv')
data_path_test  = os.path.join(os.getcwd().replace('model', ''), 'data/processed_test.csv')
train_df = pd.read_csv(data_path_train)
test_df  = pd.read_csv(data_path_test)

print(train_df.info())


In [None]:
import pandas as pd
import os
import re
from collections import Counter

data_path_train = os.path.join(os.getcwd().replace('model', ''), 'data/processed_train.csv')
train_df = pd.read_csv(data_path_train)

def simple_tokenizer(text):
    return re.findall(r'\b\w+\b', text.lower())

all_tokens = []
for text in train_df['text']:
    tokens = simple_tokenizer(text)
    all_tokens.extend(tokens)

counter = Counter(all_tokens)


vocab = sorted(counter.keys())
word2idx = {word: idx + 1 for idx, word in enumerate(vocab)}  

print(f"Tổng số từ duy nhất trong tập dữ liệu: {len(word2idx)}")
print("Mười từ đầu tiên trong từ điển:")
for word, idx in list(word2idx.items())[:10]:
    print(f"{word}: {idx}")


In [None]:
import os
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"  

In [None]:
import os
import re
import pandas as pd
import numpy as np
from collections import Counter

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

data_path_train = os.path.join(os.getcwd().replace('model', ''), 'data/processed_train.csv')
data_path_test  = os.path.join(os.getcwd().replace('model', ''), 'data/processed_test.csv')

train_df = pd.read_csv(data_path_train)
test_df  = pd.read_csv(data_path_test)

print(train_df.info())


unique_labels = np.unique(train_df['sentiment'].values)
print("Unique labels in training data:", unique_labels)



def simple_tokenizer(text):
    """
    A simple tokenizer that lowercases text and extracts words using regex.
    """
    return re.findall(r'\b\w+\b', text.lower())

all_tokens = []
for text in train_df['text']:
    tokens = simple_tokenizer(text)
    all_tokens.extend(tokens)


counter = Counter(all_tokens)


vocab = sorted(counter.keys())

word2idx = {word: idx + 1 for idx, word in enumerate(vocab)}

print(f"Total unique words in vocabulary: {len(word2idx)}")
print("First 10 words in vocabulary:")
for word, idx in list(word2idx.items())[:10]:
    print(f"{word}: {idx}")


def text_to_sequence(text, word2idx):
    """
    Converts a text string into a sequence of integers based on the word2idx mapping.
    Tokens not found in the vocabulary are mapped to 0.
    """
    tokens = simple_tokenizer(text)
    return [word2idx.get(token, 0) for token in tokens]

train_sequences = [text_to_sequence(text, word2idx) for text in train_df['text']]
test_sequences  = [text_to_sequence(text, word2idx) for text in test_df['text']]

max_token_index = max(max(seq) if len(seq) > 0 else 0 for seq in train_sequences)
expected_vocab_size = len(word2idx) + 1  # +1 for padding index (0)
print("Max token index in training data:", max_token_index)
print("Expected vocabulary size (including padding):", expected_vocab_size)
if max_token_index >= expected_vocab_size:
    print("Warning: Some token indices exceed the vocabulary size!")

max_len = max(len(seq) for seq in train_sequences)
print(f"Maximum sequence length: {max_len}")

def pad_sequences(sequences, max_len):
    """
    Pads all sequences to the fixed length max_len.
    Sequences longer than max_len are truncated, and shorter ones are padded with zeros.
    """
    padded_seqs = []
    for seq in sequences:
        if len(seq) < max_len:
            seq = seq + [0] * (max_len - len(seq))
        else:
            seq = seq[:max_len]
        padded_seqs.append(seq)
    return np.array(padded_seqs)

x_train = pad_sequences(train_sequences, max_len)
x_test  = pad_sequences(test_sequences, max_len)


y_train = train_df['sentiment'].values
y_test  = test_df['sentiment'].values

x_train_tensor = torch.LongTensor(x_train)
y_train_tensor = torch.LongTensor(y_train)
x_test_tensor  = torch.LongTensor(x_test)
y_test_tensor  = torch.LongTensor(y_test)


print("Max index in x_train_tensor:", x_train_tensor.max().item())


batch_size = 32
train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
test_dataset = TensorDataset(x_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=batch_size)


class SentimentLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_len):
        super(SentimentLSTM, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim, padding_idx=0)
        self.lstm1 = nn.LSTM(input_size=embedding_dim, hidden_size=256, batch_first=True)
        self.dropout1 = nn.Dropout(0.5)
        self.lstm2 = nn.LSTM(input_size=256, hidden_size=128, batch_first=True)
        self.dropout2 = nn.Dropout(0.5)
        self.lstm3 = nn.LSTM(input_size=128, hidden_size=64, batch_first=True)
        self.fc1 = nn.Linear(64, 64)
        self.fc2 = nn.Linear(64, 3)  # 3 classes for sentiment classification
        
    def forward(self, x):
        # x: (batch_size, max_len)
        x = self.embedding(x)         # -> (batch_size, max_len, embedding_dim)
        x, _ = self.lstm1(x)          # -> (batch_size, max_len, 256)
        x = self.dropout1(x)
        x, _ = self.lstm2(x)          # -> (batch_size, max_len, 128)
        x = self.dropout2(x)
        x, (h_n, _) = self.lstm3(x)   # h_n shape: (num_layers, batch_size, hidden_size)
        last_hidden = h_n[-1]         # -> (batch_size, 64)
        out = torch.relu(self.fc1(last_hidden))
        out = self.fc2(out)
        return out

# Instantiate the model
vocab_size = len(word2idx) + 1  # +1 for padding index 0
embedding_dim = 256
model = SentimentLSTM(vocab_size=vocab_size, embedding_dim=embedding_dim, max_len=max_len)
print(model)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
model.to(device)

criterion = nn.CrossEntropyLoss()  
optimizer = optim.Adam(model.parameters(), lr=0.001)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)  # Output shape: (batch_size, 3)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    avg_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, dim=1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

test_accuracy = correct / total
print("Test Accuracy:", test_accuracy)


In [None]:
from sklearn.metrics import classification_report
import numpy as np
import pandas as pd

# Giả sử bạn có các vector nhãn thật và dự đoán từ mô hình LSTM sau:
# y_true: các nhãn thực tế (dạng numpy array hoặc list)
# y_pred: các nhãn mô hình dự đoán (cũng dưới dạng numpy array hoặc list)

# Ví dụ:
# y_true = np.array([...])
# y_pred = np.array([...])

# Tạo báo cáo đánh giá chi tiết:
report = classification_report(y_true, y_pred, target_names=["Negative", "Neutral", "Positive"], output_dict=True)
report_df = pd.DataFrame(report).transpose()
print(report_df)


In [None]:
import os
import re
import pandas as pd
import numpy as np
from collections import Counter

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from sklearn.metrics import classification_report


data_path_train = os.path.join(os.getcwd().replace('model', ''), 'data/processed_train.csv')
data_path_test  = os.path.join(os.getcwd().replace('model', ''), 'data/processed_test.csv')

train_df = pd.read_csv(data_path_train)
test_df  = pd.read_csv(data_path_test)

print(train_df.info())
print("Unique sentiment labels in training data:", np.unique(train_df['sentiment'].values))


def simple_tokenizer(text):
    return re.findall(r'\b\w+\b', text.lower())

all_tokens = []
for text in train_df['text']:
    tokens = simple_tokenizer(text)
    all_tokens.extend(tokens)

counter = Counter(all_tokens)
vocab = sorted(counter.keys())
word2idx = {word: idx + 1 for idx, word in enumerate(vocab)}

print(f"Total unique words in vocabulary: {len(word2idx)}")
print("First 10 words in vocabulary:")
for word, idx in list(word2idx.items())[:10]:
    print(f"{word}: {idx}")

def text_to_sequence(text, word2idx):
    tokens = simple_tokenizer(text)
    return [word2idx.get(token, 0) for token in tokens]

train_sequences = [text_to_sequence(text, word2idx) for text in train_df['text']]
test_sequences  = [text_to_sequence(text, word2idx) for text in test_df['text']]
max_token_index = max(max(seq) if len(seq) > 0 else 0 for seq in train_sequences)
expected_vocab_size = len(word2idx) + 1  # +1 cho padding index 0
print("Max token index in training data:", max_token_index)
print("Expected vocabulary size (including padding):", expected_vocab_size)

max_len = max(len(seq) for seq in train_sequences)
print(f"Maximum sequence length: {max_len}")

def pad_sequences(sequences, max_len):
    padded_seqs = []
    for seq in sequences:
        if len(seq) < max_len:
            seq = seq + [0] * (max_len - len(seq))
        else:
            seq = seq[:max_len]
        padded_seqs.append(seq)
    return np.array(padded_seqs)

x_train = pad_sequences(train_sequences, max_len)
x_test  = pad_sequences(test_sequences, max_len)


y_train = train_df['sentiment'].values
y_test  = test_df['sentiment'].values

x_train_tensor = torch.LongTensor(x_train)
y_train_tensor = torch.LongTensor(y_train)
x_test_tensor  = torch.LongTensor(x_test)
y_test_tensor  = torch.LongTensor(y_test)

print("Max index in x_train_tensor:", x_train_tensor.max().item())

batch_size = 32
train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
test_dataset = TensorDataset(x_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=batch_size)



class SentimentLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_len):
        super(SentimentLSTM, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim, padding_idx=0)
        self.lstm1 = nn.LSTM(input_size=embedding_dim, hidden_size=256, batch_first=True)
        self.dropout1 = nn.Dropout(0.5)
        self.lstm2 = nn.LSTM(input_size=256, hidden_size=128, batch_first=True)
        self.dropout2 = nn.Dropout(0.5)
        self.lstm3 = nn.LSTM(input_size=128, hidden_size=64, batch_first=True)
        self.fc1 = nn.Linear(64, 64)
        self.fc2 = nn.Linear(64, 3)  # 3 nhãn cảm xúc
        
    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.lstm1(x)
        x = self.dropout1(x)
        x, _ = self.lstm2(x)
        x = self.dropout2(x)
        x, (h_n, _) = self.lstm3(x)
        last_hidden = h_n[-1]
        out = torch.relu(self.fc1(last_hidden))
        out = self.fc2(out)
        return out

vocab_size = len(word2idx) + 1
embedding_dim = 256
model = SentimentLSTM(vocab_size=vocab_size, embedding_dim=embedding_dim, max_len=max_len)
print(model)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
model.to(device)


criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    avg_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

model.eval()
y_true = []
y_pred = []
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, dim=1)
        y_pred.extend(preds.cpu().numpy())
        y_true.extend(labels.cpu().numpy())


report = classification_report(y_true, y_pred, target_names=["Negative", "Neutral", "Positive"])
print("Classification Report:")
print(report)


In [None]:
import os
import re
import pandas as pd
import numpy as np
from collections import Counter

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from sklearn.metrics import classification_report

data_path_train = os.path.join(os.getcwd().replace('model', ''), 'data/processed_train.csv')
data_path_test  = os.path.join(os.getcwd().replace('model', ''), 'data/processed_test.csv')
train_df = pd.read_csv(data_path_train)
test_df  = pd.read_csv(data_path_test)

print(train_df.info())
print("Unique sentiment labels:", np.unique(train_df['sentiment'].values))

def simple_tokenizer(text):
    return re.findall(r'\b\w+\b', text.lower())

all_tokens = []
for text in train_df['text']:
    tokens = simple_tokenizer(text)
    all_tokens.extend(tokens)
counter = Counter(all_tokens)
vocab = sorted(counter.keys())
word2idx = {word: idx + 1 for idx, word in enumerate(vocab)}

print(f"Total unique words in vocabulary: {len(word2idx)}")
print("First 10 words in vocabulary:")
for word, idx in list(word2idx.items())[:10]:
    print(f"{word}: {idx}")


def text_to_sequence(text, word2idx):
    tokens = simple_tokenizer(text)
    return [word2idx.get(token, 0) for token in tokens]

train_sequences = [text_to_sequence(text, word2idx) for text in train_df['text']]
test_sequences  = [text_to_sequence(text, word2idx) for text in test_df['text']]
max_len = max(len(seq) for seq in train_sequences)
print(f"Maximum sequence length: {max_len}")

def pad_sequences(sequences, max_len):
    padded_seqs = []
    for seq in sequences:
        if len(seq) < max_len:
            seq = seq + [0]*(max_len - len(seq))
        else:
            seq = seq[:max_len]
        padded_seqs.append(seq)
    return np.array(padded_seqs)

x_train = pad_sequences(train_sequences, max_len)
x_test  = pad_sequences(test_sequences, max_len)

y_train = train_df['sentiment'].values
y_test  = test_df['sentiment'].values

x_train_tensor = torch.LongTensor(x_train)
y_train_tensor = torch.LongTensor(y_train)
x_test_tensor  = torch.LongTensor(x_test)
y_test_tensor  = torch.LongTensor(y_test)

batch_size = 32
train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
test_dataset  = TensorDataset(x_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=batch_size)


class ImprovedSentimentLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_len):
        super(ImprovedSentimentLSTM, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=vocab_size, embedding_dim=embedding_dim, padding_idx=0)
        self.lstm1 = nn.LSTM(input_size=embedding_dim, hidden_size=256, batch_first=True, bidirectional=True)
        self.dropout1 = nn.Dropout(0.5)
        self.lstm2 = nn.LSTM(input_size=512, hidden_size=128, batch_first=True, bidirectional=True)
        self.dropout2 = nn.Dropout(0.5)

        self.lstm3 = nn.LSTM(input_size=256, hidden_size=64, batch_first=True, bidirectional=True)

        self.fc1 = nn.Linear(128, 64)
        self.fc2 = nn.Linear(64, 3)
        
    def forward(self, x):
        x = self.embedding(x)
        x, _ = self.lstm1(x)
        x = self.dropout1(x)
        x, _ = self.lstm2(x)
        x = self.dropout2(x)
        x, (h_n, _) = self.lstm3(x)
        last_hidden = torch.cat((h_n[-2], h_n[-1]), dim=1)
        out = torch.relu(self.fc1(last_hidden))
        out = self.fc2(out)
        return out

vocab_size = len(word2idx) + 1
embedding_dim = 256
model = ImprovedSentimentLSTM(vocab_size=vocab_size, embedding_dim=embedding_dim, max_len=max_len)
print(model)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001)  
num_epochs = 20  

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    avg_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

model.eval()
y_true, y_pred = [], []
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, dim=1)
        y_pred.extend(preds.cpu().numpy())
        y_true.extend(labels.cpu().numpy())

report = classification_report(y_true, y_pred, target_names=["Negative", "Neutral", "Positive"])
print("Classification Report:")
print(report)
