In [2]:
import sys
import os

# Add the virtual environment to the Python path
venv_path = os.path.join(os.getcwd(), 'venv')
site_packages = os.path.join(venv_path, 'Lib', 'site-packages')

if site_packages not in sys.path:
    sys.path.insert(0, site_packages)

In [149]:
import requests, sys, json

def do_uniprot_search(prot_id):
    
    params = {
    "query": prot_id,
    "fields": [
        "id",
        # "gene_names",
        "annotation_score",
        "cc_function",
        "gene_names",
        "protein_existence",
        "organism_name",
        "sequence"
    ],
    "sort": "annotation_score desc",
    "size": "1"
    }
    headers = {
    "accept": "application/json"
    }
    base_url = "https://rest.uniprot.org/uniprotkb/search"

    response = requests.get(base_url, headers=headers, params=params)
    if not response.ok:
        response.raise_for_status()
        sys.exit()

    data = response.json()

    initial_name = data['results'][0].get('primaryAccession', 'N/A')
    initial_name = initial_name + " - " + prot_id
    
    gene = data['results'][0].get('genes', [{}])[0].get('geneName', {}).get('value', 'N/A')
    annotation_score = data['results'][0].get('annotationScore', 'N/A')
    organism = data['results'][0].get('organism', {}).get('scientificName', 'N/A')
    amino_acids = len(data['results'][0].get('sequence', {}).get('value', 'N/A'))
    protein_existence = data['results'][0].get('proteinExistence', 'N/A')
    annotation_score = data['results'][0].get('annotationScore', 'N/A')

    description = data['results'][0].get('comments', [])[0].get('texts', [{}])[0].get('value', 'N/A')

    result = {
        "initial_name": initial_name,
        "gene": gene,
        "organism": organism,
        "amino_acids": amino_acids,
        "protein_existence": protein_existence,
        "annotation_score": annotation_score,
        "description": description,
        # "data": data
    }

    return result

do_uniprot_search("ABA4_ARATH")

{'initial_name': 'Q8LFP9 - ABA4_ARATH',
 'gene': 'ABA4',
 'organism': 'Arabidopsis thaliana',
 'amino_acids': 220,
 'protein_existence': '2: Evidence at transcript level',
 'annotation_score': 4.0,
 'description': 'Required for neoxanthin biosynthesis, an intermediary step in abscisic acid (ABA) biosynthesis. Probably not involved directly in the enzymatic conversion of violaxanthin to neoxanthin. Cannot convert violaxanthin to neoxanthin in vitro. Required for ABA biosynthesis in response to drought stress (PubMed:17470058). Required for neoxanthin biosynthesis which is involved in photoprotection of photosystem II (PSII). Neoxanthin acts as an antioxidant within the photosystem PSII supercomplex (PubMed:17351115)'}

In [140]:
TEST_ANNOTATIONS_FILE_PATH = "gomix/src/data/processed/task_datasets/2016/annotations/test.json"
with open(TEST_ANNOTATIONS_FILE_PATH, 'r') as f:
        test_annotations = json.load(f)  # dict: prot ID -> list of GO terms
test_prot_ids = [prot_id for prot_id in test_annotations]

In [153]:
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

# Perform search for all test protein IDs and save results
results = {}

def search_with_retry(prot_id, max_retries=3):
    """Search for a protein with retry logic"""
    for attempt in range(max_retries):
        try:
            result = do_uniprot_search(prot_id)
            return prot_id, result, None
        except Exception as e:
            if attempt == max_retries - 1:
                return prot_id, None, str(e)
            time.sleep(0.5)  # Brief pause before retry
    
# Use ThreadPoolExecutor for parallel requests
max_workers = 10  # Adjust based on API rate limits
with ThreadPoolExecutor(max_workers=max_workers) as executor:
    # Submit all tasks
    future_to_prot = {executor.submit(search_with_retry, prot_id): prot_id 
                      for prot_id in test_prot_ids}
    
    # Process completed tasks with progress bar
    for future in tqdm(as_completed(future_to_prot), total=len(test_prot_ids), 
                       desc="Fetching protein data"):
        prot_id, result, error = future.result()
        
        if error:
            results[prot_id] = {
                "initial_name": f"ERROR - {prot_id}",
                "gene": "N/A",
                "organism": "N/A",
                "amino_acids": "N/A",
                "protein_existence": "N/A",
                "annotation_score": "N/A",
                "description": f"Error: {error}"
            }
        else:
            results[prot_id] = result

