# Dask Leaflet Finder

In [15]:
%matplotlib inline
import sys
import numpy as np
import random
import os, time, sys, datetime
import sklearn.metrics.pairwise
import scipy.spatial.distance
from scipy.spatial.distance import cdist
import dask
import dask.array as da
import dask.multiprocessing
from dask.diagnostics import ProgressBar
from dask.diagnostics import ResourceProfiler
from dask.dot import dot_graph
from dask.array.core import map_blocks
import dask.bag as db
import dask.dataframe as df
import random
import logging
logging.basicConfig(stream=sys.stdout, level=logging.CRITICAL)
from chest import Chest
from random import shuffle


#from multiprocessing.pool import ThreadPool
RESULT_DIR="results"
RESULT_FILE_PREFIX="pair-distance-"
HEADER_CSV="Scenario, Type, Time"
#BASE_DIRECTORY=os.getcwd()
# Dask has issues with NFS home directory on Comet
# BASE_DIRECTORY='/scratch/luckow/7146882'
BASE_DIRECTORY='/oasis/scratch/comet/luckow/temp_project'
#BASE_DIRECTORY='/scratch/luckow/7218009/'
OUT_DIR=os.path.join(BASE_DIRECTORY, "npy_stack")

FILENAMES=["../132k_dataset/atom_pos_132K.npy", "../145K_dataset/atom_pos_145K.npy", 
          "../300K_dataset/atom_pos_291K.npy", '../840K_dataset/atom_pos_839K.npy']

scenario = FILENAMES[0]

## Dense Distance Array

In [2]:
%%time

"""Make sure point_array points to correct dataset"""
global point_array
global cutoff

cutoff=15.0


def map_block_distance(block,  block_id=None):
    isCompute = block_id[1]>=block_id[0] and block.shape != (1,1)# Bug ? 
                                         # Dask returns one block with shape (1,1) ID:(1, 1) Shape: (1, 1) Predicate: True Content: [[ 1.]]
    block_length = block.shape[0]
    logging.debug("ID:" + str(block_id) + " Shape: " + str(block.shape) + \
          " Predicate: " + str(isCompute) + " Content: " + str(block) + "\n")
    if isCompute:
        source_start = block_id[0]*block_length
        source_end = (block_id[0]+1)*block_length
        source_points = point_array[source_start:source_end]        
        dest_start = block_id[1]*block_length
        dest_end = (block_id[1]+1)*block_length
        dest_points = point_array[dest_start:dest_end]  
        logging.debug("Source Idx: %d - %d Dest. Idx: %d - %d\n"%(source_start, source_end, dest_start, dest_end))
        #print "Source Points: " + str(source_points.compute())
        #print "Destination Points: " + str(dest_points.compute())
        distance = cdist(source_points, dest_points)
        distances_bool = (distance < cutoff) & (distance > 0)
        return distances_bool
    else:
        return np.zeros(block.shape)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 8.11 µs


### Testing Pairwise Distance using Dummy Data

In [3]:
number_points = 4
points_np = np.arange(number_points*3).reshape(number_points,3)
point_array = da.from_array(points_np, chunks=(2, 3))
dist_matrix = da.zeros((number_points,number_points), chunks=(2,2))

In [None]:
point_array.numblocks

In [None]:
point_array.compute()

In [None]:
dist_matrix.numblocks

In [None]:
dist_matrix.chunks

In [None]:
dist_matrix.compute()

In [None]:
dist_res=dist_matrix.map_blocks(map_block_distance)

In [None]:
distances = dist_res.compute()
distances

## 1-D Version with Sparse Output

No persistence between NetworkX Connected Components and Pairwise Distance Required

In [7]:
number_points = 4
points_np = np.arange(number_points*3).reshape(number_points,3)
point_array = da.from_array(points_np, chunks=(2, 3))
dist_matrix = da.zeros((number_points,number_points), chunks=(2,4)) # in 2. dimension only 1 chunk
print dist_matrix.chunks

((2, 2), (4,))


