In [None]:

import pandas as pd
from sklearn.model_selection import train_test_split


df = pd.read_excel('./data/LLM生成数据-标签精简.xlsx')


label_column = '标签'


train_sets = []
validation_sets = []
test_sets = []

for label, group in df.groupby(label_column):

    train_valid, test = train_test_split(group, test_size=1/10)
    train, validation = train_test_split(train_valid, test_size=2/9)

    train_sets.append(train)
    validation_sets.append(validation)
    test_sets.append(test)


train_df = pd.concat(train_sets)
dev_df = pd.concat(validation_sets)
test_df = pd.concat(test_sets)


train_df.to_excel('./data/train_set.xlsx', index=False)
dev_df.to_excel('./data/validation_set.xlsx', index=False)
test_df.to_excel('./data/test_set.xlsx', index=False)


In [None]:

import pandas as pd

train_df = pd.read_excel('./data/train_set.xlsx', usecols=[0, 2] )
dev_df = pd.read_excel('./data/validation_set.xlsx', usecols=[0, 2])
test_df = pd.read_excel('./data/test_set.xlsx', usecols=[0, 2])
turth_df = pd.read_excel('./data/test_set.xlsx', usecols=[0, 2], sheet_name='Sheet1')


In [None]:
# import matplotlib.pyplot as plt

# plt.rcParams['font.sans-serif'] = ['SimHei']

# plt.rcParams['axes.unicode_minus'] = False


# length_counts = train_df['缺陷描述'].apply(len).value_counts().sort_index()


# plt.hist(length_counts.index, bins=len(length_counts), weights=length_counts.values)
# plt.xlabel('文本长度')
# plt.ylabel('频数')
# plt.title('字符串长度分布直方图')
# plt.show()


In [None]:

from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer

BERT_PATH = '/home/BERT-test/model/bert-base-chinese'
CLASS_NUM =26
MAX_LENTH = 50
class MyDataset(Dataset):
    def __init__(self, df):

        tokenizer = BertTokenizer.from_pretrained(BERT_PATH)
        self.texts = [tokenizer(text,
                                padding='max_length',
                                max_length = MAX_LENTH,
                                truncation=True,
                                return_tensors="pt")
                      for text in df['缺陷描述']]

        self.labels = [label for label in df['标签']]

    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

    def __len__(self):
        return len(self.labels)

train_dataset = MyDataset(train_df)
dev_dataset = MyDataset(dev_df)
test_dataset = MyDataset(test_df)
truth_dataset=MyDataset(turth_df)

In [None]:


from torch import nn
from transformers import BertModel

class BertClassifier(nn.Module):
    def __init__(self):
        super(BertClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(BERT_PATH)
        self.dropout = nn.Dropout(0.5)
        self.linear = nn.Linear(768, CLASS_NUM)
        self.relu = nn.ReLU()

    def forward(self, input_id, mask):
        _, pooled_output = self.bert(input_ids=input_id, attention_mask=mask, return_dict=False)
        dropout_output = self.dropout(pooled_output)
        linear_output = self.linear(dropout_output)
        final_layer = self.relu(linear_output)
        return final_layer


In [None]:


import torch
from torch.optim import Adam
from tqdm.notebook import tqdm
import numpy as np
import random
import os


epoch = 20
batch_size = 64
lr = 1e-5
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

save_path = './checkpoints'
random_seed = 3407
def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
setup_seed(random_seed)

def save_model(save_name):
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    torch.save(model.state_dict(), os.path.join(save_path, save_name))



model = BertClassifier()

criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=lr)
model = model.to(device)
criterion = criterion.to(device)


train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = DataLoader(dev_dataset, batch_size=batch_size)



best_dev_acc = 0
for epoch_num in range(epoch):
    total_acc_train = 0
    total_loss_train = 0
    for inputs, labels in tqdm(train_loader):
        input_ids = inputs['input_ids'].squeeze(1).to(device) # torch.Size([64,50])
        masks = inputs['attention_mask'].squeeze(1).to(device) # torch.Size([64, 1, 50])
        labels = labels.to(device)
        output = model(input_ids, masks)

        batch_loss = criterion(output, labels)
        batch_loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        acc = (output.argmax(dim=1) == labels).sum().item()
        total_acc_train += acc
        total_loss_train += batch_loss.item()


    model.eval()
    total_acc_val = 0
    total_loss_val = 0

    with torch.no_grad():

        for inputs, labels in dev_loader:
            input_ids = inputs['input_ids'].squeeze(1).to(device) # torch.Size([64, 50])
            masks = inputs['attention_mask'].squeeze(1).to(device) # torch.Size([64, 1, 50])
            labels = labels.to(device)
            output = model(input_ids, masks)

            batch_loss = criterion(output, labels)
            acc = (output.argmax(dim=1) == labels).sum().item()
            total_acc_val += acc
            total_loss_val += batch_loss.item()

        print(f'''Epochs: {epoch_num + 1}
          | Train Loss: {total_loss_train / len(train_dataset): .3f}
          | Train Accuracy: {total_acc_train / len(train_dataset): .3f}
          | Val Loss: {total_loss_val / len(dev_dataset): .3f}
          | Val Accuracy: {total_acc_val / len(dev_dataset): .3f}''')


        if total_acc_val / len(dev_dataset) > best_dev_acc:
            best_dev_acc = total_acc_val / len(dev_dataset)
            save_model('best.pt')

    model.train()


save_model('last.pt')



In [None]:



model = BertClassifier()
model.load_state_dict(torch.load(os.path.join(save_path, 'best.pt')))
model = model.to(device)
model.eval()

def evaluate(model, dataset):
    model.eval()
    test_loader = DataLoader(dataset, batch_size=128)
    total_acc_test = 0
    with torch.no_grad():
        for test_input, test_label in test_loader:
            input_id = test_input['input_ids'].squeeze(1).to(device)
            mask = test_input['attention_mask'].squeeze(1).to(device)
            test_label = test_label.to(device)
            output = model(input_id, mask)
            acc = (output.argmax(dim=1) == test_label).sum().item()
            total_acc_test += acc
    print(f'Test Accuracy: {total_acc_test / len(dataset): .3f}')









evaluate(model, test_dataset)
