<!-- ---


### **KGX Format Overview**

KGX (Knowledge Graph Exchange) is a Python library and set of utilities for exchanging knowledge graphs (KGs) that conform to the Biolink Model. It provides tools for converting, validating, and exchanging knowledge graphs in various formats, including JSON, TSV, RDF, and Neo4j.

#### **Core Features**
- **Property Graph Representation**: Internally represented as a `networkx.MultiDiGraph`.
- **Biolink Model Compliance**: Ensures nodes and edges conform to the Biolink Model, including valid categories, predicates, and properties.
- **Supported Formats**:
  - RDF (read/write) and SPARQL endpoints (read).
  - Neo4j endpoints (read) or dumps (write).
  - CSV/TSV and JSON.
  - Reasoner Standard API format.
  - OBOGraph JSON format.


### **KGX Format Details**

#### **Node Record**
Each node in a KGX graph is represented as a **Node Record** with the following elements:

- **Required Elements**:
  - `id`: A CURIE uniquely identifying the node.
  - `category`: A list of Biolink Model categories describing the node.

- **Optional Elements**:
  - **Biolink Model Properties**: e.g., `name`, `description`, `xref`, `provided_by`.
  - **Non-Biolink Properties**: Custom properties not defined in the Biolink Model.

#### **Edge Record**
Each edge in a KGX graph is represented as an **Edge Record** with the following elements:

- **Required Elements**:
  - `subject`: The source node's `id`.
  - `predicate`: The relationship type (from the Biolink `related_to` hierarchy).
  - `object`: The target node's `id`.

- **Optional Elements**:
  - **Biolink Model Properties**: e.g., `category`, `publications`.
  - **Edge Provenance**: e.g., `primary_knowledge_source`, `supporting_data_source`.


### **KGX Format Examples**

#### **KGX JSON Format**
The JSON format represents the graph as a dictionary with `nodes` and `edges` arrays.
 -->




#### **KGX TSV Format**
The TSV format separates nodes and edges into two files: `nodes.tsv` and `edges.tsv`.

**nodes.tsv**:
| id            | category                                                                 | name                                  | provided_by               |
|---------------|--------------------------------------------------------------------------|---------------------------------------|---------------------------|
| HGNC:11603    | biolink:NamedThing\|biolink:BiologicalEntity\|biolink:Gene               | TBX4                                  | MonarchArchive:gwascatalog |
| MONDO:0005002 | biolink:NamedThing\|biolink:BiologicalEntity\|biolink:DiseaseOrPhenotypicFeature\|biolink:Disease | chronic obstructive pulmonary disease | MonarchArchive:gwascatalog |

**edges.tsv**:
| id                                    | subject     | predicate                  | object         | relation   | primary_knowledge_source | category                        | publications               |
|---------------------------------------|-------------|----------------------------|----------------|------------|---------------------------|---------------------------------|---------------------------|
| urn:uuid:5b06e86f-d768-4cd9-ac27-abe31e95ab1e | HGNC:11603  | biolink:contributes_to    | MONDO:0005002  | RO:0003304 | MonarchArchive:gwascatalog | biolink:GeneToDiseaseAssociation | PMID:26634245\|PMID:26634244 |

---

### **Key Points**
- **Validation**: KGX ensures that nodes and edges conform to the Biolink Model.
- **Flexibility**: Supports both Biolink and non-Biolink properties.
- **Interoperability**: Facilitates exchange between different graph systems and formats.

