# Gather Connectivity Search `PathCount` Table

Negar mentioned needing data from a PostgreSQL database archive,
`connectivity-search-pg_dump.sql.gz`, which was created as part of
https://github.com/greenelab/connectivity-search-backend/blob/main/README.md .
The archive is available under https://zenodo.org/records/3978766 .
Only the `PathCount` Table is needed in order to extract single metapaths
at a time (needed for other work).

Additionally, we extract a `Node` table to help associate `identifier`
with `id` (internal versus external labels for data).

In [1]:
import gzip
import os
import pathlib

import duckdb
import requests

from hetionet_utils.sql import (
    extract_and_write_sql_block,
    remove_first_and_last_line_of_file,
)

# create the data dir
pathlib.Path("data").mkdir(exist_ok=True)

# url for source data
url = (
    "https://zenodo.org/records/3978766/files/"
    "connectivity-search-pg_dump.sql.gz?download=1"
)

# local archive file location
sql_file = "data/connectivity-search-pg_dump.sql.gz"

# expected number of tables within dump
expected_table_count = 15

# table which is targeted within the sql archive above
target_pathcount_table_name = "public.dj_hetmech_app_pathcount"
target_identifier_table_name = "public.dj_hetmech_app_node"

# duckdb filename
duckdb_filename = "data/connectivity-search.duckdb"

In [2]:
# gather postgresql database archive

# if the file doesn't exist, download it
if not pathlib.Path(sql_file).exists():
    # Download the file in streaming mode
    response = requests.get(url, stream=True)

    # Check if the request was successful
    response.raise_for_status()

    # Write the response content to a file in chunks
    with open(sql_file, "wb") as file:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                file.write(chunk)

pathlib.Path(sql_file).exists()

True

In [3]:
# show the tables
count = 0
create_table_names = []
with gzip.open(sql_file, "rt") as f:
    for line in f:
        # seek table creation lines
        if "CREATE TABLE" in line:
            # append a cleaned up line from the table creation statement
            # so we may gather the table name.
            create_table_names.append(
                line.strip().replace(" (", "").replace("CREATE TABLE ", "")
            )
            count += 1
            # there are roughly 15 tables
            # so we break here to avoid further processing
            if count == expected_table_count:
                break
create_table_names

['public.auth_group',
 'public.auth_group_permissions',
 'public.auth_permission',
 'public.auth_user',
 'public.auth_user_groups',
 'public.auth_user_user_permissions',
 'public.dj_hetmech_app_degreegroupedpermutation',
 'public.dj_hetmech_app_metanode',
 'public.dj_hetmech_app_metapath',
 'public.dj_hetmech_app_node',
 'public.dj_hetmech_app_pathcount',
 'public.django_admin_log',
 'public.django_content_type',
 'public.django_migrations',
 'public.django_session']

In [4]:
# gather the create table statements for each table
for table_name in create_table_names:
    extract_and_write_sql_block(
        sql_file=sql_file,
        sql_start=f"CREATE TABLE {table_name}",
        sql_end=";",
        output_file=(create_pathcount_table_file := f"create_table.{table_name}.sql"),
    )

In [5]:
# show the create table statements
for table_name in create_table_names:
    with open(f"create_table.{table_name}.sql", "r") as table_create_sql:
        table_sql = "".join(table_create_sql.readlines())

    print(table_sql)

CREATE TABLE public.auth_group (
    id integer NOT NULL,
    name character varying(150) NOT NULL
);

CREATE TABLE public.auth_group_permissions (
    id integer NOT NULL,
    group_id integer NOT NULL,
    permission_id integer NOT NULL
);

CREATE TABLE public.auth_permission (
    id integer NOT NULL,
    name character varying(255) NOT NULL,
    content_type_id integer NOT NULL,
    codename character varying(100) NOT NULL
);

