In [1]:
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
from tqdm import tqdm
import re
import csv

# 加载DeBERTa模型和分词器
model_name = "microsoft/deberta-base"  # 只需引用，transformers会自动下载模型
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForTokenClassification.from_pretrained(model_name)

# 初始化NER pipeline
nlp = pipeline("ner", model=model, tokenizer=tokenizer)

# 读取清洗过的TXT文件
def read_text_file(txt_file_path):
    with open(txt_file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    return text

def split_text(text, max_len=512):
    # 将文本分段处理，以适应BERT输入限制
    sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
    chunks = []
    current_chunk = ""
    for sentence in sentences:
        if len(current_chunk) + len(sentence) < max_len:
            current_chunk += " " + sentence
        else:
            chunks.append(current_chunk.strip())
            current_chunk = sentence
    if current_chunk:
        chunks.append(current_chunk.strip())
    return chunks

# 提取ESG数据，增加进度条显示
def extract_esg_data(text_chunks):
    esg_data = []
    for chunk in tqdm(text_chunks, desc="Extracting ESG Data"):
        entities = nlp(chunk)
        current_data = {}
        for entity in entities:
            label = entity["entity"]
            word = entity["word"]
            # 根据标签分类存储
            if label == "LABEL_INDICATOR":  # 替换为DeBERTa模型的指标标签
                current_data["indicator"] = word
            elif label == "LABEL_VALUE":  # 替换为DeBERTa模型的数据值标签
                current_data["value"] = word
            elif label == "LABEL_UNIT":  # 替换为DeBERTa模型的单位标签
                current_data["unit"] = word
            # 如果当前数据项完整，保存到结果中
            if "indicator" in current_data and "value" in current_data and "unit" in current_data:
                esg_data.append(current_data)
                current_data = {}
    return esg_data

# 保存提取的数据到CSV
def save_to_csv(esg_data, output_csv_path):
    keys = ["indicator", "value", "unit"]
    with open(output_csv_path, 'w', newline='', encoding='utf-8') as output_file:
        dict_writer = csv.DictWriter(output_file, fieldnames=keys)
        dict_writer.writeheader()
        dict_writer.writerows(esg_data)
    print(f"Data saved to {output_csv_path}")

# 使用示例
txt_file_path = '../txt/AML.txt'
output_csv_path = '../output_metric/esg_data.csv'

# 读取和提取文本
text = read_text_file(txt_file_path)
# text_chunks = [text]  # 假设不分段，直接传入整个文本
text_chunks = split_text(text)

# 提取ESG数据并显示进度
esg_data = extract_esg_data(text_chunks)

# 保存到CSV
save_to_csv(esg_data, output_csv_path)


Some weights of the model checkpoint at microsoft/deberta-base were not used when initializing DebertaForTokenClassification: ['deberta.embeddings.position_embeddings.weight', 'lm_predictions.lm_head.bias', 'lm_predictions.lm_head.LayerNorm.bias', 'lm_predictions.lm_head.dense.bias', 'lm_predictions.lm_head.LayerNorm.weight', 'lm_predictions.lm_head.dense.weight']
- This IS expected if you are initializing DebertaForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DebertaForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DebertaForTokenClassification were not initialized from the model checkpoint at microsoft/deberta-base and are newly initial

Data saved to ../output_metric/esg_data.csv





In [2]:
print(model.config.id2label)



{0: 'LABEL_0', 1: 'LABEL_1'}


In [5]:
import json
# 将数据转为BIO格式
def convert_to_bio(data):
    bio_data = []
    
    for item in data:
        sentence_tokens = []
        sentence_labels = []
        
        # Process the metric (indicator)
        if item["metric"]:
            metric_tokens = item["metric"].split()
            sentence_tokens.extend(metric_tokens)
            # 标记第一个词为B-INDICATOR，其余为I-INDICATOR
            sentence_labels.append("B-INDICATOR")
            sentence_labels.extend(["I-INDICATOR"] * (len(metric_tokens) - 1))
        
        # Process the value
        if item["value"]:
            value_tokens = str(item["value"]).split()
            sentence_tokens.extend(value_tokens)
            # 标记第一个词为B-VALUE
            sentence_labels.append("B-VALUE")
            sentence_labels.extend(["I-VALUE"] * (len(value_tokens) - 1))
        
        # Process the unit
        if item["unit"]:
            unit_tokens = item["unit"].split()
            sentence_tokens.extend(unit_tokens)
            # 标记第一个词为B-UNIT，其余为I-UNIT
            sentence_labels.append("B-UNIT")
            sentence_labels.extend(["I-UNIT"] * (len(unit_tokens) - 1))
        
        # Append sentence tokens and labels to bio_data
        bio_sentence = list(zip(sentence_tokens, sentence_labels))
        bio_data.append(bio_sentence)
    
    return bio_data

# 读取JSON文件
with open("../json/grouped_data_full1.json", "r") as file:
    data = json.load(file)

# 转换为BIO格式
bio_data = convert_to_bio(data)

# 保存为BIO格式的文本文件
with open("../output/bio_data.txt", "w") as file:
    for sentence in bio_data:
        for word, label in sentence:
            file.write(f"{word} {label}\n")
        file.write("\n")  # 每个句子之间用空行分隔


In [None]:
from datasets import load_dataset, ClassLabel, Sequence

# 加载BIO格式数据集
def load_bio_dataset(file_path):
    dataset = load_dataset("text", data_files=file_path)

    # 解析BIO数据格式并转换为NER任务的格式
    def tokenize_and_align_labels(example):
        tokens = []
        labels = []
        for line in example["text"].split("\n"):
            if line.strip():
                word, label = line.split()
                tokens.append(word)
                labels.append(label)
        return {"tokens": tokens, "ner_tags": labels}

    # 应用解析函数并转换数据集
    dataset = dataset.map(tokenize_and_align_labels, remove_columns=["text"])

    # 将标签转换为整数类型
    label_list = sorted(set(sum(dataset["train"]["ner_tags"], [])))  # 获取所有的标签
    label_to_id = {label: i for i, label in enumerate(label_list)}
    
    def label_to_int(example):
        example["ner_tags"] = [label_to_id[label] for label in example["ner_tags"]]
        return example

    dataset = dataset.map(label_to_int)
    
    # 设置标签格式
    features = dataset["train"].features.copy()
    features["ner_tags"] = Sequence(feature=ClassLabel(names=label_list))
    dataset = dataset.cast(features)

    return dataset, label_list

# 加载并处理数据集
file_path = "bio_data.txt"
dataset, label_list = load_bio_dataset(file_path)


In [18]:
import json
import torch
from transformers import DebertaTokenizer, DebertaForTokenClassification, Trainer, TrainingArguments
from torch.utils.data import Dataset

# 1. Load and parse the JSON, BIO, and raw text files
def load_json_data(filepath):
    with open(filepath, 'r') as file:
        data = json.load(file)
    return data

def load_bio_data(filepath):
    texts, labels = [], []
    with open(filepath, 'r') as file:
        text, label = [], []
        for line in file:
            if line.strip() == "":
                if text:
                    texts.append(" ".join(text))
                    labels.append(label)
                    text, label = [], []
                continue
            
            # 将BIO标签正确映射为数字
            word, tag = line.strip().split()
            text.append(word)
            
            # 转换标签
            if tag == 'O':
                label.append(0)
            elif 'B' in tag:
                label.append(1)  # 假设B-*标签映射为1
            elif 'I' in tag:
                label.append(2)  # 假设I-*标签映射为2
            
        if text:  # 处理文件最后一行
            texts.append(" ".join(text))
            labels.append(label)
    return texts, labels


def load_raw_text(filepath):
    with open(filepath, 'r') as file:
        return [line.strip() for line in file if line.strip()]



In [None]:
# File paths
json_filepath = '../json/grouped_data_full1.json'
bio_filepath = '../output/bio_data.txt'
raw_text_filepath = '../output/1030_split.txt'

# Load data
json_data = load_json_data(json_filepath)
bio_texts, bio_labels = load_bio_data(bio_filepath)
raw_texts = load_raw_text(raw_text_filepath)

In [None]:
# 2. Initialize tokenizer and model
tokenizer = DebertaTokenizer.from_pretrained("microsoft/deberta-base")
model = DebertaForTokenClassification.from_pretrained("microsoft/deberta-base", num_labels=3)

# 3. Custom Dataset Class for labeled and unlabeled data
class ESGDataset(Dataset):
    def __init__(self, texts, labels=None):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoding = tokenizer(self.texts[idx], truncation=True, padding="max_length", max_length=128, return_tensors="pt")
        item = {key: val.squeeze() for key, val in encoding.items()}
        if self.labels:
            labels = self.labels[idx] + [2] * (128 - len(self.labels[idx]))  # Pad labels to max length
            item["labels"] = torch.tensor(labels)
        return item

# Prepare datasets
train_dataset = ESGDataset(bio_texts, bio_labels)  # BIO labeled data for supervised training
unlabeled_dataset = ESGDataset(raw_texts)  # Unlabeled data for unsupervised pretraining

# 4. Training parameters
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="no",#"epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
)

