In [None]:
from transformers import DistilBertTokenizer, DistilBertModel

import torch
import numpy as np
import matplotlib.pyplot as plt
import os
# from google.colab import drive
from torch import nn
from torch.optim import Adam
from tqdm import tqdm

from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
import pandas as pd
from scikitplot.metrics import confusion_matrix
from sklearn.metrics import accuracy_score, recall_score, f1_score

* Model

In [None]:
class SequenceClassifier(nn.Module):
    def __init__(self, hidden_size: int, num_classes:int ,max_seq_len:int):
        super(SequenceClassifier,self).__init__()
        self.Model =DistilBertModel.from_pretrained('distilbert-base-uncased')
        self.fc1 = nn.Linear(hidden_size*max_seq_len, num_classes)

        
    def forward(self, input_id, mask):
        """
        Args:input_id: encoded inputs ids of sent.
        """
        output= self.Model(input_ids=input_id, attention_mask=mask)
        last_hidden_states = output.last_hidden_state
        batch_size, sequence_length, hidden_size=last_hidden_states.shape
        linear_output = self.fc1(last_hidden_states.view(batch_size,-1))
        return linear_output

* Data Set

In [None]:
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
tokenizer.padding_side = "right"
tokenizer.add_special_tokens({'pad_token': '[PAD]'})
labels = {'suicide': 0, 'depression': 1, 'anxiety': 2, 'edanonymous': 3, 'socialanxiety': 4,
           'alcoholism': 5, 'healthanxiety': 6, 'addiction': 7}
class Dataset(torch.utils.data.Dataset):
    def __init__(self, df):
        self.labels = [labels[label] for label in df['label']]
        self.texts = [tokenizer(text,
                                padding='max_length',
                                max_length=512,
                                truncation=True,
                                return_tensors="pt") for text in df['text']]
        # for text in  df['text']:
        #     print(text)
        #     tokenizer(text,return_tensors="pt")
        
    def classes(self):
        return self.labels
    
    def __len__(self):
        return len(self.labels)
    
    def get_batch_labels(self, idx):
        # Get a batch of labels
        return np.array(self.labels[idx])
    
    def get_batch_texts(self, idx):
        # Get a batch of inputs
        return self.texts[idx]
    
    def __getitem__(self, idx):
        batch_texts = self.get_batch_texts(idx)
        batch_y = self.get_batch_labels(idx)
        return batch_texts, batch_y

* Train function

