In [1]:
# parameters
alpha = 0.85
max_iter = 100
tol = 1.0e-5

In [2]:
%store -r data_dir
%store -r filename
input_data_path = (data_dir + filename).rstrip('.gz')

In [3]:
import time
from dask.distributed import Client, wait
import dask_cudf
import cugraph
from dask_cuda import LocalCUDACluster
import cugraph.dask as dcg
import cugraph.comms as Comms

If we wish to override the default `get_chunksize()` method:
```python
def _get_chunksize(input_path):
    """
    Calculate the appropriate chunksize for dask_cudf.read_csv
    to get a number of partitions equal to the number of GPUs.
    Examples
    --------
    >>> import dask_cugraph.pagerank as dcg
    >>> chunksize = dcg.get_chunksize(edge_list.csv)
    """

    import os
    from glob import glob
    import math

    input_files = sorted(glob(str(input_path)))
    if len(input_files) == 1:
        size = os.path.getsize(input_files[0])
        chunksize = math.ceil(size/get_n_workers())
    else:
        size = [os.path.getsize(_file) for _file in input_files]
        chunksize = max(size)
    return chunksize

dcg.get_chunksize = _get_chunk_size
```

In [4]:
cluster = LocalCUDACluster(threads_per_worker=1)
client = Client(cluster)
Comms.initialize(p2p=True)

In [5]:
# Helper function to set the reader chunk size to automatically get one partition per GPU  
chunksize = dcg.get_chunksize(input_data_path)

# Start timer
t_start = time.time()

# Multi-GPU CSV reader
e_list = dask_cudf.read_csv(input_data_path, 
                            chunksize=chunksize // 2,  # need to adjust based on hardware configuration
                            delimiter=' ', 
                            names=['src', 'dst'], 
                            dtype=['int32', 'int32'])

G = cugraph.DiGraph()
G.from_dask_cudf_edgelist(e_list, source='src', destination='dst')

# Wait for the lazy reader
tmp = wait(client.compute(e_list.to_delayed()))

# Print time
print(time.time()-t_start, "s")

25.131707429885864 s


In [6]:
# Start timer
t_start = time.time()

# Get the pagerank scores
# https://github.com/rapidsai/cugraph/blob/branch-21.12/python/cugraph/cugraph/dask/link_analysis/pagerank.py
pr_ddf = dcg.pagerank(G, alpha=alpha, max_iter=max_iter, tol=tol)
# max_iter is dependent on chunksize
# max_iter=20 fails to converge when chunksize is set to chunksize // 2
# max_iter=40 fails to converge when chunksize is set to chunksize // 4

# Print time
print(time.time()-t_start, "s")

2.9101381301879883 s


In [7]:
# Start timer
t_start = time.time()

# Dask Data Frame to regular cuDF Data Frame 
pr_df = pr_ddf.compute()

# Sort, descending order
pr_sorted_df = pr_df.sort_values('pagerank',ascending=False)

# Print time
print(time.time()-t_start, "s")

# Print the Top 3
print(pr_sorted_df.head(3))

1.1274964809417725 s
         pagerank    vertex
42325    0.000536  23933989
9532570  0.000491  23933986
9519519  0.000391  23934048


In [8]:
Comms.destroy()
client.close()
cluster.close()