In [None]:
%matplotlib inline

### Parse and store SNOMED taxonomy

In [None]:
from rdflib.extras.external_graph_libs import *
from rdflib import Graph
import networkx as nx
import pathlib

In [None]:
def node_to_id(node):
    return node.split('/')[-1]

In [None]:
path = pathlib.Path("./data/snomed_taxonomy.rdf")

# every node with 'http://snomed.info/id/SNOMED_ID' is child of 'http://www.w3.org/2002/07/owl#Class'
# 'http://snomed.info/id/138875005' is top node --> SNOMED CT Concept (SNOMED RT+CTV3)
# graph is directed in top direction (that is, a neighbor is/are only the direct parent(s))

graph = Graph()
graph.parse(path, format="application/rdf+xml")
nx_graph = rdflib_to_networkx_multidigraph(graph)

id_node_mapping = {}
for n in list(nx_graph.nodes()):
    _id = node_to_id(n)
    # Remove meta nodes (e.g. 'owl#Class') and top level node 'SNOMED CT Concept (SNOMED RT+CTV3)'
    if (not _id.isnumeric()) or (node_to_id(n) == "138875005"):
        nx_graph.remove_node(n)
        continue
    id_node_mapping[_id] = n

In [None]:
def id_to_node(snomed_id: str, mapping_dict: dict = id_node_mapping):
    node = mapping_dict.get(snomed_id, None)
    return node

In [None]:
def parents_of_id(snomed_id: str, mapping_dict: dict = id_node_mapping):
    for neighbor in nx_graph.neighbors(id_to_node(snomed_id, mapping_dict)):
        _id = node_to_id(neighbor)
        if not _id.isnumeric():
            continue
        yield _id

In [None]:
def children_of_id(snomed_id: str, mapping_dict: dict = id_node_mapping):
    for predecessor in nx_graph.predecessors(id_to_node(snomed_id, mapping_dict)):
        _id = node_to_id(predecessor)
        if not _id.isnumeric():
            continue
        yield _id

---
### Read SNOMED Interface Terminology

In [None]:
import collections

In [None]:
interface_terminology_path = pathlib.Path("./data/SCT-GIT_de_drugs_nlp.dat")
interface_terminology = collections.defaultdict(list)

for line in interface_terminology_path.open('r', encoding='utf-8').readlines():
    # first name in the resulting list for each entry is the preferred name
    snomed_id, internal_id, concept_name, interface_name = line.strip().split("\t")
    interface_terminology[snomed_id].append(interface_name)
first_level_concepts = [node_to_id(n) for n in nx_graph.nodes() if
                        nx_graph.out_degree(n) == 0 and node_to_id(n) in interface_terminology]

In [None]:
len(interface_terminology)

In [None]:
len(id_node_mapping)

In [None]:
len(first_level_concepts)

---
### Create/upload concepts to TOP FW

In [None]:
import requests
import uuid
import json
from  owlready2 import *
from typing import Union
from tqdm.autonotebook import tqdm

In [None]:
snomed_ontology_path = pathlib.Path("./data/snomed_ontology.owx")
snomed_ontology_graph = get_ontology(f"file://{snomed_ontology_path}").load()

#### Get Preferred Label by id:
`labels = snomed_ontology_graph.search_one(iri = "*40949007").prefLabel`  
Answer (e.g.):  
`[locstr('Kingdom Fungi macroconidium', 'en'),`  
` locstr('Makroconidium des Reichs Fungi', 'de')]`  

Language:  
`labels[0].lang`  
String Form:  
`str(labels[0])`

In [None]:
def batched(iterable, n, *, strict=False):
    # batched('ABCDEFG', 3) → ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    iterator = iter(iterable)
    while batch := tuple(itertools.islice(iterator, n)):
        if strict and len(batch) != n:
            raise ValueError('batched(): incomplete batch')
        yield batch

In [None]:
def get_fallback_entry(
        id: str,
        creation_dump: dict,
        snomed_ontology: owlready2.Ontology,
):
    titles = snomed_ontology.search_one(iri=f"*{id}").prefLabel
    creation_dump["missing_german_interface_terms"].append(id)
    return [
        {
            "lang": label.lang,
            "text": str(label)
        } for label in titles
    ]

