In [13]:
from multiprocessing import Process,Queue
import time
# from multiprocessing import JoinableQueue as Queue
# import multiprocessing.JoinableQueue as Queue
import os
import sys

INF = 1e9
SourceNode = 1
data_file = "wiki"
# data_file = "location"
class HyperNode:
    def __init__(self,n_id,edges):
        self.n_id = n_id
        self.dist = INF 
        self.edges = edges
        
    def AddEdge(self,edge):
        self.edges.append(edge)
        
    def GetHyperEdges(self):
        return self.edges
    
    def GetValue(self):
        return self.dist
    
                    
class HyperEdge:
    def __init__(self,e_id,nodes):
        self.e_id = e_id
        self.nodes = nodes
        
    def AddNode(self,node):
        self.nodes.append(node)
    
    def GetHyperNodes(self):
        return self.nodes
    
    def getValue(self):
        return 1

def aggregate(msg):
    dic = {}
    for tar,value in msg:
        if tar not in dic :
            dic[tar] = INF
        dic[tar] = min(dic[tar],value)
    return [(tar,value) for tar,value in dic.items() ]

In [14]:
class Master:
    def __init__(self,p):
        self.p = p
        self.Node = {}
        self.Edge = {}
        self.Part = {}
        self.Worker = []
        
        self.thread_pool = []
        self.m2sPipe = []
        self.s2mPipe = []
        self.recvBuf = [Queue() for i in range(p)]
        
        self.result = {}
        for i in range(p) :
            self.Worker.append(Worker(i))
            self.m2sPipe.append(Queue())
            self.s2mPipe.append(Queue())
            
        for worker in self.Worker:
            w_id = worker.w_id
            self.thread_pool.append(Process(target= worker.run,args=(self.m2sPipe[w_id],self.s2mPipe[w_id],self.recvBuf)))

    def terminal(self):
        for pipe in self.m2sPipe:
            pipe.close()
        for pipe in self.s2mPipe:
            pipe.close()
        for buf in self.recvBuf:
            buf.close()
        for thread in self.thread_pool:
            thread.join()
            
    def load_data(self,filename = "partition_method.txt"):
        print("dataset:",data_file," partition method:"+filename);
        with open("./test_data/"+data_file+"/"+filename) as file :  # (n_id,p_id)
            for line in file:
                v_id,par_id = line[0:-1].split(" ")
                v_id,par_id = int(v_id),int(par_id)
                self.Part[v_id] = par_id
                self.Node[v_id] = []
                    
        with open("./test_data/"+data_file+"/vertex_info.txt") as file:  # v_id, e1_id, e2_id, ...
            for line in file:
                data = line[0:-1].split(" ")
                v_id = int(data[0])
                self.Node[v_id] = [int(i) for i in data[1:]]
                    
        with open("./test_data/"+data_file+"/edge_info.txt") as file:   # e_id, v1_id, v2_id,...
            for line in file:
                data = line[0:-1].split(" ")
                e_id = int(data[0])
                self.Edge[e_id] = [int(i) for i in data[1:]]

    def sync(self):
        return [sign.get()  for sign in self.s2mPipe]
    
    def broadcast(self,signal=0):
#         sys.stdout.write("signal "+str(signal)+"\n")
        for m2sPipe in self.m2sPipe:
                m2sPipe.put(signal)
            
    def distribute_node(self):
        for n_id,p_id in self.Part.items():
            self.m2sPipe[p_id].put((n_id,self.Node[n_id]))
        for m2sPipe in self.m2sPipe:
            m2sPipe.put((-1,[-1]))

        self.sync()
#         self.Worker[w_id].Node[n_id] = self.Node[n_id]
    
    def distribute_edge(self):
        for e_id,node in self.Edge.items():
            check = {}
            for n_id in node :
                p_id = self.Part[n_id]
                if len(check) == self.p : break
                if p_id not in check:
                    self.m2sPipe[p_id].put((e_id,node))
                    check[p_id] = 1
        for m2sPipe in self.m2sPipe:
            m2sPipe.put((-1,[-1]))
            
        self.sync()
    
    def distribute_part(self):
        for worker in self.Worker:
            for item in self.Part.items():
                self.m2sPipe[worker.w_id].put(item)
            self.m2sPipe[worker.w_id].put((-1,-1))
            
            
        self.sync()
        
    def distribute(self):
        self.distribute_part()
        self.distribute_node()
        self.distribute_edge()
        
    def get_result(self):
        cnt = 0

        for pipe in self.s2mPipe: 
            while True:
                v_id,dist  = pipe.get()
#                 sys.stdout.write("update dist:"+ str(v_id)+" "+str(dist))
                if v_id == -1: 
                    break
                self.result[v_id] = dist
