In [1]:
import numpy as np
import torch
import dgl
import time

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# First each training point has its own label (idx)
# The tag is then passed dst <-> src
# After n rounds, each point has its own final label
# The clustering results are then merged

graph = torch.as_tensor(np.fromfile("/raid/bear/data/raw/papers100M/graph1.bin",dtype=np.int32))
trainids = torch.as_tensor(np.fromfile("/raid/bear/data/raw/papers100M/trainIds.bin",dtype=np.int64))
src = graph[::2]
dst = graph[1::2]
src = src.cuda()
dst = dst.cuda()

nodeNUM = 111059956
nodeLabel = torch.zeros(nodeNUM).to(torch.int32) -1
nodeLabel[trainids] = trainids.to(torch.int32)
nodeLabel = nodeLabel.cuda()

In [3]:
dgl.lpGraph(src,dst,nodeLabel)

In [4]:
# clear storage
src = src.cpu()
dst = dst.cpu()
nodeLabel = nodeLabel.cpu()
import gc
torch.cuda.empty_cache()
gc.collect()

37

In [5]:
# Extract the training set label
trainLabel = nodeLabel[trainids.to(torch.int64)]

# Clustering by label (quantity varies)
labelCluster = torch.bincount(trainLabel)
sortLabelsNUM,labelIdx = torch.sort(labelCluster,descending=True)

In [6]:
bound = len(torch.nonzero(sortLabelsNUM > 0).reshape(-1))
sortLabelsNUM = sortLabelsNUM[:bound]
labelIdx = labelIdx[:bound]

In [7]:
def findMinPart(partInfo):
    min_value, min_index = torch.min(partInfo, dim=0)
    return min_value, min_index

def findMaxPart(partInfo):
    max_value, max_index = torch.max(partInfo, dim=0)
    return max_value, max_index

def changeInfo(partInfo,changeIdx,changeValue,acc=True):
    if acc:
        partInfo[changeIdx] += changeValue
    else:
        partInfo[changeIdx] = changeValue

In [15]:
sortLabelsNUM[:30]

tensor([3463, 2107, 1766, 1362, 1100, 1068,  964,  904,  813,  755,  751,  713,
         702,  637,  629,  629,  624,  604,  599,  598,  579,  565,  546,  542,
         540,  532,  531,  521,  515,  510])

In [11]:
labelIdx

tensor([   135837,    334735,    145726,  ...,  79864979, 105840503,
          5145424])

In [17]:
ids = 0
partNUM = 8
partInfo = torch.zeros(partNUM,dtype=torch.int64)
label2part = torch.zeros(torch.max(labelIdx).item()+1,dtype=torch.int64)
for index,labelId in enumerate(labelIdx):
    value,partid = findMaxPart(partInfo[ids:])    # query current minist
    if value.item() < int(1210000/8):
        label2part[labelId] = ids
        changeInfo(partInfo,ids,sortLabelsNUM[index])    # Modify the current storage table
    else:
        ids += 1
        label2part[labelId] = ids
        changeInfo(partInfo,ids,sortLabelsNUM[index])    # Modify the current storage table

In [8]:
# The small clusters are merged to produce a specific number of clusters
partNUM = 8
partInfo = torch.zeros(partNUM,dtype=torch.int64)
label2part = torch.zeros(torch.max(labelIdx).item()+1,dtype=torch.int64)
for index,labelId in enumerate(labelIdx):
    value,partid = findMinPart(partInfo)    # query current minist
    label2part[labelId] = partid
    changeInfo(partInfo,partid,sortLabelsNUM[index])    # Modify the current storage table

# At this point, all the small clusters are put into the large cluster, id: 0 - (partNUM-1)
trainids
trainIdsInPart = label2part[trainLabel.to(torch.int64)] # Finally get the partition in which each training node is located

# Finally, nodeInfo[trainids] = trainIdsInPart

In [None]:
print(f"Has allocated {torch.cuda.memory_allocated() / (1024 ** 3)} GB CUDA Memory")

MAXEDGE = 800000000    # 
MAXSHUFFLE = 30000000   # 

def PRgenG(src,dst,nodeInfo,nodeValue):
    print("test genG...")
    torch.cuda.empty_cache()
    gc.collect()

    print(f"Has allocated {torch.cuda.memory_allocated() / (1024 ** 3)} GB CUDA Memory")
    template_array = torch.zeros(nodeNUM,dtype=torch.int32)

    # Streaming edge data
    batch_size = len(src) // MAXEDGE + 1
    
    src_batches = torch.chunk(src, batch_size, dim=0)
    dst_batches = torch.chunk(dst, batch_size, dim=0)
    batch = [src_batches, dst_batches]

    inNodeTable, outNodeTable = template_array.clone().cuda(), template_array.clone().cuda()
    for src_batch,dst_batch in zip(*batch):
        src_batch,dst_batch = src_batch.cuda(),dst_batch.cuda()
        inNodeTable,outNodeTable = dgl.sumDegree(inNodeTable,outNodeTable,src_batch,dst_batch)
    src_batch,dst_batch = None,None
    outNodeTable = outNodeTable.cpu() # innodeTable still in GPU for next use

    edgeTable = torch.zeros_like(src).to(torch.int32)

    tmp_etable = torch.zeros_like(dst_batches[0],dtype=torch.int32).cuda()
    for _ in range(3):
        offset = 0
        acc_nodeValue = torch.zeros_like(nodeValue,dtype=torch.int32)
        acc_nodeInfo = torch.zeros_like(nodeInfo,dtype=torch.int32)
        for src_batch,dst_batch in zip(*batch):  
            batchLen = len(src_batch)
            tmp_nodeValue,tmp_nodeInfo = nodeValue.clone().cuda(),nodeInfo.clone().cuda() 
            src_batch,dst_batch = src_batch.cuda(), dst_batch.cuda()  
            tmp_etable.fill_(0)
            dgl.per_pagerank(dst_batch,src_batch,tmp_etable,inNodeTable,tmp_nodeValue,tmp_nodeInfo)
            edgeTable[offset:offset+batchLen] = tmp_etable[:batchLen].cpu()
            tmp_nodeValue, tmp_nodeInfo = tmp_nodeValue.cpu(),tmp_nodeInfo.cpu()
            acc_nodeValue += tmp_nodeValue - nodeValue
            acc_nodeInfo = acc_nodeInfo | tmp_nodeInfo     
            offset += len(src_batch)
        nodeValue = nodeValue + acc_nodeValue
        nodeInfo = acc_nodeInfo
        tmp_nodeValue,tmp_nodeInfo=None,None
    src_batch,dst_batch,inNodeTable,tmp_etable = None,None,None,None

    return nodeInfo

nodeInfo = PRgenG(src, dst, nodeInfo, nodeValue)

for i in range(8):
    nodeIndex = (nodeInfo & (1 << i)) != 0
    nid = torch.nonzero(nodeIndex).reshape(-1).to(torch.int32)
    print(f"{i}: {nid.shape}")
