# Imports

In [1]:
"""WeirdTargets Module"""
##########################################
#      Future Statement Definitions      #
##########################################
from __future__ import division
from __future__ import print_function
from __future__ import generator_stop
from __future__ import unicode_literals
from __future__ import absolute_import
##########################################
#        Python Standard Library         #
##########################################
import argparse
import re 
import math
import json 
import itertools
import os
import gc
import sys
import time
import struct
import datetime
import inspect
import collections
import pprint
# import multiprocessing
import operator as op
from functools import reduce
import requests
from copy import copy, deepcopy
##########################################
#   3rd Party Data Loading & Analysis    #
##########################################
import ujson
import numpy as np
import pandas as pd
from tqdm import tqdm
from gzip import GzipFile
from toolz import partition_all
##########################################
#      3rd Party Parallel Computing      #
##########################################
import pathos
import psutil
import dask.dataframe as dd
"""import pathos.multiprocessing as pmp
import pathos.pools as ppools
import pathos.pp as ppp"""
import dask.dataframe as dd
"""from dask.threaded import get as ddscheduler"""
from concurrent.futures import ProcessPoolExecutor
from dask.diagnostics import ProgressBar
ProgressBar().register()
from opentargets import OpenTargetsClient
##########################################
#           Ignore Warnings              #
##########################################
import warnings
warnings.filterwarnings("ignore")
##########################################
#       Module Level Dunder Names        #
##########################################
__copyright__ = "Copyrights © 2019 Aly shmahell."
__credits__   = ["Aly Shmahell"]
__version__   = "0.1.1"
__maintainer__= "Aly Shmahell"
__email__     = ["aly.shmahell@gmail.com"]
__status__    = "Alpha"
##########################################
##########################################
##########################################

# nCr

In [2]:
def ncr(n, r):
    r = min(r, n-r)
    numer = reduce(op.mul, range(n, n-r, -1), 1)
    denom = reduce(op.mul, range(1, r+1), 1)
    return numer / denom

# Downloader

In [3]:
def downloader(url, filepath):
    r = requests.get(url, stream=True)
    total_size = int(r.headers.get('content-length', 0))
    block_size = 1024**2
    t=tqdm(total=total_size, unit='iB', unit_scale=True)
    with open(filepath, 'wb') as f:
        for data in r.iter_content(block_size):
            t.update(len(data))
            f.write(data)
    t.close()
    if total_size != 0 and t.n != total_size:
        print("ERROR, something went wrong")

## Exception Class

In [4]:
class WeirdTargetsException(Exception):
    """
    Exception Class
    """
    __module__ = Exception.__module__
    def __init__(self, error):
        try:
            line = sys.exc_info()[-1].tb_lineno
        except AttributeError:
            line = inspect.currentframe().f_back.f_lineno
        self.args = f"{type(self).__name__} (line {line}): {error}",
        sys.exit(self)

## Printer Class

In [5]:
class WeirdTargetsPrinter(object):
  
    def pretty(self, x):
        return re.sub(r"\n\s+", "\n", x)

    def oneliner(self, x):
        return re.sub(r"\n\s*", " ", x)

## WeirdTagets

In [6]:
class WeirdTargets(WeirdTargetsPrinter):

    def __init__(self):
        self.empty_string = ""
        
    """def testParallelism(self):
        print("Testing Parallelism:")
        print(f"\t No. of Cores Available: {pmp.cpu_count()}")
        with pmp.Pool(pmp.cpu_count()) as pool:
            PIDS = pool.map(lambda _: f"{os.getpid()}", range(pmp.cpu_count()+1))
            print(f"\t No. of Cores Utilized:  {np.unique(PIDS).size}")"""

In [7]:
weird_targets = WeirdTargets()
"""weird_targets.testParallelism()"""

'weird_targets.testParallelism()'

## SmallTargets

