# Generate RTX-KG2 Full Parquet to Kuzu

In [2]:
import gzip
import json
import pathlib
import shutil
from typing import Any, Dict, Generator, List, Literal

import awkward as ak
import ijson
import kuzu
import pyarrow as pa
import requests
from genson import SchemaBuilder
from pyarrow import parquet
from notebooks.rtx_kg2_functions import (
    count_items_under_top_level_name,
    drop_table_if_exists,
    find_top_level_names,
    generate_cypher_table_create_stmt_from_parquet_file,
    parse_items_by_topmost_item_name,
    parse_metadata_by_object_name,
)


In [3]:
# set data to be used throughout notebook
chunk_size = 1
data_dir = "../data"
parquet_dir = f"{data_dir}/"
source_data_url = "https://github.com/ncats/translator-lfs-artifacts/raw/main/files/kg2c_lite_2.8.4.json.gz"
target_extracted_sample_data = (
    f"{data_dir}/{pathlib.Path(source_data_url).name.replace('.json.gz', '.json')}"
)
parquet_dir = target_extracted_sample_data.replace(".json", ".full.dataset.parquet")
kuzu_dir = target_extracted_sample_data.replace(".json", ".full.dataset.kuzu")
target_extracted_sample_data_schema_file = target_extracted_sample_data.replace(
    ".json", ".schema.json"
)
print(f"Kuzu dir: {kuzu_dir}")

Kuzu dir: data/kg2c_lite_2.8.4.full.dataset.kuzu


In [4]:
# create path for the kuzu database to reside
if pathlib.Path(kuzu_dir).is_dir():
    shutil.rmtree(kuzu_dir)
pathlib.Path(kuzu_dir).mkdir(exist_ok=True)

In [5]:
# init a Kuzu database and connection
db = kuzu.Database(f"{kuzu_dir}")
kz_conn = kuzu.Connection(db)

In [6]:
dataset_name_to_cypher_table_type_map = {"nodes": "node", "edges": "rel"}

drop_table_if_exists(kz_conn=kz_conn, table_name="edges")
drop_table_if_exists(kz_conn=kz_conn, table_name="nodes")

# note: we provide specific ordering here to ensure nodes are created before edges
for path in [f"{parquet_dir}/nodes", f"{parquet_dir}/edges"]:

    # use first file discovered as basis for schema
    first_pq_file = next(pathlib.Path(path).glob("*.parquet"))

    create_stmt = generate_cypher_table_create_stmt_from_parquet_file(
        parquet_file=first_pq_file,
        table_type=dataset_name_to_cypher_table_type_map[first_pq_file.parent.name],
        table_name=first_pq_file.parent.name,
        table_pkey_parquet_field_name="id",
    )

    print(f"Using the following create statement to create table:\n\n{create_stmt}\n\n")
    kz_conn.execute(create_stmt)

Binder exception: Table edges does not exist.
Binder exception: Table nodes does not exist.
Using the following create statement to create table:

CREATE NODE TABLE nodes(id STRING, name STRING, all_categories STRING[], category STRING, PRIMARY KEY (id))


Using the following create statement to create table:

CREATE REL TABLE edges(FROM nodes TO nodes, qualified_object_aspect STRING, predicate STRING, domain_range_exclusion STRING, qualified_object_direction STRING, id INT64, primary_knowledge_source STRING, qualified_predicate STRING)




In [None]:
# note: we provide specific ordering here to ensure nodes are created before edges
for path in [f"{parquet_dir}/nodes", f"{parquet_dir}/edges"]:

    print(f"Working on kuzu ingest of parquet dataset: {path} ")
    # uses wildcard functionality for all files under parquet dataset dir
    # see: https://kuzudb.com/docusaurus/data-import/csv-import#copy-from-multiple-csv-files-to-a-single-table
    kz_conn.execute(f'COPY {pathlib.Path(path).name} FROM "{path}/*.parquet"')

Working on kuzu ingest of parquet dataset: data/kg2c_lite_2.8.4.full.dataset.parquet/nodes 
