# Generate RTX-KG2 Metanames Parquet for Kuzu

In [2]:
import gzip
import json
import pathlib
import shutil
from typing import Any, Dict, Generator, List, Literal

import awkward as ak
import duckdb
import ijson
import kuzu
import pyarrow as pa
import requests
from genson import SchemaBuilder
from pyarrow import parquet
from rtx_kg2_functions import (
    count_items_under_top_level_name,
    drop_table_if_exists,
    find_top_level_names,
    generate_cypher_table_create_stmt_from_parquet_file,
    parse_items_by_topmost_item_name,
    parse_metadata_by_object_name,
)

In [19]:
# set data to be used throughout notebook
chunk_size = 500000
data_dir = "data"
parquet_dir = f"{data_dir}/"
source_data_url = "https://github.com/ncats/translator-lfs-artifacts/raw/main/files/kg2c_lite_2.8.4.json.gz"
target_extracted_sample_data = (
    f"{data_dir}/{pathlib.Path(source_data_url).name.replace('.json.gz', '.json')}"
)
parquet_dir = target_extracted_sample_data.replace(".json", ".full.dataset.parquet")
parquet_metanames_dir = target_extracted_sample_data.replace(
    ".json", ".full.with-metanames.dataset.parquet"
)
kuzu_dir = target_extracted_sample_data.replace(
    ".json", ".full.with-metanames.dataset.kuzu"
)
target_extracted_sample_data_schema_file = target_extracted_sample_data.replace(
    ".json", ".schema.json"
)
print(f"Kuzu dir: {kuzu_dir}")

Kuzu dir: data/kg2c_lite_2.8.4.full.with-metanames.dataset.kuzu


In [6]:
# create path for the kuzu database to reside
if pathlib.Path(kuzu_dir).is_dir():
    shutil.rmtree(kuzu_dir)
pathlib.Path(kuzu_dir).mkdir(exist_ok=True)

In [9]:
# create path for parquet metanames to reside
if pathlib.Path(parquet_metanames_dir).is_dir():
    shutil.rmtree(parquet_metanames_dir)
pathlib.Path(parquet_metanames_dir).mkdir(exist_ok=True)

In [7]:
# init a Kuzu database and connection
db = kuzu.Database(f"{kuzu_dir}")
kz_conn = kuzu.Connection(db)

In [8]:
def gather_table_names_from_parquet_path(
    parquet_path: str,
    column_with_table_name: str = "id",
):
    # return distinct table types as set comprehension
    return set(
        # create a parquet dataset and read a single column as an array
        parquet.ParquetDataset(parquet_path)
        .read(columns=[column_with_table_name])[column_with_table_name]
        .to_pylist()
    )

In [25]:
for path, table_name_column in [
    [f"{parquet_dir}/nodes", "category"],
    [f"{parquet_dir}/edges", "predicate"],
]:

    # create object base dir
    parquet_metanames_object_base = f"{parquet_metanames_dir}/{pathlib.Path(path).name}"
    pathlib.Path(parquet_metanames_object_base).mkdir(exist_ok=True)

    for table_name in gather_table_names_from_parquet_path(
        parquet_path=path, column_with_table_name=table_name_column
    ):
        # create metanames / objects using cypher safe name and dir
        cypher_safe_table_name = table_name.split(":")[1]
        parquet_metanames_metaname_base = (
            f"{parquet_metanames_object_base}/{cypher_safe_table_name}"
        )
        pathlib.Path(parquet_metanames_metaname_base).mkdir(exist_ok=True)

        print(f"Exporting metaname tables for: {table_name}")
        if pathlib.Path(path).name == "nodes":
            # determine rowcount for offsetting parquet files by metaname
            with duckdb.connect() as ddb:
                rowcount = int(
                    ddb.execute(
                        f"""
                        SELECT COUNT(*) as count
                        FROM read_parquet('{path}/*')
                        WHERE {table_name_column}='{table_name}';
                        """
                    ).fetchone()[0]
                )

            # create a chunk offsets list for chunked parquet output by metaname
            chunk_offsets = list(
                range(
                    0,
                    # gather rowcount from table and use as maximum for range
                    rowcount,
                    # step through using chunk size
                    chunk_size,
                )
            )

            # gather chunks of data by metaname and export to file using chunked offsets
            with duckdb.connect() as ddb:
                for idx, offset in enumerate(chunk_offsets):
                    ddb.execute(
                        f"""
                        COPY (
                            SELECT *
                            FROM read_parquet('{path}/*')
                            WHERE {table_name_column}='{table_name}'
                            LIMIT {chunk_size} OFFSET {offset}
                        )
                        TO '{parquet_metanames_metaname_base}/{cypher_safe_table_name}.{idx}.parquet' (FORMAT PARQUET);
                        """
                    )
        elif pathlib.Path(path).name == "edges":

            nodes_path = path.replace("edges", "nodes")
            with duckdb.connect() as ddb:
                distinct_node_type_pairs = ddb.execute(
                    f"""
                    SELECT DISTINCT 
                        split_part(subj_node.category, ':', 2) as subj_category,
                        split_part(obj_node.category, ':', 2) as obj_category
                    FROM read_parquet('{path}/*') edge
                    LEFT JOIN read_parquet('{nodes_path}/*') AS subj_node ON
                        edge.subject = subj_node.id
                    LEFT JOIN read_parquet('{nodes_path}/*') AS obj_node ON
                        edge.object = obj_node.id
                    WHERE edge.predicate='{table_name}'
                    """
                ).fetchall()

            for subj, obj in distinct_node_type_pairs:
                
                # determine rowcount for offsetting parquet files by metaname
                with duckdb.connect() as ddb:
                    rowcount = int(
                        ddb.execute(
                            f"""
                            SELECT COUNT(*) as count
                            FROM read_parquet('{path}/*')
                            WHERE {table_name_column}='{table_name}';
                            """
                        ).fetchone()[0]
                    )
    
                # create a chunk offsets list for chunked parquet output by metaname
                chunk_offsets = list(
                    range(
                        0,
                        # gather rowcount from table and use as maximum for range
                        rowcount,
                        # step through using chunk size
                        chunk_size,
                    )
                )
    
                # gather chunks of data by metaname and export to file using chunked offsets
                with duckdb.connect() as ddb:
                    for idx, offset in enumerate(chunk_offsets):
                        ddb.execute(
                            f"""
                            COPY (
                                SELECT *
                                FROM read_parquet('{path}/*')
                                WHERE {table_name_column}='{table_name}'
                                LIMIT {chunk_size} OFFSET {offset}
                            )
                            TO '{parquet_metanames_metaname_base}/{cypher_safe_table_name}.{idx}.parquet' (FORMAT PARQUET);
                            """
                        )