In [8]:
'''class SmallTargets(WeirdTargets):

    def __init__(self, type, id):
        self.type         = type
        self.id           = id
        self.inputs       = None
        self.outputs      = []
        self.elapsed_time = None
        super(SmallTargets, self).__init__()

    def __call__(self):
        if self.type == "disease":
            try:
                self.inputs = OpenTargetsClient().get_associations_for_disease(self.id)
            except:
                raise WeirdTargetsException(f"Incorrect Disease ID: {self.id}")
        if self.type == "target":
            try:
                self.inputs = OpenTargetsClient().get_associations_for_target(self.id)
            except:
                raise WeirdTargetsException(f"Incorrect Target ID: {self.id}")
        if not self.inputs:
            raise WeirdTargetsException(self.oneliner("""The query did not
                                                          return any usefull
                                                          information."""))
        self.elapsed_time = time.time()
        with pmp.Pool(pmp.cpu_count()) as pool:
            overalls           = pool.map(lambda entry: entry['association_score']['overall'], self.inputs)
            squared_overalls   = pool.map(lambda overall: overall**2,                          overalls)
            minimum            = min(overalls)
            maximum            = max(overalls)
            average            = sum(overalls)/len(self.inputs)
            standard_deviation = np.sqrt(
                sum(squared_overalls)/len(self.inputs) - average**2)
            self.outputs = {
                "Maximum"           : maximum,
                "Minimum"           : minimum,
                "Average"           : average,
                "Standard Deviation": standard_deviation
            }
        self.elapsed_time = time.time() - self.elapsed_time

    def __str__(self):
        if not self.outputs:
            raise WeirdTargetsException(self.oneliner("""you need to call 
                                                          the SmallTargets 
                                                          object first."""))
        return self.pretty(f"""Number of Entries :       {len(self.inputs)}\n
                                Elapsed Time      :       {self.elapsed_time} sec\n
                                Maximum           :       {self.outputs['Maximum']}\n
                                Minumum           :       {self.outputs['Minimum']}\n
                                Average           :       {self.outputs['Average']}\n
                                Standard Deviation:       {self.outputs['Standard Deviation']}""")'''

'class SmallTargets(WeirdTargets):\n\n    def __init__(self, type, id):\n        self.type         = type\n        self.id           = id\n        self.inputs       = None\n        self.outputs      = []\n        self.elapsed_time = None\n        super(SmallTargets, self).__init__()\n\n    def __call__(self):\n        if self.type == "disease":\n            try:\n                self.inputs = OpenTargetsClient().get_associations_for_disease(self.id)\n            except:\n                raise WeirdTargetsException(f"Incorrect Disease ID: {self.id}")\n        if self.type == "target":\n            try:\n                self.inputs = OpenTargetsClient().get_associations_for_target(self.id)\n            except:\n                raise WeirdTargetsException(f"Incorrect Target ID: {self.id}")\n        if not self.inputs:\n            raise WeirdTargetsException(self.oneliner("""The query did not\n                                                          return any usefull\n                  

In [9]:
"""small_targets = SmallTargets('target', 'ENSG00000157764')
small_targets()
print(small_targets)"""

"small_targets = SmallTargets('target', 'ENSG00000157764')\nsmall_targets()\nprint(small_targets)"

## BigTargets

