In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
import os
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from transformers import BertTokenizer, BertModel, AdamW, get_linear_schedule_with_warmup
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import pandas as pd

In [3]:
df = pd.read_csv('D:\ISE540\Team project\Amazonfoodnew - Amazonfoodnew.csv')
df.fillna(0, inplace=True)
df.replace(-1, 1, inplace=True)

In [4]:
# CLASSES = ['packing/outter appearance/shipping','health/ingredient','pricing/volume/quality','Cooking/how to use','Taste/texture']
texts = df['reviewText'].tolist()
labels = [[row['packing/outter appearance/shipping'], row['health/ingredient'], row['pricing/volume/quality'], row['Cooking/how to use'], row['Taste/texture']] for _, row in df.iterrows()]

In [6]:
class TextClassificationDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
    def __len__(self):
        return len(self.texts)
    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(text, return_tensors='pt', max_length=self.max_length, padding='max_length', truncation=True)
        return {'input_ids': encoding['input_ids'].flatten(), 'attention_mask': encoding['attention_mask'].flatten(), 'label': torch.tensor(label)}

In [26]:
class BERTClassifier(nn.Module):
    def __init__(self, bert_model_name, num_classes):
        super(BERTClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(bert_model_name)
        self.dropout = nn.Dropout(0.1)
        self.fc = nn.Linear(self.bert.config.hidden_size, num_classes)
        #修改
        # self.sigmoid = nn.Sigmoid()


    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        x = self.dropout(pooled_output)
        logits = self.fc(x)
        return logits
        # return self.sigmoid(logits)

In [27]:
def train(model, data_loader, optimizer, scheduler, device):
    model.train()
    for batch in data_loader:
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        loss = nn.CrossEntropyLoss()(outputs, labels)
        # loss = nn.BCEWithLogitsLoss()(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

In [28]:
from sklearn.metrics import precision_recall_fscore_support

def evaluate_multiclass(model, data_loader, device, num_classes):
    model.eval()
    all_predictions = []
    all_actual_labels = []
    with torch.no_grad():
        for batch in data_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            _, preds = torch.max(outputs, dim=1)
            all_predictions.extend(preds.cpu().tolist())
            all_actual_labels.extend(labels.cpu().tolist())

    # 使用 torch.argmax 将多标签指示器转换为单一标签
    all_actual_labels = torch.argmax(torch.tensor(all_actual_labels), dim=1).cpu().tolist()

    # 计算每个类别的准确率、召回率、F1 分数
    precision, recall, f1, _ = precision_recall_fscore_support(all_actual_labels, all_predictions, labels=list(range(num_classes)), average=None)

    for i in range(num_classes):
        print(f"Class {i} - Precision: {precision[i]}, Recall: {recall[i]}, F1: {f1[i]}")

    # 计算总体的平均准确率、召回率、F1 分数
    overall_precision, overall_recall, overall_f1, _ = precision_recall_fscore_support(all_actual_labels, all_predictions, average='macro')

    print(f"\nOverall Metrics - Precision: {overall_precision}, Recall: {overall_recall}, F1: {overall_f1}")

In [29]:
# Set up parameters
bert_model_name = 'bert-base-uncased'
num_classes = 5
max_length = 128
batch_size = 8
num_epochs = 4
learning_rate = 2e-5

In [13]:
train_texts, val_texts, train_labels, val_labels = train_test_split(texts, labels, test_size=0.2, random_state=42)

In [30]:
tokenizer = BertTokenizer.from_pretrained(bert_model_name)
train_dataset = TextClassificationDataset(train_texts, train_labels, tokenizer, max_length)
val_dataset = TextClassificationDataset(val_texts, val_labels, tokenizer, max_length)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size)

In [31]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BERTClassifier(bert_model_name, num_classes).to(device)

In [32]:
optimizer = AdamW(model.parameters(), lr=learning_rate)
total_steps = len(train_dataloader) * num_epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)



