Skip to content

Commit

Permalink
Fixed naked NSArg canonicalization
Browse files Browse the repository at this point in the history
  • Loading branch information
wshayes committed Feb 4, 2021
1 parent 9959559 commit a156db4
Show file tree
Hide file tree
Showing 10 changed files with 483 additions and 76 deletions.
8 changes: 4 additions & 4 deletions bel/api/endpoints/belspec.py
Expand Up @@ -109,9 +109,9 @@ def get_belhelp(version: str = "latest"):

@router.post("/belspec/help")
def post_belhelp(belhelp: dict):
"""Create or Update BEL Help"""
"""Create or Update BEL Help """

bel.belspec.crud.update_belhelp(belhelp)
return bel.belspec.crud.update_belhelp(belhelp)


@router.delete("/belspec/help/{version}")
Expand All @@ -121,7 +121,7 @@ def delete_belhelp(version: str):
if version == "latest":
version = bel.belspec.crud.get_latest_version()

bel.belspec.crud.delete_belhelp(version)
return bel.belspec.crud.delete_belhelp(version)


@router.get("/belspec/versions", response_model=BelSpecVersions)
Expand Down Expand Up @@ -149,4 +149,4 @@ def delete_belspec(version: str):
if version == "latest":
version = bel.belspec.crud.get_latest_version()

bel.belspec.crud.delete_belspec(version)
return bel.belspec.crud.delete_belspec(version)
4 changes: 3 additions & 1 deletion bel/belspec/crud.py
Expand Up @@ -266,10 +266,12 @@ def update_belhelp(belhelp: dict):

version = belhelp["version"]

doc = {"_key": f"belspec_{version}", "doc_type": "belhelp", "belhelp": belhelp}
doc = {"_key": f"belhelp_{version}", "doc_type": "belhelp", "belhelp": belhelp}

bel_config_coll.insert(doc, overwrite=True)

return {"msg": "Loaded belspec help"}


