Skip to content

Commit

Permalink
updating UniRef90 IDs retrieval using latest APIs from UniProt
Browse files Browse the repository at this point in the history
  • Loading branch information
fasnicar committed Sep 12, 2022
1 parent 83111cb commit c64a75f
Showing 1 changed file with 172 additions and 23 deletions.
195 changes: 172 additions & 23 deletions phylophlan/phylophlan_setup_database.py
Expand Up @@ -6,8 +6,8 @@
'Claudia Mengoni (claudia.mengoni@studenti.unitn.it), '
'Mattia Bolzan (mattia.bolzan@unitn.it), '
'Nicola Segata (nicola.segata@unitn.it)')
__version__ = '3.0.23'
__date__ = '27 November 2020'
__version__ = '3.0.24'
__date__ = '10 September 2022'


import sys
Expand All @@ -17,10 +17,19 @@
from Bio.SeqRecord import SeqRecord
import os
import argparse as ap
import re
import json
import zlib
from xml.etree import ElementTree
from urllib.request import urlretrieve
from urllib.parse import urlencode
from urllib.parse import urlparse
from urllib.parse import parse_qs
from urllib.request import Request
from urllib.request import urlopen
import requests
from requests.adapters import HTTPAdapter
from requests.adapters import Retry
import time


Expand Down Expand Up @@ -176,6 +185,144 @@ def check_params(args, verbose=False):
info('Arguments: {}\n'.format(vars(args)))


def check_response(response):
try:
response.raise_for_status()
except requests.HTTPError:
error(response.json(), init_new_line=True, exit=True)


def submit_id_mapping(api_URL, IDs_list, from_db, to_db):
# valid from_db to_db pairs at https://rest.uniprot.org/configure/idmapping/fields
request = requests.post(f"{api_URL}/idmapping/run", data={"from": from_db, "to": to_db, "ids": ",".join(IDs_list)})
check_response(request)
return request.json()["jobId"]


def get_next_link(headers):
re_next_link = re.compile(r'<(.+)>; rel="next"')

if "Link" in headers:
match = re_next_link.match(headers["Link"])

if match:
return match.group(1)


def check_id_mapping_results_ready(session, api_URL, job_ID, polling_interval, verbose=False):
while True:
request = session.get(f"{api_URL}/idmapping/status/{job_ID}")
check_response(request)
j = request.json()

if "jobStatus" in j:
if j["jobStatus"] == "RUNNING":
if verbose:
info(f"Check Retrying in {polling_interval}s\n")

time.sleep(polling_interval)
else:
error("check_id_mapping_results_ready(): " + request["jobStatus"], init_new_line=True, exit=True)
else:
return bool(j["results"] or j["failedIds"])


def get_batch(session, batch_response, file_format, compressed):
batch_url = get_next_link(batch_response.headers)

while batch_url:
batch_response = session.get(batch_url)
batch_response.raise_for_status()
yield decode_results(batch_response, file_format, compressed)
batch_url = get_next_link(batch_response.headers)


def combine_batches(all_results, batch_results, file_format):
if file_format == "json":
for key in ("results", "failedIds"):
if key in batch_results and batch_results[key]:
all_results[key] += batch_results[key]
elif file_format == "tsv":
return all_results + batch_results[1:]
else:
return all_results + batch_results
return all_results


def get_id_mapping_results_link(session, api_URL, job_ID):
url = f"{api_URL}/idmapping/details/{job_ID}"
request = session.get(url)
check_response(request)
return request.json()["redirectURL"]


def decode_results(response, file_format, compressed):
if compressed:
decompressed = zlib.decompress(response.content, 16 + zlib.MAX_WBITS)

if file_format == "json":
j = json.loads(decompressed.decode("utf-8"))
return j
elif file_format == "tsv":
return [line for line in decompressed.decode("utf-8").split("\n") if line]
elif file_format == "xlsx":
return [decompressed]
elif file_format == "xml":
return [decompressed.decode("utf-8")]
else:
return decompressed.decode("utf-8")
elif file_format == "json":
return response.json()
elif file_format == "tsv":
return [line for line in response.text.split("\n") if line]
elif file_format == "xlsx":
return [response.content]
elif file_format == "xml":
return [response.text]

return response.text


def get_xml_namespace(element):
m = re.match(r"\{(.*)\}", element.tag)
return m.groups()[0] if m else ""


def merge_xml_results(xml_results):
merged_root = ElementTree.fromstring(xml_results[0])

for result in xml_results[1:]:
root = ElementTree.fromstring(result)