In [33]:
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    model.train()
    for batch in train_dataloader:
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        loss = nn.BCEWithLogitsLoss()(outputs, labels.float())  # 使用 BCEWithLogitsLoss
        loss.backward()
        optimizer.step()
        scheduler.step()

    # 在每个 epoch 结束后进行评估
    model.eval()
    confusion_matrix = evaluate_multiclass(model, val_dataloader, device, num_classes)
    print(confusion_matrix)


Epoch 1/4


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


Class 0 - Precision: 0.9333333333333333, Recall: 0.08235294117647059, F1: 0.15135135135135136
Class 1 - Precision: 0.0, Recall: 0.0, F1: 0.0
Class 2 - Precision: 0.32967032967032966, Recall: 0.6666666666666666, F1: 0.4411764705882353
Class 3 - Precision: 0.0, Recall: 0.0, F1: 0.0
Class 4 - Precision: 0.3469387755102041, Recall: 0.9532710280373832, F1: 0.5087281795511222

Overall Metrics - Precision: 0.3219884877027734, Recall: 0.3404581271761041, F1: 0.22025120029814174
None
Epoch 2/4
Class 0 - Precision: 0.8636363636363636, Recall: 0.11176470588235295, F1: 0.19791666666666666
Class 1 - Precision: 0.47368421052631576, Recall: 0.23684210526315788, F1: 0.3157894736842105
Class 2 - Precision: 0.2831858407079646, Recall: 0.7111111111111111, F1: 0.4050632911392405
Class 3 - Precision: 0.2857142857142857, Recall: 0.05, F1: 0.0851063829787234
Class 4 - Precision: 0.4100418410041841, Recall: 0.9158878504672897, F1: 0.5664739884393064

Overall Metrics - Precision: 0.46325250831782283, Recall: 0

In [38]:
import torch
from transformers import BertTokenizer

def predict_multilabel(text, model, tokenizer, device, threshold=0.6, max_length=128):
    model.eval()
    with torch.no_grad():

        encoding = tokenizer(text, return_tensors='pt', max_length=max_length, padding='max_length', truncation=True)
        input_ids = encoding['input_ids'].to(device)
        attention_mask = encoding['attention_mask'].to(device)

        # 使用模型进行预测
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        
        # 使用 sigmoid 函数将输出转换为概率
        probabilities = torch.sigmoid(outputs)
        
        # 判断概率是否超过阈值
        predicted_labels = (probabilities > threshold).int().tolist()[0]
        result = [list_classes[i] for i in range(len(list_classes)) if predicted_labels[i] == 1]

    return result

# test_sentence = "nice package, but bad price"
test_sentence = "It is really healthy, but bad price"

list_classes = ['packing/outter appearance/shipping','health/ingredient','pricing/volume/quality','Cooking/how to use','Taste/texture']
# predicted_labels = list_classes[predict_multilabel(test_sentence, model, tokenizer, device)]
predicted_labels = predict_multilabel(test_sentence, model, tokenizer, device)

print(f"The predicted labels for the sentence are: {predicted_labels}")


The predicted labels for the sentence are: ['pricing/volume/quality']


In [40]:
def predict_sentiment(text, model, tokenizer, device, max_length=128):
    model.eval()
    encoding = tokenizer(text, return_tensors='pt', max_length=max_length, padding='max_length', truncation=True)
    input_ids = encoding['input_ids'].to(device)
    attention_mask = encoding['attention_mask'].to(device)

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        _, preds = torch.max(outputs, dim=1)

    return preds.item()


test_sentence = "It is really healthy"
# test_sentence = "It is good in my breakfast"

list_classes = ['packing/outter appearance/shipping','health/ingredient','pricing/volume/quality','Cooking/how to use','Taste/texture']
predicted_class = list_classes[predict_sentiment(test_sentence, model, tokenizer, device)]
print(f"The predicted class for the sentence is: {predicted_class}")

The predicted class for the sentence is: health/ingredient
