# Simple usage of Dask vectors and Dask operators

@Author: Ettore Biondi - ebiondi@caltech.edu

In this notebook, we describe the usage of the Dask-based classes. These objects are designed to take advantage of computational power of computer clusters composed of multiple nodes. To this end, we employ the existing classes in combination of Dask (https://dask.org/). We show the syntax with which a user can instantiate Dask-based objects from existing constructors using a local Dask cluster. The same syntax applies to the other supported Dask clusters.

### Importing necessary libraries

In [1]:
import numpy as np
import occamypy as o

# Plotting
from matplotlib import rcParams
from mpl_toolkits.axes_grid1 import make_axes_locatable
import matplotlib.pyplot as plt
rcParams.update({
    'image.cmap'     : 'gray',
    'image.aspect'   : 'auto',
    'image.interpolation': None,
    'axes.grid'      : False,
    'figure.figsize' : (10, 6),
    'savefig.dpi'    : 300,
    'axes.labelsize' : 14,
    'axes.titlesize' : 16,
    'font.size'      : 14,
    'legend.fontsize': 14,
    'xtick.labelsize': 14,
    'ytick.labelsize': 14,
    'text.usetex'    : True,
    'font.family'    : 'serif',
    'font.serif'     : 'Latin Modern Roman',
})



### Starting a Dask cluster and client
Let's start by starting a local Dask client and show how to get some information from such object. We are going to start 4 workers.

In [2]:
help(o.DaskClient)

Help on class DaskClient in module occamypy.dask.utils:

class DaskClient(builtins.object)
 |  DaskClient(**kwargs)
 |  
 |  Class useful to construct a Dask Client to be used with Dask vectors and operators
 |  
 |  Methods defined here:
 |  
 |  __init__(self, **kwargs)
 |      Constructor for obtaining a client to be used when Dask is necessary
 |      1) Cluster with shared file system and ssh capability:
 |      :param hostnames : - list; list of strings containing the host names or IP addresses of the machines that
 |      the user wants to use in their cluster/client (First hostname will be running the scheduler!) [None]
 |      :param scheduler_file_prefix : string; prefix to used to create dask scheduler-file.
 |      :param logging : - boolean; Logging scheduler and worker stdout to files within dask_logs folder [True]
 |      Must be a mounted path on all the machines. Necessary if hostnames are provided [$HOME/scheduler-]
 |      2) Local cluster:
 |      :param local_param

In [3]:
client_params = {"processes":True}
client = o.DaskClient(local_params=client_params, n_wrks=4)

In [4]:
print("Number of workers = %d" % client.getNworkers())
print("Workers Ids = %s" % client.getWorkerIds())

Number of workers = 4
Workers Ids = ['tcp://127.0.0.1:61062', 'tcp://127.0.0.1:61065', 'tcp://127.0.0.1:61068', 'tcp://127.0.0.1:61071']


### Dask vectors
Now that we have a Dask client, we can instantiate vectors using the Dask interface. The currently supported methods to create such objects are the following:
1. Instantiate a vector template and spread it using the chunk parameter
2. Instantiate multiple vectors and spreading them to the given workers

In [5]:
# Method 1
vec_temp = o.VectorNumpy((200, 300))
chunks = (3, 4, 6, 2) # 3 vectors to worker 1; 4 vectors to worker 2; ...
vecD = o.DaskVector(client, vector_template=vec_temp, chunks=chunks)

vecD inherits all the methods from the abstract vector class. Let's try some of them.

In [6]:
# shape
print("List of shapes: %s" % vecD.shape)
# Randomize
vecD.rand()
# Norm
print("Dask vector norm = %s" % vecD.norm())
# Scaling
vecD.scale(10)
print("Scaled Dask vector norm = %s" % vecD.norm())
# Cloning
vecD1 = vecD.clone()
# Summing two vectors
vecD1 + vecD
# Check norm
print("Sum Dask vector norm = %s" % vecD1.norm())

List of shapes: [(200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300), (200, 300)]
Dask vector norm = 547.419212126386
Scaled Dask vector norm = 5474.192121263861
Sum Dask vector norm = 10948.384242527722


The Dask vector contains a list of the future objects pointing to the vector chunks. Let's see how to see which worker has a given chunk.

In [7]:
print("Future object to first chunk: %s" % vecD.vecDask[0])
print("Worker having given chunk: %s" % client.getClient().who_has(vecD.vecDask[0]))

Future object to first chunk: <Future: finished, type: occamypy.VectorNumpy, key: _call_clone-4dd6d028-f215-487b-a701-6fe2bcb4b593>
Worker having given chunk: {'_call_clone-4dd6d028-f215-487b-a701-6fe2bcb4b593': ('tcp://127.0.0.1:61062',)}


Let's now create a vector using a different Dask-vector constructor. Here, we instantiate all the chunks and then spread them onto the given workers.

