In [1]:
"""
Module handles the parallel grammar based information extraction

"""
import sys
sys.path.append("..")

import requests
import concurrent
from searchBackend.parallelSearch import DistributedSearch
import json



In [2]:
        
class DistributedGrammar(DistributedSearch):
    """
        Send request to multiple index server parallely

    """
    def __init__(self,config) -> None:
        self.grammar_endpoint = "api/execute/grammar"
        
        if not config:
            raise ValueError("Config not provided")
        self.config = config

        ## Loading array of IP and ports of index server
        if self.config.get("serverAddressList"):
            self.urlList = [
                f"http://{url}/" for url in self.config.get("serverAddressList")
            ]
        else:
            raise ValueError("serverAddressList not provided")
        
        self.headers = {
                        "Accept": "application/json",
                        "Content-Type": "application/json"
                        }

    def response_processor(self, response):
        """
            Transforming the response in the format of search api endpoint

        """

        duration = response.get("total_hits_duration")
        query = response.get("query")
        count = response.get("total_hits_count")
        complete = True
        total_hits = [item for key,value in response.get("total_hits").items() for item in value] ## Here they have match not matches
     

        ##TODO: Fix match/matches in frontend

        return {"complete":complete,
                "query":query,
                "count":count,
                "duration":duration,
                "scores":None,
                "total_hits":total_hits,
                "is_error":response.get("is_error"),
                "error_message":response.get("error_messages")
                }

    


        pass
        
    def query(self, url, search_query,maxDocs,triggerOverlap):
        """
            Pass Grammar query to this single endpoint

        """

        try:
            encoded_search_query = {
                        "grammar": search_query,
                        "maxDocs": maxDocs,
                        "allowTriggerOverlaps":triggerOverlap}
            
            search_url = f"{url}{self.grammar_endpoint}"  ## Final encoded query for search

            ## no encoding of response required
            response = requests.post(search_url,
                                     json=encoded_search_query,
                                     headers=self.headers)

            ## error in response.json will be catched by exception
            return {"status":"success","response":response.json()}

        except Exception as e:
            return {"status":"fail","response":{},"error_message": str(e)}
        
    def parallel_search(self, query_string,maxDocs,triggerOverlap):
        """
        Send reqest to multiple endpoints parallely and reterive the result from there.
        """

        total_hits = {}  ## key will be port and value as list of hit document
        total_duration = 0
        total_hits_count = 0
        error_messages = None
        is_error = False

        with concurrent.futures.ThreadPoolExecutor() as executor:

            ### TODO: Provide flexible option for Variable URL as well
            futures = {
                    executor.submit(self.query,
                                    url,
                                    query_string,
                                    maxDocs,
                                    triggerOverlap): url for url in self.urlList
                }

            for future in concurrent.futures.as_completed(futures):
                port = futures[future]

                try:
                    response = future.result()

                    if response.get("status") == "success":

                        response = response.get("response",{})
                        ## Total time taken
                        duration = response.get("duration") ## Time taken for the search
                        total_duration += duration

                        ## Total hits count
                        mentions = response.get("mentions")

                        ## Total number of hits
                        hits = len(mentions)
                        total_hits_count += hits
                        
                        total_hits[port] = mentions
                        
                        print(
                            f"Search on endpoint {port} completed. Hits: {hits}, Duration: {duration}"
                        )
                    
                    else:
                        error_messages = response.get("error_message")
                        is_error = True

                except Exception as e:
                    print(f"Error processing results from endpoint {port}: {e}")

        return {"total_hits": total_hits,
                "query": query_string,
                "total_hits_count": total_hits_count,
                "total_hits_duration":total_duration,
                "error_messages":error_messages,
                "is_error":is_error}
    
    def search(self, query_string,maxDocs,triggerOverlap):
        """
        public facing interface

        """
        return self.response_processor(self.parallel_search(query_string,maxDocs,triggerOverlap))


In [3]:
with open("../config.json", "r") as f:
    config = json.load(f)

In [4]:
config.get("distributed_custom_search")

