# Text Analysis Part2 - Named Entity Recognition (NER) 



In [1]:
from datasets import load_dataset
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from nltk.tag import CRFTagger
import re, unicodedata
import nltk
from nltk.stem import PorterStemmer
from nltk import pos_tag

# 1. Load the dataset
Load the BC5CDR dataset from the Hugging Face datasets library.
Use train and validation sets for training, and the test set for testing.

Map the labels to the following tags:
```
{
    0: "O",
    1: "B-Chemical",
    2: "B-Disease",
    3: "I-Disease",
    4: "I-Chemical"
}

In [2]:
dataset = load_dataset(
    "tner/bc5cdr", 
    cache_dir="./NLP_data_cache",
)

train_dataset= dataset["train"]
val_dataset = dataset["validation"]
test_dataset = dataset["test"]

In [19]:
# 定义标签的映射
ner_tag_mapping= {0: "O",1: "B-Chemical",2: "B-Disease",3: "I-Disease",4: "I-Chemical"}

train_set=[list(zip(s['tokens'], [ner_tag_mapping[tok] for tok in s['tags']])) for s in train_dataset][:-1]
val_set=[list(zip(s['tokens'], [ner_tag_mapping[tok] for tok in s['tags']])) for s in val_dataset][:-1]
test_set=[list(zip(s['tokens'], [ner_tag_mapping[tok] for tok in s['tags']])) for s in test_dataset][:-1]

train_set=train_set+val_set
test_tokens = [s['tokens'] for s in test_dataset][:-1]
test_tags = [[ner_tag_mapping[tok] for tok in s['tags']] for s in test_dataset][:-1]

# 2. Train a CRF NER tagger



## 2.1 Evaluation functions
使用Lab（text_labs_public）中的函数进行评估，主要为了提取命名实体的span，然后计算整体的F1值

Define some functions to evaluate the performance of the tagger on the test set. In NER, an entity may consist of a single word or multiple consecutive words. Therefore, an entity match is considered correct only if all its components are accurately identified and categorised correctly.

Use the funcion "extract_spans" to extract the spans of the named entities from the tagged sentences. "cal_span_level_f1" function calculates the F1 score for each class and the macro-average F1 score.
These 2 funcions are provieded by [text_labs_public](https://github.com/uob-TextAnalytics/text_labs_public/blob/main/2_sequence_tagging.ipynb)

In [5]:
def extract_spans(tagged_sents):
    spans = {}
        
    for sidx, sent in enumerate(tagged_sents):
        start = -1
        entity_type = None
        for i, (tok, lab) in enumerate(sent):
            if 'B-' in lab:
                start = i
                end = i + 1
                entity_type = lab[2:]
            elif 'I-' in lab:
                end = i + 1
            elif lab == 'O' and start >= 0:
                
                if entity_type not in spans:
                    spans[entity_type] = []
                
                spans[entity_type].append((start, end, sidx))
                start = -1      
        # Sometimes an I-token is the last token in the sentence, so we still have to add the span to the list
        if start >= 0:    
            if entity_type not in spans:
                spans[entity_type] = []
                
            spans[entity_type].append((start, end, sidx))
                
    return spans

In [6]:
def cal_span_level_f1(test_sents, test_sents_with_pred):
    # gold spans
    gold_spans = extract_spans(test_sents)

    # predicted spans
    pred_spans = extract_spans(test_sents_with_pred)
    
    # compute the metrics for each class:
    f1_per_class = []
    
    ne_types = gold_spans.keys()  # get the list of named entity types (not the tags)
    
    for ne_type in ne_types:
        # compute the confusion matrix
        true_pos = 0
        false_pos = 0
        
        for span in pred_spans[ne_type]:
            if span in gold_spans[ne_type]:
                true_pos += 1
            else:
                false_pos += 1
                
        false_neg = 0
        for span in gold_spans[ne_type]:
            if span not in pred_spans[ne_type]:
                false_neg += 1
                
        if true_pos + false_pos == 0:
            precision = 0
        else:
            precision = true_pos / float(true_pos + false_pos)
            
        if true_pos + false_neg == 0:
            recall = 0
        else:
            recall = true_pos / float(true_pos + false_neg)
        
        if precision + recall == 0:
            f1 = 0
        else:
            f1 = 2 * precision * recall / (precision + recall)
            
        f1_per_class.append(f1)
        print(f'F1 score for class {ne_type} = {f1}')
        
    print(f'Macro-average f1 score = {np.mean(f1_per_class)}')

## 2.2 Train a CRF NER tagger



In [4]:
# 1. Customized CRF Tagger
class CustomCRFTagger(CRFTagger):
    def _get_features(self, tokens, idx):

        token = tokens[idx]
        feature_list = [
            'bias',  # 偏置项
            'word.lower=' + token.lower(), # 小写形式
            'word[-3:]=' + token[-3:],  # 后缀
            'word[-2:]=' + token[-2:],  # 后缀
            'word.isupper=%s' % token.isupper(), # 是否全大写
            'word.istitle=%s' % token.istitle(), # 是否首字母大写
            'word.isdigit=%s' % token.isdigit(), # 是否是数字
            'word.ischemical=%s' % bool(re.match(r'.*(ate|ium|ide|ite)$', token.lower())),  # 化学品常见后缀
            'word.isdisease=%s' % bool(re.match(r'.*(itis|osis|oma|pathy)$', token.lower())),  # 疾病常见后缀
        ]
        
        # 添加上下文特征
        if idx > 0: # 如果不是句子的第一个单词
            prev_token = tokens[idx-1]
            feature_list.extend([
                '-1:word.lower=' + prev_token.lower(), # 前一个单词的小写形式
                '-1:word.istitle=%s' % prev_token.istitle(), # 前一个单词是否首字母大写
                '-1:word.isupper=%s' % prev_token.isupper(), # 前一个单词是否全大写
            ])
        else:
            feature_list.append("BOS")  # 句子开始
        
        if idx < len(tokens)-1: # 如果不是句子的最后一个单词
            next_token = tokens[idx+1]
            feature_list.extend([
                '+1:word.lower=' + next_token.lower(), # 后一个单词的小写形式
                '+1:word.istitle=%s' % next_token.istitle(), # 后一个单词是否首字母大写
                '+1:word.isupper=%s' % next_token.isupper(), # 后一个单词是否全大写
            ])
        else:
            feature_list.append("EOS")  # 句子结束
        
        return feature_list

In [14]:
# 2. POS Tagger
class CustomCRFTaggerWithPOS(CRFTagger):
    def _get_features(self, tokens, idx):
        token, pos = tokens[idx] # 这里的token=word, pos=POS标签
        feature_list = [
            'bias',
            'word.lower=' + token.lower(),
            'word[-3:]=' + token[-3:],
            'word[-2:]=' + token[-2:],
            'word.isupper=%s' % token.isupper(),
            'word.istitle=%s' % token.istitle(),
            'word.isdigit=%s' % token.isdigit(),
            'word.ischemical=%s' % bool(re.match(r'.*(ate|ium|ide|ite)$', token.lower())),
            'word.isdisease=%s' % bool(re.match(r'.*(itis|osis|oma|pathy)$', token.lower())),
            'pos=' + pos,  # 添加POS特征
        ]

        if idx > 0: # 如果不是句子的第一个单词
            prev_token, prev_pos = tokens[idx-1] # 获取前一个单词的大小写形式和POS标签
            feature_list.extend([
                '-1:word.lower=' + prev_token.lower(),
                '-1:word.istitle=%s' % prev_token.istitle(),
                '-1:word.isupper=%s' % prev_token.isupper(),
                '-1:pos=' + prev_pos,
            ])
        else:
            feature_list.append("BOS")
        
        if idx < len(tokens) - 1: # 如果不是句子的最后一个单词
            next_token, next_pos = tokens[idx+1] # 获取后一个单词的大小写形式和POS标签
            feature_list.extend([
                '+1:word.lower=' + next_token.lower(),
                '+1:word.istitle=%s' % next_token.istitle(),
                '+1:word.isupper=%s' % next_token.isupper(),
                '+1:pos=' + next_pos,
            ])
        else:
            feature_list.append("EOS")
        
        return feature_list

In [33]:
# 3. POS and Stemming Tagger
class CustomCRFTaggerWithPOSAndStemming(CustomCRFTaggerWithPOS):
    def __init__(self, *args, **kwargs):
        super(CustomCRFTaggerWithPOSAndStemming, self).__init__(*args, **kwargs)
        self.stemmer = PorterStemmer()

    def _get_features(self, tokens, idx):
        token, pos = tokens[idx]  # 这里的token=word, pos=POS标签
        stem = self.stemmer.stem(token)  # 提取词干

        feature_list = super(CustomCRFTaggerWithPOSAndStemming, self)._get_features(tokens, idx)

        # 添加词干特征
        feature_list.append('stem=' + stem)

        return feature_list

In [40]:
# Train a CRF NER tagger
def train_CRF_NER_tagger(train_set, tagger_name):
    tagger = tagger_name
    tagger.train(train_set, 'crf.tagger')
    return tagger  # return the trained model

In [13]:
def preprocess_data_for_pos(data):
    """将数据集转换为包含词性标注的格式，同时保留NER标签"""
    processed_data = []
    for sent in data:
        words, ner_tags = zip(*sent)  # 分离单词和NER标签
        pos_tags = [pos for word, pos in pos_tag(words)]  # 对单词进行词性标注
        
        processed_sent = [((word, pos), tag) for word, pos, tag in zip(words, pos_tags, ner_tags)]
        processed_data.append(processed_sent)
    return processed_data

train_set_pos = preprocess_data_for_pos(train_set)
test_set_pos = preprocess_data_for_pos(test_set)

train_set_pos[:2]

[[(('Naloxone', 'NN'), 'B-Chemical'),
  (('reverses', 'VBZ'), 'O'),
  (('the', 'DT'), 'O'),
  (('antihypertensive', 'JJ'), 'O'),
  (('effect', 'NN'), 'O'),
  (('of', 'IN'), 'O'),
  (('clonidine', 'NN'), 'B-Chemical'),
  (('.', '.'), 'O')],
 [(('In', 'IN'), 'O'),
  (('unanesthetized', 'JJ'), 'O'),
  ((',', ','), 'O'),
  (('spontaneously', 'RB'), 'O'),
  (('hypertensive', 'JJ'), 'B-Disease'),
  (('rats', 'NNS'), 'O'),
  (('the', 'DT'), 'O'),
  (('decrease', 'NN'), 'O'),
  (('in', 'IN'), 'O'),
  (('blood', 'NN'), 'O'),
  (('pressure', 'NN'), 'O'),
  (('and', 'CC'), 'O'),
  (('heart', 'NN'), 'O'),
  (('rate', 'NN'), 'O'),
  (('produced', 'VBN'), 'O'),
  (('by', 'IN'), 'O'),
  (('intravenous', 'JJ'), 'O'),
  (('clonidine', 'NN'), 'B-Chemical'),
  ((',', ','), 'O'),
  (('5', 'CD'), 'O'),
  (('to', 'TO'), 'O'),
  (('20', 'CD'), 'O'),
  (('micrograms', 'NNS'), 'O'),
  (('/', 'JJ'), 'O'),
  (('kg', 'NN'), 'O'),
  ((',', ','), 'O'),
  (('was', 'VBD'), 'O'),
  (('inhibited', 'VBN'), 'O'),
  (('or

In [21]:
# Original CRF Tagger
tagger = train_CRF_NER_tagger(train_set,CRFTagger())
predicted_tags = tagger.tag_sents(test_tokens)
cal_span_level_f1(test_set, predicted_tags)

F1 score for class Chemical = 0.8510595960600934
F1 score for class Disease = 0.7077707006369427
Macro-average f1 score = 0.7794151483485181


In [22]:
# Customized CRF Tagger
tagger = train_CRF_NER_tagger(train_set,CustomCRFTagger())
predicted_tags = tagger.tag_sents(test_tokens)
cal_span_level_f1(test_set, predicted_tags)

F1 score for class Chemical = 0.8648486664668865
F1 score for class Disease = 0.7600696171059175
Macro-average f1 score = 0.812459141786402


In [31]:
# Customized with POS CRF Tagger
test_tokens_pos = [[(word, pos) for (word, pos), tag in sent] for sent in test_set_pos]

tagger = train_CRF_NER_tagger(train_set_pos, CustomCRFTaggerWithPOS())
predicted_tags = tagger.tag_sents(test_tokens_pos)
cal_span_level_f1(test_set, predicted_tags)

F1 score for class Chemical = 0.8644135188866798
F1 score for class Disease = 0.7635913312693499
Macro-average f1 score = 0.8140024250780149


In [34]:
# Customized with POS and Stemming CRF Tagger
tagger = train_CRF_NER_tagger(train_set_pos, CustomCRFTaggerWithPOSAndStemming())
predicted_tags = tagger.tag_sents(test_tokens_pos)
cal_span_level_f1(test_set, predicted_tags)

F1 score for class Chemical = 0.8730316922463623
F1 score for class Disease = 0.784527707044986
Macro-average f1 score = 0.8287796996456742


1. 原版Tagger：0.8511，0.7078，0.7794
2. 自定义Tagger：0.8648，0.7601，0.8125
3. 自定义+pos：0.8744，0.7636，0.8140
4. 自定义+pos+stem：0.8730，0.7845，0.8288