Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 43 additions & 68 deletions code/ARAX/ARAXQuery/ARAX_query.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
#!/bin/env python3
import copy
import sys
def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)

import os
import json
import ast
import re
import time
from datetime import datetime
import subprocess
import traceback
from collections import Counter
import numpy as np
import threading
import json
import uuid
import requests
import gc
import contextlib
import connexion

from ARAX_response import ARAXResponse
from query_graph_info import QueryGraphInfo
from knowledge_graph_info import KnowledgeGraphInfo
from actions_parser import ActionsParser
from ARAX_filter import ARAXFilter
from ARAX_resultify import ARAXResultify
from ARAX_query_graph_interpreter import ARAXQueryGraphInterpreter
from ARAX_messenger import ARAXMessenger
from ARAX_ranker import ARAXRanker
Expand All @@ -37,23 +25,15 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)
sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../../UI/OpenAPI/python-flask-server/")
from openapi_server.models.response import Response
from openapi_server.models.message import Message
from openapi_server.models.knowledge_graph import KnowledgeGraph
from openapi_server.models.query_graph import QueryGraph
from openapi_server.models.q_node import QNode
from openapi_server.models.q_edge import QEdge
from openapi_server.models.operations import Operations

sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../..")
from RTXConfiguration import RTXConfiguration

from openapi_server.models.message import Message
from openapi_server.models.q_node import QNode
from openapi_server.models.q_edge import QEdge

sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../../reasoningtool/QuestionAnswering")

sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../ResponseCache")
from response_cache import ResponseCache
from response_cache import ResponseCache #noqa: E402

def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)


ARAXResponse.output = 'STDERR'
Expand Down Expand Up @@ -270,7 +250,7 @@ def track_query_finish(self):
query_tracker = ARAXQueryTracker()
try:
response_id = self.response.response_id
except:
except (AttributeError, TypeError):
response_id = None

attributes = {
Expand Down Expand Up @@ -348,7 +328,7 @@ def query(self, query, mode='ARAX', origin='local'):
job_id = query_tracker.create_tracker_entry(attributes)

if job_id == -999:
response.error(f"Query could not be run due to exceeded limits", error_code="OverLimit", http_status=429)
response.error("Query could not be run due to exceeded limits", error_code="OverLimit", http_status=429)
return response

response.job_id = job_id
Expand All @@ -363,13 +343,13 @@ def query(self, query, mode='ARAX', origin='local'):

#### Convert the message from dicts to objects
if 'message' in query:
response.debug(f"Deserializing message")
response.debug("Deserializing message")
query['message'] = ARAXMessenger().from_dict(query['message'])

# If there is a workflow, translate it to ARAXi and append it to the operations actions list
if "have_workflow" in query_attributes:
if query['message'].query_graph is None:
response.error(f"Cannot have a workflow with an null query_graph", error_code="MissingQueryGraph")
response.error("Cannot have a workflow with an null query_graph", error_code="MissingQueryGraph")
return response

try:
Expand All @@ -387,7 +367,7 @@ def query(self, query, mode='ARAX', origin='local'):

#### In ARAX mode, run the QueryGraph through the QueryGraphInterpreter and to generate ARAXi
if mode == 'ARAX' or mode == 'asynchronous':
response.info(f"Found input query_graph. Interpreting it and generating ARAXi processing plan to answer it")
response.info("Found input query_graph. Interpreting it and generating ARAXi processing plan to answer it")
interpreter = ARAXQueryGraphInterpreter()
interpreter.translate_to_araxi(response)
if response.status != 'OK':
Expand All @@ -397,9 +377,9 @@ def query(self, query, mode='ARAX', origin='local'):

#### Else the mode is KG2 mode, where we just accept one-hop queries, and run a simple ARAXi
else:
response.info(f"Found input query_graph. Querying RTX KG2 to answer it")
response.info("Found input query_graph. Querying RTX KG2 to answer it")
if len(response.envelope.message.query_graph.nodes) > 2:
response.error(f"Only 1 hop (2 node) queries can be handled at this time", error_code="TooManyHops")
response.error("Only 1 hop (2 node) queries can be handled at this time", error_code="TooManyHops")
return response
query['operations'] = {}
query['operations']['actions'] = [ 'expand(kp=infores:rtx-kg2)', 'resultify()', 'return(store=false)' ]
Expand All @@ -409,12 +389,12 @@ def query(self, query, mode='ARAX', origin='local'):

#### If we have operations, execute them
if "have_operations" in query_attributes:
response.info(f"Found input processing plan. Sending to the ProcessingPlanExecutor")
response.info("Found input processing plan. Sending to the ProcessingPlanExecutor")
result = self.execute_processing_plan(query, mode=mode)

#### This used to support canned queries, but no longer does
else:
response.error(f"Unable to determine ARAXi to execute. Error Q213", error_code="UnknownError")
response.error("Unable to determine ARAXi to execute. Error Q213", error_code="UnknownError")

except MemoryError as e:
self.handle_memory_error(e)
Expand All @@ -426,7 +406,7 @@ def query(self, query, mode='ARAX', origin='local'):
def examine_incoming_query(self, query, mode='ARAX'):

response = self.response
response.info(f"Examine input Query for needed information for dispatch")
response.info("Examine input Query for needed information for dispatch")
#eprint(query)

#### Check to see if there's an operations processing plan
Expand Down Expand Up @@ -467,7 +447,7 @@ def examine_incoming_query(self, query, mode='ARAX'):
def convert_workflow_to_ARAXi(self, query):

response = self.response
response.info(f"Converting workflow elements to ARAXi")
response.info("Converting workflow elements to ARAXi")

# Convert the TRAPI workflow into ARAXi
converter = WorkflowToARAXi()
Expand Down Expand Up @@ -495,7 +475,7 @@ def convert_workflow_to_ARAXi(self, query):
def validate_incoming_query_graph(self,message):

response = self.response
response.info(f"Validating the input query graph")
response.info("Validating the input query graph")

# Define allowed qnode and qedge attributes to check later
allowed_qnode_attributes = { 'ids': 1, 'categories':1, 'is_set': 1, 'set_interpretation': 1, 'set_id': 1, 'member_ids': 1, 'option_group_id': 1, 'name': 1, 'constraints': 1 }
Expand All @@ -510,7 +490,7 @@ def validate_incoming_query_graph(self,message):

#### Check to ensure that either edges EOR paths is present
if 'edges' not in message['query_graph'] and 'paths' not in message['query_graph']:
response.error(f"QueryGraph is missing both 'edges' and 'paths'. At least one must be present.", error_code="MissingQEdgeAndQPath")
response.error("QueryGraph is missing both 'edges' and 'paths'. At least one must be present.", error_code="MissingQEdgeAndQPath")
return response

#### Loop through edges checking the attributes
Expand Down Expand Up @@ -542,7 +522,7 @@ def limit_message(self,message,query):
def execute_processing_plan(self,input_operations_dict, mode='ARAX'):

response = self.response
response.debug(f"Entering execute_processing_plan")
response.debug("Entering execute_processing_plan")
messages = []
message = None

Expand All @@ -557,17 +537,17 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):
operations = Operations.from_dict(input_operations_dict["operations"])

