In [3]:
from clinvar_gk_pilot.gcs import (
    _local_file_path_for,
    download_to_local_file,
    already_downloaded,
)

# variation_blob_uri = "gs://clingen-public/clinvar-gk-pilot/2024-04-07/dev/clinvar-variation-20240407.json.gz"
# scv_blob_uri = (
#     "gs://clingen-public/clinvar-gk-pilot/2024-04-07/dev/clinvar-scvs-20240407.json.gz"
# )

catvar_blob_uri = (
    "gs://clinvar-gk-pilot/2024-04-07/dev/combined-catvar_output.ndjson.gz"
)
scv_blob_uri = "gs://clinvar-gk-pilot/2024-04-07/dev/combined-scv_output.ndjson.gz"

catvar_file = "combined-catvar_output.ndjson.gz"


variation_local_file_path = _local_file_path_for(catvar_blob_uri)
if not already_downloaded(catvar_blob_uri):
    print(f"Downloading {catvar_blob_uri} to {variation_local_file_path}")
    dl_variation_local_file_path = download_to_local_file(catvar_blob_uri)
    assert dl_variation_local_file_path == variation_local_file_path

scv_local_file_path = _local_file_path_for(scv_blob_uri)
if not already_downloaded(scv_blob_uri):
    print(f"Downloading {scv_blob_uri} to {scv_local_file_path}")
    dl_scv_local_file_path = download_to_local_file(scv_blob_uri)
    assert dl_scv_local_file_path == scv_local_file_path

catvar_file = variation_local_file_path
scv_file = scv_local_file_path

Our ClinVar datasets are available as NDJSON files. There is a variation file and an SCV file. The records of the variation file are CategoricalVariation objects, and the records of the SCV file are `VariationPathogenicity` (sub-class of `Statement`)

In [4]:
import os
import gzip
import json

################################
# Query the SCV file for a VRS ID using vanilla Python
#
# - for a given ClinVar Variation ID, find the corresponding GA4GH CatVar record in the CatVar
#   file and find the SCVs which reference that variant in the SCV file
#
#   (NOTE: the SCV file also contains the full CatVar definition in the "variation" field, but
#    this example illustrates how to query across both files, since the SCV file can be
#    relationally normalized to extract that redundant entity and refer to the variant
#    by the CatVar ID as a foreign key)
#
# - print the SCV interpretations for that variant
#
################################
################################
# Inputs

################################
# A CanonicalAllele
## For searching based on the GKS Categorical Variation (CatVrs) ID
clinvar_id = "563765"
## For searching based on the GA4GH VRS Variation ID
vrs_id = "ga4gh:VA.hf_va4AnlG99NuOjtaXJzh_XvszWWOO9"


################################
# A CategoricalCnv
## For searching based on the GKS Categorical Variation (CatVrs) ID
clinvar_id = "599353"
## For searching based on the GA4GH VRS Variation ID
vrs_id = "ga4gh:CX.5iqyOA4L5njh5FpymTPcwQ8oHTilQFmo"  # GRCh38 member


catvar_file = "combined-catvar_output.ndjson.gz"
scv_file = "combined-scv_output.ndjson.gz"
################################
assert os.path.exists(catvar_file)
assert os.path.exists(scv_file)
catvar_id = f"clinvar:{clinvar_id}"

In [5]:
################################
# Query the SCV file for the matching VRS ID
################################


def query_scvs_by_vrs_id(vrs_id: str, scv_file: str):
    scvs = []
    catvars = []
    with gzip.open(scv_file, "rt") as f:
        for line in f:
            record = json.loads(line)
            variation = record["variation"]
            processing_errors = [
                e
                for e in variation.get("extensions", [])
                if e["name"] == "vrs processing errors"
            ]
            if len(processing_errors) > 0:
                # print(f"Skipping SCV record with VRS processing errors: {line}")
                continue

            match variation["type"]:
                case "CategoricalCnv":
                    if "members" not in variation:
                        # Unsupported?
                        # e.g. "clinvar:1878325"
                        # "NC_000018.9:g.(48556994_48573289)_48573471dup"
                        # raise ValueError(f"CategoricalCnv missing members field: {line}")
                        continue
                    members = variation["members"]
                    member_vrs_ids = [m["id"] for m in members]
                    if vrs_id in member_vrs_ids:
                        scvs.append(record)

                case "CanonicalAllele":
                    if "definingContext" not in variation:
                        # Unsupported allele type?
                        # e.g. clinvar:215984
                        # "NM_000251.2(MSH2):c.212-?_366+?dup"
                        # raise ValueError(f"CanonicalAllele missing definingContext field: {line}")
                        continue
                    if variation["definingContext"]["id"] == vrs_id:
                        scvs.append(record)
                case "DescribedVariation":
                    # not an error in processing, but does not have any VRS IDs
                    continue
                    # raise ValueError(f"DescribedVariation not yet implemented: {line}")
                case _:
                    raise ValueError(f"Unexpected variation type: {variation['type']}")
    return scvs