In [4]:
"""Make sure point_array points to correct dataset"""
global point_array
global cutoff

cutoff=15.0

def map_blocks_1d_sparse(block, block_id):
    #new_block = block[:, :, None]
    
    isCompute = block_id[1]>=block_id[0] and block.shape != (1,1)# Bug ? 
                                         # Dask returns one block with shape (1,1) ID:(1, 1) Shape: (1, 1) Predicate: True Content: [[ 1.]]
    block_length = block.shape[0]
    logging.debug("ID:" + str(block_id) + " Shape: " + str(block.shape) + \
          " Predicate: " + str(isCompute) + " Content: " + str(block) + "\n")
    
    source_start = block_id[0]*block_length
    source_end = (block_id[0]+1)*block_length
    source_points = point_array[source_start:source_end]        
    dest_start = 0
    dest_end = block.shape[1]
    dest_points = point_array[dest_start:dest_end]  
    logging.debug("Source Idx: %d - %d Dest. Idx: %d - %d\n"%(source_start, source_end, dest_start, dest_end))
    logging.debug("Source Points: " + str(source_points.compute()))
    logging.debug("Destination Points: " + str(dest_points.compute()))
    #if isCompute:
    distances = cdist(source_points, dest_points) 
    #distances_bool = (distances < cutoff) & (distances > 0)
    #logging.debug(str(distances_bool))
    #xx, yy = np.meshgrid(np.arange(source_start, source_end), np.arange(dest_start, dest_end))
    #logging.debug("xx: " + str(xx))
    #logging.debug("yy: " + str(yy))
    #res=np.array([zip(y,x, z) for x,y,z in zip(xx, yy, distances_bool.T)])
    #true_res = np.array(np.where((distances < cutoff) & (distances > 0)))
    true_res = np.array(np.where(distances < cutoff))
    logging.debug("True Source: %s, Source_start: %d"%(str(true_res[0]), source_start))
    true_res[0] = true_res[0] + source_start # source offset for block
    logging.debug("True Source Adjusted" + str(true_res[0]))
    true_res[1] = true_res[1] + dest_start # dest offset for block
    res=np.array(zip(true_res[0], true_res[1]))
    res=res[res[:,0]<res[:,1], :] # filter duplicate edges (only edges where ind1<ind2)
    #number_pairs = block.shape[0]*block.shape[1]
    #logging.debug("Result Shape: %s Block Shape: %s, Number Pairs: %d"%(str(res.shape), str(block.shape), number_pairs))
    #res=res.reshape(number_pairs, 3)
    logging.debug("Result: " + str(res))
    return res
    #else:
    #    return np.zeros((1,2))
    #return new_block + cdist(source_points, dest_points)<cutoff
    #return np.array(block)
    #return new_block 

In [9]:
da_res=dist_matrix.map_blocks(map_blocks_1d_sparse,  dtype='int')
da_res.compute()

array([[0, 1],
       [0, 2],
       [1, 2],
       [1, 3],
       [2, 3]])

In [None]:
cdist(points_np, points_np)

# MDAnalysis Data
## Data Preparation

In [None]:
CHUNKSIZE=[128, 256, 512, 1024, 2048, 4096, 8192, 16384]

for c in CHUNKSIZE:
    for i in FILENAMES:
        print i
        atoms = np.load(i)
        a_da = da.from_array(atoms, chunks=(c,3))
        out_file=os.path.join(OUT_DIR, os.path.basename(i)+"_"+str(c))
        try:
            os.makedirs(out_file)
        except:
            pass
        da.to_npy_stack(out_file, a_da)

## Dense Output Array

In [None]:
%%time

distances = dist_matrix.map_blocks(map_block_distance).compute()
#da.to_npy_stack(output_directory, distances)

## Sparse Output

Outputs sparse edge list

**Single Node**

