# Install

In [None]:
# !pip install rdflib

# Imports

In [31]:
import rdflib
from rdflib import RDF, RDFS, OWL, BNode, Literal, URIRef, Namespace
from rdflib.namespace import SKOS
from typing import Optional, Union, Dict, List, Tuple
from rdflib.namespace import NamespaceManager
from rdflib.util import from_n3

import itertools
from types import GeneratorType

SemanticeType = Union[URIRef, Literal, BNode]
PotenialSemanticeType = Union[SemanticeType, str]
PotenialSemanticeTypes = Union[List[PotenialSemanticeType], PotenialSemanticeType]
Triple = Tuple[SemanticeType, SemanticeType, SemanticeType]
PotentialTriple = Tuple[PotenialSemanticeType, PotenialSemanticeType, PotenialSemanticeType]
PotentialTriples = Tuple[PotenialSemanticeTypes, PotenialSemanticeTypes, PotenialSemanticeTypes]

# Pull Subtree of a given root

In [50]:
class OntologySoup(rdflib.Graph):

    ILX = rdflib.Namespace("http://uri.interlex.org/base/ilx_")
    ILXR = rdflib.Namespace("http://uri.interlex.org/base/readable/")
    NIFRID = rdflib.Namespace("http://uri.neuinfo.org/nif/nifstd/readable/")
    DEFINITION = rdflib.Namespace("http://purl.obolibrary.org/obo/IAO_0000115")
    
    class Error(Exception):
        """Internal Error for OntologySoup"""

    class GraphTypeError(Exception):
        """Expected rdflib.Graph"""

    def __init__(self, rdfgraph: rdflib.Graph = None, format: str = None, **kwargs):
        rdflib.Graph.__init__(self)
        if isinstance(rdfgraph, str):
            rdfgraph = rdflib.Graph().parse(rdfgraph, format=format)
        if isinstance(rdfgraph, rdflib.Graph):
            # A cheat to access self.__triples and force new address.
            self.triples = rdfgraph.triples  # method not a list!
            self.add = rdfgraph.add  # todo make a pull request so they fix their graph inheritance
            self.remove = rdfgraph.remove
            self.namespace_manager = rdfgraph.namespace_manager
        else:
            self.GraphTypeError(f"{type(rdfgraph)}")
            
    def curiefy(self, node):
        """qname | iri to curie"""
        try:
            return qname_node(node, self.namespace_manager)
        except:
            return None  # raise ValueError(f'IRI {node} cannot qname. Does not exist in graph.')

    def symanticify(self, triple: Triple) -> Optional[Triple]:
        subj, pred, obj = triple
        _subj = self.uriify(subj, passive=False)
        _pred = self.uriify(pred, passive=True)
        _obj = self.uriify(obj, passive=True)
        if subj and not _subj:
            return
        if pred and not _pred:
            return
        if pred and not _pred:
            return
        return _subj, _pred, _obj
    
    def uriify(self, node, passive=True):
        """expand | curie to iri | opposite of qname"""
        try:
            return semanticize_node(node, self.namespace_manager)
        except:
            if passive:
                return
            raise ValueError(f"Curie {node} cannot expand. Does not exist in graph.")
    
    def better_triples(self, triple: Triple = ()) -> List[Triple]:
        """Always a list for each node in the triple while also expanding the node.

        >>> better_triples(('UBERON:123', ['rdfs:label', rdflib.SKOS.prefLabel], None))
        """

        def listify(v):
            """If not a list, make it one."""
            if not v:
                return [None]  # Makes sure permutations have no empty lists
            elif isinstance(v, list):
                return v
            elif isinstance(v, (GeneratorType, tuple)):
                return list(v)
            elif isinstance(v, (str, URIRef, Literal, BNode)):
                return [v]
            else:
                raise ValueError(f"VALUE: [{v}] needs to be a of TYPE {PotenialSemanticeType} not TYPE {type(v)}")
        for triple in itertools.product(*map(listify, triple)):
            triple = self.symanticify(triple)
            if triple is None:
                continue
            yield from self.triples(triple)
            
    def create_rdfgraph_from_triples(self, entity_triples: list) -> rdflib.Graph:
        """Create rdflib graph from triples. Uses prefixes from imported graphs.

        :param list entity_triples: Sematicized triples to become an rdflib Graph.
        :returns: rdflib Graph created from the triples given.
        """
        rdfgraph = rdflib.Graph()
        # update prefixes to master graph
        rdfgraph.namespace_manager = self.namespace_manager
        # Populate Graph #
        # Need to actually insert each triple because we can't cheat the __triples within triples()
        [rdfgraph.add(triple) for triple in entity_triples]
        return OntologySoup(rdfgraph=rdfgraph)
    
    def pull_entity_subgraph(self, entity_iri: rdflib.term.URIRef, graph_form=True) -> rdflib.Graph:
        """Builds all triples for an entity.
        :note: traverse BNodes since sparql or rdflib doesn't support this.

        :param entity_iri: IRI of the entity.
        :returns: list of triples in semantic rdflib.term types
        """
        entity_iri = self.uriify(entity_iri)
        entity_triples = list(self.triples((entity_iri, None, None)))
        # first layer of bnodes
        bnode_triple_queue = [(subj, pred, obj) for subj, pred, obj in entity_triples if isinstance(obj, rdflib.BNode)]
        # traverse bnodes of entity until there are no bnodes as objects
        while bnode_triple_queue:
            new_queue = []
            for subj, pred, obj in bnode_triple_queue:
                if isinstance(obj, rdflib.BNode):
                    # get the triples where the object bnode is the subject
                    bnode_triples = list(self.triples((obj, None, None)))
                    # add to new list
                    new_queue += bnode_triples
                    # add to master entity list
                    entity_triples += bnode_triples
            bnode_triple_queue = new_queue[:]
        if graph_form:
            entity_subgraph = self.create_rdfgraph_from_triples(entity_triples)
            return entity_subgraph
        return entity_triples
    
    def pull_entity_subtree(
        self,
        entity_iri: rdflib.term.URIRef,
        edges=["rdfs:subClassOf"],
        graph_form=True,
        barebones=False,
        depth_limit=10,
    ) -> rdflib.Graph:
        """
        [summary]

        Parameters
        ----------
        entity_iri : rdflib.term.URIRef
            [description]
        edges : list, optional
            [description], by default ['rdfs:subClassOf']
        graph_form : bool, optional
            [description], by default True
        barebones : bool, optional
            [description], by default False

        Returns
        -------
        rdflib.Graph
            [description]
        """
        entity_iri = self.uriify(entity_iri)
        entity_triples = list(self.better_triples((None, edges, entity_iri)))
        triple_queue = [(subj, pred, obj) for subj, pred, obj in entity_triples if not isinstance(obj, rdflib.BNode)]
        limit = 0
        while triple_queue and limit < depth_limit:
            new_queue = []
            for subj, pred, obj in triple_queue:
                if not isinstance(subj, rdflib.BNode):
                    triples = list(self.better_triples((None, edges, subj)))
                    new_queue += triples
                    entity_triples += triples
                    limit += 1
            triple_queue = new_queue[:]
        entity_triples += self.pull_entity_subgraph(entity_iri, graph_form=False)
        if not barebones:
            subjs = set([subj for subj, _, _ in entity_triples])
            g = rdflib.Graph()
            g.namespace_manager = self.namespace_manager
            for subj in subjs:
                g += self.pull_entity_subgraph(subj, graph_form=graph_form)
            return g
        # If you want barebone, but in graph form!
        if graph_form:
            entity_subgraph = self.create_rdfgraph_from_triples(entity_triples)
            return entity_subgraph
        return entity_triples
    