{'port_range': [8585, 8594],
 'serverAddressList': ['217.76.56.90:9000',
  '217.76.56.90:8000',
  '217.76.62.78:9000',
  '217.76.62.78:8000',
  '184.174.36.238:9000',
  '184.174.36.238:8000',
  '217.76.62.73:9000',
  '217.76.55.207:9000',
  '217.76.55.207:8000']}

In [5]:
grammar = DistributedGrammar(config=config.get("distributed_custom_search"))

In [20]:
QUERY= """
rules:
  - name: "rule2"
    label: Phosphorylation
    type:  event
    priority: 1
    pattern: |
      trigger = [lemma=/phosph.*/ & tag=/N.*/]
      subject = >nsubj []
"""

In [23]:
QUERY

'\nrules:\n  - name: "rule2"\n    label: Phosphorylation\n    type:  event\n    priority: 1\n    pattern: |\n      trigger = [lemma=/phosph.*/ & tag=/N.*/]\n      subject = >nsubj []\n'

In [21]:
res = grammar.search(QUERY,maxDocs=10,triggerOverlap=False)

Search on endpoint http://217.76.56.90:9000/ completed. Hits: 20, Duration: 0.19499999284744263
Search on endpoint http://184.174.36.238:9000/ completed. Hits: 20, Duration: 0.1599999964237213
Search on endpoint http://217.76.55.207:8000/ completed. Hits: 20, Duration: 0.10300000011920929
Search on endpoint http://217.76.56.90:8000/ completed. Hits: 20, Duration: 0.2160000056028366
Search on endpoint http://217.76.55.207:9000/ completed. Hits: 20, Duration: 0.13600000739097595
Search on endpoint http://217.76.62.73:9000/ completed. Hits: 20, Duration: 0.21400000154972076
Search on endpoint http://184.174.36.238:8000/ completed. Hits: 20, Duration: 0.2029999941587448
Search on endpoint http://217.76.62.78:8000/ completed. Hits: 20, Duration: 0.3310000002384186
Search on endpoint http://217.76.62.78:9000/ completed. Hits: 20, Duration: 0.4090000092983246


In [13]:
res.keys()

dict_keys(['complete', 'query', 'count', 'duration', 'scores', 'total_hits', 'is_error', 'error_message'])

In [14]:
res["is_error"]

True

In [15]:
res["error_message"]

'Expecting value: line 1 column 1 (char 0)'

In [24]:
[item for key,value in res.get("total_hits").items() for item in value]

