In [1]:
from dask.distributed import Client, LocalCluster
import dask.array as da
from lmdec import PowerMethod

# Running Power Method on a Local Cluster 

This is no faster than running it without a Local Cluster but demonstrate the fact that lmdec can handle being run on a cluster.

In [4]:
cluster = LocalCluster(n_workers=4, 
                       threads_per_worker=1,
                       memory_limit='3 GB')
client = Client(cluster)

The operations and the workers can be visualized in the Dashboard in the client below.

In [3]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:57935  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 12.00 GB


In [5]:
a = da.random.random(size=(100000,20000))
a

Unnamed: 0,Array,Chunk
Bytes,16.00 GB,128.00 MB
Shape,"(100000, 20000)","(4000, 4000)"
Count,125 Tasks,125 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 16.00 GB 128.00 MB Shape (100000, 20000) (4000, 4000) Count 125 Tasks 125 Chunks Type float64 numpy.ndarray",20000  100000,

Unnamed: 0,Array,Chunk
Bytes,16.00 GB,128.00 MB
Shape,"(100000, 20000)","(4000, 4000)"
Count,125 Tasks,125 Chunks
Type,float64,numpy.ndarray


## All lmdec operations will naturally interface with a Cluster.

The array `a` is too large to store in the cluster and thus must be "created" on demand. Therefore, there is no need to "scatter" `a` over the cluster.

In [6]:
PM = PowerMethod(tol=1e-9, max_iter=4)

In [7]:
U, S, V = PM.svd(a)

Time Usage : 217.07s of 1000s (Time Limit) 
Iteration Usage : 4 of 4 (Iteration Limit)
  .format(self.time, self.time_limit, self.num_iter, self.max_iter))


## If being used on a cluster with enough memory to hold the array in memory

We can submit the `PM.svd` function to the cluster and scatter the array on the cluster.

For a guide on using clusters and Dask see:
https://docs.dask.org/en/latest/setup.html

In [11]:
b = da.random.random(size=(20000,20000))
b

Unnamed: 0,Array,Chunk
Bytes,3.20 GB,128.00 MB
Shape,"(20000, 20000)","(4000, 4000)"
Count,25 Tasks,25 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.20 GB 128.00 MB Shape (20000, 20000) (4000, 4000) Count 25 Tasks 25 Chunks Type float64 numpy.ndarray",20000  20000,

Unnamed: 0,Array,Chunk
Bytes,3.20 GB,128.00 MB
Shape,"(20000, 20000)","(4000, 4000)"
Count,25 Tasks,25 Chunks
Type,float64,numpy.ndarray


Scatter the array to the cluster

In [5]:
future = client.scatter(b)

Submit the job to a cluster.

Once the job is submitted, it can be observed through the dashboard linked above.

In [6]:
PM = PowerMethod(tol=1e-9, max_iter=4)

In [7]:
USV = client.submit(PM.svd, future, transpose=True)

Get the values from the cluster

In [8]:
U, S, V = USV.result()

In [9]:
U.compute()

array([[ 0.00047532,  0.00091697,  0.00380269, ...,  0.00557026,
         0.00705091,  0.0088292 ],
       [-0.01440481, -0.00205058,  0.00205909, ..., -0.00648305,
         0.00540328,  0.00060134],
       [-0.00367933,  0.00736413,  0.01542387, ..., -0.00417257,
        -0.0041739 , -0.00996514],
       ...,
       [-0.00328265, -0.00365813,  0.00553923, ..., -0.00247538,
        -0.01061328, -0.00020978],
       [ 0.0031228 , -0.01090509,  0.00714572, ..., -0.01147071,
        -0.00598133, -0.00072704],
       [-0.00093447,  0.0015288 ,  0.00431711, ...,  0.00889683,
        -0.00052574,  0.00434754]])