Exporting metaname tables for: biolink:Transcript
Exporting metaname tables for: biolink:MicroRNA
Exporting metaname tables for: biolink:Behavior
Exporting metaname tables for: biolink:Drug
Exporting metaname tables for: biolink:MolecularActivity
Exporting metaname tables for: biolink:GrossAnatomicalStructure
Exporting metaname tables for: biolink:Exon
Exporting metaname tables for: biolink:ChemicalEntity
Exporting metaname tables for: biolink:Phenomenon
Exporting metaname tables for: biolink:CellLine
Exporting metaname tables for: biolink:GenomicEntity
Exporting metaname tables for: biolink:Device
Exporting metaname tables for: biolink:Cell
Exporting metaname tables for: biolink:LifeStage
Exporting metaname tables for: biolink:Publication
Exporting metaname tables for: biolink:EnvironmentalProcess
Exporting metaname tables for: biolink:CellularComponent
Exporting metaname tables for: biolink:Gene
Exporting metaname tables for: biolink:NoncodingRNAProduct
Exporting metaname tables for:

In [32]:
with duckdb.connect() as ddb:
    result = ddb.execute("""
                SELECT DISTINCT 
                split_part(subj_node.category, ':', 2) as subj_category,
                        split_part(obj_node.category, ':', 2) as obj_category
            FROM read_parquet('data/kg2c_lite_2.8.4.full.dataset.parquet/edges/*') edge
            LEFT JOIN read_parquet('data/kg2c_lite_2.8.4.full.dataset.parquet/nodes/*') AS subj_node ON
                edge.subject = subj_node.id
            LEFT JOIN read_parquet('data/kg2c_lite_2.8.4.full.dataset.parquet/nodes/*') AS obj_node ON
                edge.object = obj_node.id
            WHERE edge.predicate='biolink:causes'
    """).fetchall()
result

[('Activity', 'Disease'),
 ('InformationContentEntity', 'Disease'),
 ('PhenotypicFeature', 'PhysiologicalProcess'),
 ('Gene', 'Disease'),
 ('GenomicEntity', 'PhysiologicalProcess'),
 ('ChemicalEntity', 'MolecularActivity'),
 ('SmallMolecule', 'InformationContentEntity'),
 ('SmallMolecule', 'Procedure'),
 ('ChemicalEntity', 'Phenomenon'),
 ('OrganismTaxon', 'InformationContentEntity'),
 ('Gene', 'Activity'),
 ('MolecularMixture', 'PhysiologicalProcess'),
 ('ClinicalAttribute', 'PhenotypicFeature'),
 ('ComplexMolecularMixture', 'Disease'),
 ('Phenomenon', 'PhenotypicFeature'),
 ('SmallMolecule', 'BiologicalEntity'),
 ('Procedure', 'PhysiologicalProcess'),
 ('NucleicAcidEntity', 'PhysiologicalProcess'),
 ('NucleicAcidEntity', 'GeneticInheritance'),
 ('OrganismTaxon', 'BiologicalEntity'),
 ('Cell', 'PhenotypicFeature'),
 ('SmallMolecule', 'Cohort'),
 ('CellularComponent', 'PhenotypicFeature'),
 ('Device', 'NamedThing'),
 ('Protein', 'OrganismAttribute'),
 ('Disease', 'OrganismAttribute'),


In [29]:
def gather_parquet_dataset_size_from_path(parquet_path: str):
    with duckdb.connect() as ddb:
        return int(
            ddb.execute(
                f"""
                SELECT COUNT(*) as count
                FROM read_parquet('{parquet_path}')
                """
            ).fetchone()[0]
        )


for source, target in [
    [
        "data/kg2c_lite_2.8.4.full.dataset.parquet/nodes/*",
        "data/kg2c_lite_2.8.4.full.with-metanames.dataset.parquet/nodes/**",
    ],
    [
        "data/kg2c_lite_2.8.4.full.dataset.parquet/edges/*",
        "data/kg2c_lite_2.8.4.full.with-metanames.dataset.parquet/edges/**",
    ],
]:
    if gather_parquet_dataset_size_from_path(
        source
    ) != gather_parquet_dataset_size_from_path(target):
        raise ValueError(
            f"Inequal number of rows from parquet source {source} to metanames target {target}."
        )
    else:
        print(f"Equal values from parquet source {source} to metanames target {target}")

Equal values from parquet source data/kg2c_lite_2.8.4.full.dataset.parquet/nodes/* to metanames target data/kg2c_lite_2.8.4.full.with-metanames.dataset.parquet/nodes/**
Equal values from parquet source data/kg2c_lite_2.8.4.full.dataset.parquet/edges/* to metanames target data/kg2c_lite_2.8.4.full.with-metanames.dataset.parquet/edges/**
