Skip to content

Commit

Permalink
Fixes for BEL pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
wshayes committed Aug 3, 2018
1 parent aedc808 commit 6928169
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 91 deletions.
4 changes: 2 additions & 2 deletions bel/db/arangodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import bel.utils as utils
from bel.Config import config

import logging
log = logging.getLogger(__name__)
from structlog import get_logger
log = get_logger()

edgestore_db_name = 'edgestore'
belns_db_name = 'belns'
Expand Down
246 changes: 212 additions & 34 deletions bel/edge/edges.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import copy
import itertools
import os
import json
import datetime
import urllib

import bel.lang.belobj
import bel.lang.bel_specification
Expand All @@ -21,17 +24,35 @@

from bel.Config import config

import logging
log = logging.getLogger(__name__)
from structlog import get_logger
log = get_logger()

Edges = MutableSequence[Mapping[str, Any]]

arango_client = arangodb.get_client()
edgestore_db = arangodb.get_edgestore_handle(arango_client)


def save_nanopub_to_edgestore(nanopub_url: str, rules: List[str] = [], orthologize_targets: list = []):
"""Save nanopub created edges into edgestore"""

edges = process_nanopub(nanopub_url, rules, orthologize_targets)

# start_time = datetime.datetime.now()
# arango_client = arangodb.get_client()
# edgestore_db = arangodb.get_edgestore_handle(arango_client)
# conn_time = utils.timespan(start_time)
# log.info('Time to get db connection', delta_get_db_conn=conn_time)
load_edges_into_db(edgestore_db, edges)


def process_nanopub(nanopub_url: str, rules: List[str] = [], orthologize_targets: list = []):
"""Process nanopub into edges and load into EdgeStore
"""

start_time = datetime.datetime.now()

