In [24]:
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer
import numpy as np
from transformers import AutoTokenizer, AutoModel
import torch
from torch.utils.data import DataLoader, TensorDataset, RandomSampler
from transformers import AutoModelForSequenceClassification, AdamW

from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score
np.random.seed(123)

## Load data

use command line tool to preprocess the raw dataset first:
jq -r '. | [.text, .label | tostring] | @csv' train.jsonl | awk 'BEGIN{print "text,label"}{print}' > train.csv
jq -r '. | [.text, .label | tostring] | @csv' test.jsonl | awk 'BEGIN{print "text,label"}{print}' > test.csv

In [3]:
# Load data
train_data = pd.read_csv("train.csv")
test_data = pd.read_csv("test.csv")

# Display
print("Train Data:")
print(train_data.head())
print(train_data.shape)
print("\nTest Data:")
print(test_data.head())
print(test_data.shape)

Train Data:
                                                text  label
0                            i didnt feel humiliated      0
1  i can go from feeling so hopeless to so damned...      0
2   im grabbing a minute to post i feel greedy wrong      3
3  i am ever feeling nostalgic about the fireplac...      2
4                               i am feeling grouchy      3
(16000, 2)

Test Data:
                                                text  label
0  im feeling rather rotten so im not very ambiti...      0
1          im updating my blog because i feel shitty      0
2  i never make her separate from me because i do...      0
3  i left with my bouquet of red and yellow tulip...      1
4    i was feeling a little vain when i did this one      0
(2000, 2)


## Vectorize text

In [4]:
# Initialize CountVectorizer
vectorizer = CountVectorizer()

# Fit on both datasets to ensure vocabulary is consistent across both datasets
combined_text = pd.concat([train_data['text'], test_data['text']], axis=0)
vectorizer.fit(combined_text)

# Transform the training and testing data separately
X_train = vectorizer.transform(train_data['text'])
X_test = vectorizer.transform(test_data['text'])

# Convert to DataFrame
feature_names = vectorizer.get_feature_names_out()
train_df = pd.DataFrame(X_train.toarray(), columns=feature_names)
test_df = pd.DataFrame(X_test.toarray(), columns=feature_names)

# Display the transformed data
print("\nTransformed Train text Data:")
# print(train_df.head())
print(train_df.shape)
print("\nTransformed Test text Data:")
# print(test_df.head())
print(test_df.shape)


Transformed Train text Data:
(16000, 16158)

Transformed Test text Data:
(2000, 16158)


## convert into ndarray

In [5]:
# Extract labels and convert to ndarray
y_train = train_data['label'].to_numpy().reshape(-1, 1)
y_test = test_data['label'].to_numpy().reshape(-1, 1)

# Convert feature DataFrame to ndarray
x_train = train_df.to_numpy()
x_test = test_df.to_numpy()

# Print shapes for verification
print("y_train shape:", y_train.shape)
print("y_test shape:", y_test.shape)
print("x_train shape:", x_train.shape)
print("x_test shape:", x_test.shape)

y_train shape: (16000, 1)
y_test shape: (2000, 1)
x_train shape: (16000, 16158)
x_test shape: (2000, 16158)


## Naive Bayes Model

In [5]:
class NaiveBayes:
    def __init__(self):
        self.class_log_prior_ = None
        self.feature_log_prob_ = None
        self.classes_ = None

    def fit(self, x, y):
        # Calculate the prior probability of each class and take the logarithm
        self.classes_, y_counts = np.unique(y, return_counts=True)
        self.class_log_prior_ = np.log(y_counts / y_counts.sum())

        # Calculate the conditional probability P(feature|class)
        self.feature_log_prob_ = []
        for c in self.classes_:
            # Select samples for each class
            x_c = x[y == c]
            # Calculate the probability of each feature and take the logarithm
            class_feature_prob = (x_c.sum(axis=0) + 1) / (x_c.sum() + x_c.shape[1])
            self.feature_log_prob_.append(np.log(class_feature_prob))

        self.feature_log_prob_ = np.array(self.feature_log_prob_)

    def predict(self, xt):
        # Calculate the probability for each sample belonging to each class
        log_probs = xt @ self.feature_log_prob_.T + self.class_log_prior_
        # Choose the class with the highest probability
        return self.classes_[np.argmax(log_probs, axis=1)]

    def eval(self, y_true, y_pred):
        # Calculate accuracy
        return (y_true == y_pred).mean()

### evaluate model

In [6]:
# Instantiate the Naive Bayes classifier
nb = NaiveBayes()

# Train the model
nb.fit(x_train, y_train.flatten())  # Assuming y_train is a two-dimensional array

# Predict on the test set
y_pred = nb.predict(x_test)

