Columnwise estimation method for inverse Cov
---------------------------------------------

* The idea:

    Ax_k = e_k ->  x_k = A^(-1)e_k

    Solve x_k for k = 1, ..., n in parallel

* Thoughts

    Pitäisi olla yksikäsitteinen ratk, koska n tuntematonta ja n yhtälöä

In [2]:
# imports

import numpy as np
import xarray as xr
import scipy.sparse.linalg 
import dask
from dask.distributed import Client, progress
import dask.array as da
from functools import partial




invm = np.linalg.inv
gmres = scipy.sparse.linalg.gmres
npsolve = np.linalg.solve

ds_1 = xr.open_dataset("data/regions_verify_isotope_202112_cov.nc")
bio_1 = ds_1["covariance_bio"]
anth_1 = ds_1["covariance_anth"]

ds_2 = xr.open_dataset("data/regions_verify_202104_cov.nc", chunks = 'auto')
bio_2 = ds_2["covariance_bio"]
anth_2 = ds_2["covariance_anth"]

M = bio_1.values


In [3]:
invM_real = invm(M)


The whole inverse matrix
------------------------

When parallelizing this, we need to take into account the fact that the columns need to be assembled in the correct order after they have been computed

In [4]:
client = Client(threads_per_worker = 4, n_workers = 2)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 8,Total memory: 30.78 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:46115,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 30.78 GiB

0,1
Comm: tcp://127.0.0.1:41519,Total threads: 4
Dashboard: http://127.0.0.1:40059/status,Memory: 15.39 GiB
Nanny: tcp://127.0.0.1:42347,
Local directory: /tmp/dask-worker-space/worker-52oqzn7o,Local directory: /tmp/dask-worker-space/worker-52oqzn7o

0,1
Comm: tcp://127.0.0.1:45581,Total threads: 4
Dashboard: http://127.0.0.1:39769/status,Memory: 15.39 GiB
Nanny: tcp://127.0.0.1:33881,
Local directory: /tmp/dask-worker-space/worker-ikpi6f8a,Local directory: /tmp/dask-worker-space/worker-ikpi6f8a


Dask delayed
--------------
* just trying out if it works
* worked with the smaller matrix, but with the bigger one memory seems to be an issue

In [36]:
#Trying out examples from "Embarrassingly parallel Workloads"

#Note: the array needs to be submitted to gmres as numpy, not xarray: hence M = bio_i.values


n = len(M)
eye = np.eye(n)
cols = []

for row in eye:
    # row = col in identity matrix, and python syntax
    # iterates over rows
    inv_col = dask.delayed(gmres)(M, row, tol = 1e-10)
    cols.append(inv_col[0])

cols = dask.compute(cols)



In [39]:
cols[0][0]

array([ 1.25426569e+00, -3.52374850e-02, -6.35396420e-02, -1.01722351e-03,
       -1.00184492e-03, -2.85938105e-05,  6.91899580e-06,  1.12820986e-08,
        2.22862241e-11,  1.38580188e-12,  9.67084860e-10,  1.03813559e-10,
        1.61003724e-11,  2.79279604e-14, -3.45878263e-05, -8.87050416e-04,
        1.08150089e-09,  3.02299850e-08, -1.07141628e-07,  7.84269013e-10,
       -1.32032696e-09, -1.65843816e-11, -1.73914270e-05,  2.69015686e-07,
        3.61151731e-08,  3.70048720e-08,  4.04807298e-07, -2.86541422e-07,
       -1.10214792e-05,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00])

In [38]:
#Note: for some reason cols = dask.compute(cols) produces a tuple that comprises of a single list:
#That's why we need the double index cols[0][i]

for i in range(41):
    print(np.allclose(cols[0][i], invM_real[:,i]))

True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True
True


In [7]:
print(invM_real[:,1])

[-3.52374850e-02  1.25182439e+00 -1.33715847e-02 -2.83921396e-02
 -1.63288213e-04 -1.69204330e-04 -1.01739072e-05  4.65609305e-08
 -5.18827776e-11  4.99858515e-12 -6.52638950e-08  4.25179100e-09
  9.58854140e-10  2.36533385e-12 -5.26494205e-05 -7.82552680e-05
  1.15621339e-08  1.70409596e-07 -7.19040771e-08  2.08828387e-09
  8.01345519e-11  2.15977308e-13 -1.27117651e-04  2.96579778e-06
  9.96468015e-07 -6.19221063e-07  4.53954445e-06 -5.87085816e-05
 -2.86209133e-04  0.00000000e+00  0.00000000e+00  0.00000000e+00
  0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00
  0.00000000e+00  0.00000000e+00  0.00000000e+00  0.00000000e+00
  0.00000000e+00]


In [5]:
client.shutdown()



2022-10-21 00:08:43,467 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client


Dask arrays
-------------

In [42]:

M_da = da.from_array(M)


n = len(M)
eye = np.eye(n)
cols = []

for row in eye:
    # row = col in identity matrix, and python syntax
    # iterates over rows
    inv_col = gmres(M_da, row, tol = 1e-10)
    dask.compute(inv_col[0])