if orthologize_targets == []:
if config['bel_api'].get('edges', None):
if config['bel_api']['edges'].get('orthologize_targets', None):
Expand All @@ -40,33 +61,78 @@ def process_nanopub(nanopub_url: str, rules: List[str] = [], orthologize_targets
api_url = config['bel_api']['servers']['api_url']

nanopub = nanopubstore.get_nanopub(nanopub_url)
nanopub['source_url'] = nanopub_url

edges = create_edges(nanopub, api_url, nanopub_url)
for orthologize_target in orthologize_targets:
edges.extend(create_edges(nanopub, api_url, nanopub_url, orthologize_target=orthologize_target))
citation_string = normalize_nanopub_citation(nanopub)

arango_client = arangodb.get_client()
edgestore_db = arangodb.get_edgestore_handle(arango_client)
np_time = datetime.datetime.now()
delta_get_np = np_time - start_time

# Add unorthologized edges
edges = create_edges(nanopub, api_url, citation_string)

edges_time = datetime.datetime.now()
delta_get_edges = edges_time - np_time

# Add orthologized edges
for orthologize_target in orthologize_targets:
edges.extend(create_edges(nanopub, api_url, citation_string, orthologize_target=orthologize_target))

load_edges_into_db(edgestore_db, edges)

delta_load_db = datetime.datetime.now() - edges_time

log.info('Timings', delta_get_np=f'{delta_get_np.total_seconds() * 1000}ms', delta_get_edges=f'{delta_get_edges.total_seconds() * 1000}ms', delta_load_db=f'{delta_load_db.total_seconds() * 1000}ms', )

return edges


def create_edges(nanopub: Mapping[str, Any], api_url: str, nanopub_url: str, namespace_targets: Mapping[str, List[str]] = None, rules: List[str] = [], orthologize_target: str = None) -> Edges:
"""Create edges from nanopub
def create_edges(nanopub: Mapping[str, Any], api_url: str, citation: str, rules: List[str] = [], orthologize_target: str = []) -> Edges:
"""Create edges from assertions
Args:
nanopub (Mapping[str, Any]): nanopub in nanopub_bel schema format
api_url (str): BEL.bio API endpoint to use
nanopub_url: url for nanopub
citation (str): citation string normalized from Nanopub citation object
rules (List[str]): which computed edge rules to process, default is all,
look at BEL Specification yaml file for computed edge signature keys,
e.g. degradation, if any rule in list is 'skip', then skip computing edges
just return primary_edge
namespace_targets (Mapping[str, List[str]]): what namespaces to canonicalize
orthologize_target: list of species to convert BEL into, e.g. TAX:10090 for mouse, default option does not orthologize
orthologize_target: species to convert BEL into, e.g. TAX:10090 for mouse, default option does not orthologize
Returns:
Tuple[List[Mapping[str, Any]], List[Tuple[str, List[str]]]]: (edge list, validation messages) - edge list with edge attributes (e.g. context) and parse validation messages
Edge object:
{
"edge": {
"subject": {
"name": subj_canon,
"label": subj_lbl,
"components": subj_components,
},
"relation": {
"relation": edge_ast.bel_relation,
"edge_hash": edge_hash,
"nanopub_url": nanopub_url,
"nanopub_id": nanopub_id,
"citation": citation,
"subject_canon": subj_canon,
"subject": subj_lbl,
"object_canon": obj_canon,
"object": obj_lbl,
"annotations": annotations,
"public_flag": nanopub['isPublished'],
"edge_types": edge_types,
},
'object': {
"name": obj_canon,
"label": obj_lbl,
"components": obj_components,
}
}
}
"""

# Extract BEL Version and make sure we can process this
Expand All @@ -88,6 +154,25 @@ def create_edges(nanopub: Mapping[str, Any], api_url: str, nanopub_url: str, nam
bo = bel.lang.belobj.BEL(bel_version, api_url)

for assertion in nanopub['nanopub']['assertions']:

if not assertion['relation']:
continue # Skip any subject only statements

edge = {
'edge': {
'subject': {},
'relation': {
'relation': assertion['relation'],
'nanopub_url': nanopub['source_url'],
'nanopub_id': os.path.basename(urllib.parse.urlparse(nanopub['source_url']).path),
'citation': citation,
'annotations': copy.deepcopy(annotations),
'public_flag': nanopub['isPublished'],
},
'object': {},
}
}

if assertion['relation']:
bel_statement = f"{assertion['subject']} {assertion['relation']} {assertion['object']}"
else:
Expand All @@ -98,54 +183,126 @@ def create_edges(nanopub: Mapping[str, Any], api_url: str, nanopub_url: str, nam
log.error(f'Invalid BEL Statement: {bo.validation_messages}')
continue

if 'backbone' in nanopub['source_url']:
edge['edge']['relation']['edge_types'] = ['backbone']
else:
edge['edge']['relation']['edge_types'] = ['primary']

# Orthologize #####################################################
if orthologize_target:
orig_bel_str = bo.to_string()
bo.orthologize(orthologize_target)

# log.info(f'Unorthologized: {orig_bel_str} New: {bo.to_string()} TEMP')

if bo.to_string() == orig_bel_str:
log.info(f'Skipping unorthologized BEL stmt')
log.info(f'Cannot orthologize BEL stmt to {orthologize_target}')
continue
edge_types = ['primary', 'orthologized']
else:
edge_types = ['primary']

# Canonicalize
bo.canonicalize(namespace_targets=namespace_targets)
edge['edge']['relation']['edge_types'].append('orthologized')

# Primary edge
if bo.ast.bel_relation: # Skip subject-only Assertion?
add_edge(edges, bo.ast, nanopub_url, edge_types, annotations)
# Add BEL causal edge type
if bo.ast.bel_relation and 'causal' in bo.ast.spec['relations']['info'][bo.ast.bel_relation]['categories']:
edge['edge']['relation']['edge_types'].append('causal')

# Computed edges
computed_edges_ast = []
if not rules or 'skip' not in rules:
computed_edges_ast.extend(bo.compute_edges(rules=rules, ast_result=True))
# Primary Edge
process_belobj_into_triples(bo, edge)

edge_hash = utils._create_hash(f'{edge["edge"]["subject"]["name"]} {edge["edge"]["relation"]["relation"]} {edge["edge"]["object"]["name"]}')
edge['edge']['relation']['edge_hash'] = edge_hash
edges.append(edge)

# Computed edges ##################################################
computed_edges_ast = bo.compute_edges(rules=rules, ast_result=True)

if orthologize_target:
edge_types = ['computed', 'orthologized']
else:
edge_types = ['computed']

for edge_ast in computed_edges_ast:
add_edge(edges, edge_ast, nanopub_url, edge_types, annotations)
edge = {
'edge': {
'subject': {},
'relation': {
'relation': assertion['relation'],
'nanopub_url': nanopub['source_url'],
'nanopub_id': os.path.basename(urllib.parse.urlparse(nanopub['source_url']).path),
'annotations': copy.deepcopy(annotations),
'public_flag': nanopub['isPublished'],
'edge_types': edge_types,
},
'object': {},
}
}

bo.ast = edge_ast
process_belobj_into_triples(bo, edge)

edge_hash = utils._create_hash(f'{edge["edge"]["subject"]["name"]} {edge["edge"]["relation"]["relation"]} {edge["edge"]["object"]["name"]}')
edge['edge']['relation']['edge_hash'] = edge_hash
edges.append(edge)

return edges


def add_edge(edges: Edges, edge_ast, nanopub_url: str, edge_types: List[str], annotations: List[any]):
def process_belobj_into_triples(bo, edge):
"""Create triples (canonicalized and decanonicalized)
Create SRO strings and components canonicalized and decanonicalized
as well as the Subject and Object triples
"""

bo.canonicalize()
sro = bo.to_triple()
edge['edge']['subject']['name'] = sro['subject']
edge['edge']['relation']['subject_canon'] = sro['subject']

edge['edge']['relation']['relation'] = sro['relation']
edge['edge']['object']['name'] = sro['object']
edge['edge']['relation']['object_canon'] = sro['object']

edge['edge']['subject']['components'] = bo.ast.bel_subject.subcomponents(subcomponents=[])

if bo.ast.bel_object.__class__.__name__ == 'BELAst': # Nested BEL Assertion
obj_components = bo.ast.bel_object.bel_subject.subcomponents(subcomponents=[])
obj_components = bo.ast.bel_object.bel_object.subcomponents(subcomponents=obj_components)
else: # Normal BEL Assertion
obj_components = bo.ast.bel_object.subcomponents(subcomponents=[])

edge['edge']['object']['components'] = obj_components

bo.decanonicalize()
sro = bo.to_triple()
edge['edge']['subject']['label'] = sro['subject']
edge['edge']['relation']['subject'] = sro['subject']
edge['edge']['object']['label'] = sro['object']
edge['edge']['relation']['object'] = sro['object']


def add_edge_old(nanopub, edges: Edges, edge_ast, nanopub_url: str, edge_types: List[str], citation: str, annotations: List[any]):
"""Add edge to edges
Args:
edges: list of edges
edge_ast: Edge AST to add to edges list
nanopub_url: URL of nanopub in NanopubStore
edge_types: list of edge types (e.g. primary, orthologized, computed)
citation: normalized citation string (uri, reference, database normalization)
annotations: list of annotations pulled from nanopub
"""

# TODO Update convert_namespaces_ast to use it instead of convert_namespaces_str -- need lots of changes to that function

nanopub_id = os.path.basename(urllib.parse.urlparse(nanopub_url).path)

# Add BEL causal edge type
if edge_ast.bel_relation and 'causal' in edge_ast.spec['relations']['info'][edge_ast.bel_relation]['categories']:
edge_types.append('causal')

# Add backbone edge type
if 'backbone' in nanopub_url:
edge_types.append('backbone')
edge_types = [edge_type for edge_type in edge_types if edge_type != 'primary']

subj_canon = edge_ast.bel_subject.to_string()
subj_lbl = bel_utils.convert_namespaces_str(subj_canon, decanonicalize=True)

Expand All @@ -160,10 +317,6 @@ def add_edge(edges: Edges, edge_ast, nanopub_url: str, edge_types: List[str], an
else: # Normal BEL Assertion
obj_components = edge_ast.bel_object.subcomponents(subcomponents=[])

# Add BEL causal edge type
if edge_ast.bel_relation and 'causal' in edge_ast.spec['relations']['info'][edge_ast.bel_relation]['categories']:
edge_types.append('causal')

edge_hash = utils._create_hash(f'{subj_canon} {edge_ast.bel_relation} {obj_canon}')

# Add edge_dt after creating hash id in edge_iterator
Expand All @@ -178,12 +331,15 @@ def add_edge(edges: Edges, edge_ast, nanopub_url: str, edge_types: List[str], an
"relation": edge_ast.bel_relation,
"edge_hash": edge_hash,
"nanopub_url": nanopub_url,
"edge_types": edge_types,
"nanopub_id": nanopub_id,
"citation": citation,
"subject_canon": subj_canon,
"subject": subj_lbl,
"object_canon": obj_canon,
"object": obj_lbl,
"annotations": annotations,
"public_flag": nanopub['isPublished'],
"edge_types": edge_types,
},
'object': {
"name": obj_canon,
Expand All @@ -193,6 +349,12 @@ def add_edge(edges: Edges, edge_ast, nanopub_url: str, edge_types: List[str], an
}
}

if nanopub['nanopub']['metadata'].get('gd:creator', None):
edge['edge']['relation']['creator'] = nanopub['nanopub']['metadata']['gd:creator']

if nanopub['nanopub']['metadata'].get('project', None):
edge['edge']['relation']['project'] = nanopub['nanopub']['metadata']['project']

edges.append(copy.deepcopy(edge))
return edges

Expand Down Expand Up @@ -253,6 +415,22 @@ def load_edges_into_db(db, edges=[], edges_fn=None, username: str = None, passwo
arangodb.batch_load_docs(db, doc_iterator)


def normalize_nanopub_citation(nanopub):

citation_string = ''
if 'database' in nanopub['nanopub']['citation']:
if nanopub['nanopub']['citation']['database']['name'].lower() == 'pubmed':
citation_string = f"PMID:{nanopub['nanopub']['citation']['database']['id']}"
else:
citation_string = f"{nanopub['nanopub']['citation']['database']['name']}:{nanopub['nanopub']['citation']['database']['id']}"
elif 'reference' in nanopub['nanopub']['citation'] and isinstance(nanopub['nanopub']['citation']['reference'], str):
citation_string = nanopub['nanopub']['citation']['reference']
elif 'uri' in nanopub['nanopub']['citation']:
citation_string = nanopub['nanopub']['citation']['uri']

return citation_string


def main():

(arango_client, edgestore_db) = arangodb.get_client()
Expand Down

0 comments on commit 6928169

Please sign in to comment.