# Feature computation for various node uniqueness constraints

In [1]:
import threading
import concurrent.futures
import gzip
import csv
import time
import functools
import random

import pandas
import py2neo

import hetio.readwrite
import hetio.neo4j

In [2]:
# Read hetnet metagraph
path = 'https://raw.githubusercontent.com/dhimmel/integrate/c56d312a53e351d26016d5fd4751a9715cc3f0c3/data/metagraph.json'
metagraph = hetio.readwrite.read_metagraph(path)

In [3]:
# Extract metapaths
metapaths = metagraph.extract_metapaths('compound', 'disease', max_length=4)
metapaths.pop(0)
len(metapaths)

1979

In [4]:
# Read drug-compound disease pairs
part_df = pandas.read_table('../data/partition.tsv.gz')
part_df = part_df.query('(indication == 1 and percentile < 0.25) or (indication == 0 and percentile < 0.0015)')
part_df.indication.value_counts()

1    347
0    317
Name: indication, dtype: int64

In [5]:
# Create a cached DWPC query constructor
construct_dwpc_query = functools.lru_cache(maxsize=None)(hetio.neo4j.construct_dwpc_query)

In [6]:
# Open neo4j connection
uri = 'http://localhost:7474/db/data/'
neo = py2neo.Graph(uri)

In [7]:
def generate_parameters(max_elems=None):
    """Generate compound, disease, metapath combinations"""
    n = 0
    for i, row in part_df.iterrows():
        for metapath in metapaths:
            for unique_nodes in random.sample(unique_nodes_options, len(unique_nodes_options)):
                if max_elems is not None and n == max_elems:
                    break
                query = construct_dwpc_query(
                    metarels = metapath,
                    property = 'identifier',
                    using = True,
                    unique_nodes = unique_nodes)
                yield {
                    'compound_id': row['compound_id'],
                    'disease_id': row['disease_id'],
                    'metapath': metapath,
                    'query': query,
                    'w': 0.4,
                    'unique_nodes': unique_nodes,
                }
                n += 1

def compute_dwpc(query, metapath, compound_id, disease_id, w, unique_nodes):
    """Execute the neo4j query and write results to file"""
    start = time.time()
    results = neo.cypher.execute(query, source=compound_id, target=disease_id, w=w)
    record = results.one
    seconds = '{0:.4g}'.format(time.time() - start)
    row = compound_id, disease_id, metapath, unique_nodes, record['PC'], w, record['DWPC'], seconds
    with writer_lock:
        writer.writerow(row)


In [8]:
%%time

# Parameters
workers = 10
max_elems = None
unique_nodes_options = False, 'nested', 'expanded', 'labeled'

# Prepare writer
path = 'features.tsv.gz'
write_file = gzip.open(path, 'wt')
writer = csv.writer(write_file, delimiter='\t')
writer.writerow(['compound_id', 'disease_id', 'metapath', 'unique_nodes', 'PC', 'w', 'DWPC', 'seconds'])

# Create ThreadPoolExecutor
executor = concurrent.futures.ThreadPoolExecutor(max_workers=workers)
writer_lock = threading.Lock()

# Submit jobs
for params in generate_parameters(max_elems):
    while executor._work_queue.qsize() > 5000:
        time.sleep(1)
    executor.submit(compute_dwpc, **params)

# Shutdown and close
executor.shutdown()
write_file.close()

CPU times: user 3h 19s, sys: 10min 49s, total: 3h 11min 9s
Wall time: 22h 52min 53s
