In [1]:
import time
from math import sqrt
from pyspark import SQLContext
from DataSpliter import DataSpliter
from KruskalMST_UnionFind import kruskalReducer
from PinkMST import PinkMST
from Point import Point
from graph_show import graph_show
import pyhdfs

In [2]:
def getRightId(partId):
    return int (sqrt((partId << 3) + 1) + 1) >> 1 # >> 和 <<都是位运算，对二进制制数进行移位操作。

def getLeftId(partId,rightId):
    return partId - (((rightId - 1) * rightId) >> 1)

def openFile(fileName):
    client = pyhdfs.HdfsClient(hosts="127.0.0.1,9000")
    re = client.open(fileName)
    data = re.read()
    data = data.split(b'\n')
    pointsPair=[]
    for i in data:
        i = str(i,encoding='utf-8')
        if(len(i.strip())==0): continue
        info=i
        pointId = int(info.split(":")[0])
        ts = info.split(":")[1].replace("[","").replace("]","")
        coords = [float(i) for i in ts.split(",")]
        pointsPair.append(Point(pointId,coords))
    return pointsPair

def getSubGraphPair(partitionId,numDataSplits,inputDataFilesLoc):
    numBipartiteSubgraphs = numDataSplits * (numDataSplits - 1) / 2;
    if(partitionId < numBipartiteSubgraphs):
        rightId = getRightId(partitionId)
        leftId = getLeftId(partitionId,rightId)
    else:
        leftId = partitionId - numBipartiteSubgraphs
        rightId = leftId
    leftFileName = "%s/part-%05d" % (inputDataFilesLoc,leftId)
    rightFileName = "%s/part-%05d" % (inputDataFilesLoc,rightId)
    pointsLeft = openFile(leftFileName)
    pointsRight = None
    if (leftFileName!=rightFileName):
        pointsRight=openFile(rightFileName)
    print("sub Id:",leftId," - ",rightId)
    return (pointsLeft,pointsRight)

def GetPartitionFunction(line,inputDataFilesLoc=None,numDataSplits=None):
    partionId = int(line) 
    print("partitionId:",partionId)
    subGraphsPair = getSubGraphPair(partionId,numDataSplits,inputDataFilesLoc)
    pinkMst = PinkMST(subGraphsPair,partionId)
    edgeList = pinkMst.getEdgeList()
    return edgeList

def SetPartitionIdFunction(line,K):
    key = line[0]//K           
    return (key,line[1])

def CreateCombiner(edge):
    edgeList=[]
    edgeList.append(edge)
    return edgeList

def Merger(list,edge):
    mergeList = list
    mergeList.append(edge)
    return mergeList

def displayResults(mstToBeMerged):
    print("result mst count :",len(mstToBeMerged))
    for j in mstToBeMerged:
        print(j.getLeft(),",",j.getRight(),",",j.getWeight())

In [4]:
# main process
hadoop_master ='hdfs://127.0.0.1:9000'

K = 4
numDataSplits = 4 # split data into 4 parts
idPartitionFilesLoc='/test/idPartition'
dataParitionFilesLoc='/test/output'
# filePath = "hdfs://127.0.0.1:9000/test/data.txt"
filePath = "hdfs://127.0.0.1:9000/test/data/clust1k_5d.txt"


start = time.time()
numSubGraphs = numDataSplits * (numDataSplits - 1) / 2 + numDataSplits # 子图的数目等于10
splitter = DataSpliter(inputFileName=filePath, numSplits=numDataSplits, outputDir=dataParitionFilesLoc)
splitter.createPartitionFiles(idPartitionFilesLoc, numPartitions=numSubGraphs) #建立分区文件
splitter.writeSequenceFile()

sc = splitter.getSparkConf()
sc.addPyFile('PinkMST.py')
sc.addPyFile('Edge.py')
sc.addPyFile('Point.py')
sqlContext = SQLContext(sc)

# 读取分区数据
partitionRdd = sc.textFile(name=hadoop_master+idPartitionFilesLoc,minPartitions = int(numSubGraphs))
# print("partitionRdd count: ", partitionRdd.count())
print("partion created")

# local MST
partitions = partitionRdd.flatMap(f=lambda x:GetPartitionFunction(line=x,
                                        inputDataFilesLoc=dataParitionFilesLoc,numDataSplits=numDataSplits))
# print("edge count: ",partitions.count())
print("local mst created")

mstToBeMerged = partitions.combineByKey(createCombiner=CreateCombiner, mergeValue=Merger, 
                                        mergeCombiners=lambda x:kruskalReducer)
# print("mstToBeMerged count: ", mstToBeMerged.count())
print("mst to be merged created")