scvs = query_scvs_by_vrs_id(vrs_id, scv_file)

In [6]:
for scv in scvs:
    classification = scv["classification"]["label"]
    condition = scv["condition"]["label"]
    print(f"SCV: {scv['id']} ")
    print(f"  Classification: {classification}")
    print(f"  Condition: {condition}")
    print()

SCV: SCV000864190.1 
  Classification: Likely pathogenic
  Condition: Squalene synthase deficiency



In [None]:
################################
# load the SCV ndjson into a sqlite database with a few indexes on the scv, catvar, and vrs IDs
################################
# # sqlite schema:
# # scv_id: str (index)
# # catvar_id: str (index)
# # vrs_id: str (index) # Can have multiple records if a record has multiple defining VRS IDs
# # value: the blob of the SCV record gziped and as bytes

# import sqlite3

# db_name = "clinvar.db"
# conn = sqlite3.connect(db_name)

# # Create the schema and the indexes on columns
# c = conn.cursor()
# c.execute(
#     """
#     CREATE TABLE IF NOT EXISTS scv (
#         scv_id TEXT,
#         catvar_id TEXT,
#         vrs_id TEXT,
#         value BLOB,
#     )
#     """
# )
# c.execute("CREATE INDEX IF NOT EXISTS scv_id_index ON scv (scv_id)")
# c.execute("CREATE INDEX IF NOT EXISTS catvar_id_index ON scv (catvar_id)")
# c.execute("CREATE INDEX IF NOT EXISTS vrs_id_index ON scv (vrs_id)")
# # Create a uniqueness constraint so these cannot be duplicated
# c.execute(
#     "CREATE UNIQUE INDEX IF NOT EXISTS scv_catvar_vrs_id_index ON scv (scv_id, catvar_id, vrs_id)"
# )
# conn.commit()

# with gzip.open(scv_file, "rt") as f:
#     for line in f:
#         record = json.loads(line)

#         variation = record["variation"]
#         processing_errors = [
#             e
#             for e in variation.get("extensions", [])
#             if e["name"] == "vrs processing errors"
#         ]
#         if len(processing_errors) > 0:
#             # print(
#             #     f"Skipping SCV record with VRS processing errors: {json.dumps(record)}"
#             # )
#             continue

#         match variation["type"]:
#             case "CategoricalCnv":
#                 if "members" not in variation:
#                     # Unsupported?
#                     # e.g. "clinvar:1878325"
#                     # "NC_000018.9:g.(48556994_48573289)_48573471dup"
#                     # raise ValueError(
#                     #     f"CategoricalCnv missing members field: {json.dumps(record)}"
#                     # )
#                     continue
#                 members = variation["members"]
#                 member_vrs_ids = [m["id"] for m in members]
#                 if vrs_id in member_vrs_ids:
#                     scvs.append(record)

#             case "CanonicalAllele":
#                 if "definingContext" not in variation:
#                     # Unsupported allele type?
#                     # e.g. clinvar:215984
#                     # "NM_000251.2(MSH2):c.212-?_366+?dup"
#                     # raise ValueError(
#                     #     f"CanonicalAllele missing definingContext field: {json.dumps(record)}"
#                     # )
#                     continue
#                 if variation["definingContext"]["id"] == vrs_id:
#                     scvs.append(record)
#             case "DescribedVariation":
#                 # do not have any VRS IDs
#                 continue
#                 # raise ValueError(
#                 #     f"DescribedVariation not yet implemented: {json.dumps(record)}"
#                 # )
#             case _:
#                 raise ValueError(f"Unexpected variation type: {variation['type']}")


