In [None]:
import os
import cupy as cp
import cudf
import dask_cudf
import dask.delayed
import cugraph
from cugraph import Graph
import cugraph.dask as dask_cugraph
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import cugraph.dask.comms.comms as Comms
from numba import cuda, jit, prange
from math import ceil
import dask.array as da
from numba.cuda.random import create_xoroshiro128p_states, xoroshiro128p_uniform_float32
import numpy as np

GPU_MEM_LIMIT = (18*1024**3) / 1e9

In [None]:
@cuda.jit
def compute_degree_distribution(n, count, percentage):
    """
    - n                 number of vertices
    - count             input cudf series
    - percentage        output cudf series
    """
    i = cuda.threadIdx.x + (cuda.blockIdx.x * cuda.blockDim.x)
    
    if i < count.size:
        percentage[i] = (count[i] / n) * 100

@cuda.jit
def adj_list(nodes, src, dst, undirected, out):
    tx = cuda.threadIdx.x
    bx = cuda.blockIdx.x
    dx = cuda.blockDim.x
    tid = dx * bx + tx
    pos = 0
    if tid < len(nodes):
        u_node = nodes[tid]
        for j in range(len(src)):
            if u_node == src[j]:
                out[tid, pos] = dst[j]
                pos += 1
            if undirected:
                if u_node == dst[j]:
                    out[tid, pos] = src[j]
                    pos += 1

@cuda.jit
def reciprocal_count(A, nodes, src, dst, M, N, out):
    ty = cuda.threadIdx.y; tx = cuda.threadIdx.x
    by = cuda.blockIdx.y; bx = cuda.blockIdx.x
    dy = cuda.blockDim.y; dx = cuda.blockDim.x
    row = dy * by + ty
    column = dx * bx + tx
    
    if row < M and column < N and A[row, column] != -1:
        v_node = A[row, column]
        u_node = nodes[row]
        for j in range(len(src)):
            if v_node == src[j] and u_node == dst[j]:
                cuda.atomic.add(out, row, 1)


@cuda.jit
def find_uv_edges(A, src, dst, M, N, undirected, out):
    ty = cuda.threadIdx.y; tx = cuda.threadIdx.x
    by = cuda.blockIdx.y; bx = cuda.blockIdx.x
    dy = cuda.blockDim.y; dx = cuda.blockDim.x
    row = dy * by + ty
    column = dx * bx + tx
    
    if row < M and column < N and A[row, column] != -1:
        u_node = A[row, column]
        for j in range(N):
            v_node = A[row, j]
            if v_node != -1 and u_node != v_node:
                common = explore_edges(u_node, v_node, src, dst)
                cuda.atomic.add(out, row, common)
                if undirected:
                    common = explore_edges(v_node, u_node, src, dst)
                    cuda.atomic.add(out, row, common)
#            j += 1


@cuda.jit(device=True)
def explore_edges(u, v, src, dst):
    result, k = 0, 0
    while k < src.size:
        if u == src[k] and v == dst[k]:
            result += 1
        k += 1

    return result


@cuda.jit
def lcc(nodes, edges, df_degree, recip, undirected, lcc_array):
    tid = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
    if tid < len(nodes):
        lcc = 0.0
        item = edges[tid]
        if item > 0:
            node_deg = df_degree[tid]
            if undirected:
                lcc = item / (node_deg * (node_deg - 1))
            else:
                lcc = item / (node_deg * (node_deg - 1) - (2 * recip[tid]))

        cuda.atomic.add(lcc_array, 0, lcc)

@cuda.jit
def gnp_erdos_renyi(epoch, p, rng_states, M, N, matrix):
    tid = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
    if tid < M:
        for j in range(N):
            rnd = xoroshiro128p_uniform_float32(rng_states, tid*epoch)
            if rnd <= p:
                matrix[tid, j] = 1

@cuda.jit
def align(src, const):
    tid = cuda.blockDim.x * cuda.blockIdx.x + cuda.threadIdx.x
    if tid < len(src):
        src[tid] = src[tid] + const

In [None]:
def load_data(input_file):
    chunksize = dask_cugraph.get_chunksize(input_file)
    ddf = dask_cudf.read_csv(
        input_file, 
        chunksize=chunksize, 
        delimiter=',', 
        names=['src', 'dst', 'wt'], 
        dtype=['int32', 'int32', 'float64']
    ).drop_duplicates(subset=['src', 'dst'], keep='first').dropna(how='any')

    return ddf