# Calculate accuracy
accuracy = nb.eval(y_test.flatten(), y_pred)
print(f"Accuracy: {accuracy}")

Accuracy: 0.769


In [8]:
def calculate_metrics(y_true, y_pred, num_classes):
    # Initialize metrics
    precision = np.zeros(num_classes)
    recall = np.zeros(num_classes)
    f1 = np.zeros(num_classes)
    
    # Calculate metrics for each class
    for cls in range(num_classes):
        true_positive = np.sum((y_pred == cls) & (y_true == cls))
        false_positive = np.sum((y_pred == cls) & (y_true != cls))
        false_negative = np.sum((y_pred != cls) & (y_true == cls))
        true_negative = np.sum((y_pred != cls) & (y_true != cls))

        precision[cls] = true_positive / (true_positive + false_positive) if true_positive + false_positive > 0 else 0
        recall[cls] = true_positive / (true_positive + false_negative) if true_positive + false_negative > 0 else 0
        f1[cls] = 2 * (precision[cls] * recall[cls]) / (precision[cls] + recall[cls]) if precision[cls] + recall[cls] > 0 else 0

    # Calculate the macro-average of the metrics
    macro_precision = np.mean(precision)
    macro_recall = np.mean(recall)
    macro_f1 = np.mean(f1)

    # Calculate accuracy
    accuracy = np.mean(y_true == y_pred)

    return accuracy, macro_precision, macro_recall, macro_f1

# Use the calculate_metrics function
num_classes = 6  
accuracy, precision, recall, f1 = calculate_metrics(y_test.flatten(), y_pred, num_classes)

# Print metrics
print(f"Accuracy: {accuracy:.3f}")
print(f"Precision: {precision:.3f}")
print(f"Recall: {recall:.3f}")
print(f"F1 Score: {f1:.3f}")

Accuracy: 0.769
Precision: 0.807
Recall: 0.658
F1 Score: 0.685


## Process data for bert

In [13]:
# 1. Load pre-trained model and tokenizer
model_name = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

# 2. Tokenizer
def tokenize_texts(texts):
    return tokenizer(texts, padding=True, truncation=True, return_tensors="pt")

# 3. Apply tokenization
# train_encodings = tokenize_texts(train_data['text'].tolist())
# test_encodings = tokenize_texts(test_data['text'].tolist())
train_encodings = tokenizer(train_data['text'].tolist(), padding='max_length', truncation=True, max_length=87, return_tensors="pt")
test_encodings = tokenizer(test_data['text'].tolist(), padding='max_length', truncation=True, max_length=87, return_tensors="pt")

# 4. Convert tokenized data to tensors
x_train = train_encodings['input_ids']
x_test = test_encodings['input_ids']


attention_masks_train = train_encodings['attention_mask']
attention_masks_test = test_encodings['attention_mask']


y_train_tensor = torch.tensor(y_train)
y_test_tensor = torch.tensor(y_test)

# Print the shape of all tensors
print("x_train shape:", x_train.shape)
print("x_test shape:", x_test.shape)
print("y_train_tensor shape:", y_train_tensor.shape)
print("y_test_tensor shape:", y_test_tensor.shape)

x_train shape: torch.Size([16000, 87])
x_test shape: torch.Size([2000, 87])
y_train_tensor shape: torch.Size([16000, 1])
y_test_tensor shape: torch.Size([2000, 1])


In [25]:
# 检查是否有可用的 GPU，如果有，使用它；否则使用 CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

model_name = "bert-base-uncased"
# 1. 准备模型：添加一个用于分类的顶层，并移到适当的设备
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=6)
model.to(device)  # 将模型移动到指定的 device 上
model.eval()  # 将模型设置为评估模式

# 2. 定义数据加载器
test_dataset = TensorDataset(x_test)  # 使用测试数据集
test_dataloader = DataLoader(test_dataset, batch_size=16)

# 3. 进行预测
predictions = []
for batch in test_dataloader:
    # 将数据移动到相同的 device
    b_input_ids = batch[0].to(device)

    with torch.no_grad():  # 在评估模式下，不计算梯度
        outputs = model(b_input_ids)
    
    logits = outputs.logits
    predictions.append(logits)

# 转换预测结果为更易理解的格式（例如，提取最可能的类别）
predicted_labels = [torch.argmax(logits, dim=1).cpu().numpy() for logits in predictions]
predicted_labels_flat = np.concatenate(predicted_labels, axis=0)

# 确保 y_test 也是 NumPy 数组
y_test_np = y_test.cpu().numpy() if torch.is_tensor(y_test) else y_test

# 计算准确率
accuracy = accuracy_score(y_test_np, predicted_labels_flat)
print("Prediction Accuracy: {:.2f}%".format(accuracy * 100))

Using device: cuda


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.
