# tagtog to PICO annotation parser

In [1]:
from os import listdir
from os.path import isfile, join
import json

# Connect to tagtog API
from lxml import html
from bs4 import BeautifulSoup
from requests import get
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
import urllib

# Specific imports
import numpy
from nltk.tokenize import WhitespaceTokenizer
import spacy
import re

In [2]:
# Get all the entities
all_annotations_data = 'https://www.tagtog.com/-api/metrics/v0/search_stats?project=PICO_test_ebm&owner=anjDhr&search=*'
#all_annotations_data_response = get(all_annotations_data, auth=('anjDhr', '9J@NiScMhUy9LbR'))
all_annotations_data_response = get(all_annotations_data, auth=('anjDhr', '9J@NiScMhUy9LbR'))
print('The response for query is: ', all_annotations_data_response)
all_annotations_data_response = json.loads(all_annotations_data_response.text)

The response for query is:  <Response [200]>


In [3]:
ent2name = dict()

for eachEntry in all_annotations_data_response:
    if 'e_' in eachEntry:
        ent2name[eachEntry] = all_annotations_data_response[eachEntry]['name']

In [4]:
ent2name

{'e_8': 'Intervention_name',
 'e_16': 'Outcome_name',
 'e_3': 'Participant_age',
 'e_2': 'Participant_symptom',
 'e_18': 'Outcomes_other',
 'e_7': 'StudyType',
 'e_5': 'Participant_samplesize',
 'e_11': 'Intervention_device',
 'e_19': 'Outcome_AE',
 'e_15': 'Intervention_comparator',
 'e_6': 'Participant_other',
 'e_17': 'Outcome_measure',
 'e_10': 'Intervention_components',
 'e_4': 'Participant_gender',
 'e_1': 'Participant_disease'}

In [5]:
P_ents = {}
I_ents = {}
O_ents = {}
S_ents = {}
for k, v in ent2name.items():
    if str(v).startswith('Intervention'):
        I_ents[k] = v
    if str(v).startswith('Participant'):
        P_ents[k] = v
    if str(v).startswith('StudyType'):
        S_ents[k] = v

In [6]:
# Initialize tokenizer
tk = WhitespaceTokenizer()

In [7]:
# Initialize pos tagger
nlp = spacy.load("en_core_web_sm")



In [8]:
string_our = 'My name is a random'
doc = nlp('My name is a random')
temp_offset = []
for token in doc:
    print(f'{token.idx:{8}} {token.text:{8}} {token.pos_:{6}} {token.tag_:{6}} {token.dep_:{6}} {spacy.explain(token.pos_):{20}} {spacy.explain(token.tag_)}')
    temp_offset.append( token.idx )

       0 My       PRON   PRP$   poss   pronoun              pronoun, possessive
       3 name     NOUN   NN     nsubj  noun                 noun, singular or mass
       8 is       AUX    VBZ    ROOT   auxiliary            verb, 3rd person singular present
      11 a        DET    DT     det    determiner           determiner
      13 random   ADJ    JJ     attr   adjective            adjective (English), other noun-modifier (Chinese)


In [9]:
temp_offset

[0, 3, 8, 11, 13]

In [10]:
string_our[temp_offset[0]:temp_offset[1]-1]

'My'

In [11]:
annot_dir = '/mnt/nas2/data/systematicReview/PICO_datasets/annotation_me/PICO_test_ebm_pis/ann.json/master/pool'
plain_dir = '/mnt/nas2/data/systematicReview/PICO_datasets/annotation_me/PICO_test_ebm_pis/plain.html/pool'

In [12]:
def dir_files(mypath):
    list_files = [f for f in listdir(mypath) if isfile(join(mypath, f))]
    dict_files = x = { i.split('-')[-1].split('.')[0] : i for i in list_files }
    return dict_files

plain_files = dir_files(plain_dir)
annot_files = dir_files(annot_dir)