def semanticize_node(
    node: str,
    nsm: NamespaceManager = None,
    known_literal_type: str = None,
    use_builtin_literal_type_check: bool = False,
) -> Union[URIRef, Literal, BNode]:
    """Curie to IRI, str IRI to IRI, label to Literal, or Hash to BNode.

    :note: Some short hand sparql (while using use_builtin_literal_type_check)
        will break such as ['example'] & {'example'}

    :param str node: string to be converted into a semantically usable node.
    :param NamespaceManager nsm: A glorified dict of prefixes to namespaces.
    :param str known_literal_type: 'xsd:<value>' of literal type.
    :param bool use_builtin_literal_type_check:
    :returns: semenatic rdflib.term node.

    :examples:

    # Str IRI to semantic IRI.
    >>>semanticize_node('http://www.w3.org/2000/01/rdf-schema#label')
    rdflib.term.URIRef('http://www.w3.org/2000/01/rdf-schema#label')

    # curie to IRI.
    >>>semanticize_node('rdfs:label')
    rdflib.term.URIRef('http://www.w3.org/2000/01/rdf-schema#label')

    # Hash to BNode.
    >>>semanticize_node('_:u123H23')
    rdflib.term.BNode('432543')

    # Label to Literal.
    >>>semanticize_node('Brain')
    rdflib.term.Literal(node='Brain', nsm=nsm)

    # Integer to Literal
    >>>semanticize_node(node='12345', nsm=nsm)
    rdflib.term.Literal(
        node='12345',
        datatype=rdflib.term.URIRef('http://www.w3.org/2001/XMLSchema#integer')
    )
    """

    def _auto_find_node_xsd_type(node):
        """Find indened semantic type for node."""
        ### BOOLEAN ###
        if node.lower() in ["false", "true"]:
            return XSD.boolean

        ### INTEGER ###
        try:
            int(node)
            return XSD.integer
        except:
            pass

        ### DECIMAL ###
        try:
            float(node)
            return XSD.decimal
        except:
            pass

        # TODO: Add checks for date & time here
        # should just import a datatime module to see if it takes it.

        ### STRING ###
        # Default is always string. Blank type is also considered string.
        return XSD.string

    valid_semantic_types = [URIRef, Literal, BNode]
    valid_iri_starts = ["http", "https"]
    valid_node_types = [str, float, int, bool, URIRef, Literal, BNode]

    #  basic check
    if not node:
        return None
    if type(node) not in valid_node_types:
        return ValueError(f'Node "{node}" needs to be a value type "{valid_node_types}".')

    # Node is already semantic.
    for valid_semantic_type in valid_semantic_types:
        if isinstance(node, valid_semantic_type):
            return node

    # Can handle integers, they just need to be strings first.
    node = str(node)
    # basic cleanup
    node = node.strip()

    # SHORTCUTS #
    # Bypassing for uncommon XSD types.
    if known_literal_type:
        # One pass recursive to expand type given.
        # TODO: add checks to see if this is a correct XSD.
        datatype = semanticize_node(known_literal_type)
        return Literal(node, datatype=datatype)

    # IRI #
    if urlparse(node).scheme in valid_iri_starts:
        # Sanity check iri in case its an iri within a label.
        node = node.strip().replace(" ", "")
        return URIRef(node)
    # BNODE #
    elif node.startswith("_:"):
        return from_n3(node, nsm=nsm)
    else:
        expanded_node = from_n3(node, nsm=nsm)
        # CURIE #
        if str(expanded_node) != node:
            # We use that fact that it expanded as the anchor it is a curie.
            return expanded_node
        # LITERAL #
        else:
            if known_literal_type:
                # bypass possible problems with rare types.
                node_type = known_literal_type
            elif use_builtin_literal_type_check:
                # short hand symantics equal to sparql must be used.
                from_n3(node, nsm=None)
            else:
                # auto find what the literal type is supposed to be.
                node_type = _auto_find_node_xsd_type(node)
            return Literal(node, datatype=node_type)

    raise ValueError(f'Expandable node "{node}" must be intended to become semantic.')

