-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
7b3e2ab
commit 6c02841
Showing
4 changed files
with
312 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
const axios = require("axios"); | ||
const qb = require("./query_builder"); | ||
const queue = require("./query_queue"); | ||
const tf = require("@biothings-explorer/api-response-transform"); | ||
const resolver = require("biomedical_id_resolver"); | ||
const { transform } = require("lodash"); | ||
const debug = require("debug")("call-apis:query"); | ||
|
||
/** | ||
* Make API Queries based on input BTE Edges, collect and align the results into BioLink Model | ||
*/ | ||
module.exports = class APIQueryDispathcer { | ||
/** | ||
* Construct inputs for APIQueryDispatcher | ||
* @param {array} edges - an array of BTE edges with input added | ||
*/ | ||
constructor(edges) { | ||
this.edges = edges; | ||
} | ||
|
||
async _queryBucket(queries) { | ||
const res = await Promise.allSettled(queries.map(query => { | ||
debug(`Making the following query ${JSON.stringify(query.config)}`) | ||
return axios(query.config) | ||
.then(res => ({ | ||
response: res.data, | ||
edge: query.edge | ||
})) | ||
.then(res => { | ||
if (query.needPagination(res.response)) { | ||
debug("This query needs to be paginated") | ||
query.getNext(); | ||
} | ||
debug(`Succesfully made the following query: ${JSON.stringify(query.config)}`) | ||
let tf_obj = new tf(res); | ||
let transformed = tf_obj.transform(); | ||
|
||
debug(`After transformation, BTE is able to retrieve ${transformed.length} hits!`) | ||
return transformed | ||
}) | ||
.catch(error => { | ||
debug(`Failed to make to following query: ${JSON.stringify(query.config)}`) | ||
return undefined; | ||
}); | ||
})) | ||
this.queue.dequeue() | ||
return res; | ||
} | ||
|
||
_checkIfNext(queries) { | ||
queries.map(query => { | ||
if (query.hasNext === true) { | ||
this.queue.addQuery(query) | ||
} | ||
}) | ||
} | ||
|
||
_constructQueries(edges) { | ||
return edges.map(edge => new qb(edge)); | ||
|
||
} | ||
|
||
_constructQueue(queries) { | ||
this.queue = new queue(queries); | ||
this.queue.constructQueue(queries); | ||
} | ||
|
||
async query(resolveOutputIDs = true) { | ||
debug(`Resolving ID feature is turned ${(resolveOutputIDs) ? 'on' : 'off'}`) | ||
debug(`Number of BTE Edges received is ${this.edges.length}`); | ||
this.queryResult = []; | ||
const queries = this._constructQueries(this.edges); | ||
this._constructQueue(queries); | ||
while (this.queue.queue.length > 0) { | ||
const bucket = this.queue.queue[0].getBucket(); | ||
let res = await this._queryBucket(bucket); | ||
this.queryResult = [...this.queryResult, ...res]; | ||
this._checkIfNext(bucket); | ||
} | ||
this.merge(); | ||
await this.annotate(resolveOutputIDs); | ||
debug("Query completes"); | ||
} | ||
|
||
// async query() { | ||
// this.queryResult = await Promise.allSettled(this.edges.map(edge => { | ||
// if (edge === undefined) { | ||
// return undefined | ||
// } | ||
// let qbo = new qb(edge); | ||
// return axios(qbo.config) | ||
// .then(res => ({ | ||
// response: res.data, | ||
// edge: edge | ||
// })) | ||
// .then(res => { | ||
// if (qbo.hasNext(res.response)) { | ||
// qbo.getNext(); | ||
// } | ||
// let tf_obj = new tf(res); | ||
// let transformed = tf_obj.transform(); | ||
// return transformed | ||
// }) | ||
// .catch(error => { | ||
// return undefined; | ||
// }); | ||
// })); | ||
// this.merge(); | ||
// await this.annotate(); | ||
// } | ||
|
||
/** | ||
* Merge the results into a single array from Promise.allSettled | ||
*/ | ||
merge() { | ||
this.result = []; | ||
this.queryResult.map(res => { | ||
if (!(res.value === undefined)) { | ||
this.result = [...this.result, ...res.value]; | ||
} | ||
}); | ||
debug(`Total number of results returned for this query is ${this.result.length}`) | ||
} | ||
|
||
/** | ||
* Add equivalent ids to all output using biomedical-id-resolver service | ||
*/ | ||
async annotate(enable = true) { | ||
let res = {}; | ||
if (enable === true) { | ||
let output_ids = {}; | ||
this.result.map(item => { | ||
let output_type = item["$association"]["output_type"]; | ||
if (!(output_type in output_ids)) { | ||
output_ids[output_type] = []; | ||
} | ||
output_ids[output_type].push(item["$output"]); | ||
}); | ||
res = await resolver(output_ids); | ||
} | ||
this.result.map(item => { | ||
if (item.$output in res) { | ||
item.$output_id_mapping = { | ||
resolved: res[item.$output], | ||
original: item.$output | ||
}; | ||
item.label = res[item.$output].id.label; | ||
item.id = item.$output = res[item.$output].id.identifier; | ||
} else { | ||
item.label = item.id = item.$output; | ||
} | ||
}); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
module.exports = class QueryBucket { | ||
constructor() { | ||
this.cnt = {}; | ||
this.bucket = []; | ||
this.MAX_CONCURRENT_API_QUERIES = 3 | ||
} | ||
|
||
canBeAdded(url) { | ||
if (!(url in this.cnt) || this.cnt[url] < this.MAX_CONCURRENT_API_QUERIES) { | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
add(query) { | ||
if (!(query.getUrl() in this.cnt)) { | ||
this.cnt[query.getUrl()] = 0; | ||
} | ||
this.cnt[query.getUrl()] += 1; | ||
this.bucket.push(query); | ||
} | ||
|
||
getBucket() { | ||
return this.bucket; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
const _ = require("lodash"); | ||
|
||
/** | ||
* Build API queries serving as input for Axios library based on BTE Edge info | ||
*/ | ||
module.exports = class QueryBuilder { | ||
/** | ||
* Constructor for Query Builder | ||
* @param {object} edge - BTE Edge object with input field provided | ||
*/ | ||
constructor(edge) { | ||
this.start = 0 | ||
this.hasNext = false | ||
this.POST_HEADER = { "content-type": "application/x-www-form-urlencoded" }; | ||
this.edge = edge; | ||
this.server = edge.query_operation.server; | ||
if (edge.query_operation.server.endsWith('/')) { | ||
this.server = this.server.substring(0, this.server.length - 1) | ||
}; | ||
this.url = this.server + edge.query_operation.path; | ||
this.method = edge.query_operation.method; | ||
this.supportBatch = edge.query_operation.supportBatch; | ||
this.input = edge.input; | ||
this.inputSeparator = edge.query_operation.inputSeparator; | ||
this.params = _.cloneDeep(edge.query_operation.params); | ||
this.constructInput(); | ||
this.constructRequestBody(); | ||
this.constructParams(); | ||
this.constructAxiosRequestConfig(); | ||
} | ||
|
||
getUrl() { | ||
return this.url | ||
} | ||
|
||
/** | ||
* Construct input based on method and inputSeparator | ||
*/ | ||
constructInput() { | ||
if (this.supportBatch === true) { | ||
this.input = this.input.join(this.inputSeparator); | ||
} | ||
} | ||
|
||
/** | ||
* Construct parameters for API calls | ||
*/ | ||
constructParams() { | ||
if (this.edge.query_operation.path_params) { | ||
this.edge.query_operation.path_params.map(param => { | ||
let val = this.params[param]; | ||
this.url = this.url.replace("{" + param + "}", val).replace("{inputs[0]}", this.input); | ||
delete this.params[param]; | ||
}); | ||
} | ||
Object.keys(this.params).map(param => { | ||
if (typeof this.params[param] === 'string') { | ||
this.params[param] = this.params[param].replace("{inputs[0]}", this.input); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* Construct request body for API calls | ||
*/ | ||
constructRequestBody() { | ||
if (this.edge.query_operation.request_body !== undefined && "body" in this.edge.query_operation.request_body) { | ||
let body = this.edge.query_operation.request_body.body; | ||
this.data = Object.keys(body).reduce((accumulator, key) => accumulator + key + '=' + body[key].replace('{inputs[0]}', this.input) + '&', ''); | ||
this.data = this.data.substring(0, this.data.length - 1) | ||
} | ||
} | ||
|
||
/** | ||
* Construct the request config for Axios reqeust. | ||
*/ | ||
constructAxiosRequestConfig() { | ||
this.config = { | ||
url: this.url, | ||
params: this.params, | ||
data: this.data, | ||
method: this.method, | ||
timeout: 50000 | ||
} | ||
} | ||
|
||
needPagination(apiResponse) { | ||
if (this.method === "get" && this.edge.tags.includes("biothings")) { | ||
if (apiResponse.total > this.start + apiResponse.hits.length) { | ||
this.hasNext = true; | ||
return true | ||
} | ||
} | ||
this.hasNext = false; | ||
return false | ||
} | ||
|
||
getNext() { | ||
this.start += 1000; | ||
this.params['from'] = this.start; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
const QueryBucket = require("./query_bucket"); | ||
|
||
module.exports = class APIQueryQueue { | ||
constructor(queries) { | ||
this.queue = []; | ||
this.queries = queries; | ||
} | ||
|
||
dequeue() { | ||
this.queue.shift(); | ||
} | ||
|
||
addQuery(query) { | ||
for (let bucket of this.queue) { | ||
if (bucket.canBeAdded(query.getUrl())) { | ||
bucket.add(query); | ||
return; | ||
} | ||
} | ||
const newBucket = new QueryBucket(); | ||
newBucket.add(query); | ||
this.queue.push(newBucket); | ||
} | ||
|
||
constructQueue(queries) { | ||
queries.map(query => this.addQuery(query)); | ||
} | ||
} |