While these can be read with vanilla python by iterating lines and parsing each as JSON, there are also libraries which can make querying the files simpler and potentially more performant.

One option is DuckDB. DuckDB achieves fast speeds and low memory utilization by memory mapping files and dropping rows from memory that don't match filter criteria. It also has the benefit of being able to query GZIP-compressed NDJSON files directly, so disk storage stays minimal, and presenting a SQL interface to the data, with full support of nested structs so we can access fields from nested JSON objects without manipulating the files. Another benefit is that it gracefully handles heterogeneous record schemas, automatically nulling values that don't exist in particular rows.

Another option to query the NDJSON files is the Polars library for Python.

This library is similar to Pandas, but it is written in Rust, and it supports lazily querying files without loading all contents into memory.

To read a whole NDJSON file, use `pandas.read_ndjson`. To read it lazily and only load into a dataframe the fields of rows that match applied filters, use `pandas.scan_ndjson` (as we do below).

In [None]:
import polars as pl

schema = pl.Schema({"id": pl.Utf8, "definingContext": pl.Struct({"id": pl.Utf8})})

# Load the whole file into memory.
# NOT RECOMMENDED unless you have a very large amount of memory.
# Memory requirements include the data itself plus the overhead of the Python and Polars data structures.
# import gzip
# catvar_file = "combined-catvar_output.ndjson.gz"
# with gzip.open(catvar_file, "rb") as f:
#     df = pl.read_ndjson(f, schema=schema)

# Load file lazily. Recommended for large files.
catvar_file = "combined-catvar_output.ndjson"
catvar_df = pl.scan_ndjson(
    catvar_file,
    schema=schema,
)

In [None]:
df.filter(pl.col("id") == "clinvar:209226").collect()

In [None]:
scv_file = "combined-scv_output.ndjson"
# scv_schema = pl.Schema(
scv_df = pl.scan_ndjson(scv_file)
scv_df.head().collect()

In [17]:
import gzip
import duckdb

scv_file = "combined-scv_output.ndjson.gz"
vrs_id = "ga4gh:VA.iTe7Ew7If4vdPnhfcT2qXQQY-tNu7tcF"
# d = duckdb.connect()
query = f"""
SELECT id, member, a
FROM
    (SELECT id, UNNEST(variation.members) AS member, s.*
    FROM read_json('combined-scv_output.ndjson.gz', format='newline_delimited', ignore_errors=true) s
    ) a
WHERE member.id = '{vrs_id}'

LIMIT 10
"""
# AND a.variation.type = 'CategoricalCnv'
# WHERE (variation.type = 'CategoricalCnv')
# LIMIT 10
# AND member.id = '{vrs_id}'

results = duckdb.query(query)

scv_ids = []
while batch := results.fetchmany(100):
    for row in batch:
        print(row)
        scv_ids.append(row[0])

scv_ids = [row[0] for row in batch]

# with gzip.open(scv_file) as f:
#     for line in f:
#         pass