# 5. Trainer for supervised training with labeled data
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
)

# 6. Fine-tuning
trainer.train()

loading file vocab.json from cache at C:\Users\ariaH/.cache\huggingface\hub\models--microsoft--deberta-base\snapshots\0d1b43ccf21b5acd9f4e5f7b077fa698f05cf195\vocab.json
loading file merges.txt from cache at C:\Users\ariaH/.cache\huggingface\hub\models--microsoft--deberta-base\snapshots\0d1b43ccf21b5acd9f4e5f7b077fa698f05cf195\merges.txt
loading file added_tokens.json from cache at None
loading file special_tokens_map.json from cache at None
loading file tokenizer_config.json from cache at C:\Users\ariaH/.cache\huggingface\hub\models--microsoft--deberta-base\snapshots\0d1b43ccf21b5acd9f4e5f7b077fa698f05cf195\tokenizer_config.json
loading configuration file config.json from cache at C:\Users\ariaH/.cache\huggingface\hub\models--microsoft--deberta-base\snapshots\0d1b43ccf21b5acd9f4e5f7b077fa698f05cf195\config.json
Model config DebertaConfig {
  "_name_or_path": "microsoft/deberta-base",
  "attention_probs_dropout_prob": 0.1,
  "hidden_act": "gelu",
  "hidden_dropout_prob": 0.1,
  "hidden

  0%|          | 0/9 [00:00<?, ?it/s]

In [10]:
# 训练完成后保存模型和分词器
trainer.save_model("../model/fine_tuned_deberta")
tokenizer.save_pretrained("../model/fine_tuned_deberta")


Saving model checkpoint to ../model/fine_tuned_deberta
Configuration saved in ../model/fine_tuned_deberta\config.json
Model weights saved in ../model/fine_tuned_deberta\pytorch_model.bin
tokenizer config file saved in ../model/fine_tuned_deberta\tokenizer_config.json
Special tokens file saved in ../model/fine_tuned_deberta\special_tokens_map.json


('../model/fine_tuned_deberta\\tokenizer_config.json',
 '../model/fine_tuned_deberta\\special_tokens_map.json',
 '../model/fine_tuned_deberta\\vocab.json',
 '../model/fine_tuned_deberta\\merges.txt',
 '../model/fine_tuned_deberta\\added_tokens.json')

In [None]:
import json
import torch
from transformers import DebertaTokenizer, DebertaForTokenClassification, Trainer, TrainingArguments
from torch.utils.data import Dataset

# 1. Load and parse the JSON, BIO, and raw text files
def load_json_data(filepath):
    with open(filepath, 'r') as file:
        data = json.load(file)
    return data

def load_bio_data(filepath):
    texts, labels = [], []
    with open(filepath, 'r') as file:
        text, label = [], []
        for line in file:
            if line.strip() == "":
                if text:
                    texts.append(" ".join(text))
                    labels.append(label)
                    text, label = [], []
                continue
            word, tag = line.strip().split()
            text.append(word)
            label.append(0 if tag == 'O' else 1 if 'B' in tag else 2)  # Convert B-I-O to 0-1-2
        if text:
            texts.append(" ".join(text))
            labels.append(label)
    return texts, labels

def load_raw_text(filepath):
    with open(filepath, 'r') as file:
        return [line.strip() for line in file if line.strip()]

# File paths
json_filepath = '/mnt/data/grouped_data_full1.json'
bio_filepath = '/mnt/data/bio_data.txt'
raw_text_filepath = '/mnt/data/1030_split.txt'

# Load data
json_data = load_json_data(json_filepath)
bio_texts, bio_labels = load_bio_data(bio_filepath)
raw_texts = load_raw_text(raw_text_filepath)

# 2. Initialize tokenizer and model
tokenizer = DebertaTokenizer.from_pretrained("microsoft/deberta-base")
model = DebertaForTokenClassification.from_pretrained("microsoft/deberta-base", num_labels=3)

# 3. Custom Dataset Class for labeled and unlabeled data
class ESGDataset(Dataset):
    def __init__(self, texts, labels=None):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoding = tokenizer(self.texts[idx], truncation=True, padding="max_length", max_length=128, return_tensors="pt")
        item = {key: val.squeeze() for key, val in encoding.items()}
        if self.labels:
            labels = self.labels[idx] + [2] * (128 - len(self.labels[idx]))  # Pad labels to max length
            item["labels"] = torch.tensor(labels)
        return item

# Prepare datasets
train_dataset = ESGDataset(bio_texts, bio_labels)  # BIO labeled data for supervised training
unlabeled_dataset = ESGDataset(raw_texts)  # Unlabeled data for unsupervised pretraining

# 4. Training parameters
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="no",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
)

# 5. Trainer for supervised training with labeled data
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
)

# 6. Fine-tuning
trainer.train()


In [None]:
bio_texts, bio_labels = load_bio_data(bio_filepath)
print(bio_texts[:2])
print(bio_labels[:2])