In [None]:
def create_top_fw_concept(
        concept_id: str,
        concept_id_store: dict,
        creation_dump: dict,
        parent_id: Union[str, list] = None,
        title_dict: dict = None,
):
    if concept_id not in creation_dump["top_id_store"]:
        _id = str(uuid.uuid4())
        creation_dump["top_id_store"][concept_id] = _id
    else:
        _id = creation_dump["top_id_store"][concept_id]
    
    _is_interface_term = concept_id in concept_id_store
    _data = {
        "entityType": "single_concept",
        "id": _id,
        "titles": [
            {
                "lang": "de" if _is_interface_term else "en",
                "text": concept_id_store[concept_id][0] if _is_interface_term else f"No interface term: '{concept_id}'",
            }
        ] if title_dict is None else title_dict,
        "codes": [
            {
                "uri": f"http://snomed.info/id/{concept_id}",
                "codeSystem": {
                    "uri": "http://snomed.info/id",
                    "shortName": "SNOMED CT"
                },
                "code": concept_id,
            }
        ] 
    }
    
    if _is_interface_term and len(concept_id_store[concept_id]) > 1:
        _data["synonyms"] = [
            {
                "lang": "de",
                "text": syn
            } for syn in concept_id_store[concept_id][1:]
        ]
        
    if parent_id is not None:
        parent_id = [parent_id] if isinstance(parent_id, str) else parent_id
        add_list = []
        for pid in parent_id:
            add_list.append(
                {
                    "entityType": "single_concept",
                    "id": creation_dump["top_id_store"].get(pid)
                }
            )
        if len(add_list) > 0:
            _data["superConcepts"] = add_list
    
    return _data

In [None]:
def post_to_top_fw(post_data: Union[list, dict], post_api_token: str, organisation: str = "imise", repository: str = "snomed_interface_terminology"):
    _endpoint_suffix = "" if isinstance(post_data, dict) else "/bulk"
    request = requests.post(
        url=f"https://top.imise.uni-leipzig.de/api/{organisation}/{repository}/entity{_endpoint_suffix}",
        headers={"Authorization": "Bearer " + post_api_token},
        json=post_data
    )
    return request

In [None]:
username = input("Please give your TOP FW username: ")
password = input("Please give your TOP FW password: ")
api_token = requests.post(
    url = "https://top.imise.uni-leipzig.de/auth/realms/top-realm/protocol/openid-connect/token",
    data = {
        "client_id": "top-frontend",
        "username": username,
        "password": password,
        "grant_type": "password"
    }
)

In [None]:
info_dump_path = "./data/creation_dump_info.json"
post_array_path = "./data/post_array.json"

creation_dump_info = json.load(pathlib.Path(info_dump_path).open('rb')) if pathlib.Path(info_dump_path).exists() else {
    "top_id_store": {},
    "missing_german_interface_terms": []
}
post_array = json.load(pathlib.Path(post_array_path).open('rb')) if pathlib.Path(post_array_path).exists() else collections.defaultdict(list)

In [None]:
with tqdm(total=len(nx_graph)) as pbar:
    for i, stratum in enumerate(nx.topological_generations(nx_graph.reverse(copy=True))):
        pbar.set_description_str(f"Stratum {str(i).zfill(2)}: ")
        for node in stratum:
            pbar.update(1)
            _node_concept_id = node_to_id(node)
            _snomed_pref_label = None
            if _node_concept_id not in interface_terminology:
                _snomed_pref_label = get_fallback_entry(id=_node_concept_id, creation_dump=creation_dump_info, snomed_ontology=snomed_ontology_graph)
            _parents = None
            if i > 0:
                _parents = [node_to_id(x) for x in nx_graph[node]]
            post_array[i].append(
                create_top_fw_concept(_node_concept_id, interface_terminology, creation_dump_info, _parents, title_dict=_snomed_pref_label)
            )
json.dump(creation_dump_info, pathlib.Path(info_dump_path).open('w', encoding='utf-8'), ensure_ascii=False)
json.dump(post_array, pathlib.Path(post_array_path).open('w', encoding='utf-8'), ensure_ascii=False)

In [None]:
[p for p in post_array[3] if p.get("codes", [{}])[0].get("code") in creation_dump_info["missing_german_interface_terms"]][:20]

In [None]:
[p for p in post_array[3] if (p.get("titles", [{}])[0].get("lang") == "de") and (p.get("codes", [{}])[0].get("code") in creation_dump_info["missing_german_interface_terms"])][:20]

In [None]:
stratum = 1
batch_size = 100
with tqdm(total=len(post_array[stratum])) as pbar:
    for batch in batched(post_array[stratum], batch_size):
        response = post_to_top_fw(list(batch), api_token.json()['access_token'], repository="snomed_interface_terminology_mix")
        pbar.update(batch_size)
        break

In [None]:
response.json()

In [None]:
for stratum_n in range(1, 2):
    print(f"-- Stratum {stratum_n} --")
    for entity in tqdm(post_array[stratum_n]):
        post_to_top_fw(entity, api_token.json()['access_token'], repository="snomed_interface_terminology_mix")

Enthält die SNOMED Interface Terminology (313.548 Konzepte) von Stefan Schulz.
Contact: Stefan Schulz - stefan.schulz@medunigraz.at