('SCV000244507.1', {'digest': 'iTe7Ew7If4vdPnhfcT2qXQQY-tNu7tcF', 'expressions': [{'syntax': 'spdi', 'value': 'NC_000017.11:43041892:A:T'}, {'syntax': 'hgvs.g', 'value': 'NC_000017.11:g.43041893A>T'}, {'syntax': 'gnomad', 'value': '17-43041893-A-T'}], 'extensions': [{'name': 'clinvar vcf', 'value': '"17-43041893-A-T"'}, {'name': 'clinvar hgvs type', 'value': '"genomic, top-level"'}], 'id': 'ga4gh:VA.iTe7Ew7If4vdPnhfcT2qXQQY-tNu7tcF', 'label': 'NC_000017.11:43041892:A:T', 'location': {'digest': 'zdH_x64Ewl4jnJSjPQrb9ENIPb8GZOG-', 'end': '43041893', 'id': 'ga4gh:SL.zdH_x64Ewl4jnJSjPQrb9ENIPb8GZOG-', 'sequenceReference': {'extensions': [{'name': 'assembly', 'value': 'GRCh38'}, {'name': 'chromosome', 'value': '17'}], 'id': 'NC_000017.11', 'refgetAccession': 'SQ.dLZ15tNO1Ur0IcGjwc3Sdi_0A6Yf4zm7', 'residueAlphabet': 'na', 'type': 'SequenceReference'}, 'start': '43041892', 'type': 'SequenceLocation'}, 'state': '{"sequence":"T","type":"LiteralSequenceExpression"}', 'type': 'Allele', 'copies': 

In [None]:
# import gzip
# import json
# import psycopg2
# from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

# scv_file = "combined-scv_output.ndjson.gz"

# # docker volume create clinvar_data
# # docker run --name mypostgres -e POSTGRES_PASSWORD=mysecretpassword -d -p 5432:5432 -v clinvar_data:/var/lib/postgresql/data postgres

# # Connect to the default database
# conn = psycopg2.connect(
#     host="localhost", dbname="postgres", user="postgres", password="mysecretpassword"
# )
# conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

# # Create a new database
# cur = conn.cursor()
# # cur.execute("DROP DATABASE IF EXISTS clinvar")
# db_name = "clinvar"
# cur.execute("SELECT 1 FROM pg_catalog.pg_database WHERE datname = %s", (db_name,))
# db_exists = cur.fetchone()
# if not db_exists:
#     cur.execute("CREATE DATABASE IF NOT EXISTS clinvar")
# cur.close()
# conn.close()

# # with psycopg2.connect(
# #     host="localhost", dbname="postgres", user="postgres", password="mysecretpassword"
# # ) as conn:
# #     conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
# #     cur = conn.cursor()
# #     cur.execute("CREATE DATABASE clinvar")


# with psycopg2.connect(
#     host="localhost", dbname="clinvar", user="postgres", password="mysecretpassword"
# ) as conn:
#     cur = conn.cursor()
#     cur.execute("DROP TABLE IF EXISTS scv")
#     cur.execute(
#         """
#         CREATE TABLE IF NOT EXISTS scv (
#             id TEXT PRIMARY KEY,
#             value JSONB
#         )
#         """
#     )
#     cur.execute(
#         # Create a GIN index on value.definingContext.id
#         """
#         CREATE INDEX IF NOT EXISTS scv_definingContext_id_index ON scv ((value->'definingContext'->>'id'))
#         """
#     )

# import time

# start = time.time()
# last_printed = start


# def file_batch_lines(fileio, batch_size=10000):
#     """
#     Generator function to read a file and yield batches of lines.

#     :param file_path: Path to the file to be read.
#     :param batch_size: Number of lines per batch.
#     """
#     batch = []
#     for line in fileio:
#         batch.append(line)
#         if len(batch) == batch_size:
#             yield batch
#             batch = []
#     if batch:
#         yield batch


# with psycopg2.connect(
#     host="localhost", dbname="clinvar", user="postgres", password="mysecretpassword"
# ) as conn:
#     cur = conn.cursor()
#     with gzip.open(scv_file, "rt") as f:
#         idx = 0
#         for line_batch in file_batch_lines(f, batch_size=2000):
#             # mega SQL string strategy
#             records = [json.loads(line) for line in line_batch]
#             query_base = "INSERT INTO scv (id, value) VALUES "
#             values_templ = ",".join(["(%s, %s)"] * len(records))
#             values = [(r["id"], line) for r, line in zip(records, line_batch)]
#             # flatten values by 1 level
#             values = [v for sublist in values for v in sublist]
#             query = query_base + values_templ
#             cur.execute(query, values)

#             # executemany strategy
#             # cur.executemany(
#             #     "INSERT INTO scv (id, value) VALUES (%s, %s)",
#             #     [(r["id"], line) for r, line in zip(records, line_batch)],
#             # )

#             # record = json.loads(line)
#             # cur.execute(
#             #     "INSERT INTO scv (id, value) VALUES (%s, %s)",
#             #     (record["id"], json.dumps(record)),
#             # )
#             idx += len(records)
#             if time.time() - last_printed > 10:
#                 print(f"Inserted {idx} records")
#                 last_printed = time.time()
#                 cur.execute("COMMIT")