In [112]:
from utils import process_tweet, build_freqs
import nltk 
from nltk.corpus import twitter_samples
import matplotlib.pyplot as plt
import numpy as np
import os

In [113]:
positive_tweets = twitter_samples.strings('positive_tweets.json')
negative_tweets = twitter_samples.strings('negative_tweets.json')

tweets = positive_tweets + negative_tweets
labels = np.concatenate(
    (np.ones(len(positive_tweets)),
    np.zeros(len(negative_tweets)))
)
labels.shape

(10000,)

In [114]:
frequencies = build_freqs(tweets, labels)

In [115]:
data = []
for tweet in tweets:
    pos = 0
    neg = 0
    for word in process_tweet(tweet):
        if (word, 1) in frequencies:
            pos += frequencies[(word, 1)]
        if (word, 0) in frequencies:
            neg += frequencies[(word, 0)]
    data.append([1, pos, neg])

In [136]:
def get_data(tweets, frequencies):
    data = []
    for tweet in tweets:
        pos = 0
        neg = 0
        for word in process_tweet(tweet):
            if (word, 1) in frequencies:
                pos += frequencies[(word, 1)]
            if (word, 0) in frequencies:
                neg += frequencies[(word, 0)]
        data.append([1, pos, neg])
    return data

In [116]:
data[6000], labels[6000]

([1, 34, 4624], 0.0)

In [117]:
import random

# Kết hợp data và labels thành một list các tuple
combined = list(zip(data, labels))

# Shuffle list kết hợp
random.shuffle(combined)

# Tách lại thành data và labels
data, labels = zip(*combined)

# Chuyển đổi về kiểu ban đầu (nếu cần)
data = list(data)
labels = list(labels)

In [118]:
total_samples = 10000
train_data = data[:8000]
train_labels = labels[:8000]

test_data = data[8000:]
test_labels = labels[8000:]

In [119]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

In [120]:
def get_dataloader(data, labels, batch_size = 64):
    data = torch.tensor(data, dtype = torch.float)
    labels = torch.tensor(labels, dtype = torch.int64)
    dataset = TensorDataset(data, labels)
    dataloader = DataLoader(dataset, batch_size = batch_size, shuffle = True)
    return dataloader

In [121]:
train_dataloader = get_dataloader(train_data, train_labels)
test_dataloader = get_dataloader(test_data, test_labels)

  labels = torch.tensor(labels, dtype = torch.int64)


In [122]:
for batch in train_dataloader:
    print(batch[0].shape, batch[1].shape)
    break

torch.Size([64, 3]) torch.Size([64])


In [123]:
class MLP_classifier(nn.Module):
    def __init__(self, device, hidden_size = 32):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(3, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 2)
        )
        self.to(device)

    def forward(self, X):
        y_hat = self.net(X)
        return y_hat
    
    def save_model(self, path = "../w1/model_ckpt.pth"):
        # dir_path = os.path.dirname(path)
        # if not os.path.exists(dir_path):
        #     os.makedirs(dir_path)
        torch.save(self.state_dict(), path)
        print(f"Model saved to {path}")


    def load_model(self, path = "../w1/model_ckpt.pth"):
        self.load_state_dict(torch.load(path, weights_only=True))

        self.eval() 
        print(f"Model loaded from {path}")


In [124]:
def training_step(model, batch, loss_fn, optimizer, device):
    X = batch[0]
    y = batch[1]
    X = X.to(device)
    y = y.to(device)

    y_hat = model(X)
    l = loss_fn(y_hat, y)
    optimizer.zero_grad()
    l.backward()
    optimizer.step()

    training_loss = l.item()
    return training_loss

def validation_step(model, batch, loss_fn, device):
    X = batch[0]
    y = batch[1]
    X = X.to(device)
    y = y.to(device)

    with torch.no_grad():
        y_hat = model(X)
        l = loss_fn(y_hat, y)
    valid_loss = l.item()
    return valid_loss

def accuracy(model, test_dataloader, device, maximum_test = 1000):
    true_cnt = 0
    total = 0
    for batch_idx, (X, y) in enumerate(test_dataloader):
        X = X.to(device)
        y = y.to(device)
        with torch.no_grad():
            y_hat = model(X)
        choice = torch.argmax(y_hat, dim = -1)

        for i in range(len(y)):
            true_choice = y[i]
            model_choice = choice[i]
            # print(true_choice, model_choice)
            if model_choice == true_choice:
                true_cnt+=1
            total += 1
        if total > maximum_test:
            break
    return 1.0 * true_cnt / total


In [131]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = MLP_classifier(device)
accuracy(model, test_dataloader, device)

0.509765625

In [132]:
def training(model, device = "cuda", num_epochs = 50):
    loss = nn.CrossEntropyLoss()
    optim = torch.optim.SGD(model.parameters(), lr = 0.001)
    best_acc = 0.0
    for i in range(num_epochs):
        training_loss, validation_loss = 0, 0
        for batch_idx, batch in enumerate(train_dataloader):
            training_loss += training_step(model, batch, loss, optim, device)
        for batch_idx, batch in enumerate(test_dataloader):
            validation_loss += validation_step(model, batch, loss, device)
        acc = accuracy(model, test_dataloader, device, 2000)
        print(f"Epoch: {i + 1}, Training Loss: {training_loss}, Validation Loss: {validation_loss}, Accuracy: {acc}")
        if acc > best_acc:
            best_acc = acc
            model.save_model()

In [133]:
training(model, device = device)

Epoch: 1, Training Loss: 3277.597546484829, Validation Loss: 184.07309337747378, Accuracy: 0.991
Model saved to ../w1/model_ckpt.pth
Epoch: 2, Training Loss: 531.6754729520636, Validation Loss: 239.47046018837136, Accuracy: 0.9925
Model saved to ../w1/model_ckpt.pth
Epoch: 3, Training Loss: 364.48960233558256, Validation Loss: 198.3581577670081, Accuracy: 0.9925
Epoch: 4, Training Loss: 194.61841696381765, Validation Loss: 145.99206844996473, Accuracy: 0.99
Epoch: 5, Training Loss: 111.36062544077069, Validation Loss: 132.13289597325357, Accuracy: 0.9915
Epoch: 6, Training Loss: 467.2432198937237, Validation Loss: 256.92088440517546, Accuracy: 0.994
Model saved to ../w1/model_ckpt.pth
Epoch: 7, Training Loss: 511.5178278275489, Validation Loss: 190.34884889516977, Accuracy: 0.994
Epoch: 8, Training Loss: 284.5044182341051, Validation Loss: 481.2810952127741, Accuracy: 0.993
Epoch: 9, Training Loss: 205.3697558249366, Validation Loss: 111.58604412781267, Accuracy: 0.9925
Epoch: 10, Trai

In [134]:
model.save_model()

Model saved to ../w1/model_ckpt.pth


In [148]:
def test(text):
    data = torch.tensor(get_data([text], frequencies), dtype = torch.float, device = device)
    with torch.no_grad():
        model.eval()
        y_pred = model(data)
    pred = torch.argmax(y_pred).item()
    if pred == 0:
        print(f"<{text}> ----> Negative!")
    else:
        print(f"<{text}> ----> Positive!")

In [158]:
accuracy(model, test_dataloader, device)

0.9921875

In [160]:
test(negative_tweets[5])


<oh god, my babies' faces :( https://t.co/9fcwGvaki0> ----> Negative!