#### Connect to the message store just once, even if we won't use it
response.debug(f"Connecting to ResponseCache")
response.debug("Connecting to ResponseCache")
response_cache = ResponseCache() # also calls connect

#### Create a messenger object for basic message processing
response.debug(f"Creating ARAXMessenger instance")
response.debug("Creating ARAXMessenger instance")
messenger = ARAXMessenger()

#### If there are URIs provided, try to load them
force_remote = False
if operations.message_uris is not None:
response.debug(f"Found message_uris")
response.debug("Found message_uris")
for uri in operations.message_uris:
response.debug(f" messageURI={uri}")
matchResult = re.match( r'http[s]://arax.ncats.io/.*api/arax/.+/response/(\d+)',uri,re.M|re.I )
Expand Down Expand Up @@ -613,9 +593,9 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):

#### If there are one or more messages embedded in the POST, process them
if operations.messages is not None:
response.debug(f"Received messages")
response.debug("Received messages")
for uploadedMessage in operations.messages:
response.debug(f"uploadedMessage is a "+str(uploadedMessage.__class__))
response.debug("uploadedMessage is a "+str(uploadedMessage.__class__))
if str(uploadedMessage.__class__) == "<class 'openapi_server.models.message.Message'>":
uploadedMessage = ARAXMessenger().from_dict(uploadedMessage)
messages.append(uploadedMessage)
Expand All @@ -635,38 +615,38 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):
else:
#response.error(f"Uploaded message does not contain a results. May be the wrong format")
#return response
response.warning(f"There are no results in this uploaded message, but maybe that's okay")
response.warning("There are no results in this uploaded message, but maybe that's okay")
else:
response.error(f"Uploaded message is not of type Message. It is of type"+str(uploadedMessage.__class__))
response.error("Uploaded message is not of type Message. It is of type "+str(uploadedMessage.__class__))
return response

#### Take different actions based on the number of messages we now have in hand
n_messages = len(messages)

#### If there's no input message, then create one
if n_messages == 0:
response.debug(f"No starting messages were referenced. Will start with a blank template Message")
response.debug("No starting messages were referenced. Will start with a blank template Message")
messenger.create_envelope(response)

message = response.envelope.message

#### If there's on message, we will run with that
elif n_messages == 1:
response.debug(f"A single Message is ready and in hand")
response.debug("A single Message is ready and in hand")
message = messages[0]
response.envelope.message = message

#### Multiple messages unsupported
else:
response.warning(f"Multiple Messages were uploaded or imported by reference. However, proper merging code has not been implemented yet! Will use just the first Message for now.")
response.warning("Multiple Messages were uploaded or imported by reference. However, proper merging code has not been implemented yet! Will use just the first Message for now.")
message = messages[0]