#TODO build_graph

def nodes(graph):
    vertices = graph.nodes().compute().sort_values(ascending=True).to_cupy()
    return vertices
    
def number_of_vertices(graph):
    res = graph.nodes()
    return len(res)

def number_of_edges(graph):
    res = graph.number_of_edges()
    return res

def degree(graph, mode='tot'):
    if mode in 'tot': df = graph.degree()
    elif mode in 'in': df = graph.in_degree()
    elif mode in 'out': df = graph.out_degree()
    
    df = df.sort_values(by='vertex', ignore_index=True).compute()
    return df

def degree_distribution_wrapper(n, df) -> cudf.DataFrame:
    """
    - df                cudf dataframe containing in/out/total degree per each node
    - n                 number of vertices    
    - mode              tot OR in OR out degree to specify nothing(??????????????)      
    """
    degree_series = df['degree'].value_counts()
    df_distribution = cudf.DataFrame({'degree': degree_series.index.to_cupy(),
                                      'count': degree_series.to_cupy(), 
                                      'percentage': 0.0})
    size = len(df_distribution)
    compute_degree_distribution.forall(size)(n, df_distribution['count'],
                                             df_distribution['percentage'])
    df = dask_cudf.from_cudf(df_distribution, npartitions=1)
    return df


def compute_bounds(x, y, bytess) -> int:
    size = ((x * y * bytess) / 1e9)
    if size > GPU_MEM_LIMIT:
        x  = compute_bounds(int(x/2), y, bytess)
    return x

    
def init_cc(n, batch_size, iteration, mod, N, nodes_cp=None, mode='local'):
    """
    - n                 number of vertices of the graph
    - batch_size        range of nodes examined each epoch
    - iteration         current epoch
    - mod               the margin of n / batch_size
    """

    if (batch_size * iteration) <= n:
        start = 0 + (batch_size*(iteration - 1))
        stop = start + batch_size
        M = batch_size
    else:
        start = 0 + (batch_size*(iteration - 1))
        stop = start + mod
        M = mod
    print(start, stop)
    return start, stop, M
        
def avg_clustering_coefficient(n, src, dst, df_degree, N, nodes_cp=None, undirected=False) -> float:
    local_ccs = cp.zeros((1,), dtype='float32')
    M = compute_bounds(n, N, cp.dtype(cp.int32).itemsize)
    epochs = ceil(n / M)
    leftovers = n % M
    
    for i in range(1, epochs+1):  
        start, stop, M = init_cc(n, M, i, leftovers, N)
        nodes = cp.arange(start, stop, 1)
        if nodes_cp is not None: nodes = nodes_cp[start : stop]
        matrix = cp.empty((M, N), dtype='int32')
        matrix.fill(-1)
        edgespernode = cp.zeros(M, dtype='int32')
        reciprocal = cp.zeros(M, dtype='int32')

        threadsperblock = 1024
        blockspergrid = (M + (threadsperblock -1)) // threadsperblock
        threadsperblock_2D = (32, 32)
        blockspergrid_x = (N + (threadsperblock_2D[1] - 1)) // threadsperblock_2D[1]
        blockspergrid_y = (M + (threadsperblock_2D[0] - 1)) // threadsperblock_2D[0]
        blockspergrid_2D = (blockspergrid_x, blockspergrid_y)

        adj_list[blockspergrid, threadsperblock](nodes, src, dst, undirected, matrix)
        reciprocal_count[blockspergrid_2D, threadsperblock_2D](matrix, nodes, src, dst, M, N, reciprocal)
        find_uv_edges[blockspergrid_2D, threadsperblock_2D](matrix, src, dst, M, N, undirected, edgespernode) 
        lcc[blockspergrid, threadsperblock](nodes, edgespernode, df_degree, reciprocal, undirected, local_ccs)
        cuda.synchronize()


    return local_ccs[0].get() / n

