# dimer pipeline

### Read filenames

In [1]:
import os
unid = 'u0496358'
path = "/home/"+str(unid)+"/BRAT/"+str(unid)+"/Project_pe_test" 
files = os.listdir(path)

len(files)

44

## define regular expression

In [2]:
import re
 
rule=r'(?P<name>(d-dimer|ddimer))(?P<n1>.{1,25}?)(?P<value>[0-9]{1,4}(\.[0-9]{0,3})?\s*)(?P<n2>[^\n\w\d]*)(?P<unit>(ug\/l|ng\/ml|mg\/l|nmol\/l)?)'
rule1=r'(elevated|pos|positive|increased|high|\+)(.{1,20})?(\n)?\s?(d-dimer|d\s?dimer)'  
rule2=r'(d-dimer|d\s?dimer)([^0-9-:]{1,15})?(positive|pos)'
neg_regex = '(\\bno\\b|denies)'


## d-dimer pipeline apply rules

In [3]:
def ddimer_val(rule='rule', rule1='rule1', rule2='rule2', file_txt='note'):

    # import libraries
    import re
    from pipeUtils import Annotation
    from pipeUtils import Document

    # initite Document obj
    file1a = ''
    doc = Document()
    doc.load_document_from_file(file_txt)      

    # change to lower case
    doc.text = doc.text.lower()
    
#######################################################################################    
# match name value unit in note e.g. d-dimer 123.456 ng/mL
    
    # for rule in rules: # different process, cannot repeat.
    # compile and match in note text    
    pattern=re.compile(rule)
    matches=pattern.finditer(doc.text)   

    ann_index=0
    for match in matches:
        ann_id = 'NLP_'+ str(ann_index)
        ann_index=ann_index+1

        # check value and unit, then nomalize value
        if match.group('value') != None:
            value = float(match.group('value')) # mg/L*1000, ug/L, ng/mL, nmol/L*186
        if match.group('unit')=='mg/l':
            value = value * 1000
        if match.group('unit')=='nmol/l':
            value = value * 186
        # compare the value
        if value < 500:
            label = 'low_ddimer'   
        else:
            label = 'high_ddimer'    

        # Add new annotation
        new_annotation = Annotation(start_index=int(match.start()), 
                                end_index=int(match.end()), 
                                type=label,
                                ann_id = ann_id
                                )
        new_annotation.spanned_text = doc.text[new_annotation.start_index:new_annotation.end_index]

        # Check negation right before the found target up to 35 charachers before, 
        # making sure that the pre-text does not cross the text boundary and is valid
        if new_annotation.start_index - 35 > 0:
            pre_text_start = new_annotation.start_index - 35
        else:
            pre_text_start = 0

        # ending index of the pre_text is the beginning of the found target    
        pre_text_end = new_annotation.start_index    

        # substring the document text to identify the pre_text string
        pre_text = doc.text[pre_text_start: pre_text_end]

        if value < 500:
            new_annotation.attributes["Negation"] ='Negated'
        doc.annotations.append(new_annotation)
        

#######################################################################################
# annotate Target 2: Modifier + Name

    # compile and match in note text    
    pattern1=re.compile(rule1)
    matches1=pattern1.finditer(doc.text)  # match positive/+ d-dimer in note

    for match1 in matches1:
        ann_id = 'NLP_'+ str(ann_index)
        ann_index=ann_index+1
        new_annotation = Annotation(start_index=int(match1.start()), 
                                    end_index=int(match1.end()), 
                                    type='high_ddimer',
                                    ann_id = ann_id
                                    )
        new_annotation.spanned_text = doc.text[new_annotation.start_index:new_annotation.end_index]

        # Check negation right before the found target up to 30 charachers before, 
        # making sure that the pre-text does not cross the text boundary and is valid

        if new_annotation.start_index - 30 > 0:
            pre_text_start = new_annotation.start_index - 30
        else:
            pre_text_start = 0

        # ending index of the pre_text is the beginning of the found target    
        pre_text_end = new_annotation.start_index    

        # substring the document text to identify the pre_text string
        pre_text = doc.text[pre_text_start: pre_text_end]

        # We do not need to know the exact location of the negation keyword, so re.search is acceptable
        if re.search(neg_regex, pre_text , re.IGNORECASE):
            new_annotation.attributes["Negation"] ='Negated'
        doc.annotations.append(new_annotation)
        