In [20]:
class BigTargets(WeirdTargets):

    def __init__(self, 
                 rawdir, 
                 rawurl, 
                 keysdict, 
                 outdir, 
                 outfile, 
                 groupnames, 
                 resultdir, 
                 resultfile, 
                 saveresults=False):
        if not os.path.isdir(outdir):
            os.mkdir(outdir, 755)
        if not os.path.isdir(resultdir):
            os.mkdir(resultdir, 755)
        if not os.path.isdir(rawdir):
            os.mkdir(rawdir, 755)
        super(BigTargets, self).__init__()
        self.rawdir         :"rawdir"      = rawdir
        self.rawfile        :"rawfile"     = rawurl.split('/')[-1]
        self.rawurl         :"rawurl"      = rawurl
        self.infile         :"infile"      = os.path.join(self.rawdir, self.rawfile)
        self.keysdict       :"keysdict"    = keysdict
        self.outdir         :"outdir"      = outdir
        self.outfile        :"outfile"     = outfile
        self.groupnames     :"groupnames"  = groupnames
        self.resultdir      :"resultdir"   = resultdir
        self.resultfile     :"resultfile"  = resultfile
        self.saveresults    :"saveresults" = saveresults

    def __download__(self):
        downloader(self.rawurl, self.infile)

    def __traversejson__(self, parsed, keys):
        if len(keys)>1:
            return self.__traverse__(parsed[keys[0]], keys[1:])
        return parsed[keys[0]]

    def __parsejson__(self, pyObject, tqdmObject):
        tqdmObject.update(1)
        parsed = ujson.loads(pyObject)
        obj = {
            key: self.__traversejson__(parsed, self.keysdict[key])
            for key in self.keysdict.keys()
        }
        return obj

    def __json2panda__(self, batch, tqdmObject):
        parsedJSON = map(lambda b: self.__parsejson__(b, tqdmObject), batch)
        df = pd.DataFrame.from_records(parsedJSON, 
                                       columns=["target",
                                                "disease",
                                                "score"])
        return df
    def __peek__(self, iterable):
        try:
            first = next(iterable)
        except StopIteration:
            return None, None
        return first, itertools.chain([first], iterable)

    def __persist__(self):
        with tqdm(desc=f"Feature Extraction{self.empty_string:>18}") as tqdmObject:
            with GzipFile(self.infile) as f:
                batches = partition_all(
                    math.floor(psutil.virtual_memory()[1]/(1024**3))*int(2e+4), f
                )
                while True:
                    df, frames = self.__peek__(
                        map(
                            lambda b: self.__json2panda__(b, tqdmObject), batches
                        )
                    )
                    if frames == None:
                        break
                    df.to_hdf(
                        os.path.join(self.outdir, self.outfile),
                        key=f'{self.outfile.split(".")[0]}',
                        mode='a',
                        format='table',
                        append=True
                    )

    def __process__(self):
        with tqdm(desc=f"Mapping{self.empty_string:>18}") as tqdmo:
            def setter(x):
                tqdmo.update(1)
                return set(x.to_numpy())
            groups = self.df.groupby(self.groupnames[0])[self.groupnames[1]].apply(setter)
        array = groups.to_numpy()
        names = list(groups.keys())
        combinations = itertools.combinations(range(len(groups)), 2)
        nck = int(ncr(array.shape[0], 2))
        if self.saveresults:
            temp = np.memmap(
                os.path.join(self.resultdir, 'temp.memmap'), 
                dtype='|U30', 
                mode='w+', 
                shape=(nck,2)
            )
        counter = 0
        index   = 0
        with tqdm(desc=f"Reducing{self.empty_string:>17}", iterable=range(nck)) as tqdmo:
            def __compare__(combination):
                common = array[combination[0]] & array[combination[1]]
                if len(common) >= 2:
                    return 1, ", ".join([names[combination[0]], names[combination[1]]])
                else:
                    return 0, None
            for combination in combinations:
                partial_result = __compare__(combination)
                counter += partial_result[0]
                tqdmo.update(1)
                if partial_result[0] > 0 and self.saveresults:
                    temp[index][0] = partial_result[0]
                    temp[index][1] = partial_result[1]
                    index += 1
        if self.saveresults:
            results = np.memmap(
                os.path.join(self.resultdir, self.resultfile), 
                dtype='|U30', 
                mode='w+', 
                shape=(trueindex, 2)
            )
            results[:] = temp[:trueindex]
            os.remove(os.path.join(self.resultdir, 'temp.memmap'))
        else:
            results = None
        return results, counter
    def __call__(self):
        try:
            if not (os.path.exists(os.path.join(self.rawdir, self.rawfile))):
                self.__download__()
            if not (os.path.exists(os.path.join(self.outdir, self.outfile))):
                self.__persist__()
            print("Loading HD5")
            self.df     = dd.read_hdf(
                os.path.join(self.outdir, self.outfile),
                key=f'{self.outfile.split(".")[0]}'
            ).compute()
            self.df = self.df.head(10000)
            results, counter = self.__process__()
            print(f"No. Complete Bipartites{self.empty_string:>1}: {counter}")
        except KeyboardInterrupt:
            for name in dir():
                if not name.startswith('_'):
                    if name in globals():
                        del globals()[name]
                    else:
                        del locals()[name]
            gc.collect()
            WeirdTargetsException("Keyboard Interrupt")