In [8]:
vec1 = o.VectorNumpy((200, 300))
vec2 = o.VectorNumpy((10, 30))
vec3 = o.VectorNumpy((250, 1))
# We use the parameter chunks to select which worker will have a given vector instance
vecD = o.DaskVector(client, vectors=[vec1, vec2, vec3], chunks=(1, 1, 0, 1))

Let's try similar tests as before.

In [9]:
# shape
print("List of shapes: %s" % vecD.shape)
# Randomize
vecD.rand()
# Norm
print("Dask vector norm = %s" % vecD.norm())
# Scaling
vecD.scale(10)
print("Scaled Dask vector norm = %s" % vecD.norm())
# Cloning
vecD1 = vecD.clone()
# Summing two vectors
vecD1 + vecD
# Check norm
print("Sum Dask vector norm = %s" % vecD1.norm())
print("Future object to third chunk: %s" % vecD.vecDask[2])
print("Worker having given chunk: %s" % client.getClient().who_has(vecD.vecDask[2]))

List of shapes: [(200, 300), (10, 30), (250, 1)]
Dask vector norm = 142.35185759872147
Scaled Dask vector norm = 1423.5185759872145
Sum Dask vector norm = 2847.037151974429
Future object to third chunk: <Future: finished, type: occamypy.VectorNumpy, key: VectorNumpy-4904d8dfefe3d63e434690b581731f64>
Worker having given chunk: {'VectorNumpy-4904d8dfefe3d63e434690b581731f64': ('tcp://127.0.0.1:61071',)}


### Dask operators
Now, let's try to instantiate Dask operators. These kind of objects are pretty useful when large-scale problems have to be solved. The main idea behind the interface is to pass a given operator constructor and the necessary parameters so that the object is directly instantiated within the Dask workers of a client.

In [10]:
# Construct a simple scaling operator acting on each chunk of a Dask Vector
vec = o.VectorNumpy((100, 25))
chunks = (2, 3, 5, 10)
sc = 10.0
vecD = o.DaskVector(client, vector_template=vec, chunks=chunks)
# Creating list of lists of the arguments for the operator's constructor
scal_op_args = [(vec_i, sc) for vec_i in vecD.vecDask]

# Instantiating Dask operator
scaleOpD = o.DaskOperator(client, o.Scaling, scal_op_args, chunks)

Similarly to the Dask vector class, a Dask operator object inherits all the methods from the corresponding abstract class. Let's try some of those methods.

In [11]:
# Dot-product test
scaleOpD.dotTest(True)
# Power method
max_eig = scaleOpD.powerMethod()
print("\nMaximum eigenvalue = %s" % max_eig)

Dot-product tests of forward and adjoint operators
--------------------------------------------------
Applying forward operator add=False
 Runs in: 0.08325624465942383 seconds
Applying adjoint operator add=False
 Runs in: 0.07850790023803711 seconds
Dot products add=False: domain=1.083178e+03 range=1.083178e+03 
Absolute error: 0.000000e+00
Relative error: 0.000000e+00 

Applying forward operator add=True
 Runs in: 0.0854029655456543 seconds
Applying adjoint operator add=True
 Runs in: 0.08702898025512695 seconds
Dot products add=True: domain=2.166357e+03 range=2.166357e+03 
Absolute error: 4.547474e-13
Relative error: 2.099134e-16 

-------------------------------------------------

Maximum eigenvalue = 10.000000000000005


Let's now try to apply this Dask operator.

In [12]:
vecD.rand()
vecD1 = scaleOpD.getRange().clone()
scaleOpD.forward(False, vecD, vecD1)
print("Norm of the input = %s" % vecD.norm())
print("Norm of the output = %s" % vecD1.norm())

Norm of the input = 129.12552942808634
Norm of the output = 1291.2552942808636


Finally, let's combine an operator that spreads and collects a local vector onto a Dask-vector chunks. Such operator is useful when the same vector is employed multiple times on different operators embarrassingly-parallelizable.

In [13]:
S = o.DaskSpread(client, vec, chunks)
S.dotTest(True) # checking dot-product

Dot-product tests of forward and adjoint operators
--------------------------------------------------
Applying forward operator add=False
 Runs in: 0.2735610008239746 seconds
Applying adjoint operator add=False
 Runs in: 0.061138153076171875 seconds
Dot products add=False: domain=1.127771e+02 range=1.127771e+02 
Absolute error: 4.263256e-14
Relative error: 3.780251e-16 

Applying forward operator add=True
 Runs in: 0.2522130012512207 seconds
Applying adjoint operator add=True
 Runs in: 0.07279467582702637 seconds
Dot products add=True: domain=2.255542e+02 range=2.255542e+02 
Absolute error: 8.526513e-14
Relative error: 3.780251e-16 

-------------------------------------------------


