In [1]:
# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from datasets import load_dataset

# Load the dataset
dataset = load_dataset("SetFit/sst5")
print(dataset)
print(dataset['train'][0])

README.md:   0%|          | 0.00/421 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
Repo card metadata block was not found. Setting CardData to empty.


train.jsonl:   0%|          | 0.00/1.32M [00:00<?, ?B/s]

dev.jsonl:   0%|          | 0.00/171k [00:00<?, ?B/s]

test.jsonl:   0%|          | 0.00/343k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/8544 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/1101 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/2210 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['text', 'label', 'label_text'],
        num_rows: 8544
    })
    validation: Dataset({
        features: ['text', 'label', 'label_text'],
        num_rows: 1101
    })
    test: Dataset({
        features: ['text', 'label', 'label_text'],
        num_rows: 2210
    })
})
{'text': 'a stirring , funny and finally transporting re-imagining of beauty and the beast and 1930s horror films', 'label': 4, 'label_text': 'very positive'}


Build vocabulary and pre-processing functions:

In [2]:
def tokenize(text: str):
    return text.lower().split()


def build_vocab(sentences: list[str]):
    # vocab 是一个 set 类型的对象，用于存储唯一的单词。
    # update() 方法用于将一个可迭代对象中的所有元素添加到集合中。
    # 它将 tokenize(sentence) 返回的所有单词添加到 vocab 中。
    vocab = set()
    for sentence in sentences:
        vocab.update(tokenize(sentence))
    return {word: idx for idx, word in enumerate(vocab)}


# Build vocabulary
vocab = build_vocab(dataset["train"]["text"])
vocab_size = len(vocab)

In [3]:
def map_token_to_index(token):
    # 这个函数的主要用途是在将文本转换为数字序列时使用，这是许多自然语言处理任务的预处理步骤。
    # Return the index of the token or the index of the '<unk>' token if the token is not in the vocabulary
    return vocab.get(token, -1)


def map_text_to_indices(text: str):
    return [map_token_to_index(token) for token in tokenize(text)]


def prepare_dataset(dataset):
    return dataset.map(
        # 输入 x 是数据集中的一个样本。
        # 假设每个样本有一个 "text" 字段，包含原始文本。
        # 使用 map_text_to_indices(x["text"]) 将文本转换为 token 索引列表。
        # 返回一个新的字典，包含 "token_ids" 字段，其值是转换后的索引列表。
        lambda x: {"token_ids": map_text_to_indices(x["text"])}, num_proc=1
    )

Create a function that takes a batch of sequences of token ids (list of list of ints) and converts them into one-hot encodings:

In [4]:
# One-hot encoding function for a batch of sentences
def one_hot_encode_batch(sentences: list[list[int]]):
    # Note that we are assuming that the sentences have the same length
    sequence_length = len(sentences[0])
    batch_size = len(sentences)

    # Create a tensor of zeros with the desired shape (including the batch dimension)
    one_hot_vectors = torch.zeros(
        batch_size, sequence_length, vocab_size, dtype=torch.float32
    )
    # 遍历 sentences 列表中的每个句子。
    for i, indices in enumerate(sentences):
        # 遍历当前句子中的每个单词索引。j 是单词在句子中的位置，idx 是单词的索引值。
        for j, idx in enumerate(indices):
            # Set the appropriate index to 1.0, but only if the index is not -1
            # 查单词索引是否有效（不是 -1）。索引 -1 通常用于表示未知单词或填充符号。
            if idx >= 0:
                # 在 one_hot_vectors 张量中，将对应位置设置为 1.0。
                # i 表示批次中的第几个句子。
                # j 表示句子中的第几个单词。
                # idx 表示该单词在词汇表中的索引。
                one_hot_vectors[i, j, idx] = 1.0

    return one_hot_vectors

In [5]:
preprocessed_dataset = prepare_dataset(dataset)
print(preprocessed_dataset["train"][0])

Map:   0%|          | 0/8544 [00:00<?, ? examples/s]

Map:   0%|          | 0/1101 [00:00<?, ? examples/s]

Map:   0%|          | 0/2210 [00:00<?, ? examples/s]

{'text': 'a stirring , funny and finally transporting re-imagining of beauty and the beast and 1930s horror films', 'label': 4, 'label_text': 'very positive', 'token_ids': [2595, 12425, 14585, 14854, 14762, 8410, 13079, 16340, 4271, 11038, 14762, 9799, 7145, 14762, 16379, 6888, 10894]}


In [6]:
# Define the collate function for dynamic truncation
def collate_fn(batch):
    ## Truncate all sentences in the batch to the shortest length

    # Find the minimum length of the sentences in the batch
    min_length = min([len(example["token_ids"]) for example in batch])
    
    # We don't convert the inputs to tensors here because we will apply one-hot encoding and therefore converting to tensors in the model on-the-fly
    # 对于每个样本，提取 "token_ids" 字段，并截取到 min_length。
    inputs = [example["token_ids"][:min_length] for example in batch]
    # 同样使用列表推导式，从每个样本中提取 "label" 字段。
    # 使用 torch.tensor() 将标签列表转换为 PyTorch 张量。
    labels = torch.tensor([example["label"] for example in batch])

    return inputs, labels