def delete_belhelp(version: str):
"""Delete BEL specification help"""
Expand Down
8 changes: 8 additions & 0 deletions bel/belspec/enhance.py
Expand Up @@ -38,6 +38,14 @@ def add_function_signature_help(specification: dict) -> dict:
Simplify the function signatures for presentation to BEL Editor users
"""
for f in specification["functions"]["signatures"]:

# Copy primary_function into function signature
if specification["functions"]["signatures"][f]["func_type"] == "Modifier":
specification["functions"]["signatures"][f]["primary_function"] = specification[
"functions"
]["info"][f]["primary_function"]

# Enhance signature for function
for argset_idx, argset in enumerate(
specification["functions"]["signatures"][f]["signatures"]
):
Expand Down
160 changes: 139 additions & 21 deletions bel/lang/ast.py
Expand Up @@ -36,6 +36,18 @@
from bel.schemas.constants import strarg_validation_lists


def compare_fn_args(args1, args2, ignore_locations: bool = False) -> bool:
"""If args set1 is the same as arg set2 - returns True
This is used to see if two functions have the same set of arguments
"""

args1 = ", ".join([arg.to_string(ignore_location=True) for arg in args1])
args2 = ", ".join([arg.to_string(ignore_location=True) for arg in args2])

return args1 == args2


#########################
# Unknown string #
#########################
Expand All @@ -58,7 +70,7 @@ def __str__(self):

__repr__ = __str__

def to_string(self, fmt: str = "medium") -> str:
def to_string(self, fmt: str = "medium", ignore_location: bool = False) -> str:

return str(self)

Expand All @@ -79,7 +91,7 @@ def __init__(self, name, version: str = "latest", span: Span = None):

self.type = "Relation"

def to_string(self, fmt: str = "medium"):
def to_string(self, fmt: str = "medium", ignore_location: bool = False):
if fmt == "short":
return self.name_short
else:
Expand Down Expand Up @@ -227,6 +239,17 @@ def orthologizable(self, species_key: Key) -> Optional[bool]:

return true_response

def optimize(self):
"""Optimize Assertion
Currently this only optimizes reactions if they match the following pattern
"""

if self.name == "reaction":
self = optimize_rxn(self)

return self

def get_species_keys(self, species_keys: List[str] = None):
"""Collect species associated with NSArgs
Expand Down Expand Up @@ -274,7 +297,7 @@ def validate(self, errors: List[ValidationError] = None):
try:
errors.extend(validate_function(self))
except Exception as e:
logger.error(f"Could not validate function {self.to_string()} -- error: {str(e)}")
logger.exception(f"Could not validate function {self.to_string()} -- error: {str(e)}")
errors.append(
ValidationError(
type="Assertion",
Expand All @@ -291,13 +314,7 @@ def validate(self, errors: List[ValidationError] = None):

return errors

def to_string(
self,
fmt: str = "medium",
canonicalize: bool = False,
decanonicalize: bool = False,
orthologize: str = None,
) -> str:
def to_string(self, fmt: str = "medium", ignore_location: bool = False) -> str:
"""Convert AST object to string
Args:
Expand All @@ -310,7 +327,12 @@ def to_string(
str: string version of BEL AST
"""

arg_string = ", ".join([a.to_string(fmt=fmt) for a in self.args])
if ignore_location and self.name == "location":
return ""

arg_string = ", ".join(
[a.to_string(fmt=fmt, ignore_location=ignore_location) for a in self.args]
)

if fmt in ["short", "medium"]:
function_name = self.name_short
Expand Down Expand Up @@ -480,7 +502,7 @@ def update(self, entity: BelEntity):
self.entity = entity
self.span = None

def to_string(self, fmt: str = "medium") -> str:
def to_string(self, fmt: str = "medium", ignore_location: bool = False) -> str:

return str(self.entity)

Expand Down Expand Up @@ -508,7 +530,7 @@ def update(self, value: str):
self.value = value
self.span = None

def to_string(self, fmt: str = "medium") -> str:
def to_string(self, fmt: str = "medium", ignore_location: bool = False) -> str:
"""Convert AST object to string
Args:
Expand Down Expand Up @@ -737,8 +759,14 @@ def parse(self):
else:
logger.error(f"Unknown span type {span}")

return self.args_to_components()

def args_to_components(self):
"""Convert AST args to subject, relation, object components"""

# Subject only assertion
if len(self.args) == 1 and self.args[0].type == "Function":
# if len(self.args) == 1 and self.args[0].type == "Function":
if len(self.args) == 1:
self.subject = self.args[0]
# Normal SRO BEL assertion
elif (
Expand Down Expand Up @@ -914,6 +942,23 @@ def validate(self):
if arg and arg.type in ["Function"]:
self.errors.extend(arg.validate(errors=[]))

def optimize(self):
"""Optimize Assertion
Currently this only optimizes reactions if they match the following pattern
reactants(A, B) -> products(complex(A, B)) SHOULD BE complex(A, B)
"""

if hasattr(self, "args"):
for idx, arg in enumerate(self.args):
if arg and arg.type == "Function":
tmp = arg.optimize()
self.args[idx] = tmp

self.args_to_components()

return self

def subcomponents(self, subcomponents=None):
"""Generate subcomponents of the BEL subject or object
Expand All @@ -939,15 +984,15 @@ def subcomponents(self, subcomponents=None):

return subcomponents

def to_string(self, fmt: str = "medium") -> str:
def to_string(self, fmt: str = "medium", ignore_location: bool = False) -> str:
"""Convert AST object to string
Args:
fmt (str): short, medium, long formatted BEL statements
short = short function and short relation format
medium = short function and long relation format
long = long function and long relation format
canonicalize
ignore_location: don't add location to output string
Returns:
str: string version of BEL AST
Expand All @@ -958,19 +1003,19 @@ def to_string(self, fmt: str = "medium") -> str:
if self.subject and self.relation and self.object:
if isinstance(self.object, BELAst):
return "{} {} ({})".format(
self.subject.to_string(fmt=fmt),
self.subject.to_string(fmt=fmt, ignore_location=ignore_location),
self.relation.to_string(fmt=fmt),
self.object.to_string(fmt=fmt),
self.object.to_string(fmt=fmt, ignore_location=ignore_location),
)
else:
return "{} {} {}".format(
self.subject.to_string(fmt=fmt),
self.subject.to_string(fmt=fmt, ignore_location=ignore_location),
self.relation.to_string(fmt=fmt),
self.object.to_string(fmt=fmt),
self.object.to_string(fmt=fmt, ignore_location=ignore_location),
)

elif self.subject:
return "{}".format(self.subject.to_string(fmt=fmt))
return "{}".format(self.subject.to_string(fmt=fmt, ignore_location=ignore_location))

else:
return ""
Expand Down Expand Up @@ -1405,6 +1450,13 @@ def validate_function(fn: Function, errors: List[ValidationError] = None) -> Lis
)
)

# Check for bad reactions
# 1. reactants = products -> error
# 2. reactants = products(complex(reactants)) = warning to replace with just the complex

if fn.name == "reaction":
errors.extend(validate_rxn_semantics(fn))

# Modifier function with wrong parent function
if (
fn.function_signature["func_type"] == "Modifier"
Expand All @@ -1424,6 +1476,72 @@ def validate_function(fn: Function, errors: List[ValidationError] = None) -> Lis
return errors


def validate_rxn_semantics(rxn: Function) -> List[ValidationError]:
"""Validate Reactions
Check for bad reactions
1. reactants = products -> error
2. reactants = products(complex(reactants)) = warning to replace with just the complex
"""

errors = []

reactants = rxn.args[0]
products = rxn.args[1]

if reactants.name != "reactants" or products.name != "products":
return errors

# ERROR reactants(A, B) -> products(complex(A, B)) SHOULD BE complex(A, B)
if products.args[0].name == "complexAbundance" and compare_fn_args(
reactants.args, products.args[0].args
):
errors.append(
ValidationError(
type="Assertion",
severity="Error",
msg=f"Reaction should be replaced with just the product complex: {products.args[0].to_string()}",
visual_pairs=[(rxn.span.start, rxn.span.end)],
index=rxn.span.start,
)
)

# ERROR reactants(A, B) SHOULD NOT EQUAL products(A, B)
elif compare_fn_args(reactants.args, products.args):
errors.append(
ValidationError(
type="Assertion",
severity="Error",
msg=f"Reaction should not have equivalent reactants and products",
visual_pairs=[(rxn.span.start, rxn.span.end)],
index=rxn.span.start,
)
)

return errors


def optimize_rxn(rxn: Function) -> Function:
"""Transform reaction into more optimal BEL"""

parent = rxn.parent
reactants = rxn.args[0]
products = rxn.args[1]
if reactants.name != "reactants" or products.name != "products":
return rxn

# Convert reactants(A, B) -> products(complex(A, B)) SHOULD BE complex(A, B)

if products.args[0].name == "complexAbundance" and compare_fn_args(
reactants.args, products.args[0].args, ignore_locations=True
):
rxn = products.args[0]
rxn.parent = parent

return rxn


def sort_function_args(fn: Function):
"""Add sort tuple values to function arguments for canonicalization and sort function arguments"""

Expand Down
4 changes: 2 additions & 2 deletions bel/schemas/belspec.py
Expand Up @@ -8,8 +8,8 @@

# BEL Specification Schema ###########################################################
class FunctionTypes(str, enum.Enum):
primary = "primary"
modifier = "modifier"
Primary = "Primary"
Modifier = "Modifier"


class ArgumentTypes(str, enum.Enum):
Expand Down

0 comments on commit a156db4

Please sign in to comment.