In [13]:
def fetch_char_annot(plain_dir, plain_files, annot_dir, annot_files):
    annotations = dict()
    plain_text_dict = dict()

    for k,v in plain_files.items():

        plain_file_path = plain_dir + '/' + v
        annot_file_path = annot_dir + '/' + annot_files[k]

        with open( annot_file_path, 'r' ) as af, open( plain_file_path, 'r' ) as pf:
            annot_json = json.loads(af.read())

            plainfile_soup = BeautifulSoup(pf)
            text_list = plainfile_soup.find_all("pre")

            annotation_collect = []
            annotation_dict = dict()
            document_parts = {}


            for e in annot_json['entities']:
                document_part = e['part']
                text_annot = e['offsets'][0]['text']
                annot_start = e['offsets'][0]['start']
                annot_end = annot_start + len(e['offsets'][0]['text'])            
                
                
                if len(text_list) == 1:
                    plain_text = text_list[0].text
                    document_parts[ text_list[0].get('id') ] = text_list[0].text
                    document_entity_match = plain_text[annot_start:annot_end]
                    document_entity_match_label = e['classId']

                    assert document_entity_match.strip() == text_annot.strip()

                    # Character level annotation
                    match_label_list = len( text_annot.strip() ) * [document_entity_match_label]
                    document_char_labels = [0] * len(plain_text)
                    document_char_labels[annot_start:annot_end] = match_label_list

                    if document_part not in annotation_dict:
                        annotation_dict[document_part] = [ document_char_labels ]
                    else:
                        annotation_dict[document_part].append( document_char_labels )
            
            plain_text_dict[k] = document_parts
            annotations[k] = annotation_dict
            
    return annotations, plain_text_dict

In [14]:
char_annot, plain_texts = fetch_char_annot(plain_dir, plain_files, annot_dir, annot_files)

In [16]:
def ent2binlabs(lst, picos):
    
    if picos == 'p':
        ignore_labs = list(S_ents.values()) + list(I_ents.values())
    if picos == 'i':
        ignore_labs = list(S_ents.values()) + list(P_ents.values())
    if picos == 'o':
        ignore_labs = list(S_ents.values())
    if picos == 's':
        ignore_labs = list(P_ents.values()) + list(I_ents.values())
           
    new_lst = []
    
    for l in lst:
        if len(l) == 1 and ( l[0] == 0 or ent2name[ l[0] ] not in ignore_labs):
            new_lst.append( ['0'] )
            
        elif len(l) == 1:
            print( ent2name[ l[0] ] )
            new_lst.append( ['1'] )            
    
    return new_lst

In [17]:
def mergelabels(old_labels, labels_clean):
    
    #old_labels = ann[k_a][k_a_]['p'] # Fetch already stored labels
    #print( old_labels )
    new_labels = [ list(set(old_labels[n] + l)) for n, l in enumerate(labels_clean)] # Append new labels to the old ones
    # convert new labels (which is a list of lists to flattened list)
    new_labels = [ ['1'] if len(nl) > 1 else ['0'] for nl in new_labels ]

    assert len( old_labels ) == len( new_labels )
    
    return new_labels 