#######################################################################################
# match d-dimer + positive in note

    pattern2=re.compile(rule2)
    matches2=pattern2.finditer(doc.text)  

    for match2 in matches2:
        ann_id = 'NLP_'+ str(ann_index)
        ann_index=ann_index+1
        new_annotation = Annotation(start_index=int(match2.start()), 
                                    end_index=int(match2.end()), 
                                    type='high_ddimer',
                                    ann_id = ann_id
                                    )
        new_annotation.spanned_text = doc.text[new_annotation.start_index:new_annotation.end_index]

        # Check negation right before the found target up to 30 charachers before, 
        # making sure that the pre-text does not cross the text boundary and is valid

        if new_annotation.start_index - 30 > 0:
            pre_text_start = new_annotation.start_index - 30
        else:
            pre_text_start = 0

        # ending index of the pre_text is the beginning of the found target    
        pre_text_end = new_annotation.start_index    

        # substring the document text to identify the pre_text string
        pre_text = doc.text[pre_text_start: pre_text_end]

        # We do not need to know the exact location of the negation keyword, so re.search is acceptable
        if re.search(neg_regex, pre_text , re.IGNORECASE):
            new_annotation.attributes["Negation"] ='Negated'
        doc.annotations.append(new_annotation)
        
    return doc.annotations    

## Apply the pipeline

In [4]:
import chardet

doc_annotations=dict()

note_count = 0                       # count the number of text notes want to process ***
for i in files[:]:
    if ".txt" in i:
        doc_file = os.path.join(path,i)
        #note_count = note_count + 1  #
        #if note_count > 2:           # count the number of text notes want to process ***
        #    break                    #
              
        note_annotations = ddimer_val(rule=rule, rule1=rule1, rule2=rule2, file_txt=doc_file)

        doc_annotations[i] = note_annotations


## Append annotation dataframes to annotation files

In [5]:

for k in doc_annotations:               # dict of annotations
      
    k0=k.split('.')[0]
    k1=k0+'.nlp'    

    nlp_ann=''
    for doc_ann in doc_annotations[k]:     # doc_ann is line of mention ann in doc annotation
        
        nlp_ann = nlp_ann + doc_ann.ann_id   +'\t'  
        nlp_ann = nlp_ann + doc_ann.type    +' '
        nlp_ann = nlp_ann + str(doc_ann.start_index)   +' '
        nlp_ann = nlp_ann + str(doc_ann.end_index)      +'\t'
        nlp_ann = nlp_ann + doc_ann.spanned_text    +'\n'    

    nlpann='nlpann/'+k1
    with open(nlpann, 'a') as myfile:
        myfile.write(nlp_ann)


## doc classification

In [6]:

doc_cls_results={}
for k in doc_annotations:               # dict of annotations
    doc_cls_results[k]='low_ddimer'
    for doc_ann in doc_annotations[k]:
        if doc_ann.type =='high_ddimer':
            doc_cls_results[k]='high_ddimer'
for k in doc_cls_results:
    print(k, '-----', doc_cls_results[k])

90688_292.txt ----- high_ddimer
65675_64.txt ----- high_ddimer
48640_63.txt ----- low_ddimer
86087_123.txt ----- high_ddimer
83838_106.txt ----- high_ddimer
72554_306.txt ----- high_ddimer
15899_182.txt ----- high_ddimer
13867_266.txt ----- high_ddimer
61180_73.txt ----- high_ddimer
32113_141.txt ----- high_ddimer
59381_293.txt ----- high_ddimer
58515_159.txt ----- high_ddimer
6878_279.txt ----- high_ddimer
820_14.txt ----- high_ddimer
32113_109.txt ----- high_ddimer
49079_68.txt ----- high_ddimer
10568_20.txt ----- high_ddimer
25764_268.txt ----- high_ddimer
1498_225.txt ----- high_ddimer
82326_55.txt ----- high_ddimer


## Select one doc annotation for mention level evaluation

In [7]:
k = '90688_292.txt'
ann1 = doc_annotations[k]
print(ann1)

[<pipeUtils.Annotation object at 0x7fa074ff0ef0>, <pipeUtils.Annotation object at 0x7fa074ebe7b8>]


## read annotation and convert to dataframe

In [8]:
import numpy as np
import pandas as pd

nlp_list=[]
for a in ann1:       
    list1=[a.ann_id, a.type, a.start_index, a.end_index, a.spanned_text]
    nlp_list.append(list1)
nlp_list    

nlp_df = pd.DataFrame(nlp_list, columns=['markup_id','type','start','end','txt'])    
nlp_df

Unnamed: 0,markup_id,type,start,end,txt
0,NLP_0,high_ddimer,398,414,elevated d-dimer
1,NLP_1,high_ddimer,2023,2039,elevated d-dimer


## convert df to annotation object, compare two annotations

In [9]:
def df2ann(df=[], pdoc_type='', ndoc_type=''):
    from pipeUtils import Annotation
    from pipeUtils import Document

    #ann_obj=Annotation()
    Annotations1=[]
    for index, row in df.iterrows() :

        if (pdoc_type == row['type'] or ndoc_type == row['type']):
            continue
        ann_obj=Annotation(start_index=row['start'], end_index=row['end'], type=row['type'], spanned_text=row['txt'], ann_id=row['markup_id'])
        Annotations1.append(ann_obj)

    return Annotations1

