# Converting between BioC xml and BioC json

This script converts a BioC collection to BioC json or vice versa (adapted from the BioC-JSON tools from the NLM/NCBI BioNLP Research Group: bioc_json.py, downloaded 20200312 from https://github.com/ncbi-nlp/BioC-JSON).

Dependencies: PyBioC (available from https://github.com/2mh/PyBioC)

In [None]:
import json
import bioc
from bioc import biocxml

class BioC2JSON:
    def node(this, node):
        json_node = {'refid': node.refid, 'role': node.role}
        return json_node
    
    def relation(this, rel):
        json_rel = {}
        json_rel['id'] = rel.id
        json_rel['infons'] = rel.infons
        json_rel['nodes'] = [this.node(n) for n in rel.nodes] 
        return json_rel

    def location(this, loc):
        json_loc = {'offset': int(loc.offset), 'length': int(loc.length)}
        return json_loc

    def annotation(this, note):
        json_note = {}
        json_note['id'] = note.id
        json_note['infons'] = note.infons
        json_note['text'] = note.text
        json_note['locations'] = [this.location(l)
                                  for l in note.locations] 
        return json_note
    
    def sentence(this, sent):
        json_sent = {}
        json_sent['infons'] = sent.infons
        json_sent['offset'] = int(sent.offset)
        json_sent['text'] = sent.text
        json_sent['annotations'] = [this.annotation(a)
                                    for a in sent.annotations]
        json_sent['relations'] = [this.relation(r)
                                  for r in sent.relations] 
        return json_sent

    def passage(this, psg):
        json_psg = {}
        json_psg['infons'] = psg.infons
        json_psg['offset'] = int(psg.offset)
        json_psg['text'] =  psg.text
        json_psg['text'] =  psg.text if psg.text else ""
        json_psg['sentences'] = [this.sentence(s)
                                 for s in psg.sentences] 
        json_psg['annotations'] = [this.annotation(a)
                                   for a in psg.annotations]
        json_psg['relations'] = [this.relation(r)
                                 for r in psg.relations] 
        return json_psg

    def document(this, doc):
        json_doc = {}
        json_doc['id'] = doc.id
        json_doc['infons'] = doc.infons
        json_doc['passages'] = [this.passage(p)
                                for p in doc.passages]
        json_doc['relations'] = [this.relation(r)
                                 for r in doc.relations] 
        return json_doc

    def collection(this, collection):
        json_collection = {}
        json_collection['source'] = collection.source
        json_collection['date'] = collection.date
        json_collection['key'] = collection.key
        json_collection['infons'] = collection.infons
        json_collection['documents'] = [this.document(d)
                                        for d in collection.documents] 
        return json_collection

class JSON2BioC:

    def node(this, json_node):
        node = bioc.BioCNode()
        node.refid = json_node['refid']
        node.role = json_node['role']
        return node

    def relation(this, json_rel):
        rel = bioc.BioCRelation()
        rel.id = json_rel['id']
        rel.infons = json_rel['infons']
        rel.nodes = [this.node(n) for n in json_rel['nodes']] 
        return rel

    def location(this, json_loc):
        loc = bioc.BioCLocation()
        loc.offset = str(json_loc['offset'])
        loc.length = str(json_loc['length'])
        return loc

    def annotation(this, json_note):
        note = bioc.BioCAnnotation()
        note.id = json_note['id']
        note.infons = json_note['infons']
        note.text = json_note['text']
        note.locations = [this.location(l)
                          for l in json_note['locations']] 
        return note
    
    def sentence(this, json_sent):
        sent = bioc.BioCSentence()
        sent.infons = json_sent['infons']
        sent.offset = str(json_sent['offset'])
        sent.text = json_sent['text']
        sent.annotations = [this.annotation(a)
                            for a in json_sent['annotations']]
        sent.relations = [this.relation(r)
                          for r in json_sent['relations']]
        return sent

    def passage(this, json_psg):
        psg = bioc.BioCPassage()
        psg.infons = json_psg['infons']
        psg.offset = str(json_psg['offset'])
        psg.text = json_psg.get('text')
        psg.sentences = [this.sentence(s)
                         for s in json_psg['sentences']]
        psg.annotations = [this.annotation(a)
                           for a in json_psg['annotations']]
        psg.relations = [this.relation(r)
                         for r in json_psg['relations']]
        return psg

    def document(this, json_doc):
        doc = bioc.BioCDocument()
        doc.id = json_doc['id']
        doc.infons = json_doc['infons']
        doc.passages = [this.passage(p)
                        for p in json_doc['passages']]
        doc.relations = [this.relation(r)
                         for r in json_doc['relations']]
        return doc

    def collection(this, json_collection):
        collection = bioc.BioCCollection()
        collection.source = json_collection['source']
        collection.date = json_collection['date'] 
        collection.key = json_collection['key']
        collection.infons = json_collection['infons']
        collection.documents = [this.document(d)
                                for d in json_collection['documents']]
        return collection

option = "-j" #change to -b when converting from json to xml
in_file = "../../../data/gold_standard_xml/goldstandard2_20220203.xml"#'path to input file'
out_file = "../../../results/supple.json"#'path to output file'

if option == '-j': #converts xml to json
    with open(in_file,"r") as reader:
        collection = bioc.load(reader)
        bioc2json = BioC2JSON()
        bioc_json = bioc2json.collection(collection)
        with open(out_file, 'w') as f:
            json.dump(bioc_json, f, indent=2)
            print(file=f)

elif option == '-b': #converts json to xml
        bioc_json = None
        with open(in_file) as f:
            bioc_json = json.load(f)

        # print json.dumps(bioc_json, indent=2)

        json2bioc = JSON2BioC()
        bioc_collection = json2bioc.collection(bioc_json)
        
        writer = bioc.BioCWriter(out_file, bioc_collection)
        writer.write()
        

In [None]:
print(collection)

# Processing dictionaries

In [None]:
import pandas as pd
import re

def remove_hyphens_lowercase(list_):
    '''change terms from list to lowercase and remove hyphens
    '''
    new_set = set()
    for term in list_:
        new_set.update([term.lower().replace("-"," ")])
    return sorted(list(new_set))

def create_variant_terms_dis(list_):
    '''
    used to create a new variant list if keyword exists in the term
    these terms are ok unless severe acute respiratory syndrome is within the list
    (disease dictionary)
    '''
    
    new_set = set()
    variants = ["disease", "disorder", "syndrome", "pneumonia", "infection"]
    for term in list_:
        term = term.lower()
        new_set.update([term])
        
        main_var = ""
        keyword_in_term = False
        new_var_list = []
        
        for var in variants:
            if var in term:
                keyword_in_term = True
                main_var = var
                new_var_list = [v for v in variants if v!=var]
                break
        
        if keyword_in_term == True:
            for var2 in new_var_list:
                new_set.update([term.replace(main_var, var2)])
                
    return sorted(new_set)

            
def interchange_corona(list_):
    '''
    interchange corona and coronavirus terms 
    '''
    new_set = set()
    for term in list_:
        term = term.lower()
        new_set.update([term])
        if "corona virus" in term:
            continue
        elif "coronavirus" in term:
            if not "syndrome coronavirus" in term:
                new_set.update([term.replace("coronavirus", "corona")])
        else:
            new_set.update([re.sub("\bcorona\b", "coronavirus", term)])
            
    return sorted(new_set)


def interchange_wuhan_hubei(list_):
    '''
    interchanging wuhan and hubei
    '''
    new_set = set()
    
    for term in list_:
        term = term.lower()
        
        new_set.update([term])
        
        if "wuhan" in term:
            new_set.update([term.replace("wuhan", "hubei")])
        elif "hubei" in term:
            new_set.update([term.replace("hubei", "wuhan")])
            
    return sorted(new_set)

def check_duplicate_words_from_single_term(term):
    words = term.split()
    return len(words) > len(set(words))

def check_if_both_new_novel(term):
    return "new" in term and "novel" in term

def remove_duplicates(list_):
    '''
    remove duplicate terms and remove "new"/"novel" multiple occurances
    '''
    new_set = set()
    
    for term in list_:
        if check_duplicate_words_from_single_term(term):
            continue
        elif check_if_both_new_novel(term):
            continue
        else:
            new_set.update([term])
    
    return sorted(new_set) 

def _2019_var_virus(list_):
    '''
    create variants with 2019 and new/novel (virus dictionary)
    '''
    variants_prefix = ['2019novel', '2019new', '2019 novel', '2019 new']
    
    new_set = set()
    
    for term in list_:
        new_set.update([term])
        if "2019" not in term and "19" not in term:
            # add 2019 as prefix and suffix
            new_set.update([term + " 2019"])
            new_set.update(["2019 " + term])            
            
            for var in variants_prefix:
                new_set.update([var + " " + term])

        
    new_set_2 = set()
    #check for duplicates
    for term in new_set:
        if check_duplicate_words_from_single_term(term):
            continue
        elif check_if_both_new_novel(term):
            continue
        elif re.compile(r'\b2019new\b').search(term) and re.compile(r'\bnew\b').search(term):
            continue
        elif re.compile(r'\b2019novel\b').search(term) and re.compile(r'\bnovel\b').search(term):
            continue
        else:
            new_set_2.update([term])
        
    return sorted(new_set_2)

def create_dis_variants_from_virus_cv(cv_terms, virus_terms):
    '''
    for the disease dictionary, create disease variants from virus terms
    '''
    disease_from_virus = set()
    for virus in virus_terms:
        for term in cv_terms:
            if "[virus name or Wuhan or Hubei]" in term:
                disease_from_virus.update([term.replace("[virus name or Wuhan or Hubei]",virus)])
                disease_from_virus.update([term.replace("[virus name or Wuhan or Hubei]","wuhan")])
                disease_from_virus.update([term.replace("[virus name or Wuhan or Hubei]","hubei")])

            elif "[virus name]" in term:
                disease_from_virus.update([term.replace("[virus name]",virus)])

    disease_from_virus_lower = remove_hyphens_lowercase(disease_from_virus)
    disease_from_virus_lower_nodup = remove_duplicates(disease_from_virus_lower)
    
    return sorted(disease_from_virus_lower_nodup)

def read_input_file(filename):
    '''read file and create a list
    '''
    new_set = set()
    
    with open(filename) as f:
        for line in f.readlines():
            line=line.strip()
            new_set.update([line])
    
    return sorted(new_set)

def write_output_file(filename, list_):
    '''
    write list into output file
    '''
    with open(filename, 'w') as f:
        for term in sorted(list_):
            f.write(term + "\n")


## virus dictionary

In [None]:
# input file virus
v_in = "virus/input/file.txt"
# output file virus
v_out = "virus/output/file.txt"

virus_terms = read_input_file(v_in)

# remove hyphens and lowercase all terms
virus_lowercase = remove_hyphens_lowercase(virus_terms)

# add 2019 and new/novel variants
virus_lowercase_2019 = _2019_var_virus(virus_lowercase)

# interchange corona and coronavirus terms
virus_lowercase_2019_corona = interchange_corona(virus_lowercase_2019)

# interchange wuhan and hubei terms
virus_lowercase_2019_corona_wuhan = interchange_wuhan_hubei(virus_lowercase_2019_corona)

# remove duplicate words and terms with both new and novel
virus_lowercase_2019_corona_wuhan_nodup = remove_duplicates(virus_lowercase_2019_corona_wuhan)

# write file
write_output_file(v_out, virus_lowercase_2019_corona_wuhan_nodup)

## disease dictionary

In [None]:
# input file disease
dis_in = "disease/input/file.txt"
# output file disease
dis_out = "disease/output/file.txt"

# file with terms to convert virus terms into disease
cv_in = "disease/input/cv_file.md"

disease_terms = read_input_file(dis_in)
cv_terms = read_input_file(cv_in)

# remove hyphens and lowercase all terms
dis_lowercase = remove_hyphens_lowercase(disease_terms)

# create disease variants from virus terms (get the virus terms from the virus dictionary or load from file)
dis_v_variants = create_dis_variants_from_virus_cv(cv_terms, virus_lowercase_2019_corona_wuhan_nodup)

# merge lists
dis_merged = set.union(set(dis_lowercase), set(dis_v_variants))

# interchange corona and coronavirus terms
dis_merged_corona = interchange_corona(dis_merged)

# interchange wuhan and hubei terms
dis_merged_corona_wuhan = interchange_wuhan_hubei(dis_merged_corona)

# create variant terms among disease, disorder, syndrome, infection and pneumonia
dis_merged_corona_wuhan_var = create_variant_terms_dis(dis_merged_corona_wuhan)

# remove duplicate words and terms with both new and novel
dis_merged_corona_wuhan_var_nodup = remove_duplicates(dis_merged_corona_wuhan_var)

# write file
write_output_file(dis_out, dis_merged_corona_wuhan_var_nodup)

# Production of Silver Standard

In [None]:
import spacy
from spacy_lookup import Entity
import pandas as pd

sci_md = 'path/to/scispacy/en_core_sci_md/en_core_sci_md-0.2.4'

# abstract collection
abstract = open('path/to/abstracts/gold_abstracts.txt', encoding="utf8").read()
abstract = abstract.replace('-', '')
abstract = abstract.replace('/', ' ')
print(abstract)

## for virus terms

In [None]:
# load virus dictionary
virusterms = []
with open("virus/output/file.txt") as viruslist:
    for line in viruslist:
        line = line.strip()
        virusterms.append(line)

# create pipeline
nlp1 = spacy.load(sci_md, disable = ['ner']) #disabling ner required for Entity to work
entity1 = Entity(keywords_list=virusterms,case_sensitive=False) #this merges tokens to match the keywords_list
nlp1.add_pipe(entity1, last=True)
print(nlp1.pipeline)

# create doc
doc1 = nlp1(abstract)

# add IOB tags
df_virus = pd.DataFrame()
for token in doc1:
    if token._.is_entity:
        token_text = token.text
        token_text = token.text.split()
        if len(token_text) == 1:
            token_series = {'token' : token.text, 'label' : "B", 'category' : 'Virus_SARS-CoV-2', 'sent': token.sent}
            df_virus = df_virus.append(token_series, ignore_index=True)
        else:
            token_series = {'token' : token_text[0], 'label' : "B", 'category' : 'Virus_SARS-CoV-2', 'sent': token.sent}
            df_virus = df_virus.append(token_series, ignore_index=True)
            index = 1
            while index <= (len(token_text)-1):
                token_series = {'token' : token_text[index], 'label' : "I", 'category' : 'Virus_SARS-CoV-2', 'sent': token.sent,}
                df_virus = df_virus.append(token_series, ignore_index=True)
                index = index+1
    else:
        token_series = {'token' : token.text, 'label' : "O", 'category' : 'NaN', 'sent': token.sent}
        df_virus = df_virus.append(token_series, ignore_index=True)
df_virus = df_virus[['token', 'label', 'category', 'sent']]

# write file
df_virus.to_csv (r'path/to/output_file_virus.csv', index = True, header=True)

## for disease terms

In [None]:
diseaseterms = []
with open("disease/output/file.txt") as diseaselist:
    for line in diseaselist:
        line = line.strip()
        diseaseterms.append(line)

# create pipeline
nlp2 = spacy.load(sci_md, disable = ['ner']) #disabling ner required for Entity to work
entity2 = Entity(keywords_list=diseaseterms,case_sensitive=False) #this merges tokens to match the keywords_list
nlp2.add_pipe(entity2, last=True)
print(nlp2.pipeline)

# create doc
doc2 = nlp2(abstract)

df_disease = pd.DataFrame()

# add IOB tags
for token in doc2:
    if token._.is_entity:
        token_text = token.text
        token_text = token.text.split()
        if len(token_text) == 1:
            token_series = {'token' : token.text, 'label' : "B", 'category' : 'Disease_COVID-19', 'sent': token.sent}
            df_disease = df_disease.append(token_series, ignore_index=True)
        else:
            token_series = {'token' : token_text[0], 'label' : "B", 'category' : 'Disease_COVID-19', 'sent': token.sent}
            df_disease = df_disease.append(token_series, ignore_index=True)
            index = 1
            while index <= (len(token_text)-1):
                token_series = {'token' : token_text[index], 'label' : "I", 'category' : 'Disease_COVID-19', 'sent': token.sent,}
                df_disease = df_disease.append(token_series, ignore_index=True)
                index = index+1
    else:
        token_series = {'token' : token.text, 'label' : "O", 'category' : 'NaN', 'sent': token.sent}
        df_disease = df_disease.append(token_series, ignore_index=True)
df_disease = df_disease[['token', 'label', 'category', 'sent']]

# write file
df_disease.to_csv (r'path/to/output_file_disease.csv', index = True, header=True)