In [None]:
def train(model,df_train,df_val,df_test,learning_rate, epochs,Batch_Size):
    print("begin function")
    print("Downloading Dataset")
    train  = Dataset(df_train )
    val  = Dataset(df_val )
    test  = Dataset(df_test )
    train_dataloader = torch.utils.data.DataLoader(train, batch_size=Batch_Size, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val, batch_size=Batch_Size, shuffle=True)
    test_dataloader = torch.utils.data.DataLoader(test, batch_size=Batch_Size, shuffle=True)
    print("completed Dataset")
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")
    
    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=learning_rate)
    print("cuda")
    
    model = model.to(device)
    criterion = criterion.to(device)
    print("begin loop")
    for epoch_num in range(epochs):
        total_acc_train = 0
        total_loss_train = 0
        print("epoch:",epoch_num)
        for train_input, train_label in tqdm(train_dataloader):
            train_label = train_label.type(torch.LongTensor)  ###################
            train_label = train_label.to(device)
            mask = train_input['attention_mask'].to(device)
            input_id = train_input["input_ids"].squeeze(1).to(device)
            
            model.zero_grad()# 梯度下降
            # print(input_id.shape, mask.shape)

            output = model(input_id, mask)
            # print(type(output),type(train_label)) ###############################
            batch_loss = criterion(output, train_label)
            total_loss_train += batch_loss.item()
            
            acc = (output.argmax(dim=1)==train_label).sum().item()
            total_acc_train += acc

            batch_loss.backward()
            optimizer.step()
        torch.save(model.state_dict(), "./strength data/model_epoch_{}.pt".format(epoch_num))
            
        total_acc_val = 0
        total_loss_val = 0
        
        with torch.no_grad():
            # 验证
            for val_input, val_label in val_dataloader:
                val_label = val_label.type(torch.LongTensor) ###################
                val_label = val_label.to(device)
                mask = val_input['attention_mask'].to(device)
                input_id = val_input['input_ids'].squeeze(1).to(device)
                
                output = model(input_id, mask)
                batch_loss = criterion(output, val_label)
                total_loss_val += batch_loss.item()
                
                acc = (output.argmax(dim=1)==val_label).sum().item()
                total_acc_val += acc
                
            print(
            f"Epochs: {epoch_num + 1} | Train Loss: {total_loss_train/(len(train_dataloader)*16): .3f} \
            | Train Accuracy: {total_acc_train /( len(train_dataloader)*16): .3f} \
            | Val Loss: {total_loss_val / (len(val_dataloader)*16): .3f} \
            | Val Accuracy: {total_acc_val / (len(val_dataloader)*16): .3f}")

        # 测试
        predictions_labels = []
        true_labels = []
        total_acc_test = 0
        with torch.no_grad():
            result=[]
            for test_input, test_label in test_dataloader:

                test_label = test_label.to(device)
                mask = test_input['attention_mask'].to(device)
                input_id = test_input['input_ids'].squeeze(1).to(device)

                output = model(input_id, mask)
                result.append(output)
                # print(output)
                acc = (output.argmax(dim=1) == test_label).sum().item()
                total_acc_test += acc
                
                # add original labels
                true_labels += test_label.cpu().numpy().flatten().tolist()
                # get predicitons to list
                predictions_labels += output.argmax(dim=1).cpu().numpy().flatten().tolist()
        true_labels, pred_labels ,result=true_labels, predictions_labels,torch.cat(result, dim=0)
        accuracy = accuracy_score(true_labels, pred_labels)
        print(f"Accuracy: {accuracy:.3f}")

        # 计算召回率
        recall = recall_score(true_labels, pred_labels, average='macro')  # 'macro'表示简单平均
        print(f"Recall: {recall:.3f}")

        # 计算F1分数
        f1 = f1_score(true_labels, pred_labels, average='macro')
        print(f"F1 Score: {f1:.3f}")
        # 绘图
        fig, ax = plt.subplots(figsize=(8, 8))
        cm = confusion_matrix(y_true=true_labels, y_pred=pred_labels, 
                            labels=range(len(labels)), normalize='true')
        disp = ConfusionMatrixDisplay(confusion_matrix=cm, 
                                    display_labels=list(labels.keys()))
        # 旋转 x 轴标签，以便更容易阅读
        disp.plot(ax=ax)
        plt.xticks(rotation=45)
        plt.savefig("strength data/matrix_EPOCHS{}.png".format(epoch_num),dpi=1080)

* Data download

In [None]:
df_train = pd.read_csv('./strength data/train.csv') # ,nrows=100
df_val = pd.read_csv('./strength data/val.csv') # ,nrows=100
df_test = pd.read_csv('./strength data/test.csv') # ,nrows=100
print(df_train.shape,df_val.shape,df_test.shape)
# train  = Dataset(df_train ) 

* 超参数

In [None]:
use_cuda = torch.cuda.is_available()
device = torch.device("cuda")
model = SequenceClassifier(hidden_size=768, num_classes=8, max_seq_len=512)

bert=model.Model
LR = 1e-5
EPOCHS=7
Batch_Size=16


* Train

In [None]:
train(model,df_train,df_val,df_test, LR, EPOCHS,Batch_Size)

In [None]:
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
model =DistilBertModel.from_pretrained('distilbert-base-uncased')
tokenizer.padding_side = "right"
tokenizer.add_special_tokens({'pad_token': '[PAD]'})
text = "Replace me by any text you'd like."
encoded_input = tokenizer(text,padding='max_length', max_length=512, truncation=True,return_tensors='pt')
# print(encoded_input['token_type_ids'])
output = model(**encoded_input)

last_hidden_states = output.last_hidden_state
print(last_hidden_states.shape)



def gpt2_generation(seed_text, max_length=50):
    tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
    model =DistilBertModel.from_pretrained('distilbert-base-uncased')
    inputs = tokenizer.encode(seed_text, return_tensors='pt')
    outputs = model.generate(inputs, max_length=max_length, num_return_sequences=1)
    generated = tokenizer.decode(outputs[0], skip_special_tokens=True)
    return generated

seed_text = "This is a sample sentence for text data augmentation."
augmented_sentence = gpt2_generation(seed_text)
print(augmented_sentence)
