In [2]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster()
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:56829  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.44 GB


In [3]:
import dask.dataframe as dd
import datetime
import glob

v = lambda x: (int(x)/1e18)
g = lambda x: (int(x)/1e9)
t = lambda x: datetime.datetime.fromtimestamp(int(x))

# Concating all of my csv files together:
# https://stackoverflow.com/questions/20906474/import-multiple-csv-files-into-pandas-and-concatenate-into-one-dataframe

files = ['C:/Data_Files/transactions/eth_txs-201901_0.csv','C:/Data_Files/transactions/eth_txs-201901_1.csv','C:/Data_Files/transactions/eth_txs-201901_2.csv']
#      'C:/Data_Files/transactions/eth_txs-201901_3.csv','C:/Data_Files/transactions/eth_txs-201901_4.csv','C:/Data_Files/transactions/eth_txs-201901_5.csv',
#      'C:/Data_Files/transactions/eth_txs-201901_6.csv','C:/Data_Files/transactions/eth_txs-201901_7.csv','C:/Data_Files/transactions/eth_txs-201901_8.csv']
data = []
for filename in files:
    dt = dd.read_csv(filename,
                usecols=['hash','nonce','block_hash','block_number','transaction_index','from_address','to_address','value','gas','gas_price','block_timestamp'],
                converters={'value': v, 
                            'gas_price': g, 
                            'gas': v,
                            'block_timestamp': t})
    data.append(dt)

df = dd.concat(data, axis=0, ignore_index=True)

# distribute the partitions of the dataset across the cluster
df = client.persist(df)

In [4]:
# calculate aggregation
a=df.groupby(['from_address']).agg({
    'hash': ['count'],
    'value': ['min','max','mean','std']
})
a.columns = ['in_count', 'in_min', 'in_max', 'in_mean', 'in_std']
a.index.name = 'address'

b=df.groupby(['to_address']).agg({
    'hash': ['count'],
    'value': ['min','max','mean','std']
})
b.columns = ['out_count', 'out_min', 'out_max', 'out_mean', 'out_std']
b.index.name = 'address'

features = a.merge(b, how='outer', left_index=True, right_index=True)
features.in_count = features.in_count.astype(float)
features.out_count = features.out_count.astype(float)

features = features.fillna(-1e-18)
# persist the features in cluster memory
features = client.persist(features)

In [5]:
# create an aggregated list of adjacencies - we're not interested in how many transactions, just whether the nodes are connected
adjdf = df.groupby(['from_address','to_address']).agg({
    'hash':'count'
}).compute()

In [6]:
# this could be optimized
adjmap = {}
for i, row in adjdf.reset_index(drop=False).drop('hash',axis=1).sort_values(['from_address']).iterrows():
    if row.from_address not in adjmap:
        adjmap[row.from_address] = []
    else:
        adjmap[row.from_address].append(row.to_address)
    
    if len(adjmap[row.from_address])==0:
        adjmap[row.from_address] = [row.to_address]

In [7]:
# create a map of addresses (based on the features dataset)
unique_addrs = features.index.compute().values
addrmap = {addr: ix for ix, addr in list(enumerate(unique_addrs))}

In [8]:
# free some memory
client.cancel(df)
client.cancel(features)

In [9]:
import sys
print('addrmap',(sys.getsizeof(addrmap)/1024)/1024, 'MB')
print('adjmap',(sys.getsizeof(adjmap)/1024)/1024, 'MB')

addrmap 80.00009155273438 MB
adjmap 40.000099182128906 MB


In [10]:
from dask import delayed

# make these dictionaries delayed, so dask can push them around
addrmap_ = delayed(addrmap)
adjmap_ = delayed(adjmap)

In [11]:
from dask.distributed import wait

# by suggestion of Matt Rocklin, submit the dicts to one of the workers, and then replicate it to the others
# https://stackoverflow.com/questions/48299356/override-dask-scheduler-to-concurrently-load-data-on-multiple-workers/52069109#52069109
d1_ = client.submit(addrmap_)
d2_ = client.submit(adjmap_)
wait([d1_,d2_])
client.replicate([d1_, d2_])

In [12]:
# the lookup function 
# note that it's not a delayed function, because it runs on each of the addresses
def get_adj_nodes(addr, adjmap, addrmap):
    try:
        # list of nodes adjacent to addr
        adj = adjmap[addr]
    except KeyError:
        adj = [addr]
    
    # index of addr
    element_ix = addrmap[addr]
    # index of its neighbors
    neighbors_ix = [addrmap[a] for a in adj]
    return [element_ix, neighbors_ix]

In [13]:
import dask.bag as db

# to avoid overloading the cluster, we scatter the list of addresses into 10 partitions to be processed
unique_addrs = db.from_sequence(list(addrmap.keys()), npartitions=8)

In [14]:
# finally, we map the lookup to each of these addresses, passing the dict futures as arguments
comps_ = unique_addrs.map(get_adj_nodes, adjmap=adjmap_, addrmap=addrmap_)

In [15]:
# run the whole thing
results = comps_.compute()

  ([['0x00000000060c32d93a35a13bed526f8cbb472edb', [ ... e27b26a56']]],)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)


In [16]:
print(len(results))
print(type(results))

1462039
<class 'list'>


In [17]:
import pickle
import collections

graph = collections.defaultdict(list)

for i in results:
    graph[i[0]] = i[1]

In [18]:
print(len(graph)) 
print("Saving graph...")
pickle.dump(graph, open('ind.' + 'exchange' + '.graph', 'wb'))
print("... graph saved.")

1462039
Saving graph...
... graph saved.
