In [1]:
import ray

ray.init()

Waiting for redis server at 127.0.0.1:25208 to respond...
Waiting for redis server at 127.0.0.1:61913 to respond...
Starting local scheduler with 8 CPUs, 0 GPUs

View the web UI at http://localhost:8889/notebooks/ray_ui581.ipynb?token=7e39268a68ff53e6b05e2b0067efaff12795ff35c1841e55



{'local_scheduler_socket_names': ['/tmp/scheduler18059451'],
 'node_ip_address': '127.0.0.1',
 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store93849666', manager_name='/tmp/plasma_manager17217449', manager_port=14465)],
 'redis_address': '127.0.0.1:25208',
 'webui_url': 'http://localhost:8889/notebooks/ray_ui581.ipynb?token=7e39268a68ff53e6b05e2b0067efaff12795ff35c1841e55'}

In [2]:
# store access to nodes by their global coordinate
class MasterStore:
    referenceStore = {"dna":{}, "rna":{}, "individuals":{}}
    
    def addGraph(self, datatype):
        referenceStore[datatype] = {}
    
    def updateRef(self, datatype, key, value):
        self.referenceStore[datatype][key] = value
        
    def getRef(self, datatype, key):
        return self.referenceStore[datatype][key]
    
    def refExists(self, datatype, key):
        return datatype in self.referenceStore and key in self.referenceStore[datatype]
            
# all communication to adjacent nodes goes through the master store
masterStore = MasterStore()

# a generic node.
class Node:
    def __init__(self, data, datatype, neighbors = set()):
        self.data = data
        self.datatype = datatype
        self.neighbors = neighbors
        self.interGraphLinks = {}
        
    def addNeighbor(self, newNeighbor):
        self.neighbors.add(newNeighbor)
        
    def dropNeighbor(self, oldNeighbor):
        self.neighbors.remove(oldNeighbor)

    def addInterGraphLink(self, datatype, key):
        if datatype in self.interGraphLinks:
            if key not in self.interGraphLinks[datatype]:
                self.interGraphLinks[datatype].append(key)
        else:
            self.interGraphLinks[datatype] = [key]
    
    def merge(self, otherNode):
        for i in otherNode.neighbors:
            self.addNeighbor(i)
        
        for key, value in otherNode.interGraphLinks.items():
            for link in value:
                self.addInterGraphLink(key, link)


In [3]:
# sample reference genome
referenceGenome = "CAGTCCTAGCTACGCTCTATCCTCTCAGAGGACCGATCGATATACGCGTGAAACTAGTGCACTAGACTCGAACTGA"

# sample test data for DNA operations
dnaTestData = [{"individualID":0, "dnaData":
                [{"coordinateStart":7.1, "coordinateStop":8.0, "variantAllele": "C"},
                 {"coordinateStart":12.2, "coordinateStop":13.0, "variantAllele": "T"},
                 {"coordinateStart":26.2222, "coordinateStop":27.0, "variantAllele": "TTTT"}]},
               {"individualID":1, "dnaData":
                [{"coordinateStart":7.2, "coordinateStop":8.0, "variantAllele": "G"},
                 {"coordinateStart":12.2, "coordinateStop":13.0, "variantAllele": "T"}]}]

# individual IDs
individuals = {0: {"Name":"John Doe", "Gender":"M"}, 1:{"Name":"Jane Doe", "Gender":"M"}}

def buildIndividualsGraph(individuals, masterStore):
    for indivID, data in individuals.items():
        node = Node(data, "individuals")
        masterStore.updateRef("individuals", indivID, node)

buildIndividualsGraph(individuals, masterStore)

In [4]:
def buildDnaGraph(referenceGenome, dnaTestData, masterStore):
    # start building the graph
    for i in range(len(referenceGenome)):
        # store the coordinates of neighboring nodes
        neighbors = set(filter(lambda x: x >= 0.0, 
                               [float(i-1), float(i+1)]))
        # create a new node
        node = Node(referenceGenome[i], "dna", neighbors)

        # store a link to the object in the masterStore
        masterStore.updateRef("dna", float(i), node)

    for indiv in dnaTestData:
        for variant in indiv["dnaData"]:
            # store the coordinates of neighboring nodes
            neighbors = set(filter(lambda x: x >= 0.0, 
                                   [float(int(variant["coordinateStart"] - 1)), 
                                    variant["coordinateStop"]]))

            # create a new node for the individual data
            node = Node(variant["variantAllele"], 
                        "dna",
                        neighbors)

            node.addInterGraphLink("individuals", indiv["individualID"])
            
            if(masterStore.refExists("dna", variant["coordinateStart"])):
                node.merge(masterStore.getRef("dna", variant["coordinateStart"]))

            for neighbor in neighbors:
                if(masterStore.refExists("dna", neighbor)):
                    tempNode = masterStore.getRef("dna", neighbor)
                    tempNode.addNeighbor(variant["coordinateStart"])
                    
            indivNode = masterStore.getRef("individuals", indiv["individualID"])
            indivNode.addInterGraphLink("dna", variant["coordinateStart"])
            
            masterStore.updateRef("dna", variant["coordinateStart"], node)

# build the graph
buildDnaGraph(referenceGenome, dnaTestData, masterStore)

