In [1]:
!pip install duckdb

Collecting duckdb
  Downloading duckdb-0.9.1-cp310-cp310-macosx_11_0_arm64.whl (13.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.2/13.2 MB[0m [31m30.7 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: duckdb
Successfully installed duckdb-0.9.1

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.1.2[0m[39;49m -> [0m[32;49m23.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [2]:
!tar zxf ../output/monarch-kg.tar.gz -C ../output/

In [114]:
%%time
import duckdb

# create a new database
db = duckdb.connect(database='monarch-kg.duckdb')

db.sql("""
create or replace table nodes as select *,  substr(id, 1, instr(id,':') -1) as namespace from read_csv('../output/monarch-kg_nodes.tsv', header=True, sep='\t', AUTO_DETECT=TRUE)
""")

db.sql("""
create or replace table edges as select * from read_csv('../output/monarch-kg_edges.tsv', header=True, sep='\t', AUTO_DETECT=TRUE)
""")

db.sql("""
create or replace table closure as select * from read_csv('../data/monarch/phenio-relation-filtered.tsv', sep='\t', names=['subject_id', 'predicate_id', 'object_id'], AUTO_DETECT=TRUE)
""")

db.sql("""
create or replace table closure_id as select subject_id as id, array_agg(object_id) as closure from closure group by subject_id
""")

db.sql("""
create or replace table closure_label as select subject_id as id, array_agg(name) as closure_label from closure join nodes on object_id = id
group by subject_id
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

CPU times: user 1min 9s, sys: 3.62 s, total: 1min 13s
Wall time: 17.1 s


In [125]:
%%time

def columns(field):
    column_text = f"""
       {field}.name as {field}_label, 
       {field}.category as {field}_category,
       {field}.namespace as {field}_namespace,
       {field}_closure.closure as {field}_closure,
       {field}_closure_label.closure_label as {field}_closure_label,    
    """
    if field in ['subject', 'object']:
        column_text += f"""
        {field}.in_taxon as {field}_taxon,
        {field}.in_taxon_label as {field}_taxon_label,
        """
    return column_text

def joins(field):
    return f"""
    left outer join nodes as {field} on edges.{field} = {field}.id
    left outer join closure_id as {field}_closure on {field}.id = {field}_closure.id
    left outer join closure_label as {field}_closure_label on {field}.id = {field}_closure_label.id
    """    

def evidence_sum(evidence_fields):
    """ Sum together the length of each field after splitting on | """
    evidence_count_sum = "+".join([f"len(split({field}, '|'))" for field in evidence_fields])
    return f"{evidence_count_sum} as evidence_count,"

def grouping_key(grouping_fields):
    fragments = []
    for field in grouping_fields:
        if field == 'negated':
            fragments.append(f"coalesce({field}.replace('True','NOT'), '')")
        else:
            fragments.append(field)
    grouping_key_fragments = ", ".join(fragments)
    return f"concat_ws('🍪', {grouping_key_fragments}) as grouping_key"


CPU times: user 9 µs, sys: 1e+03 ns, total: 10 µs
Wall time: 14.1 µs


In [126]:
%%time
# create or replace table denormalized_edges as

fields = ['subject',
          'object',
          'qualifiers',
          'frequency_qualifier',
          'onset_qualifier',
          'sex_qualifier',
          'stage_qualifier']

evidence_fields = ['publications', 'has_evidence']
grouping_fields = ['subject', 'negated', 'predicate', 'object']

query = f"""
create or replace table denormalized_edges as
select edges.*, 
       {"".join([columns(field) for field in fields])}
       {evidence_sum(evidence_fields)}
       {grouping_key(grouping_fields)}  
from edges
    {"".join([joins(field) for field in fields])}
"""

db.query(query)
#print(query)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

CPU times: user 2min 38s, sys: 8.2 s, total: 2min 46s
Wall time: 25.7 s


In [131]:
%%time
db.query("""
-- write denormalized_edges as tsv
copy (select * from denormalized_edges) to 'monarch-kg-denormalized-edges.tsv' (header, delimiter '\t')
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

CPU times: user 49.7 s, sys: 17.3 s, total: 1min 6s
Wall time: 14.6 s
