# Pylops-distributed - Solvers

In this notebook we investigate the use of pylops-distributed CG and CGLS solvers with distributed operators.

In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

import os
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import matplotlib.pyplot as plt
import scipy as sp
import skfmm
import dask
import dask.array as da
import pylops
import pylops_distributed

from scipy.sparse.linalg.interface import MatrixLinearOperator, aslinearoperator 
from scipy.linalg import lstsq, solve
from scipy.sparse.linalg import cg, lsqr
from dask import persist
from dask.distributed import Client, LocalCluster, performance_report

In [2]:
os.getenv('OMP_NUM_THREADS'), os.getenv('MKL_NUM_THREADS'), os.getenv('OPENBLAS_NUM_THREADS')

('1', '1', '1')

In [3]:
#nchunks = [2, 1]
#nchunks = [2, 2]
nchunks = [4, 4]

kind = 'persist' # persist or postponed

# Use threads (works fine)
client, cluster = pylops_distributed.utils.backend.dask(processes=False, threads_per_worker=1, n_workers=4)
# Use processes (very slow)
#client, cluster = pylops_distributed.utils.backend.dask(processes=True, threads_per_worker=1, n_workers=4)

In [4]:
client

0,1
Client  Scheduler: inproc://10.0.0.12/4333/1  Dashboard: http://10.0.0.12:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 8.59 GB


### CG for square systems

Let's just try out the solver using numpy inputs (matrix and vector). As da.xx is never explicitely invoked when ``compute=False`` and ``client=None`` the solver will simply operate on numpy arrays

In [5]:
n = 8000

np.random.seed(0)
A = np.random.randn(n, n)
A = np.dot(A.T, A)

Let's now apply the forward using the LinearOperator interface

In [6]:
Ada = da.from_array(A, chunks=(n//nchunks[0], n//nchunks[1])).persist()  # move the data to the workers once
x = da.ones(n) #, chunks=(n//nchunks[1]))
x0 = da.zeros(n) #, chunks=(n//nchunks[1]))

Aop = MatrixLinearOperator(A)
Adaop = pylops_distributed.MatrixMult(Ada, compute=(False, False))

# takes and returns a numpy array
print(Aop.matvec(np.ones(n)))

# takes and returns a dask array
print(Adaop.matvec(np.ones(n)))

[-3449.71567852  9007.96728277 16675.82965137 ... 13483.71530641
 23533.53556349  7352.24843242]
dask.array<sum-aggregate, shape=(8000,), dtype=float64, chunksize=(2000,), chunktype=numpy.ndarray>


In [7]:
Ada

Unnamed: 0,Array,Chunk
Bytes,512.00 MB,32.00 MB
Shape,"(8000, 8000)","(2000, 2000)"
Count,16 Tasks,16 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 512.00 MB 32.00 MB Shape (8000, 8000) (2000, 2000) Count 16 Tasks 16 Chunks Type float64 numpy.ndarray",8000  8000,

Unnamed: 0,Array,Chunk
Bytes,512.00 MB,32.00 MB
Shape,"(8000, 8000)","(2000, 2000)"
Count,16 Tasks,16 Chunks
Type,float64,numpy.ndarray


And the inverse problem with different approches when it comes to the use of dask

In [8]:
niter = 10

y = Aop * np.ones(n)
yy = Adaop * da.ones(n)

# scipy
xinv_sp = cg(Aop, y, maxiter=niter)[0]
print(xinv_sp)

[0.85871839 1.27163427 1.06899494 ... 0.71259363 0.97728159 1.11484963]


if kind == 'persist':
    # dask with persist at each iter
    xinv = pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter, client=client)[0]
else:
    # dask with all graph computed in one go
    xinv = pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter)[0]
  
print(xinv.compute())

In [9]:
if kind == 'persist':
    with performance_report(filename="dask-report-cg_persist.html"):
        pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter, client=client)[0].compute()
else:
    with performance_report(filename="dask-report-cg_postponed.html"):
        pylops_distributed.optimization.cg.cg(Adaop, yy, x0, tol=0, niter=niter)[0].compute()


time 0.01778888702392578
time 0.01585984230041504
time 0.015965938568115234
time 0.01790022850036621
time 0.020157337188720703
time 0.1002042293548584
time 0.05542802810668945
time 0.027637958526611328
time 0.08151817321777344
time 0.026092052459716797


In [10]:
client.close()