for child in root.findall("{http://uniprot.org/uniprot}entry"):
merged_root.insert(-1, child)

ElementTree.register_namespace("", get_xml_namespace(merged_root[0]))
return ElementTree.tostring(merged_root, encoding="utf-8", xml_declaration=True)


def get_id_mapping_results_search(session, url):
parsed = urlparse(url)
query = parse_qs(parsed.query)
file_format = query["format"][0] if "format" in query else "json"
size = int(query["size"][0]) if "size" in query else 500
query["size"] = size
compressed = query["compressed"][0].lower() == "true" if "compressed" in query else False
parsed = parsed._replace(query=urlencode(query, doseq=True))
url = parsed.geturl()
request = session.get(url)
check_response(request)
results = decode_results(request, file_format, compressed)

for batch in get_batch(session, request, file_format, compressed):
results = combine_batches(results, batch, file_format)

if file_format == "xml":
return merge_xml_results(results)

return results


def database_update(update=False, verbose=False):
taxa2core_file_latest = None
taxa2core_file = os.path.basename(DOWNLOAD_URL).replace('?dl=1', '')
Expand Down Expand Up @@ -312,35 +459,19 @@ def get_core_proteins(taxa2core_file, taxa_label, output, output_extension, verb

# try to re-map the ids in case "not mapped" store in not_mapped
if retry2download:
if verbose:
info("Re-trying to download {} core proteins that just failed, please wait as it might take some time\n"
.format(len(retry2download)))
idmapping_url = 'https://www.uniprot.org/uploadlists/'
contact = "phylophlan@cibiocm.com"
params = {'from': 'ACC+ID',
'to': 'NF90',
'format': 'tab', # or 'list' for only converted clusters
'query': ' '.join(retry2download)}
data = urlencode(params).encode('utf-8')
request = Request(idmapping_url, data, headers={'User-Agent': 'Python {}'.format(contact)})
resolved_uniref90s = resolve_IDs(retry2download, verbose=verbose)

try:
response = urlopen(request, data)
uniprotkb2uniref90 = [line.decode().strip().split('\t')[:2] for line in response.readlines()][1:] # skip ['From', 'To']
except Exception:
error('unable convert UniProtKB ID to UniRef90 ID')

for uniref90_id in (x[1].split('_')[-1] for x in uniprotkb2uniref90[1:]):
for uniref90_id in (x.replace('UniRef90_', '') for x in resolved_uniref90s.values()):
local_prot = os.path.join(output, uniref90_id + output_extension)
download(url.format(uniref90_id), local_prot, verbose=verbose)

if not os.path.exists(local_prot):
not_mapped.append(uniref90_id)

if len(uniprotkb2uniref90) != len(retry2download):
if len(resolved_uniref90s) != len(retry2download):
# probably deleted proteins in the Uniprot versions, try to download their latest version in any case
for ur90 in set([ur90 for ur90, _ in uniprotkb2uniref90]) - set(retry2download):
local_prot = os.path.join(output, ur90 + output_extension)
for ur90 in set(resolved_uniref90s.keys()) - set(retry2download):
local_prot = os.path.join(output, resolved_uniref90s[ur90] + output_extension)
download('https://www.uniprot.org/uniprot/{}.fasta?version=*'.format(ur90), local_prot, verbose=verbose)

if not os.path.exists(local_prot):
Expand All @@ -358,6 +489,24 @@ def get_core_proteins(taxa2core_file, taxa_label, output, output_extension, verb
f.write('\n'.join(not_mapped_again) + '\n')


def resolve_IDs(IDs_list, verbose=False):
if verbose:
info("Resolving {} UniRef90 ID, please wait this might take some time\n".format(len(IDs_list)))

polling_interval = 5 # in seconds
api_URL = "https://rest.uniprot.org"
retries = Retry(total=5, backoff_factor=0.25, status_forcelist=[500, 502, 503, 504])
session = requests.Session()
session.mount("https://", HTTPAdapter(max_retries=retries))
job_ID = submit_id_mapping(api_URL, IDs_list, from_db="UniProtKB_AC-ID", to_db="UniRef90")

if check_id_mapping_results_ready(session, api_URL, job_ID, polling_interval, verbose=verbose):
link = get_id_mapping_results_link(session, api_URL, job_ID)
results = get_id_mapping_results_search(session, link)

return dict([(i['from'], i['to']['id']) for i in results['results']])


def create_database(db_name, inputt, input_ext, output, overwrite, verbose=False):
seqs = []

Expand Down

0 comments on commit c64a75f

Please sign in to comment.