#         sys.stdout.write("get result !\n")
        
        
        

    def run(self):
        for thread in self.thread_pool:
            thread.start()
        sys.stdout.write("All thread start"+"\n")
            
        self.distribute()
        sys.stdout.write("init data distribute over"+"\n")
        
        msg_num = 1
        turn = 0
        
        source_id = 1 # init status
        self.recvBuf[self.Part[source_id]].put((source_id,0,0))
        for buf in self.recvBuf:
            for i in range(1,self.p+1):
#                 print(-i)
                buf.put((-i,-i,-i))
#         sys.stdout.write("master edge num:"+str(len(self.Node[source_id])))
        
#         communicate_beg = time.time()
        while msg_num != 0:   
            turn += 1
            print("---------------\n","turn:",turn)
#             sys.stdout.write("beign turn:"+str(turn)+"\n")

            communicate_beg = time.time()
            self.broadcast(0) # begin recevie buffer 
            msg = self.sync() # wait finish recevie buffer
            print('communicate time:%s(ms)' % (int((time.time() - communicate_beg)*100000)/100))
#             sys.stdout.write("receive buffer info ok"+"\n")
            
            compute_beg = time.time()
            self.broadcast(0) # begin compute
            msg = self.sync() # wait finish compute
            print('compute time:%s(ms)' % (int((time.time() - compute_beg)*100000)/100))
            
#             sys.stdout.write("compute ok"+"\n")
            msg_num = sum([item[0] for item in msg])
            cro_num = sum([item[1] for item in msg])
#             while True:
#                 if sum([buf.qsize() for buf in self.recvBuf]) >= cro_num:
#                     break
                
            sys.stdout.write("total msg:"+str(msg_num)+" cross_msg:"+str(cro_num)+"\n")
                
            self.get_result()
    
#             dic = {}
#             for key in range(5): 
#                 dic[key] = 0
#             for key,value in self.result.items():
#                 if value not in dic :
#                     dic[value] = 0
#                 dic[value] += 1
#             for key in range(5):
#                 print(key,":",dic[key])
        
#         sys.stdout.write("terminal "+"\n")
        self.broadcast(-1) # off worker
#         self.sync()
#         sys.stdout.write("wokers shut down"+"\n")
            
        self.get_result()
        self.terminal()
        
        sys.stdout.write("finished"+"\n")
        return 
      
    def n2n_single_metric(self):
        cnt = 0
        for n_id,edges in self.Node.items():
            nei = set()
            for e_id in edges:
                for node in self.Edge[e_id]:
                    if self.Part[n_id] != self.Part[node]:
                        cnt += 1
        return cnt
    
    def n2n_agg_metric(self):
        cnt = 0
        for n_id,edges in self.Node.items():
            nei = set()
            for e_id in edges:
                for node in self.Edge[e_id]:
                    nei.add(node)
            for node in nei:
                if self.Part[n_id] != self.Part[node]:
                    cnt += 1
        return cnt
          
class Worker:
    def __init__(self,w_id):
        self.w_id = w_id
        self.Node = {}
        self.Edge = {}
        self.Part = {}
        self.msg = {}
        self.sendBuf = []
        self.recvBuf = []
        filepath = "./pipeline/"+str(self.w_id)
        open(filepath,"w")
        self.turn = 0
        
    def cal_node(self,node):
        msgs = self.msg[node.n_id]
        mindist = INF 
        mindist = min(msgs)
        sendMessage = []
        if mindist < node.dist : 
            node.dist = mindist
            
            for e_id in node.GetHyperEdges():
#                 print("debug:",self.w_id," ",e_id)
                hyperEdge = self.Edge[e_id]
                for n_id in hyperEdge.GetHyperNodes():
                    sendMessage.append((n_id,mindist+hyperEdge.getValue()))
#                     SendMessageTo(self.v_id,target,mindist+hyperEdge.getValue())
        return sendMessage
    
    def receive_node(self,m2sPipe,s2mPipe):
        while True:
            v_id,edge = m2sPipe.get()
            
            if v_id == -1 : break
            if v_id in self.Node : 
                continue
            self.Node[v_id] = HyperNode(v_id,edge)

        s2mPipe.put(self.w_id)

        
    def receive_edge(self,m2sPipe,s2mPipe):
        while True:
            e_id,node = m2sPipe.get()
            if e_id == -1 : break
            if e_id in self.Edge : 
                continue
            self.Edge[e_id] = HyperEdge(e_id,node)
        s2mPipe.put(self.w_id)
#         print(self.w_id," edge_num:",len(self.Edge))
        
        
    def receive_part(self,m2sPipe,s2mPipe):
        while True:
            v_id,p_id = m2sPipe.get()
            if v_id == -1 : break
            self.Part[v_id] = p_id
