In [8]:
from owlready2 import *
import spacy
from spacy.matcher import Matcher
from spacy.tokenizer import Tokenizer
import xml.sax
import json


In [116]:
# load the BRENDA ontology
onto_path.append("../../data/ontologies/")
onto = get_ontology('http://purl.obolibrary.org/obo/bto.owl').load()

# load classes and synonyms
classes = {c.name: c.label.first() for c in onto.classes()}
class_synonyms = {c.name: c.hasExactSynonym + c.hasRelatedSynonym for c in onto.classes()}

# add additional synonyms
class_synonyms['BTO_0000440'] += ['stool']

# create a reverse mapping of labels to BRENDA ids
# reverse mapping removes redundant end terms from the key where possible
redundant_end_terms = ('tissue', 'tissues', 'cell', 'cells')

classes_reverse = {c.label.first().lower().strip(): c.name for c in onto.classes() if c.label != []}
class_synonyms_reverse = {s.lower().strip(): c for c, syn in class_synonyms.items() for s in syn}
labels_reverse = {**classes_reverse, **class_synonyms_reverse}

additional_labels = {}
for key, value in labels_reverse.items():
    if key.endswith(redundant_end_terms):
        additional_labels[' '.join(key.split()[:-1])] = value

for key, value in additional_labels.items():
    if key not in labels_reverse:
        labels_reverse[key] = value


# flatten the labels and synonyms into a single set
class_labels = {c for c in classes.values() if c is not None}
class_synonyms_flattend = {s for syn in class_synonyms.values() for s in syn}
bto_values = class_labels.union(class_synonyms_flattend)

nlp = spacy.load("en_core_web_lg")
matcher = Matcher(nlp.vocab)
tokenizer = Tokenizer(nlp.vocab)

redundant_end_terms = ['tissue', 'tissues', 'cell', 'cells']
patterns = []

for bto_value in bto_values:
    weak_pattern = [{'LOWER': token.lower_} for token in tokenizer(bto_value)]
    if weak_pattern[-1]['LOWER'] in redundant_end_terms:
        weak_pattern[-1]['OP'] = '?'
    patterns.append(weak_pattern)

matcher.add("bto", patterns=patterns, greedy="LONGEST")

# adds a regex pattern to the default tokenizer to split on underscores
underscore_tokenizer = nlp.tokenizer
infixes = nlp.Defaults.infixes + [r'[_~]']
infix_re = spacy.util.compile_infix_regex(infixes)
tokenizer.infix_finditer = infix_re.finditer


In [114]:
print(labels_reverse['alveolar type 2'])

BTO_0000538


In [103]:
class BioSamplesHandler(xml.sax.ContentHandler):
    '''
    SAX hander class to read in in formation from a BioSamples XMl file
    - Reads in the title, paragraph, and attributes of each sample into a dictionary
        - if no matching attribtue found, dictionary written as json to file for second pass
    - Found values written to provided sample_dict with the biosample_id as the key
    '''

    def __init__(self, sample_dict, positive_attributes, tmp_file, output_file):
        self.sample_dict = sample_dict
        self.positive_attributes = positive_attributes
        self.tmp_file = tmp_file
        self.output_file = output_file
        self.biosample_id = ''
        self.cur_dict = {}
        self.attributes = {}
        self.attribute_name = ''
        self.is_title = False
        self.is_paragraph = False
        self.is_sra = False
        self.sra_id = ''
    
    def startDocument(self):
        open(self.tmp_file, 'w').close()
        open(self.output_file, 'w').close()

    def startElement(self, name, attrs):
        if name == 'BioSample':
            self.biosample_id = attrs['accession']
        elif name == 'Title':
            self.is_title = True
        elif name == 'Paragraph':
            self.is_paragraph = True
        elif name == 'Attribute':
            try:
                self.attribute_name = attrs['harmonized_name']
            except KeyError:
                self.attribute_name = attrs['attribute_name']
        elif name == 'Id':
            if 'db' in attrs and attrs['db'] == 'SRA':
                self.is_sra = True

    def characters(self, content):
        if self.is_title:
            self.cur_dict['title'] = content.lower()
            self.is_title = False
        elif self.is_paragraph:
            self.cur_dict['paragraph'] = content.lower()
            self.is_paragraph = False
        elif self.is_sra:
            self.sra_id = content
            self.is_sra = False
        elif self.attribute_name != '':
            self.attributes[self.attribute_name] = content.lower()
            self.attribute_name = ''

    def endElement(self, name):
        if name == 'BioSample':
            # check if there are any positive attributes and extract the tissue
            intersect = set(self.attributes.keys()).intersection(self.positive_attributes)
            matches = []
            if len(intersect) == 1:
                attribute_value = self.attributes[list(intersect)[0]]
                # check if the attribute value is a BTO term using the matcher
                attribute_tokens = underscore_tokenizer(attribute_value)
                matches = matcher(attribute_tokens, as_spans=True)
            elif len(intersect) > 1:
                # if there are multiple positive attributes, check if any of the values are BTO terms
                for attribute in intersect:
                    attribute_value = self.attributes[attribute]
                    attribute_tokens = underscore_tokenizer(attribute_value)
                    matches += matcher(attribute_tokens, as_spans=True)

            if matches == []:
                # no matches found, write to file
                with open(self.tmp_file, 'a') as f:
                    self.cur_dict['biosample_id'] = self.biosample_id
                    self.cur_dict['attributes'] = self.attributes
                    self.cur_dict['sra_id'] = self.sra_id
                    json.dump(self.cur_dict, f)
                    f.write('\n')
            else:
                # matches found, add to sample_dict
                tissue_matches = ','.join([m.text for m in matches])
                bto_matches = ','.join([labels_reverse[m.text.lower()] for m in matches])
                with open(self.output_file, 'a') as f:
                    json.dump({'biosample_id': self.biosample_id , 'sra_id': self.sra_id, 'tissue': tissue_matches, 'bto_matches': bto_matches}, f)
                    f.write('\n')
                
            self.attribute_name = ''
            self.attributes = {}
            self.cur_dict = {}
            self.biosample_id = ''
            self.sra_id = ''


    def endDocument(self):
        print("Finished parsing BioSamples XML file")
                


In [117]:
positive_attributes = {'tissue', 'cell_type', 'cell_line', 'cell_subtype', 'source_name', 'host_tissue_sampled'}
sample_dict = {}

# parse the BioSamples XML file
parser = xml.sax.make_parser()
handler = BioSamplesHandler(sample_dict, positive_attributes, 'tmp.jsonl', 'output.jsonl')
parser.setContentHandler(handler)

parser.parse('../../data/biosamples/biosample_random_samples.xml')


Finished parsing BioSamples XML file