In [7]:
# Create DataLoaders for train and test datasets
train_dataloader = DataLoader(
    # 当 DataLoader 从数据集中抽取一批样本时，它会调用 collate_fn 函数来将这些单独的样本组合成一个批次。
    # 这个函数定义了如何处理可能长度不同的样本，以及如何将它们组合成适合模型输入的格式。
    preprocessed_dataset["train"], batch_size=8, collate_fn=collate_fn, shuffle=True
)
validation_dataloader = DataLoader(
    preprocessed_dataset["validation"], batch_size=32, collate_fn=collate_fn
)
test_dataloader = DataLoader(
    preprocessed_dataset["test"], batch_size=32, collate_fn=collate_fn
)

for batch in train_dataloader:
    inputs, labels = batch
    print(inputs)
    print(labels)
    break

[[3546, 4271, 9799, 8493, 2330, 5659, 11877, 14585, 14762, 14608, 10104, 16211, 7181, 10061, 12952, 2810, 9799], [2595, 7874, 14585, 14445, 14762, 15571, 14854, 12751, 6499, 14952, 9799, 6828, 14585, 14094, 15712, 12566, 9628], [1922, 6328, 6976, 3219, 6442, 9799, 2023, 13635, 6902, 8018, 13538, 14762, 15667, 5881, 10617, 7818, 10766], [9799, 8488, 3719, 2222, 7851, 2595, 4602, 16363, 8879, 14762, 1686, 3546, 7647, 4410, 14585, 1765, 9799], [2210, 2595, 11512, 10659, 6528, 4271, 9799, 14782, 4271, 539, 14762, 9799, 3992, 4271, 4520, 2595, 11044], [3992, 15535, 225, 9892, 15923, 14952, 15141, 2595, 15252, 12935, 6976, 7517, 8023, 14585, 6919, 288, 14585], [9799, 2330, 14762, 4726, 13880, 9272, 5431, 6976, 6729, 5305, 15677, 5659, 7061, 11146, 2595, 461, 212], [15657, 14952, 9799, 9210, 14762, 8636, 2595, 2684, 145, 6122, 6225, 14468, 3958, 9272, 5659, 16323, 9184]]
tensor([1, 4, 2, 3, 3, 1, 0, 4])


In [8]:
# Define the Neural Network
class SentimentModel(nn.Module):
    def __init__(self, vocab_size, hidden_dim, num_classes):
        super(SentimentModel, self).__init__()
        self.hidden_layer = nn.Linear(vocab_size, hidden_dim)
        self.output_layer = nn.Linear(hidden_dim, num_classes)

    def forward(self, input_ids: list[list[int]]):
        # Instead of the embedding layer, we will use one-hot encoding
        # Note: you could also use torch's Embedding layer initialized with the one-hot vectors
        encodings = one_hot_encode_batch(input_ids)
        # Sum the one-hot vectors to get the bag of words representation
        bag_of_words = encodings.sum(dim=1)
        # Apply the hidden layer and the output layer
        a_1 = torch.relu(self.hidden_layer(bag_of_words))
        # No activation function is applied to the output layer because we will use CrossEntropyLoss which applies softmax
        z_2 = self.output_layer(a_1)
        return z_2


# Initialize the model
hidden_dim = 256
num_classes = 5
model = SentimentModel(vocab_size, hidden_dim, num_classes)

# Define Loss Function and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

In [9]:
# Train the Model
epochs = 10
for epoch in range(epochs):
    # model.train() 是一个非常重要的 PyTorch 方法调用，
    # 用于将模型设置为训练模式。这行代码通常在开始训练循环之前调用
    model.train()
    total_loss = 0
    for inputs, labels in train_dataloader:
        # Forward pass
        outputs = model(inputs)

        # Compute loss
        loss = criterion(outputs, labels)
        total_loss += loss.item()

        # Backward pass and update
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(train_dataloader):.4f}")

Epoch 1/10, Loss: 1.5278
Epoch 2/10, Loss: 1.2820
Epoch 3/10, Loss: 0.9254
Epoch 4/10, Loss: 0.6539
Epoch 5/10, Loss: 0.4555
Epoch 6/10, Loss: 0.3649
Epoch 7/10, Loss: 0.3210
Epoch 8/10, Loss: 0.2873
Epoch 9/10, Loss: 0.2652
Epoch 10/10, Loss: 0.2416


Bonus: Evaluate the model by means of accuracy (percentage of correctly predicted classes):

In [10]:
def evaluate_model(model, dataloader):
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0

    with torch.no_grad():  # Disable gradient computation
        for inputs, labels in dataloader:
            # Forward pass
            outputs = model(inputs)
            # 这个函数用于找出张量中的最大值。当应用于二维张量时，它可以沿着指定的维度找出最大值。
            # 在分类任务中，它通常是一个形状为 (batch_size, num_classes) 的二维张量。
            # torch.max() 返回两个值：最大值和最大值的索引。
            # 在这里，我们使用 _ 忽略了最大值，只保留了索引。
            _, predicted = torch.max(outputs, dim=1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    # Calculate accuracy
    accuracy = correct / total
    print(f"Accuracy: {accuracy:.4f}")
    return accuracy

In [11]:
# Evaluate on the test set
test_accuracy = evaluate_model(model, test_dataloader)

Accuracy: 0.3222