#             if len(self.Part)%10000 == 0 :
#                 print(self.w_id," ",len(self.Part))
        s2mPipe.put(self.w_id)
            

    def run(self,m2sPipe,s2mPipe,sendBuf):
        self.receive_part(m2sPipe,s2mPipe)
        self.receive_node(m2sPipe,s2mPipe)
        self.receive_edge(m2sPipe,s2mPipe)
        
        while True:
            self.turn += 1
            sig = m2sPipe.get()
#             sys.stdout.write("w_id:"+str(self.w_id)+" sign:"+str(sig)+"\n")
            if sig == -1 : 
                break
            self.receiveBuf(s2mPipe,sendBuf)
            
            m2sPipe.get()
            self.compute(s2mPipe,sendBuf) 
            
#         sys.stdout.write("w_id:"+str(self.w_id)+" test "+str(len(self.Node))+"\n")
            for node in self.Node.values():
                s2mPipe.put((node.n_id,node.dist))
#         sys.stdout.write("w_id:"+str(self.w_id)+" send result"+"\n")
            s2mPipe.put((-1,-1))
    
        for node in self.Node.values():
            s2mPipe.put((node.n_id,node.dist))
        s2mPipe.put((-1,-1))
        return 
        
    def receiveBuf(self,s2mPipe,Buffer):
        self.msg = {}
        cnt = 0
        while True:
            tar,value,turn = Buffer[self.w_id].get()
#             Buffer[self.w_id].task_done()
            if tar < 0 :
                cnt += 1
                if cnt == len(Buffer) : break
                else : continue
                
            if tar not in self.msg :
                self.msg[tar] = []
            self.msg[tar].append(value)
            
        for tar,value,turn in self.recvBuf:
            if tar not in self.msg :
                self.msg[tar] = []
            self.msg[tar].append(value)
        self.recvBuf = []
        
        
        s2mPipe.put(self.w_id)
        
    def compute(self,s2mPipe,Buffer):
        msg_cnt = 0
        cross_cnt = 0

        for n_id,node in self.Node.items():
            if n_id not in self.msg : continue
            msg = self.cal_node(node) # compute and get msg info
            msg_cnt += len(msg)
            msg = aggregate(msg)
            for tar,value in msg:  # msg transfer judge
                if self.Part[tar] == self.w_id:
                    self.recvBuf.append((tar,value,self.turn))
                else : 
                    cross_cnt += 1
                    Buffer[self.Part[tar]].put((tar,value,self.turn))

        for i in range(len(Buffer)):
             Buffer[i].put((-self.w_id-1,-self.w_id-1,-self.w_id-1))
        s2mPipe.put((msg_cnt,cross_cnt))
        

master = Master(4)
master.load_data("NA.txt")
# master.load_data("MinMax.txt")
master.run()


dataset: wiki  partition method:MinMax.txt
All thread start
init data distribute over
---------------
 turn: 1
communicate time:0.35(ms)
compute time:6.25(ms)
total msg:1239 cross_msg:650
---------------
 turn: 2
communicate time:1.68(ms)
compute time:1044.55(ms)
total msg:5797283 cross_msg:1520405
---------------
 turn: 3
communicate time:8018.2(ms)
compute time:2860.63(ms)
total msg:15231954 cross_msg:5039889
---------------
 turn: 4
communicate time:25841.19(ms)
compute time:17.23(ms)
total msg:6 cross_msg:2
---------------
 turn: 5
communicate time:3.3(ms)
compute time:1.57(ms)
total msg:0 cross_msg:0
finished


In [7]:
print("single metric:",master.n2n_single_metric())
print("aggree metric:",master.n2n_agg_metric())

single metric: 15311060
aggree metric: 6560946


In [17]:
dataset: wiki  partition method:MinMax.txt
All thread start
init data distribute over
---------------
 turn: 1
communicate time:0.35(ms)
compute time:6.25(ms)
total msg:1239 cross_msg:650
---------------
 turn: 2
communicate time:1.68(ms)
compute time:1044.55(ms)
total msg:5797283 cross_msg:1520405
---------------
 turn: 3
communicate time:8018.2(ms)
compute time:2860.63(ms)
total msg:15231954 cross_msg:5039889
---------------
 turn: 4
communicate time:25841.19(ms)
compute time:17.23(ms)
total msg:6 cross_msg:2
---------------
 turn: 5
communicate time:3.3(ms)
compute time:1.57(ms)
total msg:0 cross_msg:0
finished


0 : 1
1 : 804
2 : 3758
3 : 1
4 : 0


In [8]:
(4221013+11089060)/(3424582+10024705)

1.1383557358839915

In [9]:
15311060/13450290

1.1383442290091887