In [73]:
!pip install transformers



In [74]:
from transformers import AutoModelForTokenClassification,AutoTokenizer,pipeline
model = AutoModelForTokenClassification.from_pretrained('uer/roberta-base-finetuned-cluener2020-chinese')
tokenizer = AutoTokenizer.from_pretrained('uer/roberta-base-finetuned-cluener2020-chinese', model_max_length=512)
ner = pipeline('ner', model=model, tokenizer=tokenizer)

In [75]:
def ner_by_cluener(title):
  tag_type = "position"
  ner_result = ner(title)
  single_name_str = ""
  name_list = []
  for char_dic in ner_result:
    # b_or_i, current_entity_type = char_dic['entity'].split("-")
    current_entity_type = char_dic['entity'].split("-")[1]
    # Start a new entity
    if single_name_str == "" and (char_dic['entity'] == f"B-{tag_type}" or char_dic['entity'] == f"I-{tag_type}"):
      single_name_str = char_dic['word']
    # Concatenate the entity
    elif char_dic['entity'] == f"I-{tag_type}":
      single_name_str+=char_dic['word']
    # B-tag type follows another B-tag type
    elif char_dic['entity'] == f"B-{tag_type}":
      name_list.append(single_name_str)
      single_name_str = char_dic['word']
    # B-tag type ended by a non-type type
    elif single_name_str!= "" and tag_type!= current_entity_type:
      name_list.append(single_name_str)
      single_name_str = ""
# The last captured entity
  if single_name_str != "":
    name_list.append(single_name_str)
  # name_list = [i for i in name_list if len(i)>1]
  name_list = list(set(name_list))
  print(name_list)
  return ";".join(name_list)

In [76]:
import csv
import os

output = []
count = 0
temp_output_dir = "temp_output"
print("Start tagging...")
with open("input.txt", 'r', encoding='utf-8') as f_reader:
  input_data = [i.rstrip('\n') for i in f_reader.readlines()]
  count_max = len(input_data)
  for title in input_data:
    if count%2000 == 0 and count != 0:
        print("{:.2f}% finished...".format(count/count_max*100))
        with open(f"{temp_output_dir}/output_{count}.csv", "w", encoding='utf-8', newline='') as f_writer:
            csv_writer = csv.writer(f_writer)
            csv_writer.writerows(output)
            output = []
    personname_str = ner_by_cluener(title)
    output.append([title, personname_str])
    count+=1
if not os.path.exists(temp_output_dir):
    os.mkdir(temp_output_dir)
if output != []:
  with open(f"{temp_output_dir}/output_{count}.csv", "w", encoding='utf-8', newline='') as f_writer:
    csv_writer = csv.writer(f_writer)
    csv_writer.writerows(output)

Start tagging...
[{'entity': 'I-name', 'score': 0.45452666, 'index': 56, 'word': '聰', 'start': 55, 'end': 56}, {'entity': 'B-name', 'score': 0.678319, 'index': 74, 'word': '梁', 'start': 73, 'end': 74}, {'entity': 'B-name', 'score': 0.8970667, 'index': 75, 'word': '王', 'start': 74, 'end': 75}, {'entity': 'I-name', 'score': 0.97592294, 'index': 76, 'word': '濬', 'start': 75, 'end': 76}, {'entity': 'B-position', 'score': 0.75350523, 'index': 78, 'word': '皇', 'start': 77, 'end': 78}, {'entity': 'I-position', 'score': 0.67683053, 'index': 79, 'word': '太', 'start': 78, 'end': 79}, {'entity': 'I-position', 'score': 0.62755257, 'index': 80, 'word': '子', 'start': 79, 'end': 80}, {'entity': 'B-name', 'score': 0.9844667, 'index': 104, 'word': '王', 'start': 103, 'end': 104}, {'entity': 'I-name', 'score': 0.9463046, 'index': 105, 'word': '無', 'start': 104, 'end': 105}, {'entity': 'I-name', 'score': 0.9760482, 'index': 106, 'word': '忌', 'start': 105, 'end': 106}, {'entity': 'B-name', 'score': 0.90634

In [77]:
import csv
from glob import glob
print("Combining output...")
count = 0
output = []
with open("output.csv", "w", encoding='utf-8', newline='') as f_writer:
    csv_writer = csv.writer(f_writer)
    for filename in glob(f"{temp_output_dir}/*.csv"):
        with open(filename, newline='') as f:
            reader = csv.reader(f)
            for row in reader:
                csv_writer.writerow(row)

print("Removing temp data...")
!rm -r ./temp_output/*.csv

print("Done!")

Combining output...
Removing temp data...
Done!
