Let's loop through the sequences of the SwissProt dataset and scrape their GO term annotations from the UniProt API.

In [None]:
import requests, json

from os import path

from Bio import SeqIO

from time import sleep

fasta_path = "./dataset/uniprot_sprot.fasta"

start_offset = 243543

mf_dataset_path = "./dataset/mf.jsonl"
bp_dataset_path = "./dataset/bp.jsonl"
cc_dataset_path = "./dataset/cc.jsonl"
all_dataset_path = "./dataset/all.jsonl"

params = {
    "fields": [
        " go_p",
        " go_c",
        " go_f",
        " go_id",
    ],
}

headers = {
    "accept": "application/json"
}

base_url = "https://rest.uniprot.org/uniprotkb"

with open(mf_dataset_path, "a") as mf_dataset_file, \
    open(bp_dataset_path, "a") as bp_dataset_file, \
    open(cc_dataset_path, "a") as cc_dataset_file, \
    open(all_dataset_path, "a") as all_dataset_file, \
    open(fasta_path, "r") as fasta_file:

    for index, record in enumerate(SeqIO.parse(fasta_file, "fasta"), start=1):
        if index < start_offset:
            continue

        sequence_id = record.id.split("|")[1]
        taxon_id = record.description.split("OX=", 1)[1].split(" ")[0]
        sequence = str(record.seq)

        url = path.join(base_url, sequence_id)

        response = requests.get(url, headers=headers, params=params)

        while response.status_code == 503:
            retry_after = int(response.headers.get("Retry-After", 5))

            print(f"Rate limit exceeded. Retrying in {retry_after} seconds ...")

            sleep(retry_after)

            response = requests.get(url, headers=headers, params=params)

        
        if response.status_code != 200:
            print(f"Error fetching data for {sequence_id}: {response.status_code}")

            continue

        data = response.json()

        bp_terms = []
        cc_terms = []
        mf_terms = []
        all_terms = []

        if "uniProtKBCrossReferences" not in data:
            continue

        for go_object in data["uniProtKBCrossReferences"]:
            if "database" not in go_object:
                continue

            if go_object["database"] == "GO":
                id = go_object["id"]

                aspect = "UNK"
                evidence_code = "UNK"

                for property in go_object["properties"]:
                    if property["key"] == "GoTerm":
                        value = property["value"]

                        aspect, _ = value.split(":", 1)

                    if property["key"] == "GoEvidenceType":
                        evidence_code = property["value"].split(":", 1)[0]

                go_term = {
                    "id": id,
                    "evidence_code": evidence_code,
                }

                match(aspect.upper()):
                    case "P":
                        bp_terms.append(go_term)
                    case "C":
                        cc_terms.append(go_term)
                    case "F":
                        mf_terms.append(go_term)

                all_terms.append(go_term)

        if len(mf_terms) > 0:
            mf_dataset_file.write(json.dumps({
                "id": sequence_id,
                "sequence": sequence,
                "terms": list(mf_terms),
                "taxon_id": str(taxon_id),
            }) + "\n")

        if len(bp_terms) > 0:
            bp_dataset_file.write(json.dumps({
                "id": sequence_id,
                "sequence": sequence,
                "terms": list(bp_terms),
                "taxon_id": str(taxon_id),
            }) + "\n")

        if len(cc_terms) > 0:
            cc_dataset_file.write(json.dumps({
                "id": sequence_id,
                "sequence": sequence,
                "terms": list(cc_terms),
                "taxon_id": str(taxon_id),
            }) + "\n")

        if len(all_terms) > 0:
            all_dataset_file.write(json.dumps({
                "id": sequence_id,
                "sequence": sequence,
                "terms": list(all_terms),
                "taxon_id": str(taxon_id),
            }) + "\n")

        print(
            f"Record: #{index:,}, ID: {sequence_id}, "
            f"Length: {len(sequence)}, GO Terms: {len(all_terms):,}"
        )

print("Done!")

        

Record: #243,543, ID: Q4QR85, Length: 342, Num Terms: 19
Record: #243,544, ID: Q6NUD0, Length: 333, Num Terms: 4
Record: #243,545, ID: D4AQG5, Length: 633, Num Terms: 4
Record: #243,546, ID: Q6WIH6, Length: 633, Num Terms: 4
Record: #243,547, ID: E4UVK2, Length: 642, Num Terms: 4
Record: #243,548, ID: C5FX29, Length: 632, Num Terms: 4
Record: #243,549, ID: Q6WIH7, Length: 632, Num Terms: 4
Record: #243,550, ID: C5NZY5, Length: 356, Num Terms: 4
Record: #243,551, ID: Q8NIH1, Length: 633, Num Terms: 4
Record: #243,552, ID: A5YCB9, Length: 608, Num Terms: 4
Record: #243,553, ID: D4DIT1, Length: 633, Num Terms: 4
Record: #243,554, ID: D4ALW9, Length: 271, Num Terms: 4
Record: #243,555, ID: E4V4I7, Length: 271, Num Terms: 4
Record: #243,556, ID: C5FQJ4, Length: 270, Num Terms: 4
Record: #243,557, ID: C5PE18, Length: 355, Num Terms: 4