[{'sentenceId': 509617,
  'label': None,
  'documentId': '12525665',
  'sentenceIndex': 3,
  'words': ['Protein',
   'phosphatase',
   '2A',
   '(',
   'PP2A',
   ')',
   'is',
   'a',
   'phosphoprotein',
   'that',
   'plays',
   'important',
   'roles',
   'in',
   'the',
   'regulation',
   'of',
   'signal',
   'transduction',
   'and',
   'cell',
   'growth',
   '.'],
  'foundBy': 'rule2',
  'match': [{'start': 2, 'end': 3, 'namedCaptures': []}]},
 {'sentenceId': 584575,
  'label': None,
  'documentId': '12196103',
  'sentenceIndex': 2,
  'words': ['14',
   '-',
   '3',
   '-',
   '3',
   'proteins',
   'bind',
   'phosphoserine-phosphorylated',
   'ligands',
   ',',
   'such',
   'as',
   'the',
   'Raf-1',
   'kinase',
   'and',
   'Bad',
   ',',
   'through',
   'recognition',
   'of',
   'the',
   'phosphorylated',
   'consensus',
   'motif',
   ',',
   'RSXpSXP',
   '(',
   'where',
   'pS',
   'is',
   'phosphoserine',
   ')',
   '.'],
  'foundBy': 'rule2',
  'match': [{'st

In [26]:
duration = res.get("total_hits_duration")
query = res.get("query")
count = res.get("total_hits_count")
complete = True
total_hits = [item for key,value in res.get("total_hits").items() for item in value]

In [27]:
total_hits[0]

{'sentenceId': 509617,
 'label': None,
 'documentId': '12525665',
 'sentenceIndex': 3,
 'words': ['Protein',
  'phosphatase',
  '2A',
  '(',
  'PP2A',
  ')',
  'is',
  'a',
  'phosphoprotein',
  'that',
  'plays',
  'important',
  'roles',
  'in',
  'the',
  'regulation',
  'of',
  'signal',
  'transduction',
  'and',
  'cell',
  'growth',
  '.'],
 'foundBy': 'rule2',
 'match': [{'start': 2, 'end': 3, 'namedCaptures': []}]}

In [24]:
res.get("error_messages")

'Expecting value: line 1 column 1 (char 0)'

In [278]:
grammar.urlList[1]

'http://217.76.56.90:8000/'

In [279]:
res1 = grammar.query(url=grammar.urlList[1],
                     search_query=QUERY,
                     maxDocs=10,
                     triggerOverlap=False)

http://217.76.56.90:8000/api/execute/grammar
{"metadataQuery":null,"duration":0.1599999964237213,"allowTriggerOverlaps":false,"mentions":[{"sentenceId":50302,"label":null,"documentId":"12392717","sentenceIndex":2,"words":["Although","CaM-kinase","II","weakly","phosphorylated","CaM","under","the","same","conditions",",","CaM-kinase","I",",","CaM-kinase","kinase","alpha",",","and","cAMP-dependent","protein","kinase","did","not","phosphorylate","CaM.","Polycations","such","as","poly(Lys",")","were","required","for","the","phosphorylation","."],"foundBy":"rule2","match":[{"start":1,"end":2,"namedCaptures":[]}]},{"sentenceId":50302,"label":null,"documentId":"12392717","sentenceIndex":2,"words":["Although","CaM-kinase","II","weakly","phosphorylated","CaM","under","the","same","conditions",",","CaM-kinase","I",",","CaM-kinase","kinase","alpha",",","and","cAMP-dependent","protein","kinase","did","not","phosphorylate","CaM.","Polycations","such","as","poly(Lys",")","were","required","for","the"

In [261]:
res

{'total_hits': {},
 'total_hits_count': 0,
 'total_hits_duration': 0,
 'error_messages': 'Expecting value: line 2 column 1 (char 1)'}

In [185]:
encoded_search_query = {"grammar":QUERY,
                        "maxDocs":10,
                        "allowTriggerOverlaps":False}

In [273]:
import requests

url = "http://217.76.56.90:8000/api/execute/grammar"
headers = {
    "Accept": "application/json",
    "Content-Type": "application/json"
}


response = requests.post(url,
                        headers=headers,
                        json=encoded_search_query)

print(response.text)

print(response.json())


{"metadataQuery":null,"duration":0.23899999260902405,"allowTriggerOverlaps":false,"mentions":[{"sentenceId":50302,"label":null,"documentId":"12392717","sentenceIndex":2,"words":["Although","CaM-kinase","II","weakly","phosphorylated","CaM","under","the","same","conditions",",","CaM-kinase","I",",","CaM-kinase","kinase","alpha",",","and","cAMP-dependent","protein","kinase","did","not","phosphorylate","CaM.","Polycations","such","as","poly(Lys",")","were","required","for","the","phosphorylation","."],"foundBy":"rule2","match":[{"start":1,"end":2,"namedCaptures":[]}]},{"sentenceId":50302,"label":null,"documentId":"12392717","sentenceIndex":2,"words":["Although","CaM-kinase","II","weakly","phosphorylated","CaM","under","the","same","conditions",",","CaM-kinase","I",",","CaM-kinase","kinase","alpha",",","and","cAMP-dependent","protein","kinase","did","not","phosphorylate","CaM.","Polycations","such","as","poly(Lys",")","were","required","for","the","phosphorylation","."],"foundBy":"rule2","m

In [78]:
print(""" {
  "grammar": "# an example grammar\nrules:\n - name: \"example\"\n   label: GrammaticalSubject\n   type: event\n   pattern: |\n     trigger = [lemma=have]\n     subject  = >nsubj []\n",
  "maxDocs": 10,
  "allowTriggerOverlaps": false
}""")

 {
  "grammar": "# an example grammar
rules:
 - name: "example"
   label: GrammaticalSubject
   type: event
   pattern: |
     trigger = [lemma=have]
     subject  = >nsubj []
",
  "maxDocs": 10,
  "allowTriggerOverlaps": false
}