###############################################################################################

def compare2ann_types_by_span(ref_ann=[], sys_ann=[], ref_type ='Annotation', sys_type='Annotation', exact=True):
    tp, fp, fn = 0,0,0
    tp_list = []
    fp_list = []
    fn_list = []
    ref_anns = []
    sys_anns = []

    # Split annotations of different types into two lists
    for a in ref_ann:
        if(a.type == ref_type):
            ref_anns.append(a)
    for a in sys_ann:
        if(a.type == sys_type):
            sys_anns.append(a)

    # Count tp and fp
    for sys_ann in sys_anns:
        tp_flag = False
        matching_ref = None
        for ref_ann in ref_anns:
            if exact:
                if(sys_ann.exactMatch(ref_ann)):
                    tp_flag=True
                    matching_ref = ref_ann
            else:
                if sys_ann.overlaps(ref_ann):
                    tp_flag = True
                    matching_ref = ref_ann
        if tp_flag:
            tp = tp + 1
            tp_list.append([sys_ann, matching_ref])
        else:
            fp = fp + 1
            fp_list.append(sys_ann)

    # Count fn
    for ref_ann in ref_anns:
        tp_flag = False
        for sys_ann in sys_anns:
            if exact:
                if(ref_ann.exactMatch(sys_ann)):
                    tp_flag=True
            else:
                if ref_ann.overlaps(sys_ann):
                    tp_flag = True
        if not tp_flag:
            fn = fn + 1
            fn_list.append(ref_ann)

    return tp, fp, fn, tp_list, fp_list, fn_list

## Convert d-dimer nlp to annotation obj

In [10]:
Annotations2=df2ann(df=nlp_df, pdoc_type='positive_DOC', ndoc_type='negative_DOC')
for a in Annotations2:
    print (a.ann_id, a.type, a.start_index, a.end_index, a.spanned_text)

NLP_0 high_ddimer 398 414 elevated d-dimer
NLP_1 high_ddimer 2023 2039 elevated d-dimer


## read manual ref_ann and convert to df

In [11]:
import os
unid = 'u0496358'
path = "/home/"+str(unid)+"/BRAT/"+str(unid)+"/Project_pe_test" 

ann_file='90688_292.ann'

In [12]:
# read ann and convert to df

import numpy as np
import pandas as pd

annoList = []

with open(os.path.join(path,ann_file)) as f:
    ann_file = f.read()
ann_file=ann_file.split('\n')

for line in ann_file:

    if(line.startswith('T')):
        line=line.replace('\n', '')
        line=line.split('\t')

        line0=line[0]
        line2=line[2]
        line1=line[1].split(' ')
        
        if (';' in line1[2]):
            line1.remove(line1[2])   # remove middle span of annotated phrase seprated in 2 line, keep the annotation.
                        
        annList = []
        annList.append(line[0])
        annList.extend(line1)
        annList.append(line[2])
        annoList.append(annList)
#print(annoList)  
    
ann_df = pd.DataFrame(annoList, columns=['markup_id','type','start','end','txt'])    
ann_df
#ann_file

Unnamed: 0,markup_id,type,start,end,txt
0,T1,High_Ddimer,398,414,elevated D-dimer
1,T2,High_Ddimer,2023,2039,elevated D-dimer
2,T3,PE,154,156,PE
3,T4,PE,459,461,PE
4,T5,PE,688,690,PE
5,T6,PE,3154,3156,PE
6,T7,PE,653,655,PE
7,T8,PE,6332,6334,PE
8,T9,PE,929,947,pulmonary embolism
9,T11,PE,4648,4666,pulmonary embolism


## convert manual ref_ann df to annotation obj

In [13]:
Annotations3=df2ann(ann_df, pdoc_type='positive_DOC', ndoc_type='negative_DOC')
for a in Annotations2:
    print (a.ann_id, a.type, a.start_index, a.end_index, a.spanned_text)

NLP_0 high_ddimer 398 414 elevated d-dimer
NLP_1 high_ddimer 2023 2039 elevated d-dimer


## Mention Level Evaluation

In [14]:
tp, fp, fn, tp_list, fp_list, fn_list = compare2ann_types_by_span(ref_ann=Annotations2, sys_ann=Annotations3, ref_type ='high_ddimer', sys_type='high_ddimer', exact=True)
print("tp, fp, fn")
print(tp, fp, fn)
print("-----fn_list-----")
for i in fn_list:
    print(i.ann_id, i.start_index, i.end_index)

tp, fp, fn
0 0 2
-----fn_list-----
NLP_0 398 414
NLP_1 2023 2039