# Convert to list maintaining original order
results_list = [results[prot_id] for prot_id in test_prot_ids]

# Save results to file
with open("test_uniprotid_info.txt", 'w', encoding='utf-8') as f:
    json.dump(results_list, f, indent=2, ensure_ascii=False)

print(f"\n✓ Completed! Processed {len(test_prot_ids)} protein IDs.")
print(f"✓ Results saved to: test_uniprotid_info.txt")

Fetching protein data: 100%|██████████| 1785/1785 [11:06<00:00,  2.68it/s] 


✓ Completed! Processed 1785 protein IDs.
✓ Results saved to: test_uniprotid_info.txt





In [2]:
# Python code to parse and reformat the txt file as requested.
# Save this whole cell into a notebook and run it. Adjust `INPUT_PATH` to point to your input file.
#
# Behavior:
# - Attempts to parse the file as JSON (a list of dicts) first.
# - If that fails, tries Python literal eval (ast.literal_eval).
# - If that fails, it extracts top-level {...} blocks and parses them individually.
# - Removes entries that look like errors (initial_name starting with "ERROR", any field equal to "N/A",
#   or description containing the word "Error").
# - Converts the list of dicts into a mapping keyed by the second part of initial_name (the part after ' - ').
# - Writes the reformatted data to '/mnt/data/test_uniprotid_info_formatted.txt' as pretty JSON.
#
# Outputs:
# - Prints summary counts and the output file path.
# - If the input file is missing, prints instructions and creates no output file.

import json, ast, re, os
from pathlib import Path

INPUT_PATH = "test_uniprotid_info.txt"   # <- change this to your actual input filename if different
OUTPUT_PATH = "test_uniprotid_info_formatted.txt"

def extract_top_level_dicts(s: str):
    """Extract top-level {...} blocks from a string using brace counting.
       Returns a list of strings (each a {...} block)."""
    blocks = []
    brace_level = 0
    current = []
    in_string = False
    escape = False
    for ch in s:
        current.append(ch)
        if ch == '\\' and not escape:
            escape = True
            continue
        if ch in ('"', "'") and not escape:
            in_string = not in_string
        if not in_string:
            if ch == '{':
                brace_level += 1
            elif ch == '}':
                brace_level -= 1
                if brace_level == 0:
                    blocks.append(''.join(current).strip())
                    current = []
        if escape:
            escape = False
    return blocks

def try_parse_text(s: str):
    """Return list of dicts parsed from text. May raise ValueError if nothing parseable found."""
    s_strip = s.strip()
    # Try JSON first
    try:
        data = json.loads(s_strip)
        if isinstance(data, list):
            return data
        # if single dict, wrap
        if isinstance(data, dict):
            return [data]
    except Exception:
        pass
    # Try Python literal eval (list/dict)
    try:
        data = ast.literal_eval(s_strip)
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
    except Exception:
        pass
    # Fallback: extract top-level dict blocks and parse each with ast.literal_eval or json
    blocks = extract_top_level_dicts(s_strip)
    parsed = []
    for b in blocks:
        # attempt json then ast
        try:
            obj = json.loads(b)
            if isinstance(obj, dict):
                parsed.append(obj)
                continue
        except Exception:
            pass
        try:
            obj = ast.literal_eval(b)
            if isinstance(obj, dict):
                parsed.append(obj)
                continue
        except Exception:
            # try some small cleaning: replace single quotes with double quotes for JSON
            b2 = re.sub(r"'", r'"', b)
            try:
                obj = json.loads(b2)
                if isinstance(obj, dict):
                    parsed.append(obj)
                    continue
            except Exception:
                pass
        # if still not parsed, skip
    if not parsed:
        raise ValueError("Could not parse any dicts from text. File format not recognized.")
    return parsed