TypeError: type not understood

Futures & client.map
--------------------

* So far this one has worked best: managed to run it with 1000 x 1000 part of bio_2
* still pretty slow though
* One problem is passing M to map because we don't want to repeat it. Solution so far is the function partial but not sure if it works as it should as sometimes I get a result with the wrong shape back.

In [43]:


def invert_col(A, idx: int):
    n = len(A)
    e_i = np.zeros(n)
    e_i[idx] = 1
    inverted = gmres(A, e_i, tol = 1e-10)
    return inverted[0]

invert_col_frozen = partial(invert_col, M)


futures = client.map(invert_col_frozen, np.arange(len(M)))

In [53]:
results = client.gather(futures)
inv_with_futures = np.stack(results, axis = 1)


array([ 1.25426569e+00, -3.52374850e-02, -6.35396420e-02, -1.01722351e-03,
       -1.00184492e-03, -2.85938128e-05,  6.91901511e-06,  1.12811330e-08,
        3.85212971e-11,  6.71369287e-12,  9.68146703e-10,  1.04263533e-10,
        1.62963256e-11,  1.27288886e-13, -3.45878268e-05, -8.87050416e-04,
        1.07984640e-09,  3.02262165e-08, -1.07142484e-07,  7.84310970e-10,
       -1.32140060e-09, -1.66337019e-11, -1.73914321e-05,  2.69013163e-07,
        3.61232542e-08,  3.70054023e-08,  4.04793727e-07, -2.86531598e-07,
       -1.10214733e-05,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00,  0.00000000e+00,  0.00000000e+00,  0.00000000e+00,
        0.00000000e+00])

In [16]:

gmres_frozen = partial(gmres, A = M)
eye = np.eye(len(M))
e_i_s = [row for row in eye]
futures = client.map(gmres_frozen, e_i_s)

TypeError: Dask no longer supports mapping over Iterators or Queues.Consider using a normal for loop and Client.submit

In [15]:
results = client.gather(futures)

Key:       gmres-d39d99389f6248fa9cba86a78a62206b
Function:  gmres
args:      (array([0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0.

TypeError: gmres() got multiple values for argument 'A'

-5c4fa13c0f32ce6b6fcd1c583256e4b7
Function:  gmres
args:      (array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0.

In [8]:
M_inv = np.stack(results, axis = 1)
print(M_inv)
print(M_inv.shape)

[[array([ 1.42166327e+00, -1.34508530e-08, -2.80250391e-08, -1.76773559e-01,
         -4.11503880e-01, -2.33237749e-08, -2.53968809e-08, -2.84447636e-08,
          2.11290536e-02,  1.32636162e-02, -4.02160984e-08, -4.28033641e-08,
         -3.29807370e-08, -4.10332531e-08,  5.27138813e-04,  1.36026088e-03,
         -3.16536841e-04, -6.46103982e-09, -4.53955659e-08, -6.18512572e-08,
         -6.20226952e-08, -4.46805573e-08, -2.61072573e-08, -5.16243421e-08,
         -1.22769970e-06, -9.35344916e-06,  3.76812432e-05,  9.83156266e-05,
         -1.31522798e-08, -1.23928072e-08, -1.87888694e-08, -3.76912324e-08,
         -6.54057318e-08, -8.61516639e-08, -8.29998385e-08, -5.59191868e-08,
         -2.94281393e-08, -3.66591966e-08, -8.33968553e-08, -1.15120846e-07,
         -7.84008788e-08,  1.05969522e-07,  6.96691579e-07, -5.52706764e-08,
         -1.10268237e-05, -1.87114959e-05, -7.05103501e-06,  5.75114868e-06,
          1.05652479e-05,  2.80567519e-06,  6.26861524e-09,  1.71683136e-09,

  arrays = [asanyarray(arr) for arr in arrays]


In [4]:
def invert_col(A, idx: int):
    n = len(A)
    e_i = np.zeros(n)
    e_i[idx] = 1
    inverted = gmres(A, e_i, tol = 1e-10)
    return inverted[0]

n = len(M)
inverted_cols = []
big_future = client.scatter(M)
for i in np.arange(n):
   invcol = client.submit(invert_col, big_future, i)
   inverted_cols.append(invcol)


In [16]:
print(Minv)
print(exitCodes)

[[ 1.25426569e+00 -3.52374850e-02 -6.35396420e-02 ...  0.00000000e+00
   0.00000000e+00  0.00000000e+00]
 [-3.52374850e-02  1.25182439e+00 -1.33715847e-02 ...  0.00000000e+00
   0.00000000e+00  0.00000000e+00]
 [-6.35396420e-02 -1.33715847e-02  1.26289885e+00 ...  0.00000000e+00
   0.00000000e+00  0.00000000e+00]
 ...
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00 ...  5.00079700e+00
  -6.31019841e-02  4.64396874e-07]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00 ... -6.31019841e-02
   5.00079671e+00 -2.09457564e-05]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00 ...  4.64313226e-07
  -2.09457041e-05  5.00489954e+00]]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]


In [17]:
print(np.allclose(invM, Minv))

True