#### Examine the options that were provided and act accordingly
optionsDict = {}
if operations.options:
response.debug(f"Processing options were provided, but these are not implemented at the moment and will be ignored")
response.debug("Processing options were provided, but these are not implemented at the moment and will be ignored")
for option in operations.options:
response.debug(f" option="+option)
response.debug(" option="+option)
optionsDict[option] = 1

# Save the original input query for later reference
Expand All @@ -681,7 +661,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):

#### If there are actions, then fulfill those
if operations.actions:
response.debug(f"Found actions")
response.debug("Found actions")
actions_parser = ActionsParser()
result = actions_parser.parse(operations.actions)
response.merge(result)
Expand Down Expand Up @@ -759,8 +739,6 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):
action = None
for action in actions:
response.info(f"Processing action '{action['command']}' with parameters {action['parameters']}")
nonstandard_result = False
skip_merge = False

# Catch a crash
try:
Expand Down Expand Up @@ -811,13 +789,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):
elif action['command'] == 'filter_results': # recognize the filter_results command
response.debug(f"Before filtering, there are {len(response.envelope.message.results)} results")
filter_results.apply(response, action['parameters'])

elif action['command'] == 'query_graph_reasoner':
response.info(f"Sending current query_graph to the QueryGraphReasoner")
qgr = QueryGraphReasoner()
message = qgr.answer(ast.literal_eval(repr(message.query_graph)), TxltrApiFormat=True)
self.message = message
nonstandard_result = True
response.debug(f"After filtering, there are {len(response.envelope.message.results)} results")

elif action['command'] == 'connect':
connect.apply(response, action['parameters'])
Expand All @@ -827,7 +799,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):
break

elif action['command'] == 'rank_results':
response.info(f"Running experimental reranker on results")
response.info("Running experimental reranker on results")
try:
ranker = ARAXRanker()
ranker.aggregate_scores_dmk(response)
Expand Down Expand Up @@ -861,7 +833,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):

#### Immediately after resultify, run the experimental ranker
if action['command'] == 'resultify' and mode != 'RTXKG2':
response.info(f"Running experimental reranker on results")
response.info("Running experimental reranker on results")
try:
ranker = ARAXRanker()
ranker.aggregate_scores_dmk(response)
Expand All @@ -874,7 +846,9 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):

if mode != 'RTXKG2': # KG2 doesn't use virtual edges or edit the QG, so no transformation needed
result_transformer = ResultTransformer()
response.debug(f"Calling ResultTransformer; number of results is: {len(response.envelope.message.results)}")
result_transformer.transform(response)
response.debug(f"Results have been transformed; number of results is: {len(response.envelope.message.results)}")

#### At the end, process the explicit return() action, or implicitly perform one
return_action = { 'command': 'return', 'parameters': { 'response': 'true', 'store': 'true' } }
Expand All @@ -901,7 +875,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):
#### Provide the total results count in the Response if it is available
try:
response.envelope.total_results_count = response.total_results_count
except:
except (AttributeError, TypeError):
pass

#response.envelope.operations['actions'] = operations.actions
Expand Down Expand Up @@ -949,7 +923,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):
# If store=true, then put the message in the database
response_id = None
if return_action['parameters']['store'] == 'true':
response.debug(f"Storing resulting Message")
response.debug("Storing resulting Message")
response_id = response_cache.add_new_response(response)
response.info(f"Result was stored with id {response_id}. It can be viewed at https://arax.ncats.io/?r={response_id}")
response.response_id = response_id
Expand All @@ -963,10 +937,10 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):
#### If asking for the full message back
if return_action['parameters']['response'] == 'true':
if mode == 'asynchronous':
response.info(f"Processing is complete. Attempting to send the result to the callback URL.")
response.info("Processing is complete. Attempting to send the result to the callback URL.")
self.send_to_callback(callback, response)
else:
response.info(f"Processing is complete. Transmitting resulting Message back to client.")
response.info("Processing is complete. Transmitting resulting Message back to client.")
return response

#### Else just the id is returned
Expand Down Expand Up @@ -1012,7 +986,7 @@ def send_to_callback(self, callback, response):

send_attempts += 1
if not post_succeeded:
response.info(f"Wait 10 seconds before trying again")
response.info("Wait 10 seconds before trying again")
time.sleep(10)

if not post_succeeded:
Expand All @@ -1030,7 +1004,7 @@ def inject_int_value_into_parameters(self, parameter_name, query_options, parame
#### Try to convery the value to an integer
try:
parameter_value = int(parameter_value)
except:
except (ValueError, TypeError):
self.response.error(f"Unable to convert parameter {parameter_name} = '{parameter_value}' into an integer", error_code=error_code)
return
#### Only update the value in parameters if one was not explicitly specified
Expand Down Expand Up @@ -1070,7 +1044,8 @@ def main():

#### Set verbose
verbose = params.verbose
if verbose is None: verbose = 1
if verbose is None:
verbose = 1

#### Create the ARAXQuery object
araxq = ARAXQuery()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import connexion
import six
import sys
import os

from openapi_server import util

sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../../../../reasoningtool/QuestionAnswering")

from QuestionExamples import QuestionExamples


Expand Down