In [5]:
print(masterStore.referenceStore)

{'dna': {0.0: <__main__.Node object at 0x1037bad30>, 1.0: <__main__.Node object at 0x1037bac50>, 2.0: <__main__.Node object at 0x1037ba860>, 3.0: <__main__.Node object at 0x1037ba6d8>, 4.0: <__main__.Node object at 0x1037bab00>, 5.0: <__main__.Node object at 0x1037bac18>, 6.0: <__main__.Node object at 0x1037babe0>, 7.0: <__main__.Node object at 0x1037ba7b8>, 8.0: <__main__.Node object at 0x1037ba9b0>, 9.0: <__main__.Node object at 0x1037ba400>, 10.0: <__main__.Node object at 0x1037bacc0>, 11.0: <__main__.Node object at 0x1037baba8>, 12.0: <__main__.Node object at 0x1037ba9e8>, 13.0: <__main__.Node object at 0x1037ba5f8>, 14.0: <__main__.Node object at 0x1037ba080>, 15.0: <__main__.Node object at 0x1037ba240>, 16.0: <__main__.Node object at 0x1037ba208>, 17.0: <__main__.Node object at 0x1037ba518>, 18.0: <__main__.Node object at 0x1037ba320>, 19.0: <__main__.Node object at 0x1037bae10>, 20.0: <__main__.Node object at 0x1037ba470>, 21.0: <__main__.Node object at 0x1037ba128>, 22.0: <__ma

In [6]:
def bfs(graphName, startNodeID):
    q = [startNodeID]
    visited = []

    while(q and masterStore.refExists(graphName, q[0])):
        currentKey = q[0]
        q.remove(currentKey)
        visited.append(currentKey)
        
        node = masterStore.getRef(graphName, currentKey)
        print(str(currentKey) + "\t" + str(node.data) + "\t" + str(node.interGraphLinks))
        
        for i in node.neighbors:
            if i not in visited and i not in q:
                q.append(i)

# traverse our new graph to look at
bfs("dna", 0.0)

0.0	C	{}
1.0	A	{}
2.0	G	{}
3.0	T	{}
4.0	C	{}
5.0	C	{}
6.0	T	{}
7.2	G	{'individuals': [1]}
7.1	C	{'individuals': [0]}
7.0	A	{}
8.0	G	{}
9.0	C	{}
10.0	T	{}
11.0	A	{}
12.0	C	{}
12.2	T	{'individuals': [1, 0]}
13.0	G	{}
14.0	C	{}
15.0	T	{}
16.0	C	{}
17.0	T	{}
18.0	A	{}
19.0	T	{}
20.0	C	{}
21.0	C	{}
22.0	T	{}
23.0	C	{}
24.0	T	{}
25.0	C	{}
26.0	A	{}
26.2222	TTTT	{'individuals': [0]}
27.0	G	{}
28.0	A	{}
29.0	G	{}
30.0	G	{}
31.0	A	{}
32.0	C	{}
33.0	C	{}
34.0	G	{}
35.0	A	{}
36.0	T	{}
37.0	C	{}
38.0	G	{}
39.0	A	{}
40.0	T	{}
41.0	A	{}
42.0	T	{}
43.0	A	{}
44.0	C	{}
45.0	G	{}
46.0	C	{}
47.0	G	{}
48.0	T	{}
49.0	G	{}
50.0	A	{}
51.0	A	{}
52.0	A	{}
53.0	C	{}
54.0	T	{}
55.0	A	{}
56.0	G	{}
57.0	T	{}
58.0	G	{}
59.0	C	{}
60.0	A	{}
61.0	C	{}
62.0	T	{}
63.0	A	{}
64.0	G	{}
65.0	A	{}
66.0	C	{}
67.0	T	{}
68.0	C	{}
69.0	G	{}
70.0	A	{}
71.0	A	{}
72.0	C	{}
73.0	T	{}
74.0	G	{}
75.0	A	{}


In [7]:
print(masterStore.referenceStore["individuals"][0].interGraphLinks)
print(masterStore.referenceStore["individuals"][1].interGraphLinks)

{'dna': [7.1, 12.2, 26.2222]}
{'dna': [7.2, 12.2]}


In [8]:
def moveToRay(masterStore):
    for graph in masterStore.referenceStore:
        for node in masterStore.referenceStore[graph]:
            oid = ray.put(masterStore.referenceStore[graph][node])
            masterStore.referenceStore[graph][node] = oid

In [9]:
print(masterStore.referenceStore["individuals"][0])
print(masterStore.referenceStore["individuals"][1])

print(ray.get(masterStore.referenceStore["individuals"][0]).interGraphLinks)
print(ray.get(masterStore.referenceStore["individuals"][1]).interGraphLinks)

<__main__.Node object at 0x1037c85f8>
<__main__.Node object at 0x1037c8a58>


Exception: Attempting to call `get` on the value <__main__.Node object at 0x1037c85f8>, which is not an ObjectID.

In [38]:
import ray.local_scheduler as local_scheduler

print(isinstance(oid, local_scheduler.ObjectID))

True


In [15]:
import pyarrow
import pyarrow.parquet as pq

ModuleNotFoundError: No module named 'pyarrow._parquet'