def is_error_entry(d: dict):
    """Return True if the entry should be removed (contains N/A or is an error)."""
    # If initial_name starts with ERROR (case-insensitive)
    iname = str(d.get("initial_name", "")).strip()
    if iname.upper().startswith("ERROR"):
        return True
    # If any field is exactly "N/A" (case-insensitive) or contains 'Error' in description
    for v in d.values():
        if isinstance(v, str) and v.strip().upper() == "N/A":
            return True
    desc = str(d.get("description", ""))
    if "error" in desc.lower():
        return True
    return False

def make_key_from_initial_name(initial_name: str):
    """Extract the second part of initial_name. Example: 'Q9VKH0 - COG8_DROME' -> 'COG8_DROME'"""
    if not initial_name:
        return None
    # first prefer split on ' - '
    if ' - ' in initial_name:
        parts = initial_name.split(' - ', 1)
        return parts[1].strip()
    # fallback: split on '-' with spaces trimmed
    if '-' in initial_name:
        parts = initial_name.split('-', 1)
        return parts[1].strip()
    # fallback: split on whitespace and return second token
    tokens = initial_name.split()
    if len(tokens) >= 2:
        return tokens[1].strip()
    # if nothing else, return the original
    return initial_name.strip()

def reformat_entries(entries):
    """Return dict keyed by second part of initial_name, filtering out error entries."""
    out = {}
    skipped = 0
    for d in entries:
        if not isinstance(d, dict):
            skipped += 1
            continue
        if is_error_entry(d):
            skipped += 1
            continue
        key = make_key_from_initial_name(d.get("initial_name", ""))
        if not key:
            skipped += 1
            continue
        # ensure unique keys
        base_key = key
        suffix = 1
        while key in out:
            key = f"{base_key}_{suffix}"
            suffix += 1
        out[key] = d
    return out, skipped

def main(input_path=INPUT_PATH, output_path=OUTPUT_PATH):
    p = Path(input_path)
    if not p.exists():
        print(f"Input file not found at: {p.resolve()}")
        print("Place your input file at that path or change INPUT_PATH variable at the top of this cell.")
        return
    text = p.read_text(encoding="utf-8")
    try:
        entries = try_parse_text(text)
    except Exception as e:
        print("Failed to parse input file:", str(e))
        return
    reformatted, skipped = reformat_entries(entries)
    # Write to output file as JSON (pretty)
    with open(output_path, "w", encoding="utf-8") as f:
        json.dump(reformatted, f, indent=2, ensure_ascii=False)
    print(f"Parsed {len(entries)} entries from input.")
    print(f"Kept {len(reformatted)} entries. Skipped {skipped} entries (errors / N/A / unparsable).")
    print(f"Output written to: {output_path}")

if __name__ == "__main__":
    main()

# If run in this notebook environment and the input file existed, the output file will be at OUTPUT_PATH.
# If you want the output file in the current working directory instead of /mnt/data, change OUTPUT_PATH accordingly.


Parsed 1785 entries from input.
Kept 1534 entries. Skipped 251 entries (errors / N/A / unparsable).
Output written to: test_uniprotid_info_formatted.txt


In [6]:
import json
from pathlib import Path

# Input/output file (same name, will be overwritten)
FILE_PATH = "test_uniprotid_info_formatted.txt"

# Keys that must appear first, in this order
priority_keys = [
    "AGRA2_HUMAN",
    "ANGL8_HUMAN",
    "COL12_DANRE",
    "APEX1_DANRE",
    "APBB3_MOUSE",
    "AA3R_MOUSE",
    "ATAD5_MOUSE",
    "ANFB_MOUSE"
]

def reorder_dict(data, priority_keys):
    """Return a new dict with priority_keys first, then all remaining keys in original order."""
    reordered = {}
    # Add priority keys first (only if they exist)
    for k in priority_keys:
        if k in data:
            reordered[k] = data[k]
    # Add the rest preserving their order
    for k, v in data.items():
        if k not in reordered:
            reordered[k] = v
    return reordered

def main():
    path = Path(FILE_PATH)
    if not path.exists():
        print(f"File not found: {path.resolve()}")
        return
    
    # Load dictionary
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    
    if not isinstance(data, dict):
        print("Input file does not contain a dictionary at the top level.")
        return
    
    reordered = reorder_dict(data, priority_keys)
    
    # Write back (overwrite) pretty JSON
    with open(path, "w", encoding="utf-8") as f:
        json.dump(reordered, f, indent=2, ensure_ascii=False)
    
    print(f"Reordered dictionary saved to: {path.resolve()}")
    print(f"Priority keys placed first: {', '.join([k for k in priority_keys if k in reordered])}")

