# 情感分析

### 1. 导包

In [1]:
from random import random
import os
import random
from torch import nn
from torch.utils.data import Dataset, DataLoader
from transformers import AdamW, get_linear_schedule_with_warmup
from sklearn.metrics import classification_report
from tqdm.auto import tqdm
import numpy as np
import torch
import re
from transformers import AutoConfig, AutoTokenizer, BertForSequenceClassification
from torch.utils.tensorboard import SummaryWriter
import time
import sklearn.metrics as metrics

### 2. 设置超参数

In [2]:
max_length = 512                                                     # 最大长度
batch_size = 20                                                      # 批量大小
learning_rate = 1e-5                                                 # 学习率
epoch_num = 5                                                        # epoch数
themes = {"动力", "价格", "内饰", "配置", "安全性", "外观", "操控", "油耗", "空间", "舒适性"}

device = 'cuda' if torch.cuda.is_available() else 'cpu'              # 设备(cuda)
print(f'using device {device}')

checkpoint = "bert-base-chinese"                                     # 预训练模型
tokenizer = AutoTokenizer.from_pretrained(checkpoint)                # 分词器

using device cuda


### 3. 设置随机种子

In [3]:
def seed_everything(seed=1029):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

seed_everything(12)                                                  # 随机种子

### 4. 定义Dataset

In [4]:
class ChnSentiCorp(Dataset):

    def __init__(self, data_file):
        self.themes = ["动力", "价格", "内饰", "配置", "安全性", "外观", "操控", "油耗", "空间", "舒适性"]
        self.data = self.load_data(data_file)

    def load_data(self, data_file):
        theme_sentiment_pattern = re.compile(r'(\S+?)#(-?\d+)')
        Data = {}

        with open(data_file, 'rt', encoding='utf-8') as f:
            for idx, line in enumerate(f):
                line = line.strip()
                matches = theme_sentiment_pattern.findall(line)
                comment = re.sub(theme_sentiment_pattern, "", line).strip()
                theme_in_line = [theme for theme, _ in matches]
                multi_hot_vector = [1 if theme in theme_in_line else 0 for theme in self.themes]
                total_sentiment = sum(int(senti) for _, senti in matches)

                if total_sentiment > 0:
                    sentiment_label = 2
                elif total_sentiment < 0:
                    sentiment_label = 0
                else:
                    sentiment_label = 1

                Data[idx] = {
                    "comment": comment.replace(" ", ""),
                    "themes": multi_hot_vector,
                    "sentiment": sentiment_label
                }
            return Data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

In [5]:
train_data = ChnSentiCorp('data/train.txt')
test_data = ChnSentiCorp('data/test.txt')

print(f"length of training set: {len(train_data)}")
print(f"length of test set: {len(test_data)}")
print(train_data[0])

length of training set: 8000
length of test set: 2653
{'comment': '因为森林人即将换代，这套系统没必要装在一款即将换代的车型上，因为肯定会影响价格。', 'themes': [0, 1, 0, 0, 0, 0, 0, 0, 0, 0], 'sentiment': 1}


### 5. 定义DataLoader

In [6]:
def collate_fn(batch_samples):
    batch_sentences, batch_sentiment_labels = [], []

    for sample in batch_samples:
        batch_sentences.append(sample['comment'])
        batch_sentiment_labels.append(sample['sentiment'])

    batch_inputs = tokenizer(
        batch_sentences,
        max_length=max_length,
        padding=True,
        truncation=True,
        return_tensors="pt",
        return_attention_mask=True
    )

    input_ids = batch_inputs['input_ids']
    attention_mask = batch_inputs['attention_mask']
    labels = torch.tensor(batch_sentiment_labels, dtype=torch.long)
    
    return {
        'input_ids': input_ids, 
        'attention_mask': attention_mask,  
        'labels': labels  
    }