while(numSubGraphs > 1):
    numSubGraphs = (numSubGraphs + (K - 1)) // K
    mstToBeMergedResult = mstToBeMerged.map(lambda x:SetPartitionIdFunction(x,K)).reduceByKey(kruskalReducer, int(numSubGraphs))
    mstToBeMerged = mstToBeMergedResult
    mstToBeMerged.cache()

mstToBeMerged=mstToBeMerged.collect()[0][1]

displayResults(mstToBeMerged)
end = time.time()
print("PinkTotalTime=" + str(end - start))

graph_show(mstToBeMerged)

data number:  1000
0:[1.25, 2.5299999713897705, 0.23000000417232513, 1.8799999952316284, -0.25]
1:[-2.609999895095825, -0.5400000214576721, -0.14000000059604645, -2.509999990463257, -1.0]
2:[-0.46000000834465027, 0.3199999928474426, 1.3700000047683716, 1.190000057220459, 0.15000000596046448]
3:[-0.09000000357627869, 0.07000000029802322, 0.6399999856948853, -2.069999933242798, 3.630000114440918]
4:[1.4299999475479126, -1.9500000476837158, 2.200000047683716, -0.9200000166893005, -0.17000000178813934]
partion created
local mst created
mst to be merged created
result mst count : 999
619 , 253 , 0.2133072868810325
197 , 729 , 0.22847306790020577
744 , 774 , 0.2291287263797915
71 , 84 , 0.25099806542811365
525 , 35 , 0.26589474599962676
665 , 657 , 0.2731299972513506
314 , 379 , 0.27477257134904026
982 , 95 , 0.2769476404763494
984 , 50 , 0.2924038042523119
317 , 674 , 0.30577772899661926
236 , 881 , 0.3075711312248656
506 , 396 , 0.3198438731624066
497 , 796 , 0.32155875490195296
637 , 546 

175 , 498 , 0.6939020177604499
82 , 726 , 0.6943342453561795
447 , 614 , 0.6948380640147725
684 , 727 , 0.6957010914271188
305 , 779 , 0.6957729237930165
588 , 389 , 0.6962040407530222
996 , 676 , 0.6978538826178708
624 , 99 , 0.6987847101874731
985 , 73 , 0.6990708219237927
16 , 745 , 0.6991422222049823
117 , 89 , 0.6995712428390644
883 , 691 , 0.6995713083378513
928 , 401 , 0.6996427389290589
813 , 308 , 0.7002856324747551
71 , 19 , 0.7003570783488272
537 , 850 , 0.700428464174813
908 , 456 , 0.7010705908088884
582 , 524 , 0.7027090441987375
893 , 674 , 0.70342027962552
748 , 775 , 0.7044856192185222
886 , 220 , 0.7045566047172103
127 , 913 , 0.7049822965894988
598 , 765 , 0.7056911866407131
614 , 267 , 0.7061161324194489
760 , 211 , 0.7061869393020785
610 , 750 , 0.708731250163819
293 , 730 , 0.7095067442001697
430 , 336 , 0.7098591157751976
942 , 302 , 0.710140832882637
829 , 540 , 0.7116178847840386
463 , 277 , 0.7122499381912653
956 , 305 , 0.7123903338627307
170 , 106 , 0.712881