def random_graph_generator(n, edges) -> cudf.DataFrame:
    L = Graph(directed=True)
    df = cudf.DataFrame({'src': None, 'dst': None})
    p = edges / (n * (n - 1))
    N = n
    M = compute_bounds(n, N, cp.dtype(cp.int32).itemsize*4)
    np_nodes = np.arange(0, n, 1, dtype='int32')
    epochs = ceil(n / M)
    leftovers = n % M
    epoch = 1

    while epoch <= epochs:
        L.clear()
        start, stop, M = init_cc(n, M, epoch, leftovers, N, mode='rndm')
        matrix = cp.zeros((M, N), dtype='int32')
        threadsperblock = 1024
        blockspergrid = (matrix.shape[0] + (threadsperblock - 1)) // threadsperblock
        rng_states = create_xoroshiro128p_states(threadsperblock * blockspergrid, seed=42)
        gnp_erdos_renyi[blockspergrid, threadsperblock](p, rng_states, M, N, matrix)
        a_matrix = np.empty((M,N), dtype='int32')
        cp.asnumpy(matrix, stream=None, out=a_matrix)
#        arrays.append(a_matrix)
        L.from_numpy_array(a_matrix)
        df_l = L.view_edge_list()
        df_l.pop('weights')
        size = len(df['src'])
        align.forall(size)(df_l['src'], start)
        df = cudf.concat([df, df_l], ignore_index=True)
        del a_matrix
        del matrix
        epoch += 1
    
#    A = np.concatenate(arrays)
    df.dropna(inplace=True)
    return df

In [None]:
cluster = LocalCUDACluster()
client = Client(cluster)
Comms.initialize(p2p=True)

In [None]:
client

In [None]:
input_file = '../graphs/ethereum/2020-01-01_2020-01-01/network.csv'
ddf = load_data(input_file)

In [None]:
G = Graph(directed=True)
G.from_dask_cudf_edgelist(ddf, source='src', destination='dst', edge_attr='wt')

In [None]:
vertex_number = number_of_vertices(G)
edges_number = number_of_edges(G)
tot_degrees = degree(G, 'tot')
in_degrees = degree(G, 'in')
out_degrees = degree(G, 'out')



In [None]:
df_tot = degree_distribution_wrapper(vertex_number, tot_degrees)
df_in = degree_distribution_wrapper(vertex_number, in_degrees)
df_out = degree_distribution_wrapper(vertex_number, out_degrees)

In [None]:
N = tot_degrees['degree'].max()
src = ddf['src'].compute()
dst = ddf['dst'].compute()
#tot_deg = tot_degrees['degree'].compute()

In [None]:
big_future = client.scatter([src, dst, tot_degrees['degree']])

In [None]:
avg_cc = client.submit(avg_clustering_coefficient, vertex_number, big_future[0],
                       big_future[1], big_future[2], N, undirected=False)
avg_cc.result()

In [None]:
df_components = dask_cugraph.weakly_connected_components(G)
target_label = df_components['labels'].mode()[0].compute()
target_label = target_label.iloc[0]
df_nodes = df_components[df_components['labels'] == target_label].compute()
ddf_mc = ddf.loc[ddf['src'].isin(df_nodes['vertex'])]

In [None]:
G_mc = Graph(directed=True)
G_mc.from_dask_cudf_edgelist(ddf_mc, source='src', destination='dst', edge_attr='wt')

In [None]:
vertex_number_mc = number_of_vertices(G_mc)
edges_number_mc = number_of_edges(G_mc)
tot_degrees_mc = degree(G_mc, 'tot')
in_degrees_mc = degree(G_mc, 'in')
out_degrees_mc = degree(G_mc, 'out')
vertices_mc = nodes(G_mc)

In [None]:
df_tot_mc = degree_distribution_wrapper(vertex_number, tot_degrees)
df_in_mc = degree_distribution_wrapper(vertex_number, in_degrees)
df_out_mc = degree_distribution_wrapper(vertex_number, out_degrees)

In [None]:
N = tot_degrees_mc['degree'].max()
src_mc = ddf_mc['src'].compute()
dst_mc = ddf_mc['dst'].compute()

In [None]:
big_future = client.scatter([src_mc, dst_mc, tot_degrees_mc['degree']])

In [None]:
avg_cc_mc = client.submit(avg_clustering_coefficient, vertex_number_mc, big_future[0],
                       big_future[1], big_future[2], N, nodes_cp=vertices_mc, undirected=False)
avg_cc_mc.result()

In [None]:
edges_rnd = client.submit(random_graph_generator, vertex_number, edges_number)
edges_rnd.result()

In [None]:
Comms.destroy()

In [None]:
client.shutdown()