In [17]:
def char2tokAnnot(char_annotations, plain_text_dict):
    
    token_annotations = {}
    
    for k_a, v_a in char_annotations.items():
        #print(k_a) # document number
        
        token_annotations[k_a] = {}

        for k_a_, v_a_ in v_a.items(): # convert char annot to tok annot
            #print(k_a_) # document part

            # Get text
            text =  plain_text_dict[k_a][k_a_]
            
            # Get POS tags
            doc = nlp(text)
            
            text_pos = [token.tag_ for token in doc]
            text_tokens = [token.text for token in doc]
            text_abs_offset = [( token.idx, token.idx+len(token.text) ) for token in doc]
            
            # Iterate through the annotations for each text part
            
            part_labels = []
            
            for v_a_i in v_a_:
            
                labels = [ list( set(v_a_i[ ws[0] : ws[1] ]) ) for ws in  text_abs_offset ]
                # if the length of label is more than 1, then only keep the non-zero labels
                labels_clean = [ list(filter(lambda num: num != 0, l)) if len( l ) > 1 else l for l in labels ]
                
                assert len(labels) == len(labels_clean)
                
                labels_ent = [ [ ent2name[l[0]] ] if l[0] in ent2name else [ l[0] ] for l in labels_clean ]
                assert len(labels) == len(labels_clean) == len(labels_ent)
                
                if len(part_labels) == 0:
                    part_labels = labels_ent
                    #print( part_labels )
                else:
                    # merge labels_ent with part_labels
                    for counter, l in enumerate(labels_ent):
                        part_labels[ counter ].append( l[0] )
            
            # Remove redundant labels after merging
            part_labels = [list(set(l)) for l in part_labels]
            
            # bifurcate the P, I, O labels
            part_labels_p = []
            part_labels_i = []
            part_labels_s = []
            for l in part_labels:
                if len(l) == 1:
                    part_labels_p.append( l[0] )
                    part_labels_i.append( l[0] )
                    part_labels_s.append( l[0] )
                else:
                    p_reg = re.compile('Participant_.*')
                    i_reg = re.compile('Intervention_.*')
                    s_reg = re.compile('StudyType')

                    if any(p_reg.match(str(line)) for line in l) == True :
                        part_labels_p.append( str(1) )
                    else:
                        part_labels_p.append( str(0) )

                    if any(i_reg.match(str(line)) for line in l) == True :
                        part_labels_i.append( str(1) )
                    else:
                        part_labels_i.append( str(0) )

                    if any(s_reg.match(str(line)) for line in l) == True :
                        part_labels_s.append( str(1) )
                    else:
                        part_labels_s.append( str(0) )


            token_annotations[k_a][k_a_] = {}
            token_annotations[k_a][k_a_]['text'] = ' '.join(text_tokens)
            token_annotations[k_a][k_a_]['tokens'] = text_tokens
            token_annotations[k_a][k_a_]['labels'] = part_labels
            token_annotations[k_a][k_a_]['pos'] = text_pos
            token_annotations[k_a][k_a_]['abs_char_offsets'] = [token.idx for token in doc]
            
            token_annotations[k_a][k_a_]['participant_fine'] = part_labels_p
            token_annotations[k_a][k_a_]['intervention_fine'] = part_labels_i
            token_annotations[k_a][k_a_]['studytype_fine'] = part_labels_s
            
            token_annotations[k_a][k_a_]['participant'] = part_labels_p
            token_annotations[k_a][k_a_]['intervention'] = part_labels_i
            token_annotations[k_a][k_a_]['studytype'] = part_labels_s


    return token_annotations

In [18]:
tok_annotations = char2tokAnnot(char_annot, plain_texts)

In [19]:
tok_annotations['25931290']

{'s1v1': {'text': 'Sensory Adapted Dental Environments to Enhance Oral Care for Children with Autism Spectrum Disorders : A Randomized Controlled Pilot Study . \n\n This pilot and feasibility study examined the impact of a sensory adapted dental environment ( SADE ) to reduce distress , sensory discomfort , and perception of pain during oral prophylaxis for children with autism spectrum disorder ( ASD ) . Participants were 44 children ages 6 - 12 ( n = 22 typical , n = 22 ASD ) . In an experimental crossover design , each participant underwent two professional dental cleanings , one in a regular dental environment ( RDE ) and one in a SADE , administered in a randomized and counterbalanced order 3 - 4 months apart . Outcomes included measures of physiological anxiety , behavioral distress , pain intensity , and sensory discomfort . Both groups exhibited decreased physiological anxiety and reported lower pain and sensory discomfort in the SADE condition compared to RDE , indicating a be

In [20]:
# get rid of the document part
annotations_final = dict()

for k_doc, v_doc in tok_annotations.items():
    
    #if k_doc not in annotations_final:
    #    annotations_final[k_doc] = {}
        
    # document part: 's1v1'
    if 's1v1' in v_doc:
        annotations_final[k_doc] = {}
        annotations_final[k_doc]['text'] = v_doc['s1v1']['text']
        annotations_final[k_doc]['tokens'] = v_doc['s1v1']['tokens']
        annotations_final[k_doc]['pos'] = v_doc['s1v1']['pos']
        annotations_final[k_doc]['abs_char_offsets'] = v_doc['s1v1']['abs_char_offsets']        
        annotations_final[k_doc]['participants_fine'] = v_doc['s1v1']['participant_fine']
        annotations_final[k_doc]['participants'] = v_doc['s1v1']['participant']
        annotations_final[k_doc]['interventions_fine'] = v_doc['s1v1']['intervention_fine']
        annotations_final[k_doc]['interventions'] = v_doc['s1v1']['intervention']
    else:
        print(k_doc)

18773733
17293018
10940525
6920252
7814711
21170734
21393467
24660757


In [21]:
# bifurcate the P and I annotations

In [22]:
len(annotations_final)

183

In [23]:
with open('/mnt/nas2/data/systematicReview/PICO_datasets/EBM_parsed/test_ebm_anjani.json', 'w+') as wf:
    json.dump(annotations_final, wf)