747 , 57 , 0.9618731995801847
303 , 948 , 0.9620810327533217
808 , 807 , 0.9626006514433155
233 , 155 , 0.9637427035845954
777 , 938 , 0.9639502273794792
649 , 487 , 0.9641058371245808
625 , 796 , 0.9647279113483326
842 , 844 , 0.9663849762014434
587 , 573 , 0.9664367734295073
48 , 548 , 0.9666435997648328
846 , 770 , 0.9681425295203893
63 , 22 , 0.9690201766672432
202 , 974 , 0.9703092644433374
192 , 677 , 0.9703607463329281
954 , 194 , 0.9706698441447489
667 , 139 , 0.9706698594961631
483 , 977 , 0.9710302813559007
764 , 147 , 0.9723168918952902
689 , 298 , 0.9729337993144032
546 , 817 , 0.9733961684390899
177 , 855 , 0.975858598990831
23 , 471 , 0.976831623654715
607 , 433 , 0.9771386131206057
814 , 109 , 0.9788258159621512
534 , 400 , 0.9804081181719474
337 , 478 , 0.9810707674981392
18 , 730 , 0.9823950245761954
9 , 480 , 0.9833106723507432
87 , 925 , 0.9840731847043189
397 , 242 , 0.9846319404600572
780 , 274 , 0.985494769954831
930 , 261 , 0.9857991005416342
58 , 737 , 0.9864582

                      {'name': '253', 'symbolSize': 10},
                      {'name': '197', 'symbolSize': 10},
                      {'name': '729', 'symbolSize': 10},
                      {'name': '744', 'symbolSize': 10},
                      {'name': '774', 'symbolSize': 10},
                      {'name': '71', 'symbolSize': 10},
                      {'name': '84', 'symbolSize': 10},
                      {'name': '525', 'symbolSize': 10},
                      {'name': '35', 'symbolSize': 10},
                      {'name': '665', 'symbolSize': 10},
                      {'name': '657', 'symbolSize': 10},
                      {'name': '314', 'symbolSize': 10},
                      {'name': '379', 'symbolSize': 10},
                      {'name': '982', 'symbolSize': 10},
                      {'name': '95', 'symbolSize': 10},
                      {'name': '984', 'symbolSize': 10},
                      {'name': '50', 'symbolSize': 10},
                      {'name': '317'

             'links': [{'source': '619',
                        'target': '253',
                        'value': 0.2133072868810325},
                       {'source': '197',
                        'target': '729',
                        'value': 0.22847306790020577},
                       {'source': '744',
                        'target': '774',
                        'value': 0.2291287263797915},
                       {'source': '71',
                        'target': '84',
                        'value': 0.25099806542811365},
                       {'source': '525',
                        'target': '35',
                        'value': 0.26589474599962676},
                       {'source': '665',
                        'target': '657',
                        'value': 0.2731299972513506},
                       {'source': '314',
                        'target': '379',
                        'value': 0.27477257134904026},
                       {'source': '982',
      

                        'target': '223',
                        'value': 0.6350590768642463},
                       {'source': '219',
                        'target': '517',
                        'value': 0.6351377770997574},
                       {'source': '591',
                        'target': '114',
                        'value': 0.6352164540960874},
                       {'source': '787',
                        'target': '77',
                        'value': 0.6352164969076607},
                       {'source': '847',
                        'target': '419',
                        'value': 0.6367102599531658},
                       {'source': '954',
                        'target': '452',
                        'value': 0.6410927781998134},
                       {'source': '217',
                        'target': '413',
                        'value': 0.6424172787071097},
                       {'source': '742',
                        'target': '453',
        

                        'value': 0.7424284494007225},
                       {'source': '260',
                        'target': '387',
                        'value': 0.74263044851217},
                       {'source': '40',
                        'target': '412',
                        'value': 0.742899734452463},
                       {'source': '619',
                        'target': '437',
                        'value': 0.7439085649351457},
                       {'source': '767',
                        'target': '524',
                        'value': 0.7439086182173417},
                       {'source': '677',
                        'target': '72',
                        'value': 0.7471947445312865},
                       {'source': '697',
                        'target': '844',
                        'value': 0.7491996337751109},
                       {'source': '665',
                        'target': '695',
                        'value': 0.7496665951834264},

                        'value': 0.8841379723437933},
                       {'source': '843',
                        'target': '381',
                        'value': 0.8849858600219641},
                       {'source': '951',
                        'target': '846',
                        'value': 0.8860021322044089},
                       {'source': '51',
                        'target': '57',
                        'value': 0.8871864898907087},
                       {'source': '113',
                        'target': '647',
                        'value': 0.8875809745115026},
                       {'source': '826',
                        'target': '584',
                        'value': 0.8884255409039662},
                       {'source': '149',
                        'target': '124',
                        'value': 0.8884819052270058},
                       {'source': '836',
                        'target': '963',
                        'value': 0.888763085737104

                       {'source': '303',
                        'target': '948',
                        'value': 0.9620810327533217},
                       {'source': '808',
                        'target': '807',
                        'value': 0.9626006514433155},
                       {'source': '233',
                        'target': '155',
                        'value': 0.9637427035845954},
                       {'source': '777',
                        'target': '938',
                        'value': 0.9639502273794792},
                       {'source': '649',
                        'target': '487',
                        'value': 0.9641058371245808},
                       {'source': '625',
                        'target': '796',
                        'value': 0.9647279113483326},
                       {'source': '842',
                        'target': '844',
                        'value': 0.9663849762014434},
                       {'source': '587',
       

                        'target': '449',
                        'value': 1.1776247576080816},
                       {'source': '569',
                        'target': '514',
                        'value': 1.182285823378505},
                       {'source': '54',
                        'target': '532',
                        'value': 1.1840607992695167},
                       {'source': '322',
                        'target': '124',
                        'value': 1.1842296818582148},
                       {'source': '782',
                        'target': '311',
                        'value': 1.185537834120445},
                       {'source': '765',
                        'target': '611',
                        'value': 1.1867182123812192},
                       {'source': '391',
                        'target': '142',
                        'value': 1.1885285395043088},
                       {'source': '606',
                        'target': '527',
          

In [5]:
binwu.wb@alibaba-inc.com

NameError: name 'binwu' is not defined