In [None]:
%%time
CHUNKSIZE=1024
FILENAME="atom_pos_291K.npy"
fn=os.path.join(OUT_DIR,'atom_pos_132K.npy_%s')%CHUNKSIZE
point_array=da.from_npy_stack(fn)
point_array.shape
dist_matrix = da.zeros((point_array.shape[0],point_array.shape[0]), chunks=(CHUNKSIZE,point_array.shape[0]))
da_res=dist_matrix.map_blocks(map_blocks_1d_sparse, chunks=(CHUNKSIZE,3), dtype='int')
res=da_res.compute()

**Distributed**

Data needs to be available on all nodes: 
    
    rsync -avh comet-22-44:/scratch/luckow/7216144/* .

In [None]:
%%time

from distributed import Client

CHUNKSIZE=2048
FILENAME="atom_pos_839K"
hostname="198.202.117.67:8786"

client = Client(hostname)
cluster_details=client.ncores()
number_nodes = len(cluster_details.keys())
number_cores = sum(cluster_details.values())  
    
print "Connected to Dask Cluster: %s "%str(client.ncores())
global point_array

start=time.time()
with dask.set_options(get=client.get):    
    fn=os.path.join(OUT_DIR,'%s.npy_%s')%(FILENAME, CHUNKSIZE)
    point_array=da.from_npy_stack(fn)
    chunk_size = point_array.chunks[0][0]
    print("Input Data: %s, Chunksize: %d, Numblocks: %s")%(str(point_array.shape), chunk_size, str(point_array.numblocks))
    dist_matrix = da.zeros((point_array.shape[0],point_array.shape[0]), chunks=(chunk_size,point_array.shape[0]))
    da_res=dist_matrix.map_blocks(map_blocks_1d_sparse, chunks=(CHUNKSIZE,3), dtype='int')
    #res=da_res.compute()
end_compute=time.time()    
print("%s,dask-distributed,%s, %d, %d, comet, total, %.4f"%(fn, "benchmark_dask_map_block_1d_sparse_distributed", number_nodes, number_cores, end_compute-start))

## Benchmark

### Single Node

In [None]:
import os.path
OUTPUT_DIRECTORY="/oasis/scratch/comet/luckow/temp_project/out"

def benchmark_dask_map_block_1d_sparse_single_node(filename, cutoff=15, number_threads=40, direct_output=True ):
    global point_array
    func_name = sys._getframe().f_code.co_name

    results = []
    cache = Chest(path=os.path.join(BASE_DIRECTORY, "cache"), available_memory=98e9)  
    
    start = time.time()
    point_array=da.from_npy_stack(filename)
    chunk_size = point_array.chunks[0][0]
    #print str(point_array.shape)
    end_read = time.time()
    results.append("%s,dask,%s,read_file, %.4f"%(filename, func_name, end_read-start))
    
    dist_matrix = da.zeros((point_array.shape[0],point_array.shape[0]), chunks=(chunk_size, point_array.shape[0]))
    #with ProgressBar():
    
    """map_block_distances operates on point_array """
    #out =  dist_matrix.map_blocks(map_block_distance)
    da_res=dist_matrix.map_blocks(map_blocks_1d_sparse, chunks=(chunk_size,3), dtype='int')
    #res=da_res.compute(cache=cache)
    res=da_res.compute()
    #outfile = os.path.join(OUTPUT_DIRECTORY, os.path.basename(filename) + "_out.h5")
    #da_res.to_hdf5(outfile, "/o", compression='lzf')
    end_compute = time.time()
    results.append("%s,dask,%s, 1, 24, comet, compute, %.4f"%(filename, func_name, end_compute-end_read))
    results.append("%s,dask,%s, 1, 24, comet, total, %.4f"%(filename, func_name, end_compute-start))    
    
    # Log performance data
    #end_compute = -1
    #end_out_write = -1
    #outfile = os.path.join(OUTPUT_DIRECTORY, os.path.basename(filename) + "_out.h5")
    #try: 
    #    os.makedirs(outfile) 
    #except: 
    #    pass
    
    #if direct_output:
    #    #da.to_npy_stack(outfile, out)    
    #    out.to_hdf5(outfile, "/o", compression='lzf')
    #    end_compute = time.time()
    #    results.append("%s,dask,%s,compute_write, %.4f"%(filename, func_name, end_compute-end_read))
    #    results.append("%s,dask,%s,total, %.4f"%(filename, func_name, end_compute-start))    
    #else:
    #    out.compute()  
    #    end_compute = time.time()
    #    #print "end compute"
    #    np.save(outfile, out)
    #    end_out_write = time.time()            
    #    results.append("%s,dask,%s,compute, %.4f"%(filename, func_name, end_compute-end_read))
    #    results.append("%s,dask,%s,write_file, %.4f"%(filename, func_name, end_out_write-end_compute))
    #    results.append("%s,dask,%s,total, %.4f"%(filename, func_name, end_out_write-start))
    
    #os.remove(outfile)
    print("\n".join(results))

In [None]:
benchmark_dask_map_block_1d_sparse_single_node(os.path.join(OUT_DIR, "atom_pos_132K.npy_512"))

By using too small parititions, the file open limit of the machine can easily be exhausted

    ulimit -n 70000
    
Unfortunately this limit can not be increased on Comet.

### Distributed

In [12]:
import os.path
from distributed import Client

hostname="198.202.116.245:8786"
client = Client(hostname)
    
def benchmark_dask_map_block_1d_sparse_distributed(filename, cutoff=15, direct_output=True ):
    global point_array
    func_name = sys._getframe().f_code.co_name
    results = []
    #cache = Chest(path=os.path.join(BASE_DIRECTORY, "cache"), available_memory=98e9)  

    cluster_details=client.ncores()
    number_nodes = len(cluster_details.keys())
    number_cores = sum(cluster_details.values())  
    global point_array
    with dask.set_options(get=client.get):        
        start = time.time()
        point_array=da.from_npy_stack(filename)
        chunk_size = point_array.chunks[0][0]   
        end_read = time.time()
        results.append("%s,dask-distributed, %s, %d, %d, comet, read_file, %.4f"%(filename, func_name, number_nodes, number_cores, end_read-start))
        chunk_size
        dist_matrix = da.zeros((point_array.shape[0],point_array.shape[0]), chunks=(chunk_size, point_array.shape[0]))
        """map_block_distances operates on point_array """
        da_res=dist_matrix.map_blocks(map_blocks_1d_sparse, chunks=(chunk_size,3), dtype='int')
        res=da_res.compute()
        end_compute = time.time()
        results.append("%s,dask-distributed, %s, %d, %d, comet, compute, %.4f"%(filename, func_name, number_nodes, number_cores, end_compute-end_read))
        results.append("%s,dask-distributed, %s, %d, %d, comet, total, %.4f"%(filename, func_name, number_nodes, number_cores, end_compute-start))    
        print("\n".join(results))

In [None]:
benchmark_dask_map_block_1d_sparse_distributed(os.path.join(OUT_DIR, "atom_pos_132K.npy_1024"))

In [None]:
#benchmark_dask_map_block_1d_sparse_distributed(os.path.join(OUT_DIR, "atom_pos_291K.npy_2048"))

In [None]:
benchmark_dask_map_block_1d_sparse_distributed(os.path.join(OUT_DIR, "atom_pos_839K.npy_2048"))

In [24]:
dask_scenarios = [os.path.abspath(os.path.join(OUT_DIR, i)) for i in os.listdir(OUT_DIR)]
shuffle(dask_scenarios)
dask_scenarios

['/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_839K.npy_16384',
 '/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_132K.npy_1024',
 '/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_132K.npy_16384',
 '/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_839K.npy_2048',
 '/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_145K.npy_1024',
 '/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_291K.npy_16384',
 '/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_839K.npy_1024',
 '/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_145K.npy_4096',
 '/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_132K.npy_4096',
 '/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_145K.npy_512',
 '/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_839K.npy_8192',
 '/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_132K.npy_512',
 '/oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_2

In [None]:
for idx, s in enumerate(dask_scenarios):
    #if '839K' in s:
    print "Process: %s (%d/%d)"%(s, idx+1, len(dask_scenarios))
    try:
        benchmark_dask_map_block_1d_sparse_distributed(s)
    except:
        print "Failed! Exception"
    time.sleep(60)

Process: /oasis/scratch/comet/luckow/temp_project/npy_stack/atom_pos_839K.npy_16384 (0/24)


distributed.utils - ERROR - ("('map_blocks_1d_sparse-3151727e8eb0eb593212f632a9f1c414', 34, 0)", '198.202.116.250:44898')
Traceback (most recent call last):
  File "/home/luckow/anaconda2/lib/python2.7/site-packages/distributed/utils.py", line 149, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/home/luckow/anaconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/home/luckow/anaconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/home/luckow/anaconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/luckow/anaconda2/lib/python2.7/site-packages/distributed/client.py", line 896, in _gather
    st.traceback)
  File "<string>", line 2, in reraise
KilledWorker: ("('map_blocks_1d_sparse-3151727e8eb0eb593212f632a9f1c414', 34, 0)", '198.202.116.250:44898')


Failed! Exception


## Other Optimization Attempts

In [None]:
%%time

"""Make sure point_array points to correct dataset"""
global point_array
global cutoff

cutoff=15.0


def map_block_distance_sparse(block,  block_id=None):
    isCompute = block_id[1]>=block_id[0] and block.shape != (1,1)# Bug ? 
                                         # Dask returns one block with shape (1,1) ID:(1, 1) Shape: (1, 1) Predicate: True Content: [[ 1.]]
    block_length = block.shape[0]
    logging.debug("ID:" + str(block_id) + " Shape: " + str(block.shape) + \
          " Predicate: " + str(isCompute) + " Content: " + str(block) + "\n")
    if isCompute:
        source_start = block_id[0]*block_length
        source_end = (block_id[0]+1)*block_length
        source_points = point_array[source_start:source_end]        
        dest_start = block_id[1]*block_length
        dest_end = (block_id[1]+1)*block_length
        dest_points = point_array[dest_start:dest_end]  
        logging.debug("Source Idx: %d - %d Dest. Idx: %d - %d\n"%(source_start, source_end, dest_start, dest_end))
        #print "Source Points: " + str(source_points.compute())
        #print "Destination Points: " + str(dest_points.compute())
        #return cdist(source_points, dest_points)<cutoff
        #return np.array(block)
        #else:
    return np.array([zip(x,y) for x,y in zip(np.zeros(shape), np.ones(shape))])

In [None]:
dist_da = da.from_array(distances, chunks=(5,5))

In [None]:
distances=dist_matrix.map_blocks(map_block_distance_sparse).compute()

In [None]:
shape=(5,5)

In [None]:
np.dstack((np.zeros(shape), np.ones(shape)))

In [None]:
[zip(x,y) for x,y in zip(np.zeros(shape), np.ones(shape))]

In [None]:
np.zeros(shape)

In [None]:
distances=dist_matrix.map_blocks(map_block_distance).compute()

In [None]:
def atop_func(x, block_id):
    print str(block_id)
    print type(x)
    print str(x)
    for i in x:
        return np.where(x)
    print x[0].shape
    if x==True: return 5
    else: return 0

In [None]:
d = da.atop(atop_func, 'i', dist_da, 'ij', dtype='int').compute()

In [None]:
da.atop(lambda x: 2 if random.randint(0,1)==0 else 1, 'i', dist_matrix, 'ij', dtype='int').compute()

In [None]:
"""Make sure point_array points to correct dataset"""
global point_array
global cutoff

cutoff=15.0

def map_blocks_new_axis_2d(block, block_id):
    new_block = block[:, :, None]
    
    isCompute = block_id[1]>=block_id[0] and block.shape != (1,1)# Bug ? 
                                         # Dask returns one block with shape (1,1) ID:(1, 1) Shape: (1, 1) Predicate: True Content: [[ 1.]]
    block_length = block.shape[0]
    logging.debug("ID:" + str(block_id) + " Shape: " + str(block.shape) + \
          " Predicate: " + str(isCompute) + " Content: " + str(block) + "\n")
    
    source_start = block_id[0]*block_length
    source_end = (block_id[0]+1)*block_length
    source_points = point_array[source_start:source_end]        
    dest_start = block_id[1]*block_length
    dest_end = (block_id[1]+1)*block_length
    dest_points = point_array[dest_start:dest_end]  
    logging.debug("Source Idx: %d - %d Dest. Idx: %d - %d\n"%(source_start, source_end, dest_start, dest_end))
    logging.debug("Source Points: " + str(source_points.compute()))
    logging.debug("Destination Points: " + str(dest_points.compute()))
    distances = cdist(source_points, dest_points) 
    distances_bool = (distances < cutoff) & (distances > 0)
    xx, yy = np.meshgrid(np.arange(source_start, source_end), np.arange(dest_start, dest_end))
    logging.debug("xx: " + str(xx))
    logging.debug("yy: " + str(yy))
    res=np.array([zip(x,y, z) for x,y,z in zip(yy, xx, distances_bool)])
    logging.debug("Result: " + str(res))
    return res

    #return new_block + cdist(source_points, dest_points)<cutoff
    #return np.array(block)
    #return new_block 

In [None]:
d=(cdist(points_np,points_np)<cutoff)& (cdist(points_np,points_np)>0.0)
print d
zip(np.where(d)[0], np.where(d)[1])

In [None]:
np.where(d)[0]+3

In [None]:
df_res=df.from_array(da_res, columns=["From", "To", "isConnected"])
df_res.where(df_res["isConnected"])\
      .dropna()[["From", "To"]].values.compute()

In [None]:
# Get Edges
print str(distances)
d=np.array(zip(np.where(distances)[0], np.where(distances)[1]))
print d

In [None]:
zip(np.where(distances)[0], np.where(distances)[1])

In [None]:
d[d[:,0]<d[:,1], :]

## 2-D with Sparse Output (not working)

In [None]:
number_points = 4
points_np = np.arange(number_points*3).reshape(number_points,3)
point_array = da.from_array(points_np, chunks=(2, 3))
dist_matrix = da.zeros((number_points,number_points), chunks=(2, 2)) # 2-D Chunks
print dist_matrix.chunks

In [None]:
"""Make sure point_array points to correct dataset"""
global point_array
global cutoff

cutoff=15.0

def map_blocks_new_axis_2d(block, block_id):
    new_block = block[:, :, None]
    
    isCompute = block_id[1]>=block_id[0] and block.shape != (1,1)# Bug ? 
                                         # Dask returns one block with shape (1,1) ID:(1, 1) Shape: (1, 1) Predicate: True Content: [[ 1.]]
    block_length = block.shape[0]
    logging.debug("ID:" + str(block_id) + " Shape: " + str(block.shape) + \
          " Predicate: " + str(isCompute) + " Content: " + str(block) + "\n")
    
    source_start = block_id[0]*block_length
    source_end = (block_id[0]+1)*block_length
    source_points = point_array[source_start:source_end]        
    dest_start = block_id[1]*block_length
    dest_end = (block_id[1]+1)*block_length
   
    dest_points = point_array[dest_start:dest_end]  
    logging.debug("Source Idx: %d - %d Dest. Idx: %d - %d\n"%(source_start, source_end, dest_start, dest_end))
    logging.debug("Source Points: " + str(source_points.compute()))
    logging.debug("Destination Points: " + str(dest_points.compute()))
    distances = cdist(source_points, dest_points) 
    distances_bool = (distances < cutoff) & (distances > 0)
    
    logging.debug(str(distances_bool))
    xx, yy = np.meshgrid(np.arange(source_start, source_end), np.arange(dest_start, dest_end))
    logging.debug("xx: " + str(xx))
    logging.debug("yy: " + str(yy))
    #res=np.array([zip(y,x, z) for x,y,z in zip(xx, yy, distances_bool.T)])
    res=np.array([zip(x,y, z) for x,y,z in zip(yy, xx, distances_bool)])
    number_pairs = block.shape[0]*block.shape[1]
    logging.debug("Result Shape: %s Block Shape: %s, Number Pairs: %d"%(str(res.shape), str(block.shape), number_pairs))
    res=res.reshape(number_pairs, 3)
    logging.debug("Result: " + str(res))
    return res

    #return new_block + cdist(source_points, dest_points)<cutoff
    #return np.array(block)
    #return new_block 

In [None]:
da_res=dist_matrix.map_blocks(map_blocks_new_axis_2d, chunks=(4,3), dtype='int')


In [None]:
da_res.compute()

In [None]:
df_res=df.from_array(da_res, columns=["From", "To", "Distance"])

In [None]:
%%time
print "Run with chunk size %d"%CHUNKSIZE
cache = Chest(path=os.path.join(BASE_DIRECTORY, "cache"), available_memory=98e9)  
df_res=df.from_array(da_res, columns=["From", "To", "isConnected"])
edges_np=df_res.where(df_res["isConnected"])\
               .dropna()[["From", "To"]].values.compute(num_workers=12, cache=cache) #.to_hdf5(outfile, "/o", compression='lzf')


## 2D Version

In [None]:
number_points = 4
points_np = np.arange(number_points*3).reshape(number_points,3)
point_array = da.from_array(points_np, chunks=(2, 3))
dist_matrix = da.zeros((number_points,number_points), chunks=(2,2)) # multiple chunks in 2. dimension
print "Dist Matrix Shape: " + str(dist_matrix.shape)
print "Dist Matrix Chunks: " + str(dist_matrix.chunks)

In [None]:
"""Make sure point_array points to correct dataset"""
global point_array
global cutoff

cutoff=15.0

def map_blocks_new_axis_2d(block, block_id):
    new_block = block[:, :, None]
    
    isCompute = block_id[1]>=block_id[0] and block.shape != (1,1)# Bug ? 
                                         # Dask returns one block with shape (1,1) ID:(1, 1) Shape: (1, 1) Predicate: True Content: [[ 1.]]
    block_length = block.shape[0]
    logging.debug("ID:" + str(block_id) + " Shape: " + str(block.shape) + \
          " Predicate: " + str(isCompute) + " Content: " + str(block) + "\n")
    
    source_start = block_id[0]*block_length
    source_end = (block_id[0]+1)*block_length
    source_points = point_array[source_start:source_end]        
    dest_start = block_id[1]*block_length
    dest_end = (block_id[1]+1)*block_length
   
    dest_points = point_array[dest_start:dest_end]  
    logging.debug("Source Idx: %d - %d Dest. Idx: %d - %d\n"%(source_start, source_end, dest_start, dest_end))
    logging.debug("Source Points: " + str(source_points.compute()))
    logging.debug("Destination Points: " + str(dest_points.compute()))
    distances = cdist(source_points, dest_points) 
    distances_bool = (distances < cutoff) & (distances > 0)
    # Get indicies with right offset from distnace bool
    true_res = np.array(np.where(distances_bool))
    logging.debug("True Source: %s, Source_start: %d"%(str(true_res[0]), source_start))
    true_res[0] = true_res[0] + source_start # source offset for block
    logging.debug("True Source Adjusted" + str(true_res[0]))
    true_res[1] = true_res[1] + dest_start # dest offset for block
    res=np.array(zip(true_res[0], true_res[1]))
    res=res[res[:,0]<res[:,1], :] # filter duplicate ed
    
    number_pairs = len(res)
    logging.debug("Result Shape: %s Block Shape: %s, Number Pairs: %d"%(str(res.shape), str(block.shape), number_pairs))
    res=res.reshape(number_pairs, 2)
    logging.debug("Result: " + str(res))
    return new_block + res

In [None]:
da_res=dist_matrix.map_blocks(map_blocks_new_axis_2d, dtype='int').compute()

In [None]:
da_res