In [14]:
#Chain of scaling and spreading operator
scale_S = scaleOpD * S
scale_S.dotTest(True) # checking dot-product
# Testing product of Dask Operators
x = vec.rand()
y = scale_S.getRange().clone()
scale_S.forward(False, x, y)
print("\nFirst element of x = %s" % x.getNdArray()[0,0])
print("First element of y = %s" % y.getNdArray()[0][0,0])

Dot-product tests of forward and adjoint operators
--------------------------------------------------
Applying forward operator add=False
 Runs in: 0.36614108085632324 seconds
Applying adjoint operator add=False
 Runs in: 0.18489599227905273 seconds
Dot products add=False: domain=7.069724e+01 range=7.069724e+01 
Absolute error: 1.165290e-12
Relative error: 1.648282e-14 

Applying forward operator add=True
 Runs in: 0.4546501636505127 seconds
Applying adjoint operator add=True
 Runs in: 0.1888599395751953 seconds
Dot products add=True: domain=1.413945e+02 range=1.413945e+02 
Absolute error: 2.330580e-12
Relative error: 1.648282e-14 

-------------------------------------------------


  rms = np.sqrt(np.mean(np.square(self.getNdArray())))



First element of x = -0.6255070644304621
First element of y = -6.255070644304621


#### Dask blocky operators

In the previous section, we worked with block-diagonal operators. Let's try now to work with blocky operators defined as follows:
\begin{eqnarray}
\mathbf{A}_{blocky} = 
\begin{bmatrix}
\mathbf{A}_{11} & \mathbf{A}_{12} \\
\mathbf{A}_{21} & \mathbf{A}_{22}
\end{bmatrix},
\end{eqnarray}
where $\mathbf{A}_{ij}$ defines each opearator composing $\mathbf{A}_{blocky}$. In here, each worked will take care of each row of this operator.

In [15]:
# We are going to use only two workers
n1 = 3
n2 = 2
vec1 = o.VectorNumpy((n1, 1))
vec2 = o.VectorNumpy((n2, 1))
# We use the parameter chunks to select which worker will have a given vector instance
chunks = (1, 0, 0, 1)
vecD = o.DaskVector(client, vectors=[vec1, vec2], chunks=chunks).zero()
# Now create the list of arguments in a column-wise fashion
A11 = np.random.rand(n1, n1)
A12 = np.random.rand(n1, n2)
A21 = np.random.rand(n2, n1)
A22 = np.random.rand(n2, n2)
A_args = [(A11, vec1, vec1), (A21, vec1, vec2), (A12, vec2, vec1), (A22, vec2, vec2)]
# Instantiating Dask operator
A_blocky = o.DaskOperator(client, o.Matrix, A_args, chunks, op_kind="blocky")

Let's try to apply the forward operator and compare the result by applying the $\mathbf{A}_{blocky}$ matrix locally.

In [16]:
# Dask blocky operator
x = vecD.rand()
y = vecD.clone()
A_blocky.forward(False, x, y)
# Local operations
A_mat = np.block([[A11, A12], [A21, A22]])
x_loc = np.concatenate(x.getNdArray(), axis=0)
y_loc = np.matmul(A_mat, x_loc)
error = y_loc - np.concatenate(y.getNdArray(), axis=0)
print("Error |y_loc-y_dask|_2 = %s" % np.linalg.norm(error))

Error |y_loc-y_dask|_2 = 1.7554167342883506e-16


Let's now try to the adjoint operator.

In [17]:
# Dask blocky operator
y.rand()
A_blocky.adjoint(False, x, y)
# Local operations
# A_mat = np.block([[A11, A12], [A21, A22]])
y_loc = np.concatenate(y.getNdArray(), axis=0)
x_loc = np.matmul(A_mat.T, y_loc)
error = x_loc - np.concatenate(x.getNdArray(), axis=0)
print("Error |x_loc-x_dask|_2 = %s" % np.linalg.norm(error))

Error |x_loc-x_dask|_2 = 1.6653345369377348e-16


Finally, let's test the dot-product test of the blocky operator.

In [18]:
A_blocky.dotTest(True)

Dot-product tests of forward and adjoint operators
--------------------------------------------------
Applying forward operator add=False
 Runs in: 0.05706906318664551 seconds
Applying adjoint operator add=False
 Runs in: 0.03998994827270508 seconds
Dot products add=False: domain=5.652114e-01 range=5.652114e-01 
Absolute error: 1.110223e-16
Relative error: 1.964262e-16 

Applying forward operator add=True
 Runs in: 0.05144476890563965 seconds
Applying adjoint operator add=True
 Runs in: 0.044786930084228516 seconds
Dot products add=True: domain=1.130423e+00 range=1.130423e+00 
Absolute error: 0.000000e+00
Relative error: 0.000000e+00 

-------------------------------------------------