# EDAM

In [54]:
root = 'http://edamontology.org/topic_3361'
g = OntologySoup('biko-ontologies/edam.xrdf', format='xml')
g_slim = g.pull_entity_subtree(URIRef(root), edges=RDFS.subClassOf)
g_slim.serialize('biko-ontologies/edam-slim.ttl', format='nifttl')

<Graph identifier=N95a4f1640c8245ce91a35e04bd6919d8 (<class 'rdflib.graph.Graph'>)>

# ERO

In [53]:
root = 'http://purl.obolibrary.org/obo/ERO_0001658'
g = OntologySoup('biko-ontologies/ero.xrdf', format='xml')
g_slim = g.pull_entity_subtree(URIRef(root), edges=RDFS.subClassOf)
g_slim.serialize('biko-ontologies/ero-slim.ttl', format='nifttl')

<Graph identifier=N469ccba6dd9943b5843466d6c24fb552 (<class 'rdflib.graph.Graph'>)>

# NCIT

In [67]:
g = OntologySoup('biko-ontologies/ncit.xrdf', format='xml')

In [80]:
NCIT = Namespace("http://ncicb.nci.nih.gov/xml/owl/EVS/Thesaurus.owl#")
_g = rdflib.Graph()
ns_list = [('brick', rdflib.term.URIRef('https://brickschema.org/schema/Brick#')),
 ('csvw', rdflib.term.URIRef('http://www.w3.org/ns/csvw#')),
 ('dc', rdflib.term.URIRef('http://purl.org/dc/elements/1.1/')),
 ('dcat', rdflib.term.URIRef('http://www.w3.org/ns/dcat#')),
 ('dcmitype', rdflib.term.URIRef('http://purl.org/dc/dcmitype/')),
 ('dcterms', rdflib.term.URIRef('http://purl.org/dc/terms/')),
 ('dcam', rdflib.term.URIRef('http://purl.org/dc/dcam/')),
 ('doap', rdflib.term.URIRef('http://usefulinc.com/ns/doap#')),
 ('foaf', rdflib.term.URIRef('http://xmlns.com/foaf/0.1/')),
 ('odrl', rdflib.term.URIRef('http://www.w3.org/ns/odrl/2/')),
 ('org', rdflib.term.URIRef('http://www.w3.org/ns/org#')),
 ('owl', rdflib.term.URIRef('http://www.w3.org/2002/07/owl#')),
 ('prof', rdflib.term.URIRef('http://www.w3.org/ns/dx/prof/')),
 ('prov', rdflib.term.URIRef('http://www.w3.org/ns/prov#')),
 ('qb', rdflib.term.URIRef('http://purl.org/linked-data/cube#')),
 ('rdf', rdflib.term.URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#')),
 ('rdfs', rdflib.term.URIRef('http://www.w3.org/2000/01/rdf-schema#')),
 ('schema', rdflib.term.URIRef('https://schema.org/')),
 ('sh', rdflib.term.URIRef('http://www.w3.org/ns/shacl#')),
 ('skos', rdflib.term.URIRef('http://www.w3.org/2004/02/skos/core#')),
 ('sosa', rdflib.term.URIRef('http://www.w3.org/ns/sosa/')),
 ('ssn', rdflib.term.URIRef('http://www.w3.org/ns/ssn/')),
 ('time', rdflib.term.URIRef('http://www.w3.org/2006/time#')),
 ('vann', rdflib.term.URIRef('http://purl.org/vocab/vann/')),
 ('void', rdflib.term.URIRef('http://rdfs.org/ns/void#')),
 ('xsd', rdflib.term.URIRef('http://www.w3.org/2001/XMLSchema#')),
 ('xml', rdflib.term.URIRef('http://www.w3.org/XML/1998/namespace')),
 ('protege',
  rdflib.term.URIRef('http://protege.stanford.edu/plugins/owl/protege#')),
 ('metadata', rdflib.term.URIRef('http://data.bioontology.org/metadata/')),
 ('oboInOwl',
  rdflib.term.URIRef('http://www.geneontology.org/formats/oboInOwl#')),
 ('NCIT',
  rdflib.term.URIRef('http://ncicb.nci.nih.gov/xml/owl/EVS/Thesaurus.owl#'))]
for ns in ns_list:
    _g.bind(ns[0], Namespace(ns[1]))
list(_g.namespaces())

[('brick', rdflib.term.URIRef('https://brickschema.org/schema/Brick#')),
 ('csvw', rdflib.term.URIRef('http://www.w3.org/ns/csvw#')),
 ('dc', rdflib.term.URIRef('http://purl.org/dc/elements/1.1/')),
 ('dcat', rdflib.term.URIRef('http://www.w3.org/ns/dcat#')),
 ('dcmitype', rdflib.term.URIRef('http://purl.org/dc/dcmitype/')),
 ('dcterms', rdflib.term.URIRef('http://purl.org/dc/terms/')),
 ('dcam', rdflib.term.URIRef('http://purl.org/dc/dcam/')),
 ('doap', rdflib.term.URIRef('http://usefulinc.com/ns/doap#')),
 ('foaf', rdflib.term.URIRef('http://xmlns.com/foaf/0.1/')),
 ('odrl', rdflib.term.URIRef('http://www.w3.org/ns/odrl/2/')),
 ('org', rdflib.term.URIRef('http://www.w3.org/ns/org#')),
 ('owl', rdflib.term.URIRef('http://www.w3.org/2002/07/owl#')),
 ('prof', rdflib.term.URIRef('http://www.w3.org/ns/dx/prof/')),
 ('prov', rdflib.term.URIRef('http://www.w3.org/ns/prov#')),
 ('qb', rdflib.term.URIRef('http://purl.org/linked-data/cube#')),
 ('rdf', rdflib.term.URIRef('http://www.w3.org/19

In [81]:
g.namespace_manager = _g.namespace_manager

In [82]:
root = NCIT['C14250']

g_slim = g.pull_entity_subtree(URIRef(root), edges=RDFS.subClassOf)
g_slim.serialize('biko-ontologies/ncit-slim.ttl', format='nifttl')

<Graph identifier=N63a8272833dc469b8bff7f5f5d6df8aa (<class 'rdflib.graph.Graph'>)>

# FMA

In [None]:
root = 'http://purl.org/sig/ont/fma/fma55676'
g = OntologySoup('biko-ontologies/fma.xrdf', format='xml')
g_slim = g.pull_entity_subtree(URIRef(root), edges=RDFS.subClassOf)
g_slim.serialize('biko-ontologies/fma-slim.ttl', format='nifttl')