if __name__ == "__main__":
    main()


Reordered dictionary saved to: C:\Users\stefa\Documents\UniBo\PhD\VsCode Projects\gomix_ecai_demo\test_uniprotid_info_formatted.txt
Priority keys placed first: AGRA2_HUMAN, ANGL8_HUMAN, COL12_DANRE, APEX1_DANRE, APBB3_MOUSE, AA3R_MOUSE, ATAD5_MOUSE, ANFB_MOUSE


In [11]:
patho = "gomix/src/demo_utils/test_uniprotid_info_formatted.txt"
with open(patho, "r", encoding="utf-8") as f:
    data = json.load(f)

# Extract list of keys
keys_list = list(data.keys())

# Print the list of keys
print(keys_list)

['AGRA2_HUMAN', 'ANGL8_HUMAN', 'COL12_DANRE', 'APEX1_DANRE', 'APBB3_MOUSE', 'AA3R_MOUSE', 'ATAD5_MOUSE', 'ANFB_MOUSE', '-----------', '3BHS7_MOUSE', '3BP1_MOUSE', '8ODP_RAT', 'AARE_ARATH', 'ABA4_ARATH', 'ABC1_ARATH', 'ABCAH_MOUSE', 'ABHD2_HUMAN', 'ABHGA_HUMAN', 'ABHGA_MOUSE', 'ACPM_DROME', 'ACSF4_MOUSE', 'ADA12_MOUSE', 'ADA2B_MOUSE', 'ADRA_ECOLI', 'AED1_ARATH', 'AEDO_HUMAN', 'AG104_ARATH', 'AGL14_ARATH', 'AGL28_ARATH', 'AGL30_ARATH', 'AGL42_ARATH', 'AGL53_ARATH', 'AGL65_ARATH', 'AGL72_ARATH', 'AGN1_SCHPO', 'AGRA2_DANRE', 'AGRA2_MOUSE', 'AGRG1_HUMAN', 'AGRG1_MOUSE', 'AIG2A_ARATH', 'AIG2B_ARATH', 'AIGLB_ARATH', 'AIP12_ARATH', 'AKAI1_HUMAN', 'AKAI1_MOUSE', 'AKAP2_MOUSE', 'AKP13_MOUSE', 'AKP13_RAT', 'AL3B2_MOUSE', 'AL3B3_MOUSE', 'ALA7_ARATH', 'ALFC4_ARATH', 'ALFC5_ARATH', 'ALFC6_ARATH', 'ALFC7_ARATH', 'ALFC8_ARATH', 'ALFP1_ARATH', 'ALFP2_ARATH', 'ALFP3_ARATH', 'ALG10_ARATH', 'ALKB1_MOUSE', 'ALMS1_MOUSE', 'ALS2_DROME', 'ALT1_ARATH', 'ALT2_ARATH', 'ALT3_ARATH', 'ALT4_ARATH', 'AMNLS_DROME', '

In [30]:
import requests, sys

go_code = "GO:0009987"

def replace_colon(s):
    return s.replace(":", "%3A")

requestURL = f"https://www.ebi.ac.uk/QuickGO/services/ontology/go/search?query={replace_colon(go_code)}&limit=1&page=1"

r = requests.get(requestURL, headers={ "Accept" : "application/json"})

if not r.ok:
  r.raise_for_status()
  sys.exit()

responseBody = r.json()
definition_text = responseBody['results'][0]['definition']['text']
print(definition_text)
print(responseBody)

Any process that is carried out at the cellular level, but not necessarily restricted to a single cell. For example, cell communication occurs among more than one cell, but occurs at the cellular level.
{'numberOfHits': 4, 'results': [{'id': 'GO:0009987', 'isObsolete': False, 'name': 'cellular process', 'definition': {'text': 'Any process that is carried out at the cellular level, but not necessarily restricted to a single cell. For example, cell communication occurs among more than one cell, but occurs at the cellular level.'}, 'aspect': 'biological_process'}], 'pageInfo': {'resultsPerPage': 1, 'current': 1, 'total': 4}}