In [21]:
big_targets = BigTargets(
    './content/datasets',
    'https://storage.googleapis.com/open-targets-data-releases/17.12/17.12_evidence_data.json.gz',
    {
        "target" : ["target", "id"],
        "disease": ["disease", "id"],
        "score":   ["scores", "association_score"]
    }, 
    './content/results',
    'tds.h5',
    ['target', 'disease'],
    './content/results',
    'result.memmap'
)
big_targets()

Loading HD5
[########################################] | 100% Completed |  9.3s


Mapping                  : 2989it [00:00, 6730.45it/s]
Reducing                 : 100%|██████████| 4465566/4465566 [00:04<00:00, 929839.97it/s] 

No. Complete Bipartites : 29251





In [22]:
class BigTargets2(WeirdTargets):

    def __init__(self, 
                 rawdir, 
                 rawurl, 
                 keysdict, 
                 outdir, 
                 outfile, 
                 groupnames, 
                 resultdir, 
                 resultfile, 
                 saveresults=False):
        if not os.path.isdir(outdir):
            os.mkdir(outdir, 755)
        if not os.path.isdir(resultdir):
            os.mkdir(resultdir, 755)
        if not os.path.isdir(rawdir):
            os.mkdir(rawdir, 755)
        super(BigTargets2, self).__init__()
        self.rawdir         :"rawdir"      = rawdir
        self.rawfile        :"rawfile"     = rawurl.split('/')[-1]
        self.rawurl         :"rawurl"      = rawurl
        self.infile         :"infile"      = os.path.join(self.rawdir, self.rawfile)
        self.keysdict       :"keysdict"    = keysdict
        self.outdir         :"outdir"      = outdir
        self.outfile        :"outfile"     = outfile
        self.groupnames     :"groupnames"  = groupnames
        self.resultdir      :"resultdir"   = resultdir
        self.resultfile     :"resultfile"  = resultfile

    def __download__(self):
        downloader(self.rawurl, self.infile)
        
    
    

    def __traversejson__(self, parsed, keys):
        if len(keys)>1:
            return self.__traverse__(parsed[keys[0]], keys[1:])
        return parsed[keys[0]]

    def __parsejson__(self, pyObject, tqdmObject):
        tqdmObject.update(1)
        parsed = ujson.loads(pyObject)
        obj = {
            key: self.__traversejson__(parsed, self.keysdict[key])
            for key in self.keysdict.keys()
        }
        return obj

    def __json2panda__(self, batch, tqdmObject):
        parsedJSON = map(lambda b: self.__parsejson__(b, tqdmObject), batch)
        df = pd.DataFrame.from_records(parsedJSON, 
                                       columns=["target",
                                                "disease",
                                                "score"])
        return df
    def __peek__(self, iterable):
        try:
            first = next(iterable)
        except StopIteration:
            return None, None
        return first, itertools.chain([first], iterable)

    def __persist__(self):
        with tqdm(desc=f"Feature Extraction{self.empty_string:>18}") as tqdmObject:
            with GzipFile(self.infile) as f:
                batches = partition_all(
                    math.floor(psutil.virtual_memory()[1]/(1024**3))*int(2e+4), f
                )
                while True:
                    df, frames = self.__peek__(
                        map(
                            lambda b: self.__json2panda__(b, tqdmObject), batches
                        )
                    )
                    if frames == None:
                        break
                    df.to_hdf(
                        os.path.join(self.outdir, self.outfile),
                        key=f'{self.outfile.split(".")[0]}',
                        mode='a',
                        format='table',
                        append=True
                    )
                    
    def __process__(self):
        with tqdm(desc=f"Mapping{self.empty_string:>18}") as tqdmo:
            def setter(x):
                tqdmo.update(1)
                return set(x.to_numpy())
            groups = self.df.groupby(self.groupnames[0])[self.groupnames[1]].apply(setter)
        self.array = groups.to_numpy()
        self.names = names = list(groups.keys())
        self.combinations = itertools.combinations(range(self.array.shape[0]), 2)
        nck = int(ncr(self.array.shape[0], 2))
        self.counter = 0
        self.index   = 0
        cpus = 8
        base = int(nck/cpus)
        self.chunks = []
        cc = 0
        partition = 0
        for successive in range(base, nck, base):
            self.chunks.append(
                            {
                                "num": cc,
                                "len": base,
                                "val": itertools.islice(
                                            self.combinations,
                                            partition,
                                            successive,
                                            1
                                        ) 
                            }
                        )
            partition = successive
            cc += 1
        if successive < nck:
            self.chunks.append(
                            {
                                "num": cc,
                                "len": nck,
                                "val": itertools.islice(
                                            self.combinations,
                                            successive,
                                            nck,
                                            1
                                        ) 
                            }
                        )
        pprint.pprint(self.chunks)
    def __call__(self):
        try:
            if not (os.path.exists(os.path.join(self.rawdir, self.rawfile))):
                self.__download__()
            if not (os.path.exists(os.path.join(self.outdir, self.outfile))):
                self.__persist__()
            print("Loading HD5")
            self.df     = dd.read_hdf(
                os.path.join(self.outdir, self.outfile),
                key=f'{self.outfile.split(".")[0]}'
            ).compute()
            self.df = self.df.head(10000)
            self.__process__()
            return self.array, self.names, self.chunks
        except KeyboardInterrupt:
            for name in dir():
                if not name.startswith('_'):
                    if name in globals():
                        del globals()[name]
                    else:
                        del locals()[name]
            gc.collect()
            WeirdTargetsException("Keyboard Interrupt")

In [None]:
big_targets2 = BigTargets2(
    './content/datasets',
    'https://storage.googleapis.com/open-targets-data-releases/17.12/17.12_evidence_data.json.gz',
    {
        "target" : ["target", "id"],
        "disease": ["disease", "id"],
        "score":   ["scores", "association_score"]
    }, 
    './content/results',
    'tds.h5',
    ['target', 'disease'],
    './content/results',
    'result.memmap'
)
array, names, chunks = big_targets2()

Loading HD5
[                                        ] | 0% Completed |  0.9s

In [None]:
class PCBC:
    def __init__(self, array, names, chunks):
        self.array  = array
        self.names  = names
        self.chunks = chunks
        self.counter = 0
    def __worker__(self, chunk):
        local = 0
        num = chunk["num"]
        with tqdm(desc=f'pool {num}', total=chunk['len'], leave=True, file=sys.stdout, position=0) as tqdmo:
            for combination in chunk['val']:
                common = self.array[combination[0]] & self.array[combination[1]]
                if len(common) >= 2:
                    local += 1
                tqdmo.update(1)
        return local
    def __parallel__(self):
        with ProcessPoolExecutor() as executor:
            running_tasks = executor.map(self.__worker__, self.chunks)
            for running_task in running_tasks:
                self.counter += running_task
    def __call__(self):
        self.__parallel__()
        print(self.counter)

In [None]:
pcbc = PCBC(array, names, chunks)

In [None]:
pcbc()