train_dataloader = DataLoader(train_data, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

In [7]:
next(iter(train_dataloader))

{'input_ids': tensor([[ 101, 1094, 6887,  ...,    0,    0,    0],
         [ 101, 4684, 2970,  ...,    0,    0,    0],
         [ 101, 2769, 4500,  ...,    0,    0,    0],
         ...,
         [ 101,  100, 4696,  ...,    0,    0,    0],
         [ 101, 1963, 3362,  ...,    0,    0,    0],
         [ 101, 2990, 6756,  ...,    0,    0,    0]]),
 'attention_mask': tensor([[1, 1, 1,  ..., 0, 0, 0],
         [1, 1, 1,  ..., 0, 0, 0],
         [1, 1, 1,  ..., 0, 0, 0],
         ...,
         [1, 1, 1,  ..., 0, 0, 0],
         [1, 1, 1,  ..., 0, 0, 0],
         [1, 1, 1,  ..., 0, 0, 0]]),
 'labels': tensor([1, 1, 1, 1, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 2, 1, 1, 0, 1, 1])}

### 6. 配置模型

In [9]:
config = AutoConfig.from_pretrained(checkpoint)
model = BertForSequenceClassification.from_pretrained(checkpoint, num_labels=3).to(device)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-chinese and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [10]:
config

BertConfig {
  "_name_or_path": "bert-base-chinese",
  "architectures": [
    "BertForMaskedLM"
  ],
  "attention_probs_dropout_prob": 0.1,
  "classifier_dropout": null,
  "directionality": "bidi",
  "hidden_act": "gelu",
  "hidden_dropout_prob": 0.1,
  "hidden_size": 768,
  "initializer_range": 0.02,
  "intermediate_size": 3072,
  "layer_norm_eps": 1e-12,
  "max_position_embeddings": 512,
  "model_type": "bert",
  "num_attention_heads": 12,
  "num_hidden_layers": 12,
  "pad_token_id": 0,
  "pooler_fc_size": 768,
  "pooler_num_attention_heads": 12,
  "pooler_num_fc_layers": 3,
  "pooler_size_per_head": 128,
  "pooler_type": "first_token_transform",
  "position_embedding_type": "absolute",
  "transformers_version": "4.49.0",
  "type_vocab_size": 2,
  "use_cache": true,
  "vocab_size": 21128
}

In [11]:
model

BertForSequenceClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(21128, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e

### 7. 优化器

In [13]:
optimizer = AdamW(model.parameters(), lr=learning_rate)
lr_scheduler = get_linear_schedule_with_warmup(
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=epoch_num * len(train_dataloader),
)

writer = SummaryWriter(log_dir='sentiment_classification_logs' + '/' + time.strftime('%m-%d_%H.%M', time.localtime()))



### 8. 定义train函数

In [12]:
total_train_step = 0
total_train_loss = 0.
best_f1_score = 0.
total_test_loss = 0

In [None]:
def train_loop(dataloader, model, optimizer, lr_scheduler, epoch, total_train_loss, total_train_step):
    progress_bar = tqdm(range(len(dataloader)),disable=False)
    progress_bar.set_description(f'loss: {0:>7f}')
    finish_step_num = epoch * len(dataloader)
    true_labels, predictions = [], []
    model.train()
    for step, batch_data in enumerate(dataloader, start=1):
        batch_data = {k: torch.tensor(v).to(device) if isinstance(v, list) else v.to(device) for k, v in batch_data.items()}
        theme_labels = batch_data["theme_labels"]
        theme_outputs = model(**batch_data)
        loss = theme_outputs["theme_loss"]
        logits = theme_outputs["theme_logits"]

        optimizer.zero_grad()
        loss.backward()

        optimizer.step()
        lr_scheduler.step()

        total_train_loss += loss.item()
        pred = predict(logits)
        true_labels += theme_labels.cpu().numpy().tolist()
        predictions += pred.cpu().numpy().tolist()
        #theme_metrics = classification_report(true_labels, predictions, target_names=themes, output_dict=True)


        total_train_step +=1
        progress_bar.set_description(f'loss: {total_train_loss / (finish_step_num + step):>7f}')
        progress_bar.update(1)

        if total_train_step % 100 == 0:
            accuracy = Ac(true_labels, predictions)
            print("训练次数:{}，loss:{}".format(total_train_step, loss.item()))
            writer.add_scalar("train_loss", loss.item(), total_train_step)
            writer.add_scalar("train_accuracy", accuracy, total_train_step)

    return total_train_loss, total_train_step