CREATE TABLE public.auth_user (
    id integer NOT NULL,
    password character varying(128) NOT NULL,
    last_login timestamp with time zone,
    is_superuser boolean NOT NULL,
    username character varying(150) NOT NULL,
    first_name character varying(30) NOT NULL,
    last_name character varying(150) NOT NULL,
    email character varying(254) NOT NULL,
    is_staff boolean NOT NULL,
    is_active boolean NOT NULL,
    date_joined timestamp with time zone NOT NULL
);

CREATE TABLE public.auth_user_groups (
    id integer NOT NULL,
    user_id integer NO

In [6]:
# gather the data for populating the tables
# note: this can take a while!
# (we're extracting large portions of TSV data
# from a single file.)
for table_name in create_table_names:
    copy_data_file = f"copy_data.{table_name}.tsv"

    # only create the file if we don't already have it.
    if not pathlib.Path(copy_data_file).is_file():
        extract_and_write_sql_block(
            sql_file=sql_file,
            sql_start=f"COPY {table_name}",
            sql_end="\\.",
            output_file=copy_data_file,
        )
        # replace the first and last lines of the copy files
        # as these are the header and data termination lines
        # which have no actual values.
        remove_first_and_last_line_of_file(target_file=copy_data_file)

In [7]:
# create the tables within a duckdb database
if not pathlib.Path(duckdb_filename).is_file():
    with duckdb.connect(duckdb_filename) as ddb:
        for table_name in create_table_names:
            with open(f"create_table.{table_name}.sql", "r") as table_create_sql:
                # read the table creation sql into duckdb execution
                # replace "public." for table naming, and "jsonb" to
                # align data typing from postrgres to duckdb (duckdb
                # includes no "jsonb" type but is compatible with the
                # insertion data in the form "json").
                ddb.execute(
                    "".join(table_create_sql.readlines())
                    .replace("public.", "")
                    .replace("jsonb", "json")
                )

In [8]:
# copy the data from the files to duckdb database
# using tab-delimited files.
# note: this can take a while!
# (we're ingesting data from TSV format into DuckDB)
with duckdb.connect(duckdb_filename) as ddb:
    for table_name in create_table_names:
        # only copy data if we have data to copy
        if os.path.getsize(copy_data_file := f"copy_data.{table_name}.tsv") > 0:
            table_name = table_name.replace("public.", "")  # noqa: PLW2901

            # only populate the table if it hasn't already been
            # populated.
            row_count = ddb.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
            if row_count == 0:
                ddb.execute(
                    f"""
                    COPY {table_name}
                    FROM '{copy_data_file}'
                    (DELIMITER '\t', HEADER false);
                    """
                )

In [9]:
# read and export data to parquet for simpler use
target_file = "./data/connectivity-search-precalculated-metapath-data.parquet"
if not pathlib.Path(target_file).is_file():
    with duckdb.connect(duckdb_filename) as ddb:
        # copy data directly to Parquet from DuckDB
        ddb.execute(
            f"""
            COPY (
                WITH combinations AS (
                    SELECT
                        source.id AS source_id,
                        target.id AS target_id,
                        pathcount.metapath_id,
                        pathcount.id AS pathcount_id,
                        pathcount.path_count,
                        pathcount.p_value,
                        pathcount.dwpc,
                        pathcount.dgp_id
                    FROM dj_hetmech_app_node AS source
                    CROSS JOIN dj_hetmech_app_node AS target
                    LEFT JOIN dj_hetmech_app_pathcount AS pathcount ON
                        source.id = pathcount.source_id
                        AND target.id = pathcount.target_id
                    WHERE source.id != target.id
                )
                SELECT
                    combinations.pathcount_id,
                    src.identifier AS source_identifier,
                    tgt.identifier AS target_identifier,
                    combinations.metapath_id,
                    combinations.path_count,
                    /* we build an adjusted p_value based on the implementation
                    found here:
                    https://github.com/greenelab/connectivity-search-backend/blob/main/dj_hetmech_app/models.py#L94
                    */
                    CASE
                        WHEN combinations.p_value * metapath.n_similar > 1.0 THEN 1.0
                        ELSE combinations.p_value * metapath.n_similar
                    END AS adjusted_p_value,
                    combinations.p_value,
                    combinations.dwpc,
                    degree.source_degree,
                    degree.target_degree,
                    degree.n_dwpcs,
                    degree.n_nonzero_dwpcs,
                    degree.nonzero_mean,
                    degree.nonzero_sd,
                    combinations.source_id,
                    combinations.target_id,
                    combinations.dgp_id
                FROM
                    combinations
                LEFT JOIN dj_hetmech_app_node AS src ON
                    src.id = combinations.source_id
                LEFT JOIN dj_hetmech_app_node AS tgt ON
                    tgt.id = combinations.target_id
                LEFT JOIN dj_hetmech_app_degreegroupedpermutation AS degree ON
                    degree.id = combinations.dgp_id
                    AND degree.metapath_id = combinations.metapath_id
                LEFT JOIN dj_hetmech_app_metapath AS metapath ON
                    metapath.abbreviation = combinations.metapath_id
            )
            TO '{target_file}'
            (FORMAT parquet, COMPRESSION zstd);
            """
        )
# confirm that we have the file
pathlib.Path("./data/connectivity-search-precalculated-metapath-data.parquet").is_file()

True

In [10]:
# show an row count using the parquet file output
with duckdb.connect() as ddb:
    count = ddb.execute(
        f"""
        SELECT COUNT(*)
        FROM read_parquet('{target_file}')
        """
    ).df()
count

Unnamed: 0,count_star()
0,2296687517


In [11]:
# show an example of using the parquet file output
with duckdb.connect() as ddb:
    sample = ddb.execute(
        f"""
        SELECT *
        FROM read_parquet('{target_file}')
        LIMIT 5;
        """
    ).df()
sample

Unnamed: 0,pathcount_id,source_identifier,target_identifier,metapath_id,path_count,adjusted_p_value,p_value,dwpc,source_degree,target_degree,n_dwpcs,n_nonzero_dwpcs,nonzero_mean,nonzero_sd,source_id,target_id,dgp_id
0,124880074,GO:0022624,128239,CCpGpBPpG,534,4.955528e-05,1.054368e-06,3.547865,23,38,296400,296400,1.121048,0.342936,7,0,20677187
1,27221237,GO:0002764,128239,BPpGpBPpG,5492,2.519696e-13,5.361055e-15,3.577119,565,38,45600,45600,2.777035,0.095092,108,0,16517322
2,88554670,GO:0005759,128239,CCpGdAdG,189,8.971079e-11,1.90874e-12,2.516033,404,1,314800,314800,1.570807,0.116032,115,0,19868896
3,152608536,GO:0006650,128239,BPpGeAeG,1056,0.00153942,3.275362e-05,1.641656,252,7,373000,373000,1.464708,0.042759,117,0,14590779
4,60834463,GO:0042157,128239,BPpGeAuG,391,0.0003828104,8.144902e-06,2.247907,121,5,3570000,3570000,1.81644,0.093481,132,0,14656517


In [12]:
# show results in alignment with:
# https://het.io/search/?source=34901&target=4145
with duckdb.connect() as ddb:
    sample = ddb.execute(
        f"""
        SELECT *
        FROM read_parquet('{target_file}')
        WHERE source_id = 34901
        AND target_id = 4145;
        """
    ).df()
sample

Unnamed: 0,pathcount_id,source_identifier,target_identifier,metapath_id,path_count,adjusted_p_value,p_value,dwpc,source_degree,target_degree,n_dwpcs,n_nonzero_dwpcs,nonzero_mean,nonzero_sd,source_id,target_id,dgp_id
0,20732773,GO:0006302,142689,BPpGdAdG,164,0.015279,0.000325,2.359603,204,5,1124800,1124800,2.022551,0.09426,34901,4145,13957906
1,67801904,GO:0006302,142689,BPpGeAdG,831,0.000539,1.1e-05,2.595366,204,5,1124800,1124800,2.080552,0.113266,34901,4145,14555881
