-
Notifications
You must be signed in to change notification settings - Fork 2
/
server.py
executable file
·165 lines (149 loc) · 5.22 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""Biomedical entity name resolution service.
1) split the input into fragments at spaces
* The order does not matter
2) search for names including all fragments, case insensitive
3) sort by length, ascending
* The curie with the shortest match is first, etc.
* Matching names are returned first, followed by non-matching names
"""
from collections import defaultdict
import logging
import os
import re
from typing import Dict, List
from fastapi import Body, FastAPI
import httpx
from pydantic import BaseModel, conint
from starlette.middleware.cors import CORSMiddleware
from .apidocs import get_app_info, construct_open_api_schema
LOGGER = logging.getLogger(__name__)
SOLR_HOST = os.getenv("SOLR_HOST", "localhost")
SOLR_PORT = os.getenv("SOLR_PORT", "8983")
app = FastAPI(**get_app_info())
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
not_alpha = re.compile(r"[\W_]+")
class Request(BaseModel):
"""Reverse-lookup request body."""
curies: List[str]
@app.post(
"/reverse_lookup",
response_model=Dict[str, List[str]],
tags=["lookup"],
)
async def lookup_names(
request: Request = Body(..., example={
"curies": ["MONDO:0005737", "MONDO:0009757"],
}),
) -> Dict[str, List[str]]:
"""Look up curies from name or fragment."""
query = f"http://{SOLR_HOST}:{SOLR_PORT}/solr/name_lookup/select"
curie_filter = " OR ".join(
f"curie:\"{curie}\""
for curie in request.curies
)
params = {
"query": curie_filter,
"limit": 1000000,
}
async with httpx.AsyncClient(timeout=None) as client:
response = await client.post(query, json=params)
response.raise_for_status()
response_json = response.json()
output = {
curie: []
for curie in request.curies
}
for doc in response_json["response"]["docs"]:
output[doc["curie"]].append(doc["name"])
return output
@app.post("/lookup", response_model=Dict[str, List[str]], tags=["lookup"])
async def lookup_curies(
string: str,
offset: int = 0,
limit: conint(le=1000) = 10,
) -> Dict[str, List[str]]:
"""Look up curies from name or fragment."""
#This original code tokenizes on spaces, and then removes all other punctuation.
# so x-linked becomes xlinked and beta-secretasse becomes betasecretase.
# This turns out to be rarely what is wanted, especially because the tokenizer
# isn't tokenizing this way. I think that this may have come about due to chemical searching
# but there is no documentation explaining the decision. In the event that chemical or other punctuation
# heavy searches start to fail, this may need to be revisited.
#fragments = string.split(" ")
#name_filters = " AND ".join(
# f"name:{not_alpha.sub('', fragment)}*"
# for fragment in fragments
#)
fragments = re.split(not_alpha,string)
name_filters = " AND ".join(
f"name:{fragment}*"
for fragment in fragments if len(fragment) > 0
)
query = f"http://{SOLR_HOST}:{SOLR_PORT}/solr/name_lookup/select"
params = {
"query": name_filters,
"limit": 0,
"sort": "length ASC",
"facet": {
"categories": {
"type": "terms",
"field": "curie",
"sort": "x asc",
"offset": offset,
"limit": limit,
"facet": {
"x": "min(length)",
},
"numBuckets": True,
}
}
}
async with httpx.AsyncClient(timeout=None) as client:
response = await client.post(query, json=params)
if response.status_code >= 300:
LOGGER.error("Solr REST error: %s", response.text)
response.raise_for_status()
response = response.json()
if not response["response"]["numFound"]:
return dict()
buckets = response["facets"]["categories"]["buckets"]
curie_filter = " OR ".join(
f"curie:\"{bucket['val']}\""
for bucket in buckets
)
params = {
"query": f"({curie_filter}) AND ({name_filters})",
"limit": 1000000,
"sort": "length ASC",
"fields": "curie,name",
}
async with httpx.AsyncClient(timeout=None) as client:
response = await client.post(query, json=params)
if response.status_code >= 300:
LOGGER.error("Solr REST error: %s", response.text)
response.raise_for_status()
output = defaultdict(list)
for doc in response.json()["response"]["docs"]:
output[doc["curie"]].append(doc["name"])
params = {
"query": f"({curie_filter}) AND NOT ({name_filters})",
"limit": 1000000,
"sort": "length ASC",
"fields": "curie,name",
}
async with httpx.AsyncClient(timeout=None) as client:
response = await client.post(query, json=params)
if response.status_code >= 300:
LOGGER.error("Solr REST error: %s", response.text)
response.raise_for_status()
for doc in response.json()["response"]["docs"]:
output[doc["curie"]].append(doc["name"])
return output
# Override open api schema with custom schema
app.openapi_schema = construct_open_api_schema(app)