diff --git a/code/ARAX/ARAXQuery/ARAX_query.py b/code/ARAX/ARAXQuery/ARAX_query.py index 5366b27d1..e55292edf 100644 --- a/code/ARAX/ARAXQuery/ARAX_query.py +++ b/code/ARAX/ARAXQuery/ARAX_query.py @@ -1,32 +1,20 @@ #!/bin/env python3 import copy import sys -def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) - import os import json -import ast import re import time from datetime import datetime -import subprocess import traceback -from collections import Counter -import numpy as np import threading -import json -import uuid import requests import gc import contextlib -import connexion from ARAX_response import ARAXResponse -from query_graph_info import QueryGraphInfo -from knowledge_graph_info import KnowledgeGraphInfo from actions_parser import ActionsParser from ARAX_filter import ARAXFilter -from ARAX_resultify import ARAXResultify from ARAX_query_graph_interpreter import ARAXQueryGraphInterpreter from ARAX_messenger import ARAXMessenger from ARAX_ranker import ARAXRanker @@ -37,23 +25,15 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../../UI/OpenAPI/python-flask-server/") from openapi_server.models.response import Response from openapi_server.models.message import Message -from openapi_server.models.knowledge_graph import KnowledgeGraph -from openapi_server.models.query_graph import QueryGraph -from openapi_server.models.q_node import QNode -from openapi_server.models.q_edge import QEdge from openapi_server.models.operations import Operations sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../..") from RTXConfiguration import RTXConfiguration -from openapi_server.models.message import Message -from openapi_server.models.q_node import QNode -from openapi_server.models.q_edge import QEdge - -sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../../reasoningtool/QuestionAnswering") - sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../ResponseCache") -from response_cache import ResponseCache +from response_cache import ResponseCache #noqa: E402 + +def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) ARAXResponse.output = 'STDERR' @@ -270,7 +250,7 @@ def track_query_finish(self): query_tracker = ARAXQueryTracker() try: response_id = self.response.response_id - except: + except (AttributeError, TypeError): response_id = None attributes = { @@ -348,7 +328,7 @@ def query(self, query, mode='ARAX', origin='local'): job_id = query_tracker.create_tracker_entry(attributes) if job_id == -999: - response.error(f"Query could not be run due to exceeded limits", error_code="OverLimit", http_status=429) + response.error("Query could not be run due to exceeded limits", error_code="OverLimit", http_status=429) return response response.job_id = job_id @@ -363,13 +343,13 @@ def query(self, query, mode='ARAX', origin='local'): #### Convert the message from dicts to objects if 'message' in query: - response.debug(f"Deserializing message") + response.debug("Deserializing message") query['message'] = ARAXMessenger().from_dict(query['message']) # If there is a workflow, translate it to ARAXi and append it to the operations actions list if "have_workflow" in query_attributes: if query['message'].query_graph is None: - response.error(f"Cannot have a workflow with an null query_graph", error_code="MissingQueryGraph") + response.error("Cannot have a workflow with an null query_graph", error_code="MissingQueryGraph") return response try: @@ -387,7 +367,7 @@ def query(self, query, mode='ARAX', origin='local'): #### In ARAX mode, run the QueryGraph through the QueryGraphInterpreter and to generate ARAXi if mode == 'ARAX' or mode == 'asynchronous': - response.info(f"Found input query_graph. Interpreting it and generating ARAXi processing plan to answer it") + response.info("Found input query_graph. Interpreting it and generating ARAXi processing plan to answer it") interpreter = ARAXQueryGraphInterpreter() interpreter.translate_to_araxi(response) if response.status != 'OK': @@ -397,9 +377,9 @@ def query(self, query, mode='ARAX', origin='local'): #### Else the mode is KG2 mode, where we just accept one-hop queries, and run a simple ARAXi else: - response.info(f"Found input query_graph. Querying RTX KG2 to answer it") + response.info("Found input query_graph. Querying RTX KG2 to answer it") if len(response.envelope.message.query_graph.nodes) > 2: - response.error(f"Only 1 hop (2 node) queries can be handled at this time", error_code="TooManyHops") + response.error("Only 1 hop (2 node) queries can be handled at this time", error_code="TooManyHops") return response query['operations'] = {} query['operations']['actions'] = [ 'expand(kp=infores:rtx-kg2)', 'resultify()', 'return(store=false)' ] @@ -409,12 +389,12 @@ def query(self, query, mode='ARAX', origin='local'): #### If we have operations, execute them if "have_operations" in query_attributes: - response.info(f"Found input processing plan. Sending to the ProcessingPlanExecutor") + response.info("Found input processing plan. Sending to the ProcessingPlanExecutor") result = self.execute_processing_plan(query, mode=mode) #### This used to support canned queries, but no longer does else: - response.error(f"Unable to determine ARAXi to execute. Error Q213", error_code="UnknownError") + response.error("Unable to determine ARAXi to execute. Error Q213", error_code="UnknownError") except MemoryError as e: self.handle_memory_error(e) @@ -426,7 +406,7 @@ def query(self, query, mode='ARAX', origin='local'): def examine_incoming_query(self, query, mode='ARAX'): response = self.response - response.info(f"Examine input Query for needed information for dispatch") + response.info("Examine input Query for needed information for dispatch") #eprint(query) #### Check to see if there's an operations processing plan @@ -467,7 +447,7 @@ def examine_incoming_query(self, query, mode='ARAX'): def convert_workflow_to_ARAXi(self, query): response = self.response - response.info(f"Converting workflow elements to ARAXi") + response.info("Converting workflow elements to ARAXi") # Convert the TRAPI workflow into ARAXi converter = WorkflowToARAXi() @@ -495,7 +475,7 @@ def convert_workflow_to_ARAXi(self, query): def validate_incoming_query_graph(self,message): response = self.response - response.info(f"Validating the input query graph") + response.info("Validating the input query graph") # Define allowed qnode and qedge attributes to check later allowed_qnode_attributes = { 'ids': 1, 'categories':1, 'is_set': 1, 'set_interpretation': 1, 'set_id': 1, 'member_ids': 1, 'option_group_id': 1, 'name': 1, 'constraints': 1 } @@ -510,7 +490,7 @@ def validate_incoming_query_graph(self,message): #### Check to ensure that either edges EOR paths is present if 'edges' not in message['query_graph'] and 'paths' not in message['query_graph']: - response.error(f"QueryGraph is missing both 'edges' and 'paths'. At least one must be present.", error_code="MissingQEdgeAndQPath") + response.error("QueryGraph is missing both 'edges' and 'paths'. At least one must be present.", error_code="MissingQEdgeAndQPath") return response #### Loop through edges checking the attributes @@ -542,7 +522,7 @@ def limit_message(self,message,query): def execute_processing_plan(self,input_operations_dict, mode='ARAX'): response = self.response - response.debug(f"Entering execute_processing_plan") + response.debug("Entering execute_processing_plan") messages = [] message = None @@ -557,17 +537,17 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): operations = Operations.from_dict(input_operations_dict["operations"]) #### Connect to the message store just once, even if we won't use it - response.debug(f"Connecting to ResponseCache") + response.debug("Connecting to ResponseCache") response_cache = ResponseCache() # also calls connect #### Create a messenger object for basic message processing - response.debug(f"Creating ARAXMessenger instance") + response.debug("Creating ARAXMessenger instance") messenger = ARAXMessenger() #### If there are URIs provided, try to load them force_remote = False if operations.message_uris is not None: - response.debug(f"Found message_uris") + response.debug("Found message_uris") for uri in operations.message_uris: response.debug(f" messageURI={uri}") matchResult = re.match( r'http[s]://arax.ncats.io/.*api/arax/.+/response/(\d+)',uri,re.M|re.I ) @@ -613,9 +593,9 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): #### If there are one or more messages embedded in the POST, process them if operations.messages is not None: - response.debug(f"Received messages") + response.debug("Received messages") for uploadedMessage in operations.messages: - response.debug(f"uploadedMessage is a "+str(uploadedMessage.__class__)) + response.debug("uploadedMessage is a "+str(uploadedMessage.__class__)) if str(uploadedMessage.__class__) == "": uploadedMessage = ARAXMessenger().from_dict(uploadedMessage) messages.append(uploadedMessage) @@ -635,9 +615,9 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): else: #response.error(f"Uploaded message does not contain a results. May be the wrong format") #return response - response.warning(f"There are no results in this uploaded message, but maybe that's okay") + response.warning("There are no results in this uploaded message, but maybe that's okay") else: - response.error(f"Uploaded message is not of type Message. It is of type"+str(uploadedMessage.__class__)) + response.error("Uploaded message is not of type Message. It is of type "+str(uploadedMessage.__class__)) return response #### Take different actions based on the number of messages we now have in hand @@ -645,28 +625,28 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): #### If there's no input message, then create one if n_messages == 0: - response.debug(f"No starting messages were referenced. Will start with a blank template Message") + response.debug("No starting messages were referenced. Will start with a blank template Message") messenger.create_envelope(response) message = response.envelope.message #### If there's on message, we will run with that elif n_messages == 1: - response.debug(f"A single Message is ready and in hand") + response.debug("A single Message is ready and in hand") message = messages[0] response.envelope.message = message #### Multiple messages unsupported else: - response.warning(f"Multiple Messages were uploaded or imported by reference. However, proper merging code has not been implemented yet! Will use just the first Message for now.") + response.warning("Multiple Messages were uploaded or imported by reference. However, proper merging code has not been implemented yet! Will use just the first Message for now.") message = messages[0] #### Examine the options that were provided and act accordingly optionsDict = {} if operations.options: - response.debug(f"Processing options were provided, but these are not implemented at the moment and will be ignored") + response.debug("Processing options were provided, but these are not implemented at the moment and will be ignored") for option in operations.options: - response.debug(f" option="+option) + response.debug(" option="+option) optionsDict[option] = 1 # Save the original input query for later reference @@ -681,7 +661,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): #### If there are actions, then fulfill those if operations.actions: - response.debug(f"Found actions") + response.debug("Found actions") actions_parser = ActionsParser() result = actions_parser.parse(operations.actions) response.merge(result) @@ -759,8 +739,6 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): action = None for action in actions: response.info(f"Processing action '{action['command']}' with parameters {action['parameters']}") - nonstandard_result = False - skip_merge = False # Catch a crash try: @@ -811,13 +789,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): elif action['command'] == 'filter_results': # recognize the filter_results command response.debug(f"Before filtering, there are {len(response.envelope.message.results)} results") filter_results.apply(response, action['parameters']) - - elif action['command'] == 'query_graph_reasoner': - response.info(f"Sending current query_graph to the QueryGraphReasoner") - qgr = QueryGraphReasoner() - message = qgr.answer(ast.literal_eval(repr(message.query_graph)), TxltrApiFormat=True) - self.message = message - nonstandard_result = True + response.debug(f"After filtering, there are {len(response.envelope.message.results)} results") elif action['command'] == 'connect': connect.apply(response, action['parameters']) @@ -827,7 +799,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): break elif action['command'] == 'rank_results': - response.info(f"Running experimental reranker on results") + response.info("Running experimental reranker on results") try: ranker = ARAXRanker() ranker.aggregate_scores_dmk(response) @@ -861,7 +833,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): #### Immediately after resultify, run the experimental ranker if action['command'] == 'resultify' and mode != 'RTXKG2': - response.info(f"Running experimental reranker on results") + response.info("Running experimental reranker on results") try: ranker = ARAXRanker() ranker.aggregate_scores_dmk(response) @@ -874,7 +846,9 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): if mode != 'RTXKG2': # KG2 doesn't use virtual edges or edit the QG, so no transformation needed result_transformer = ResultTransformer() + response.debug(f"Calling ResultTransformer; number of results is: {len(response.envelope.message.results)}") result_transformer.transform(response) + response.debug(f"Results have been transformed; number of results is: {len(response.envelope.message.results)}") #### At the end, process the explicit return() action, or implicitly perform one return_action = { 'command': 'return', 'parameters': { 'response': 'true', 'store': 'true' } } @@ -901,7 +875,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): #### Provide the total results count in the Response if it is available try: response.envelope.total_results_count = response.total_results_count - except: + except (AttributeError, TypeError): pass #response.envelope.operations['actions'] = operations.actions @@ -949,7 +923,7 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): # If store=true, then put the message in the database response_id = None if return_action['parameters']['store'] == 'true': - response.debug(f"Storing resulting Message") + response.debug("Storing resulting Message") response_id = response_cache.add_new_response(response) response.info(f"Result was stored with id {response_id}. It can be viewed at https://arax.ncats.io/?r={response_id}") response.response_id = response_id @@ -963,10 +937,10 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'): #### If asking for the full message back if return_action['parameters']['response'] == 'true': if mode == 'asynchronous': - response.info(f"Processing is complete. Attempting to send the result to the callback URL.") + response.info("Processing is complete. Attempting to send the result to the callback URL.") self.send_to_callback(callback, response) else: - response.info(f"Processing is complete. Transmitting resulting Message back to client.") + response.info("Processing is complete. Transmitting resulting Message back to client.") return response #### Else just the id is returned @@ -1012,7 +986,7 @@ def send_to_callback(self, callback, response): send_attempts += 1 if not post_succeeded: - response.info(f"Wait 10 seconds before trying again") + response.info("Wait 10 seconds before trying again") time.sleep(10) if not post_succeeded: @@ -1030,7 +1004,7 @@ def inject_int_value_into_parameters(self, parameter_name, query_options, parame #### Try to convery the value to an integer try: parameter_value = int(parameter_value) - except: + except (ValueError, TypeError): self.response.error(f"Unable to convert parameter {parameter_name} = '{parameter_value}' into an integer", error_code=error_code) return #### Only update the value in parameters if one was not explicitly specified @@ -1070,7 +1044,8 @@ def main(): #### Set verbose verbose = params.verbose - if verbose is None: verbose = 1 + if verbose is None: + verbose = 1 #### Create the ARAXQuery object araxq = ARAXQuery() diff --git a/code/UI/OpenAPI/python-flask-server/openapi_server/controllers/example_questions_controller.py b/code/UI/OpenAPI/python-flask-server/openapi_server/controllers/example_questions_controller.py index bb804ab98..d792f74f6 100644 --- a/code/UI/OpenAPI/python-flask-server/openapi_server/controllers/example_questions_controller.py +++ b/code/UI/OpenAPI/python-flask-server/openapi_server/controllers/example_questions_controller.py @@ -1,8 +1,12 @@ import connexion import six +import sys +import os from openapi_server import util +sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../../../../../reasoningtool/QuestionAnswering") + from QuestionExamples import QuestionExamples