Documntation: [KGX documentation](https://github.com/biolink/kgx).



********************** Draft Code ********************** 

In [146]:
# # Get the ID prefix priority list for biolink:Disease
# element = BMT.get_element_by_mapping("orphanet:39041")
# print(element)
# # preferred_prefixes = element
# # print(preferred_prefixes)

None


In [97]:
'disease_prefix': get_preferred_prefix('orphanet:39041')  # → 'MONDO'
get_preferred_id('NCBIGene:100')

'NCBIGene:100'

In [2]:
BMT.get_element_by_mapping("orphanet:277")

In [167]:
# BMT.is_translator_canonical_predicate("biolink:gene_associated_with_condition")
# BMT.is_symmetric("biolink:gene_associated_with_condition")

False

In [5]:
node_norm = f"https://nodenorm.ci.transltr.io/1.5/get_normalized_nodes?curie={CURIE}&conflate=false&drug_chemical_conflate=false&description=false&individual_types=false"

In [11]:
# def get_preferred_id(curie):
#     url = f"https://nodenorm.ci.transltr.io/1.5/get_normalized_nodes?curie={curie}&conflate=false&drug_chemical_conflate=false&description=false&individual_types=false"

#     response = requests.get(url)
#     if response.ok:
#         data = response.json()
#         norm = data.get(curie)
#         if norm and 'id' in norm:
#             identifier = norm['id']['identifier']
#             # return identifier.split(":")[0]  # e.g., MONDO
#             return identifier
#     return curie


********************** Original Code ********************** 

In [None]:
# def get_biothings_api(node_mappings, edge_mappings, bt_data): # abstract out for one record
#     # Iterate through BioThings API data    
#     # Extract node data based on Biolink mappings
#     # print('[INFO] BT data:', bt_data)

#     for pred_uri, value in edge_mappings.items():
#         # print(f"[INFO] Predicate URI: {pred_uri} | Value: {value}")
#         # print(value.keys())
#         s_uri = value["subject"]
#         o_uri = value["object"]
#         # print(f"[INFO] Subject URI: {s_uri} | Object URI: {o_uri}")
#         s_data = node_mappings[s_uri]
#         o_data = node_mappings[o_uri]
#         # print(f"[INFO] Subject data: {s_data} | Object data: {o_data}")
#         s_identifier_key = s_data['identifier']
#         o_identifier_key = o_data['identifier']
#         s_prefix = s_data['prefix']
#         o_prefix = o_data['prefix']
#         s_category = [key for key,value in node_mappings.items() if value['prefix'] == s_prefix]
#         o_category = [key for key,value in node_mappings.items() if value['prefix'] == o_prefix]
#         # s_prefix = s_data['prefix']
#         # s_category

#         # try:
#         s_identifiers = get_nested_value(bt_data,s_identifier_key)
#         o_identifiers = get_nested_value(bt_data,o_identifier_key)
#         if o_identifiers is None or s_identifiers is None:
#             # print(f"[INFO] Found subject identifiers: {s_identifiers} key: {s_identifier_key} \n")
#             # print(f"[INFO] Found object identifiers:{o_identifiers} key: {o_identifier_key}")
#             continue
#         # print(f"[INFO] Found subject identifiers: {s_identifiers} key: {s_identifier_key} \n")
#         # ******************************************
#         # Create Nodes
#         for s_ in s_identifiers:
#             s_node_id =  f"{s_prefix}:{s_[0]}"
#             s_node_name = s_[1] 
#             # s_node_norm_id = get_preferred_id(s_node_id)  # Use '=' for assignment
#             # print(f"[INFO] {s_node_id} | {s_node_name}")
#             if s_node_id not in nodes:
#                 nodes[s_node_id] = {
#                     "id": s_node_id,
#                     "name": s_node_name,
#                     "category": s_category
#                     # "provided_by": [node_mappings[s_uri]['provided_by']]
#                 }
#                 for prop_key, prop_value in node_mappings[s_uri]['properties'].items():
#                     # print(prop_key)
#                     if "." in prop_value:
#                         pass
#                     else:
#                         if prop_key in prop_value:
#                             v_ = bt_data[prop_key]
#                             nodes[s_node_id][prop_key] = v_
                        
#                 # pprint.pprint(nodes[s_node_id])
#             for o_ in o_identifiers:
#                 o_node_id =  f"{o_prefix}:{o_[0]}"
#                 o_node_name = o_[1]
#                 o_node_norm_id = get_preferred_id(o_node_id)
#                 # print(f"[INFO] {o_node_id} | {o_node_name}")
#                 if o_node_id not in nodes:
#                     nodes[o_node_id] = {
#                         "id": o_node_id,
#                         "name": o_node_name,
#                         "category": o_category
#                         # "provided_by": [node_mappings[o_uri]['provided_by']]
#                     }
#                     for prop_key, prop_value in node_mappings[o_uri]['properties'].items():
#                         if "." in prop_value:                            
#                             nested_ids = get_nested_value(bt_data, prop_value)
#                             if nested_ids is not None:
#                                 for nest_id in nested_ids:
#                                     nodes[o_node_id][prop_key.split(".")[-1]] = nest_id[0]
#                         else:
#                             if prop_key in bt_data:
#                                 v_ = bt_data[prop_key]
#                                 nodes[o_node_id][prop_key] = v_
#                 # pprint.pprint(nodes[o_node_id])
#                 # ******************************************
#                 # Create Edges
#                 is_canonical = BMT.is_translator_canonical_predicate(pred_uri)
#                 is_symmetric = BMT.is_symmetric(pred_uri)

#                 if is_canonical:
#                     print(f"[INFO] Predicate is canonical: {pred_uri} | {is_canonical}")
#                     # if is_symmetric:
#                     #     print(f"[INFO] {pred_uri} is also symmetric (this is unusual for canonical predicates).")
#                     # else:
#                     #     print(f"[INFO] {pred_uri} is NOT symmetric (expected for canonical predicates).")
#                     rel = f"{s_node_id}-{pred_uri}-{o_node_id}"
#                     if rel not in edges:
#                         ref_url = get_nested_value(bt_data, edge_mappings[pred_uri]['properties']['ref_url'])
#                         print(ref_url)
#                         edges[rel] = {
#                             "subject": s_node_id,
#                             "predicate": pred_uri,
#                             "object": o_node_id,
#                             "knowledge_level": edge_mappings[pred_uri]['knowledge_level'],
#                             "agent_type": edge_mappings[pred_uri]['agent_type'],
#                             "primary_knowledge_source": edge_mappings[pred_uri]['primary_knowledge_source'],
#                             "ref_url":[ y_[0][0] for y_ in ref_url],
                            
#                         }

#         except Exception as error:
#             # print(f"\n[INFO] ERROR: {error}\n")
#             pass

#     return nodes,edges

In [75]:
# def get_nested_value(data, key_path):
#     """
#     # Biothings Util function? 
#     Retrieve a value from a nested dictionary using a dot-separated key path.
#     Handles lists if encountered during traversal.
#     Example: key_path = "raresource.disease.orphanet" will return the value of
#     data["raresource"]["disease"][0]["orphanet"] if "disease" is a list.
#     """

#     keys = key_path.split(".")  # Split the key path into individual keys
#     temp_data=data.copy() # for reference
#     loop_ct=0

#     # print('here')
#     for i, key in enumerate(keys):
#         # print(key)
#         # print("in loop...")
#         loop_ct+=1
#         is_final_key = (i == len(keys) - 1)
#         # data=data[key]
#         # print(key, type(data), loop_ct)
#         # print(f"Starting data: {type(data)} {type(temp_data)} {key}" )
#         # *********************************************************************************************************
#         if isinstance(data, dict) and key in data:
#             # print(f"[INFO] Dictionary loop on key: {key}")
#             if key in data:
#                 temp_data = data[key]
#                 if isinstance(temp_data, str): # found a string ID
#                     id_ = temp_data
#                     # print(f"[INFO] Inside string ID instance: {key}, {id_}")
#                     # print(data.keys())
#                     for search_key in ["SYMBOL", "symbol", "NAME", "name", "drug_name"]: # **MAKE REGEX** (drug_name, etc...)
#                         if search_key in data:
#                             name = data[search_key]
#                             break
#                         else:
#                             name = None
#                     return [(id_, name)]
#                 data = data[key]
#             else:
#                 # print(f"Key {key} not found in dictionary.")
#                 return None
#             # print(f"Ending data: {type(data)} {type(temp_data)} {key}\n" )
#         # *********************************************
#         elif isinstance(data, list):
#             # print(f"[INFO] List loop on key: {key}")
#             # If the current data is a list, assume we want the first element
#             if len(data) > 0:
#                 # print(data)
#                 if is_final_key:
#                     # print("[INFO] is final key", key)
#                     id_list = []
#                     for data_dict in data:
#                         if key in data_dict:
#                             # print("[INFO] found key ", key)
#                             id_ = data_dict[key]
#                             # print("\n", id_, key)
#                             # pprint.pprint(data_dict)
#                             name = None
#                             for search_key in ["SYMBOL", "symbol", "NAME", "name","drug_name"]: # **MAKE REGEX** (drug_name, etc...)
#                                 if search_key in data_dict:
#                                     name = data_dict[search_key]
#                                     break
#                             id_list.append((id_, name))
#                         else:
#                             return None # key not found -- this does happen , i.e orphanet in raresource
#                     # print(f"Returning from list: {id_list}")
#                     return id_list
#                 # temp_data = temp_data[key]
#                 # data = data[key]
#             else:
#                 print("Empty list encountered.")
#                 return None
#         # *********************************************
#         elif isinstance(data, str):
#             print(f"[INFO] String loop on key: {key}")

#             # If this is the final key, check for specific keys in the list element
#             if is_final_key and isinstance(temp_data, dict) and key in temp_data:
#                 for search_key in ["SYMBOL", "symbol", "NAME", "name", "drug_name"]: # **MAKE REGEX** (drug_name, etc...)
#                     if search_key in temp_data:
#                         id_ = data
#                         name = temp_data[search_key]
#                         # print(f"Returning from list: {id_} | {name}")
#                         return [(id_, name)]
#             return(data, None)
#         # *********************************************
#         else:
#             ...
#         # *********************************************************************************************************
        

#     return None

def get_nested_value(data, key_path):
    """
    Retrieve a value from a nested dictionary using a dot-separated key path.
    Handles lists if encountered during traversal.
    Example: key_path = "raresource.disease.orphanet" will return the value of
    data["raresource"]["disease"][0]["orphanet"] if "disease" is a list.
    """

    keys = key_path.split(".")  # Split the key path into individual keys
    temp_data = data.copy()  # For reference
    loop_ct = 0

    for i, key in enumerate(keys):
        loop_ct += 1
        is_final_key = (i == len(keys) - 1)

        if isinstance(data, dict) and key in data:
            temp_data = data[key]
            if isinstance(temp_data, str):  # Found a string ID
                id_ = temp_data
                for search_key in ["SYMBOL", "symbol", "NAME", "name", "drug_name"]:
                    if search_key in data:
                        name = data[search_key]
                        break
                    else:
                        name = None
                # Return the entire dictionary at this level
                return {"id": id_, "name": name, "all_keys": data}
            data = data[key]

        elif isinstance(data, list):
            if len(data) > 0:
                if is_final_key:
                    id_list = []
                    for data_dict in data:
                        if key in data_dict:
                            id_ = data_dict[key]
                            name = None
                            for search_key in ["SYMBOL", "symbol", "NAME", "name", "drug_name"]:
                                if search_key in data_dict:
                                    name = data_dict[search_key]
                                    break
                            # Return the entire dictionary at this level
                            id_list.append({"id": id_, "name": name, "all_keys": data_dict})
                        else:
                            return None  # Key not found
                    return id_list
            else:
                print("Empty list encountered.")
                return None

        elif isinstance(data, str):
            if is_final_key and isinstance(temp_data, dict) and key in temp_data:
                for search_key in ["SYMBOL", "symbol", "NAME", "name", "drug_name"]:
                    if search_key in temp_data:
                        id_ = data
                        name = temp_data[search_key]
                        # Return the entire dictionary at this level
                        return {"id": id_, "name": name, "all_keys": temp_data}
            return {"id": data, "name": None, "all_keys": temp_data}

    return None

In [46]:
# def create_edge_mappings(edge_maps, provided_by):
#     # Initialize the output dictionary
#     relations = {}
#     # Process each triplet
#     for subject, predicate, obj in edge_maps:
#         bl_pred = BMT.get_element(predicate)
#         if bl_pred:
#             predicate = bl_pred["slot_uri"]
#         if predicate not in relations:
#             relations[predicate] = {"from": [], "to": [], "provided_by": provided_by}
        
#         # Add the subject to the "from" list if not already present
#         if subject not in relations[predicate]["from"]:
#             relations[predicate]["from"].append(subject)
        
#         # Add the object to the "to" list if not already present
#         if obj not in relations[predicate]["to"]:
#             relations[predicate]["to"].append(obj)

#     # Print the resulting dictionary
#     return relations


In [20]:
# def make_biolink_data_maps(data):
#     """
#     Create Biolink node and edge mappings from input data.
#     """
#     node_mappings = {}
#     edge_mappings_updated = {}
#     edge_maps = []

#     for mkg_hit in data.get("hits", {}).get("hits", []):
#         hit = mkg_hit["_source"]

#         subject = hit["subject"]
#         subject_prefix = hit["subject_prefix"]
#         s_uri = BMT.get_element(subject)["class_uri"]

#         # Currently using to get subject identification path, 
#         # response mapping gives object details of edge, not subject. 
#         # ***** Not reliable *****
#         s_id_ref = hit["api"]["bte"]["query_operation"]["request_body"]["body"].get("scopes")

#         object_ = hit["object"]
#         object_prefix = hit["object_prefix"]
#         o_uri = BMT.get_element(object_)["class_uri"]
#         # Get fields from the query operation for the object
#         o_properties = hit["api"]["bte"]["query_operation"]["params"].get("fields", "")
#         o_properties = [field.strip() for field in o_properties.split(",")]

#         # In the response mapping we get the object identification path,
#         # followed by additional object /edge properties.
#         # o_id_ref = None

#         predicate = hit["predicate"]
#         pred_element = BMT.get_element(predicate)
#         pred_uri = pred_element["slot_uri"] if pred_element else None

#         # edge properties
#         provided_by = hit["api"]["provided_by"]
#         agent_type = hit["api"]["bte"]["query_operation"].get("agent_type")
#         knowledge_level = hit["api"]["bte"]["query_operation"].get("knowledge_level")

#         # ========================
#         # EDGE MAPPING
#         # ========================
#         if pred_uri and BMT.is_translator_canonical_predicate(pred_uri):
#             if pred_uri not in edge_mappings_updated:
#                 # edge_maps.append((s_uri, pred_uri, o_uri))
#                 edge_rel = f"{s_uri}-{pred_uri}-{o_uri}"
#                 edge_mappings_updated[edge_rel] = {
#                     "subject": s_uri,
#                     "object": o_uri,
#                     "primary_knowledge_source": provided_by,
#                     "agent_type": agent_type,
#                     "knowledge_level": knowledge_level,
#                     "properties": {},
#                 }
#             # print(hit["api"]["bte"]["response_mapping"])
#             for key, value in hit["api"]["bte"]["response_mapping"].items():
#                 if isinstance(value, dict):
#                     for k, v in value.items():
#                         if object_prefix in v:
#                             o_id_ref = v
#                         elif "ref_url" == k:
#                             edge_mappings_updated[edge_rel]['properties']["publications"] = v
#                         # else:
#                         #     edge_mappings_updated[pred_uri]['properties'][k] = v

#         # ========================
#         # NODE MAPPING: SUBJECT
#         # ========================
#         s_rel = f"{subject_prefix}-{s_uri}"
#         if s_rel not in node_mappings:
#             node_mappings[s_rel] = {
#                 "prefix": subject_prefix,
#                 "identifier": s_id_ref,
#                 "path": s_id_ref,
#                 "properties": {}
#             }

#         # ========================
#         # NODE MAPPING: OBJECT
#         # ========================
#         o_rel = f"{object_prefix}-{o_uri}"
#         if o_rel not in node_mappings:
#             node_mappings[o_rel] = {
#                 "prefix": object_prefix,
#                 # "identifier": o_id_ref,
#                 # "path": o_id_ref,
#                 "properties": {}
#             }
#             for key, value in hit["api"]["bte"]["response_mapping"].items():
#                 if isinstance(value, dict):
#                     for k, v in value.items():
#                         if object_prefix in k:
#                             node_mappings[o_rel]["identifier"] = v
#                         elif "ref_url" == k:
#                             pass
#                         else:
#                             node_mappings[o_rel]["properties"][k] = v
#         else:
#             for key, value in hit["api"]["bte"]["response_mapping"].items():
#                 if isinstance(value, dict):
#                     for k, v in value.items():
#                         if object_prefix in k:
#                             pass
#                         elif "ref_url" == k:
#                             pass
#                         else:
#                             node_mappings[o_rel]["properties"][k] = v
            
#             # for field in o_properties:
#             #     field_key = field.split(".")[-1]
#             #     node_mappings[object_prefix]["properties"][field_key] = field 


#     return node_mappings, edge_mappings_updated

In [45]:
# def get_nested_value(data, key_path, return_full=True):
#     """
#     Retrieve a value from a nested dictionary using a dot-separated key path.
#     Handles lists if encountered during traversal.
#     Example: key_path = "raresource.disease.orphanet" will return the value of
#     data["raresource"]["disease"][0]["orphanet"] if "disease" is a list.

#     Args:
#         data (dict): The input dictionary to search.
#         key_path (str): The dot-separated key path to traverse.
#         return_full (bool): If True, return the full structure (id, name, all_keys).
#                             If False, return just the ID value.

#     Returns:
#         dict, list, or str: The full structure or just the ID, depending on `return_full`.
#     """
#     keys = key_path.split(".")  # Split the key path into individual keys
#     temp_data = data.copy()  # For reference
#     loop_ct = 0

#     for i, key in enumerate(keys):
#         loop_ct += 1
#         is_final_key = (i == len(keys) - 1)

#         if isinstance(data, dict) and key in data:
#             temp_data = data[key]
#             if isinstance(temp_data, str):  # Found a string ID
#                 id_ = temp_data
#                 if not return_full:
#                     return id_  # Return just the ID if return_full is False
#                 return {"id": id_, "all_keys": data}
#             data = data[key]

#         elif isinstance(data, list):
#             if len(data) > 0:
#                 if is_final_key:
#                     id_list = []
#                     for data_dict in data:
#                         if key in data_dict:
#                             id_ = data_dict[key]
#                             if not return_full:
#                                 id_list.append(id_)  # Append just the ID if return_full is False
#                                 continue
#                             id_list.append({"id": id_, "all_keys": data_dict})
#                         else:
#                             return None  # Key not found
#                     return id_list
#             else:
#                 print("Empty list encountered.")
#                 return None

#         elif isinstance(data, str):
#             if is_final_key and isinstance(temp_data, dict) and key in temp_data:
#                 id_ = data
#                 if not return_full:
#                     return id_  # Return just the ID if return_full is False
#                 return {"id": id_, "all_keys": temp_data}
#             return {"id": data, "all_keys": temp_data}

#     return None

In [None]:
# def extract_biothings_apis(node_mappings, edge_mappings, client):
#     ct=0
#     # https://biothings.ci.transltr.io/rare_source/gene/100
#     for biothings_api_data in tqdm(client.query(q="__all__", fetch_all=True)):
#         extract_biothings_nodes(node_mappings, biothings_api_data)
#         extract_biothings_edges(edge_mappings, node_mappings, biothings_api_data) 
#         ct+=1
#         if ct >= 1:
#             break
#     print(f"📊 [INFO] Extracted {ct} records from BioThings API.")

# def extract_biothings_apis(node_mappings, edge_mappings, client):
#     # Define the URL
#     url = "https://biothings.ci.transltr.io/rare_source/gene/100"
#     print(f" 🔄[INFO] Extracting from URL: {url}")
#     # Make the GET request
#     responseX = requests.get(url)
#     # Check if the request was successful
#     if responseX.status_code == 200:
#         # Parse the JSON response
#         biothings_api_data = responseX.json()
#         extract_biothings_nodes(node_mappings, biothings_api_data)
#         extract_biothings_edges(edge_mappings, node_mappings, biothings_api_data) 

In [None]:
# def extract_biothings_nodes(node_mappings, biothings_api_data):
#     # by data doc
#     # print(api_data['entrezgene'])

#     for node_uri, node_dict in node_mappings.items():
#         key_identifier = node_dict["identifier"]
#         node_prefix = node_dict["prefix"]
#         # Change all instance to node_uri == node_cat
#         # node_cat = node_uri #[key for key,value in node_mappings.items() if value['prefix'] == node_prefix] # why are we going through items here?
#         node_api_data = get_nested_value(biothings_api_data, key_identifier)
#         # pprint.pprint(node_api_data)
#         node_uri = node_uri.split("-")[1]  # Extract the URI part from the key
#         # print(f"[INFO] Node Typ: {node_uri} | Node ID: {key_identifier} | Prefix: {node_prefix}")
#         # print(f"[INFO] Node category: {node_cat}")
#         # print(f"[INFO] type: {type(node_ids)}")

#         # if node_api_data is None:
#         #     continue
#         if isinstance(node_api_data, dict):
#             unique_node_id =  f"{node_prefix}:{node_api_data['id']}"
#             name_key = node_dict["properties"]["name"]
#             unique_node_name = node_api_data["all_keys"][f"{name_key}"]
#             # print(f"[INFO] Unique Node ID: {unique_node_id} | Unique Node Name: {unique_node_name}")

#             if unique_node_id not in nodes:
#                 nodes[unique_node_id] = {
#                     "id": unique_node_id,
#                     "name": unique_node_name,
#                     "category": [node_uri]
#                     # "provided_by": [node_mappings[s_uri]['provided_by']]
#                 }

#                 for prop_key, prop_value in node_dict['properties'].items():
#                     if "cooccurrence_url" == prop_key:
#                         continue
#                     elif "orphanet" == prop_key:
#                         continue
#                     if "." in prop_value and prop_key in node_api_data["all_keys"]:
#                         nodes[unique_node_id][prop_key] = node_api_data["all_keys"][prop_key]
#                     elif prop_key in  node_api_data["all_keys"]:                                
#                         nodes[unique_node_id][prop_key] = node_api_data["all_keys"][prop_key]

#         if isinstance(node_api_data, list):
#             for id_ in node_api_data:
#                 # unique_node_id =  f"{node_prefix}:{id_['id']}"
#                 unique_node_name = id_["all_keys"]["name"] 

#                 if unique_node_id not in nodes:
#                     nodes[unique_node_id] = {
#                         "id": unique_node_id,
#                         "name": unique_node_name,
#                         "category": [node_uri],
#                     }

#                     for prop_key, prop_value in node_dict['properties'].items():
#                         if "cooccurrence_url" == prop_key:
#                             continue
#                         elif "orphanet" == prop_key:
#                             continue
#                         elif "." in prop_value and prop_key in id_["all_keys"]:
#                             nodes[unique_node_id][prop_key] = id_["all_keys"][prop_key]

#                         elif prop_key in  id_:                                
#                             nodes[unique_node_id][prop_key] = id_["all_keys"][prop_key]


In [None]:
# def extract_biothings_edges(edge_mappings, node_mappings, biothings_api_data): # no edge relations
#     for p_uri, edge_map in edge_mappings.items():
#         p_uri = p_uri.split("-")[1]  # Extract the URI part from the key
#         s_uri = edge_map["subject"]
#         sub_key_nodes = [node for node, value in node_mappings.items() if s_uri in node ]
#         for node_key in sub_key_nodes:
#             s_node_map = node_mappings[node_key]
#             s_api_data = get_nested_value(biothings_api_data, s_node_map["identifier"])
#             if isinstance(s_api_data, dict): # should only be one subject? no list returned?
#                 s_id = s_api_data["id"]
#                 s_id = f"{s_node_map['prefix']}:{s_id}"

#             o_uri = edge_map["object"]
#             obj_key_nodes = [node for node, value in node_mappings.items() if o_uri in node ]
#             for o_node_key in obj_key_nodes:
#                 o_node_map = node_mappings[o_node_key]
#                 o_api_data = get_nested_value(biothings_api_data, o_node_map["identifier"])

#                 # ref_url_dict = get_nested_value(biothings_api_data, edge_map['ref_url'])
#                 # ref_url_path = edge_map['ref_url'].split(".")[-1]

#                 if not isinstance(o_api_data, list):
#                     o_api_data = [o_api_data]

#                 for n in o_api_data:
#                     if n is None:
#                         # logger.info
#                         # print(f"Subject node, {s_id}, does not have an object node, {n}, api_data: {biothing_api_data}")
#                         continue
#                     o_id = n["id"]
#                     o_id = f"{o_node_map['prefix']}:{o_id}"
#                     rel = f"{s_id}-{p_uri}-{o_id}"
#                     # process properties , merge key and value to edges 
#                     edges[rel] = {
#                         "subject": s_id,
#                         "predicate": p_uri.split(":")[-1],
#                         "object": o_id,
#                         "knowledge_level": edge_map['knowledge_level'],
#                         "agent_type": edge_map['agent_type'],
#                         "primary_knowledge_source": edge_map['primary_knowledge_source'],
#                         # "publications": [n["all_keys"][ref_url_path]] # assuming ref_url path alway in the object path
#                     }



---
********************** Current Code **********************  

In [1]:
from bmt import Toolkit
import requests
import pprint
import biothings_client
import json


from tqdm import tqdm

# Initialize Biolink Model Toolkit
BMT = Toolkit() # only want to initialize once

In [27]:
def get_nested_value(data, key_path, return_full=True):
    """
    Retrieve a value from a nested dictionary using a dot-separated key path.
    Handles lists during traversal and can return just the ID or a full structure.

    Example:
        key_path = "raresource.disease.orphanet"
        will return the value of data["raresource"]["disease"][i]["orphanet"] if "disease" is a list.

    Args:
        data (dict): The input dictionary to search.
        key_path (str): Dot-separated key path (e.g., "a.b.c").
        return_full (bool): If True, return {'id': ..., 'all_keys': ...}, else return just 'id'.

    Returns:
        str, dict, or list: Value(s) at the path, in requested format.
    """
    def format_output(value, context):
        return {"id": value, "all_keys": context} if return_full else value

    keys = key_path.split(".")
    current = data

    for i, key in enumerate(keys):
        is_last = (i == len(keys) - 1)

        if isinstance(current, dict):
            if key not in current:
                return None
            current = current[key]

        elif isinstance(current, list):
            results = []
            for item in current:
                if isinstance(item, dict) and key in item:
                    val = item[key]
                    results.append(format_output(val, item))
            return results if results else None

        else:
            # If we hit a string, int, or unexpected type mid-path
            return format_output(current, data)

    return format_output(current, data)


In [12]:
def make_biolink_data_maps(data):
    """
    Create Biolink Model-compliant node and edge mappings from MetaKG SmartAPI metadata.
    """
    node_mappings = {}
    edge_mappings = {}

    for hit_doc in data.get("hits", {}).get("hits", []):
        hit = hit_doc["_source"]
        api_info = hit["api"]
        bte_info = api_info.get("bte", {})
        query_op = bte_info.get("query_operation", {})
        response_mapping = bte_info.get("response_mapping", {})

        # Extract subject and object details
        subj_cls = hit["subject"]
        subj_prefix = hit["subject_prefix"]
        subj_uri = BMT.get_element(subj_cls)["class_uri"]
        subj_id_path = query_op.get("request_body", {}).get("body", {}).get("scopes")

        obj_cls = hit["object"]
        obj_prefix = hit["object_prefix"]
        obj_uri = BMT.get_element(obj_cls)["class_uri"]
        obj_fields = query_op.get("params", {}).get("fields", "")
        obj_fields = [f.strip() for f in obj_fields.split(",")]

        # Predicate
        predicate = hit["predicate"]
        pred_uri = BMT.get_element(predicate)["slot_uri"] if BMT.get_element(predicate) else None

        # Edge metadata
        edge_rel = f"{subj_uri}-{pred_uri}-{obj_uri}"
        provided_by = api_info.get("provided_by")
        agent_type = query_op.get("agent_type")
        knowledge_level = query_op.get("knowledge_level")

        # ------------------------
        # EDGE MAPPING
        # ------------------------
        if pred_uri and BMT.is_translator_canonical_predicate(pred_uri):
            if edge_rel not in edge_mappings:
                edge_mappings[edge_rel] = {
                    "subject": subj_uri,
                    "object": obj_uri,
                    "primary_knowledge_source": provided_by,
                    "agent_type": agent_type,
                    "knowledge_level": knowledge_level,
                    "properties": {}
                }

            for predicate_key, mapping_dict in response_mapping.items():
                if isinstance(mapping_dict, dict):
                    for key, path in mapping_dict.items():
                        if key == "ref_url":
                            edge_mappings[edge_rel]["properties"]["publications"] = path

        # ------------------------
        # NODE MAPPINGS
        # ------------------------
        _add_node_mapping(
            node_mappings,
            prefix=subj_prefix,
            uri=subj_uri,
            identifier=subj_id_path,
            is_subject=True
        )

        obj_id_path = extract_object_id_path(response_mapping, obj_prefix)

        _add_node_mapping(
            node_mappings,
            prefix=obj_prefix,
            uri=obj_uri,
            identifier=obj_id_path,
            is_subject=False,
            extra_properties=collect_object_properties(response_mapping, obj_prefix)
        )

    return node_mappings, edge_mappings


def _add_node_mapping(node_mappings, prefix, uri, identifier=None, is_subject=True, extra_properties=None):
    rel_key = f"{uri}"
    if rel_key not in node_mappings:
        temp_dict = {"prefix": prefix, "path": identifier}
        node_mappings[rel_key] = {
            "identifiers": [temp_dict],
            "properties": extra_properties or {}
        }
    else:
        # Append prefix, identifier, and path if not already present
        existing_identifiers = node_mappings[rel_key]["identifiers"]
        new_identifier = {"prefix": prefix, "path": identifier}
        
        # Check if the new identifier already exists
        if not any(
            iden["prefix"] == new_identifier["prefix"] and iden["path"] == new_identifier["path"]
            for iden in existing_identifiers
        ):
            node_mappings[rel_key]["identifiers"].append(new_identifier)
        # Merge additional properties
        if extra_properties:
            node_mappings[rel_key]["properties"].update(extra_properties)


def extract_object_id_path(response_mapping, object_prefix):
    for _, mapping_dict in response_mapping.items():
        if isinstance(mapping_dict, dict):
            for key, path in mapping_dict.items():
                if object_prefix.lower() in key.lower():  # Match "umls" in "UMLS"
                    return path
    return None


def collect_object_properties(response_mapping, object_prefix):
    properties = {}
    for _, mapping_dict in response_mapping.items():
        if isinstance(mapping_dict, dict):
            for k, v in mapping_dict.items():
                if k.lower() != "ref_url" and object_prefix.lower() not in k.lower():
                    properties[k] = v
    return properties



In [25]:
def extract_biothings_nodes(node_mappings, biothings_api_data):
    """
    Enrich Biolink node mappings with BioThings API data and create structured node outputs.
    """
    for node_uri, node_info in node_mappings.items():
        for unique_node in node_info["identifiers"]:
            node_prefix = unique_node["prefix"]
            node_identifier = unique_node["path"]

            api_data = get_nested_value(biothings_api_data, node_identifier)
            if not api_data:
                continue

            if isinstance(api_data, dict):
                _process_single_node(api_data, node_prefix, node_uri, node_info)
            elif isinstance(api_data, list):
                for entry in api_data:
                    _process_single_node(entry, node_prefix, node_uri, node_info)

def _process_single_node(api_data, prefix, uri, node_info):
    """
    Create or enrich a node from API data.
    """
    node_id = f"{prefix}:{api_data['id']}"
    name_key = node_info["properties"].get("name")
    node_name = api_data["all_keys"].get(name_key)

    if node_id in nodes:
        return  # Skip if already processed

    nodes[node_id] = {
        "id": node_id,
        "name": node_name,
        "category": [uri]
    }

    for prop_key, prop_val in node_info["properties"].items():
        if prop_key in {"cooccurrence_url", "orphanet"}:
            continue

        if "." in str(prop_val):
            value = api_data["all_keys"].get(prop_key)
        else:
            value = api_data["all_keys"].get(prop_key)

        if value is not None:
            nodes[node_id][prop_key] = value



In [40]:
def extract_biothings_edges(edge_mappings, node_mappings, biothings_api_data):
    def format_node_id(node_map, api_data):
        """Helper to format node ID as prefix:id."""
        return f"{node_map['prefix']}:{api_data['id']}"

    for edge_key, edge_map in edge_mappings.items():
        predicate_uri = edge_key.split("-")[1]  # Extract predicate URI part

        # Find subject node keys containing subject URI
        subject_nodes = [node for node in node_mappings if edge_map["subject"] in node]
        for s_node_key in subject_nodes:
            s_node_map = node_mappings[s_node_key]
            for s_unique_node in s_node_map["identifiers"]:
                s_api_data = get_nested_value(biothings_api_data, s_unique_node["path"])
                if not isinstance(s_api_data, dict) or s_api_data is None:
                    continue  # Skip if no valid subject data

                s_id = format_node_id(s_unique_node, s_api_data)

                # Find object node keys containing object URI
                object_nodes = [node for node in node_mappings if edge_map["object"] in node]
                for o_node_key in object_nodes:
                    o_node_map = node_mappings[o_node_key]
                    for o_unique_node in o_node_map["identifiers"]:
                        o_api_data = get_nested_value(biothings_api_data, o_unique_node["path"])
                        if o_api_data is None:
                            continue
                        if not isinstance(o_api_data, list):
                            o_api_data = [o_api_data]

                        for obj in o_api_data:
                            if obj is None:
                                continue
                            o_id = format_node_id(o_unique_node, obj)

                            rel = f"{s_id}-{predicate_uri}-{o_id}"
                            edges[rel] = {
                                "subject": s_id,
                                "predicate": predicate_uri.split(":")[-1],
                                "object": o_id,
                                "knowledge_level": edge_map.get('knowledge_level'),
                                "agent_type": edge_map.get('agent_type'),
                                "primary_knowledge_source": edge_map.get('primary_knowledge_source'),
                            }


In [18]:
def format_kgx_dict(nodes, edges):
    kgx_dict = {
        "nodes": list(nodes.values()),
        "edges": list(edges.values())
    }
    return kgx_dict

# def get_nodes_and_edges(node_mappings, edge_mappings,client):
#     nodes,edges = get_biothings_api(node_mappings, client, edge_mappings)
#     return nodes, edges

# # def remove_duplicates(list_of_dicts):
# #     Remove duplicate dictionaries from a list of dictionaries.



In [19]:
def extract_biothings_apis(node_mappings, edge_mappings, client=None, size=1):
    if size == 1 or client is None:
        # Fallback to a single hardcoded example
        url = "https://biothings.ci.transltr.io/rare_source/gene/100"
        print(f"🔗 [INFO] Extracting from URL: {url}")
        response = requests.get(url)
        if response.status_code == 200:
            biothings_api_data = response.json()
            extract_biothings_nodes(node_mappings, biothings_api_data)
            extract_biothings_edges(edge_mappings, node_mappings, biothings_api_data)
            print(f"📊 [INFO] Extracted 1 record from BioThings API.")
        else:
            print(f"❌ [ERROR] Failed to fetch data from {url} (status: {response.status_code})")
    else:
        print(f"🔄 [INFO] Extracting up to {size} records using BioThings client...")
        count = 0
        for biothings_api_data in tqdm(client.query(q="__all__", fetch_all=True)):
            extract_biothings_nodes(node_mappings, biothings_api_data)
            extract_biothings_edges(edge_mappings, node_mappings, biothings_api_data)
            count += 1
            if count >= size:
                break
        print(f"📊 [INFO] Extracted {count} records from BioThings API using client.")



In [20]:

# batch_size = 1000  # or whatever works best
# query_results = client.query(q="__all__", scroll=True, size=batch_size)
# for batch in tqdm(query_results):
#     extract_biothings_nodes(node_mappings, batch)
#     extract_biothings_edges(edge_mappings, node_mappings, batch)
#     count += len(batch)
#     if count >= size:
#         break


In [21]:
def write_kgx_to_json(kgx_data, output_file):
    # Write the dictionary to a JSON file
    with open(output_file, "w") as json_file:
        json.dump(kgx_data, json_file, indent=4)  # Use indent for pretty formatting
    print(f"\n📝 [INFO] KGX data written to {output_file}")

In [22]:
def write_kgx_to_jsonl(kgx_data, nodes_jsonl_file, edges_jsonl_file):
    # Write nodes to JSONL
    with open(nodes_jsonl_file, "w") as nodes_file:
        for node in kgx_data["nodes"]:
            json_str = json.dumps(node)
            nodes_file.write(json_str + "\n")
            # json_str = json.dumps(node, separators=(', ', ': '))
            # json_str = json_str.replace('{', '{ ', 1)  # add space after first '{'
            # nodes_file.write(json_str + "\n")

    # Write edges to JSONL
    with open(edges_jsonl_file, "w") as edges_file:
        for edge in kgx_data["edges"]:
            json_str = json.dumps(edge)
            edges_file.write(json_str + "\n")
            # json_str = json.dumps(edge, separators=(', ', ': '))
            # json_str = json_str.replace('{', '{ ', 1)  # add space after first '{'
            # edges_file.write(json_str + "\n")

    print(f"📝 [INFO] KGx nodes written to {nodes_jsonl_file}")
    print(f"📝 [INFO] KGx edges written to {edges_jsonl_file}")


In [32]:
from datetime import datetime

def run_kgx_pipeline(client, smartapi_url, debug=False, doc_size=10000):
    print("🚀 [INFO] Starting KGx conversion pipeline...")

    # === Initialization ===
    global nodes, edges
    nodes = {}
    edges = {}

    # === Get smartapi metadata ===
    print(f"🔗 [INFO] Querying SmartAPI metadata: {smartapi_url}")
    # Send the request and retrieve data
    response = requests.get(smartapi_url)
    data = response.json()

    # === Step 1: Create Biolink mappings from SmartAPI metadata ===
    print("🛠️ [INFO] Creating Biolink mappings...")
    node_mappings, edge_mappings = make_biolink_data_maps(data)

    # === Step 1.1: Add hardcoded property mappings for Disease and Gene ===
    disease_dict = {
        "omim": "raresource.disease.omim",
        "orphanet": "raresource.disease.orphanet",
        "gard": "raresource.disease.gard",
        "umls": "raresource.disease.umls",
        "mesh": "raresource.disease.mesh",
        "name": "raresource.disease.name",
        "icd10cm": "raresource.disease.icd10cm"
    }

    gene_dict = {
        "hgnc": "hgnc",
        # "name": "name",
        "ensemblgene": "ensemblgene",
        "symbol": "symbol",
        "name": "description"
    }

    for s_uri, mapping in node_mappings.items():
        props = mapping.get("properties", {})
        if "Disease" in s_uri:
            for key, val in disease_dict.items():
                props.setdefault(key, val)

        if "Gene" in s_uri:
            for key, val in gene_dict.items():
                props.setdefault(key, val)

            # Remove overly generic 'disease' field
            props.pop("disease", None)
            mapping["properties"] = props

    print("\n[DEBUG] Node Mappings:")
    pprint.pprint(node_mappings, indent=4)

    print("\n[DEBUG] Edge Mappings:")
    pprint.pprint(edge_mappings, indent=4)

    # Write node mappings to a documentation file
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_filename = f"node_mappings_{timestamp}.txt"

    with open(output_filename, "w") as doc_file:
        pprint.pprint(node_mappings, stream=doc_file, indent=4)
        pprint.pprint(edge_mappings, stream=doc_file, indent=4)

    print(f"📝 [INFO] Node mappings written to {output_filename}")

    # # === Step 2: Extract nodes and edges using BioThings API ===
    print("🔄 [INFO] Extracting nodes and edges from BioThings API...")
    extract_biothings_apis(node_mappings, edge_mappings, client, size=doc_size)
    print(f"📊 [INFO] Extracted {len(nodes)} nodes and {len(edges)} edges")

    # === Step 3: Format the graph as KGX-compatible dictionary ===
    print("📦 [INFO] Formatting graph as KGX-compatible dictionary...")
    kgx_dict = format_kgx_dict(nodes, edges)
    print("✅ [INFO] KGx dictionary formatted successfully.")
    return kgx_dict


<!-- --- -->

In [48]:
api_name = "rare_source"
api_id = "b772ebfbfa536bba37764d7fddb11d6f"
bt_client = biothings_client.get_client(url=f"https://biothings.ci.transltr.io/{api_name}")
smartapi_url = f"https://smart-api.info/api/metakg/?q=api.smartapi.id:{api_id}&bte=1&meta=1&consolidated=0&size=100"
kgx_dict = run_kgx_pipeline(bt_client, smartapi_url, debug=True, doc_size=10000)
if kgx_dict:
    print(f"🟢 KGX pipeline executed successfully.")
else:
    print(f"🔴 KGX pipeline execution failed.")

🚀 [INFO] Starting KGx conversion pipeline...
🔗 [INFO] Querying SmartAPI metadata: https://smart-api.info/api/metakg/?q=api.smartapi.id:b772ebfbfa536bba37764d7fddb11d6f&bte=1&meta=1&consolidated=0&size=100
🛠️ [INFO] Creating Biolink mappings...

[DEBUG] Node Mappings:
{   'biolink:Disease': {   'identifiers': [   {   'path': 'raresource.disease.orphanet',
                                                  'prefix': 'orphanet'},
                                              {   'path': 'raresource.disease.umls',
                                                  'prefix': 'UMLS'}],
                           'properties': {   'gard': 'raresource.disease.gard',
                                             'icd10cm': 'raresource.disease.icd10cm',
                                             'mesh': 'raresource.disease.mesh',
                                             'name': 'raresource.disease.name',
                                             'omim': 'raresource.disease.omim',
         

2001it [00:01, 1270.75it/s]No more results to return.
2901it [00:02, 1368.54it/s]

📊 [INFO] Extracted 2901 records from BioThings API using client.
📊 [INFO] Extracted 6638 nodes and 8155 edges
📦 [INFO] Formatting graph as KGX-compatible dictionary...
✅ [INFO] KGx dictionary formatted successfully.
🟢 KGX pipeline executed successfully.





In [49]:
from datetime import datetime

# Get the current date and format it as YYYYMMDD
current_datetime = datetime.now().strftime("%Y%m%d")

# Add the date to the output filename
output_filename = f"raresource_kgx_new_map_full-doc_{current_datetime}.json"

# Write the KGX data to the JSON file
write_kgx_to_json(kgx_dict, output_filename)


📝 [INFO] KGX data written to raresource_kgx_new_map_full-doc_20250618.json


In [104]:
# nodes_jsonl_file = f"raresource_nodes_1-doc_{current_datetime}.jsonl"
# edges_jsonl_file = f"raresource_edges_1-doc_{current_datetime}.jsonl"

# write_kgx_to_jsonl(kgx_dict, nodes_jsonl_file, edges_jsonl_file)

📝 [INFO] KGx nodes written to raresource_nodes_1-doc_20250610.jsonl
📝 [INFO] KGx edges written to raresource_edges_1-doc_20250610.jsonl


<!-- ### Validation -->

In [95]:
def validate_jsonl(file_path):
    with open(file_path, 'r') as file:
        for line_number, line in enumerate(file, start=1):
            try:
                json.loads(line)  # Attempt to parse the line as JSON
            except json.JSONDecodeError as e:
                print(f"Invalid JSON on line {line_number}: {e}")
                return False
    print("All lines are valid JSON.")
    return True

In [None]:
def validate_kgx_against_raw_and_metadata(kgx_data, original_data, smartapi_metadata):
    errors = []
    results = []

    # --- Step 1: Validate node ---
    expected_id = f"NCBIGene:{original_data['entrezgene']}"
    node = next((n for n in kgx_data['nodes'] if n['id'] == expected_id), None)
    
    if not node:
        errors.append(f"Node for {expected_id} missing in KGX output.")
    else:
        for field in ['name', 'symbol', 'hgnc', 'ensemblgene']:
            if str(node.get(field)) != str(original_data.get(field)):
                errors.append(f"Node field mismatch for {field}: KGX has {node.get(field)}, original has {original_data.get(field)}")

    # --- Step 2: Build set of valid disease IDs from original ---
    valid_disease_ids = set()
    disease_list = original_data.get("raresource", {}).get("disease", [])

    # Extract mapping prefixes from SmartAPI metadata
    mapping_fields = {}
    for hit in smartapi_metadata["hits"]["hits"]:
        mapping = hit["_source"]["api"]["bte"].get("response_mapping", {}).get("gene_associated_with_condition", {})
        for prefix, field_path in mapping.items():
            mapping_fields[prefix.lower()] = field_path  # e.g., 'umls': 'raresource.disease.umls'

    # Extract IDs using response_mapping
    for disease in disease_list:
        for prefix, field_path in mapping_fields.items():
            parts = field_path.split(".")
            value = disease
            for key in parts[2:]:  # skip 'raresource.disease'
                value = value.get(key) if isinstance(value, dict) else None
            if value:
                full_id = f"{prefix}:{value}"
                valid_disease_ids.add(full_id)

    # --- Step 3: Validate edges ---
    for edge in kgx_data['edges']:
        obj = edge.get('object')
        subj = edge.get('subject')
        pred = edge.get('predicate')

        if obj not in valid_disease_ids:
            errors.append(f"Edge object ID {obj} not found in valid IDs extracted from original data.")

        if subj != expected_id:
            errors.append(f"Edge subject {subj} does not match expected gene ID {expected_id}")

        if pred != 'gene_associated_with_condition':
            errors.append(f"Unexpected predicate: {pred}")

        # Confirm source info
        if edge.get("primary_knowledge_source") != "infores:rare-source":
            errors.append(f"Incorrect knowledge source: {edge.get('primary_knowledge_source')}")

        results.append({
            "subject": subj,
            "object": obj,
            "valid": obj in valid_disease_ids and subj == expected_id
        })

    return {
        "valid_edges": [r for r in results if r["valid"]],
        "invalid_edges": [r for r in results if not r["valid"]],
        "errors": errors
    }


In [None]:
# result = validate_kgx_against_raw_and_metadata(kgx_data, original_data, smartapi_metadata)
# print("Errors:")
# for e in result['errors']:
#     print(" -", e)

# print("\nValid edges:", len(result['valid_edges']))
# print("Invalid edges:", len(result['invalid_edges']))


In [100]:

# # Example usage
# edge_file_path = "/Users/nacosta/Documents/Translator/KGx_conversion/raresource_edges_full_20250610.jsonl"
# validate_jsonl(edge_file_path)


# # Example usage
# node_file_path = "/Users/nacosta/Documents/Translator/KGx_conversion/raresource_nodes_full_20250610.jsonl"
# validate_jsonl(edge_file_path)

All lines are valid JSON.
All lines are valid JSON.


True

In [227]:
# # def kgx_validate():
# !kgx validate -i jsonl "raresource_nodes_full.jsonl"

{}
