In [None]:
from dask.distributed import Client, LocalCluster

# Example: 4 workers, 1 thread each
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)

print("Dashboard:", client.dashboard_link)


Dashboard: http://127.0.0.1:8787/status


2025-09-12 17:35:31,766 - tornado.application - ERROR - Uncaught exception GET /status/ws (127.0.0.1)
HTTPServerRequest(protocol='http', host='localhost:8787', method='GET', uri='/status/ws', version='HTTP/1.1', remote_ip='127.0.0.1')
Traceback (most recent call last):
  File "/home/riccorte/miniconda3/envs/dask-env/lib/python3.11/site-packages/tornado/websocket.py", line 965, in _accept_connection
    open_result = handler.open(*handler.open_args, **handler.open_kwargs)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/riccorte/miniconda3/envs/dask-env/lib/python3.11/site-packages/tornado/web.py", line 3375, in wrapper
    return method(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/riccorte/miniconda3/envs/dask-env/lib/python3.11/site-packages/bokeh/server/views/ws.py", line 149, in open
    raise ProtocolError("Token is expired. Configure the app with a larger value for --session-token-expiration if necessary")
bok

In [2]:
# check if everything went smoothly
print(client)
print(cluster)


<Client: 'tcp://10.67.22.154:8786' processes=3 threads=3, memory=5.81 GiB>
SSHCluster(SSHCluster, 'tcp://10.67.22.154:8786', workers=3, threads=3, memory=5.81 GiB)


In [1]:
import dask.array as da
import dask
import numpy as np
import time
from scipy.linalg import solve_triangular 
from sklearn.datasets import fetch_california_housing

# Download California Housing dataset
data = fetch_california_housing(as_frame=True)

# Convert features into Dask Array (it's a matrix).
n_partition = 3        # number of partition in memory. We have 4 VMS (1 master + 3 workers), so let's start with just 3 partitions
length_partition = data.data.shape[0] // n_partition
X_da = da.from_array(data.data.values, chunks=(length_partition, data.data.shape[1]))

print("Number of Dask partitions:",  X_da.npartitions) 
print("Length of each partition:", length_partition, "rows")
print("Length of the whole dataset:", data.data.shape[0], "rows")


Number of Dask partitions: 3
Length of each partition: 6880 rows
Length of the whole dataset: 20640 rows


In [4]:
X_cached = X_da.rechunk((1_000_000, -1)).persist()

print(X_da)


dask.array<array, shape=(20640, 8), dtype=float64, chunksize=(6880, 8), chunktype=numpy.ndarray>


In [1]:
def indirect_serial(A, n_div):

    """
    Indirect TSQR (serial, NumPy).
    Splits A by rows into n_div blocks, computes local R_i via QR,
    reduces to global R by QR on the stacked R_i, then recovers Q = A R^{-1}.
    Returns (Q, R).
    """

    n_samp = A.shape[0]
    
    div_points = int(np.floor(n_samp/n_div))
    A_divided = []
    Ri = []
    
    A_divided = [A[div_points * i : div_points * (i + 1)] for i in range(n_div - 1)]
    A_divided.append(A[(n_div - 1) * div_points:, :])   # In the case n_samp wasn't divisible by n_div

    Ri = [np.linalg.qr(Ai, mode="reduced")[1] for Ai in A_divided]
    
    R_stack = np.concatenate(Ri, axis = 0)

    # Step 3. Stack R_i and compute global R

    _, R = np.linalg.qr(R_stack, mode="reduced")


    #   I = np.eye(n_samp, dtype=A.dtype)
    #   Rinv = solve_triangular(R, I, lower=False)
    Rinv = np.linalg.inv(R)
    Q = A @ Rinv

    return Q, R


def compute_R(block):
    # np.linalg.qr with mode='r' gives just the R matrix
    R = np.linalg.qr(block, mode="r")
    return R


def indirect_parallel(X_da):

    """
    Indirect TSQR with Dask.
    Input:
        X_da : Dask Array (m x n), chunked row-wise
    Output:
        R    : final global triangular factor (n x n, NumPy array on driver)
        Q_da : Dask Array (m x n), representing Q = A R^{-1} (lazy)
    """


    n_cols = X_da.shape[1]

    R_blocks = X_da.map_blocks(compute_R, dtype=X_da.dtype, chunks=(n_cols, n_cols))
    # Now R_blocks is a stack of n x n matrices (one per partition)
    # Its shape is (#chunks * n, n)

    #R_blocks.visualize(filename="fig/R_blocks_graph", format="png")
 

    #Dask has da.linalg.qr, but it assumes the whole array is large and chunked regularly.
    #To get the final global R, you must combine all the Ri
    #That means at some point, the data has to come together into a single place (can’t keep it sharded).
    # So we bring the data to the driver because it is very small, because it optimizes the uses of np.linalg.qr, we are gathering the small stuff
    R_stack = R_blocks.compute()   # NumPy array, shape (p*n, n)

    # Small QR on driver to combine them into the final R
    R = np.linalg.qr(R_stack, mode="r")
    # delay the computing of qr

    # Instead of materializing Q, compute a small R^{-1} (n x n).
    I = np.eye(n_cols, dtype=X_da.dtype)
    R_inv = solve_triangular(R, I, lower=False)  # stable

    # Broadcast Rinv to every chunk: Q = A @ R^{-1}
    Q_da = X_da @ R_inv   # still a Dask Array, lazy

    return Q_da, R      #Q_da because it is lazy, it is still a Dask array

def indirect_parallel_persisted(X_da):

    """
    Indirect TSQR with Dask.
    Input:
        X_da : Dask Array (m x n), chunked row-wise
    Output:
        R    : final global triangular factor (n x n, NumPy array on driver)
        Q_da : Dask Array (m x n), representing Q = A R^{-1} (lazy)
    """
    
    

    n_cols = X_da.shape[1]

    R_blocks = X_da.map_blocks(compute_R, dtype=X_da.dtype, chunks=(n_cols, n_cols))
    # Now R_blocks is a stack of n x n matrices (one per partition)
    # Its shape is (#chunks * n, n)

    #R_blocks.visualize(filename="fig/R_blocks_graph", format="png")
 

    #Dask has da.linalg.qr, but it assumes the whole array is large and chunked regularly.
    #To get the final global R, you must combine all the Ri
    #That means at some point, the data has to come together into a single place (can’t keep it sharded).
    # So we bring the data to the driver because it is very small, because it optimizes the uses of np.linalg.qr, we are gathering the small stuff
    R_stack = R_blocks.persist()   # NumPy array, shape (p*n, n)

    # Small QR on driver to combine them into the final R
    _, R = np.linalg.qr(R_stack)
    # delay the computing of qr

    # Instead of materializing Q, compute a small R^{-1} (n x n).
    I = np.eye(n_cols, dtype=X_da.dtype)
    R_inv = solve_triangular(R, I, lower=False)  # stable

    # Broadcast Rinv to every chunk: Q = A @ R^{-1}
    Q_da = X_da @ R_inv   # still a Dask Array, lazy

    return Q_da, R      #Q_da because it is lazy, it is still a Dask array


def indirect_parallel_optimized(X_da):


    n_cols = X_da.shape[1]

    R_blocks = X_da.map_blocks(compute_R, dtype=X_da.dtype, chunks=(n_cols, n_cols))
    # Now R_blocks is a stack of n x n matrices (one per partition)
    # Its shape is (#chunks * n, n)

    #R_blocks.visualize(filename="fig/R_blocks_graph", format="png")
 

    #Dask has da.linalg.qr, but it assumes the whole array is large and chunked regularly.
    #To get the final global R, you must combine all the Ri
    #That means at some point, the data has to come together into a single place (can’t keep it sharded).
    # So we bring the data to the driver because it is very small, because it optimizes the uses of np.linalg.qr, we are gathering the small stuff
    R_stack = R_blocks.compute()   # NumPy array, shape (p*n, n)

    # Small QR on driver to combine them into the final R
    R = np.linalg.qr(R_stack, mode="r")
    # delay the computing of qr

    # Instead of materializing Q, compute a small R^{-1} (n x n).
    I = np.eye(n_cols, dtype=X_da.dtype)
    R_inv = solve_triangular(R, I, lower=False)  # stable

    Rinv_da = da.from_array(R_inv, chunks=(R_inv.shape[0], R_inv.shape[1]))
    Q_da = X_da @ Rinv_da

    return Q_da, R      #Q_da because it is lazy, it is still a Dask array





def indirect_parallel_delayed(X_da):

    n_cols = X_da.shape[1]

    R_blocks = X_da.map_blocks(compute_R, dtype=X_da.dtype, chunks=(n_cols, n_cols))
    # Now R_blocks is a stack of n x n matrices (one per partition)
    # Its shape is (#chunks * n, n)


    # 2) Convert blocks to delayed NumPy arrays, stack via delayed
    R_list = list(R_blocks.to_delayed().ravel())     # each is delayed np.ndarray (n x n)
    R_stack = dask.delayed(np.vstack)(R_list)        # delayed (p*n x n)
    

    R_delayed = dask.delayed(compute_R)(R_stack)      # delayed np.ndarray (n x n)

    # 3) Turn the small delayed R into a Dask Array (single (n,n) chunk)
    R_da = da.from_delayed(R_delayed, shape=(n_cols, n_cols), dtype=X_da.dtype)

    # 4) Materialize small R on driver; solve for R^{-1}
    I = np.eye(n_cols, dtype=X_da.dtype)

    # compute R^{-1} lazily
    R_inv_delayed = dask.delayed(solve_triangular)(R_delayed, I, lower=False)
    R_inv_da = da.from_delayed(R_inv_delayed, shape=(n_cols, n_cols), dtype=X_da.dtype)

    # 5) Broadcast multiply (keep Q lazy)
    Q_da = X_da @ R_inv_da

    return Q_da, R_da      #Q_da because it is lazy, it is still a Dask array


In [11]:
%%time

Q, R = indirect_serial(data.data.values, 50)

size = 20640 / 50 * 8 * 8 #rows_per_chunk × n_cols × 8 bytes
print("The size for each chunk is :", size/1e6, "Mb")


The size for each chunk is : 0.0264192 Mb
CPU times: user 103 ms, sys: 82.4 ms, total: 186 ms
Wall time: 64.7 ms


In [12]:
%%time

Q_da, R = indirect_parallel(X_da)   # X_da is already splitted into three partitions


print("Final R shape:", R.shape)  # (n, n), small
print("Q is lazy:", Q_da)         # Dask Array, not yet computed

print("Chunks:", X_da.chunks)
print("Number of partitions:", X_da.npartitions)
print("Total size [MB]:", X_da.nbytes / 1e6)
print("Chunk size [MB]:", X_da.nbytes / X_da.npartitions / 1e6)    # 10–100 MB

print("Shape:", X_da.shape)
print("Chunks:", X_da.chunks)
print("Partitions:", X_da.npartitions)
print("Total size [MB]:", X_da.nbytes/1e6)
print("Avg chunk [MB]:", (X_da.nbytes/X_da.npartitions)/1e6)


#print("Compact sanity check:", X_da.shape, "(m,n);", R_blocks.shape, "(n *n_partitions, n);", R.shape, "(n,n)" )


Final R shape: (8, 8)
Q is lazy: dask.array<getitem, shape=(20640, 8), dtype=float64, chunksize=(6880, 8), chunktype=numpy.ndarray>
Chunks: ((6880, 6880, 6880), (8,))
Number of partitions: 3
Total size [MB]: 1.32096
Chunk size [MB]: 0.44032
Shape: (20640, 8)
Chunks: ((6880, 6880, 6880), (8,))
Partitions: 3
Total size [MB]: 1.32096
Avg chunk [MB]: 0.44032
CPU times: user 65 ms, sys: 26.2 ms, total: 91.2 ms
Wall time: 146 ms


In [None]:
# e.g., apply Q to a random vector and compute the norm — same workload for all
v = np.random.randn(X_cached.shape[1]).astype(X_cached.dtype)

def time_Q_apply(Q_da, v):
    start = time.perf_counter()
    val = da.linalg.norm(Q_da @ v).compute()  # triggers the matmul + reduction
    end = time.perf_counter()
    return end - start, val




In [None]:
# not-delayed version
start = time.perf_counter()
Q_nd, R_nd = indirect_parallel(X_cached)  # returns Q_da, R (NumPy)
_ = R_nd  # already materialized
t_nd_R = time.perf_counter() - start



#Dask Dashboard: Big green block: compute_R. map stage of mapblock that returns the small Ri
# Tiny yellow finalize-hlg block - Dask housekeeping
"""Why you don’t see “reduce” or “broadcast” here
The reduce to the final 
R is done on the driver with NumPy:"""


t_nd_Q, val_nd = time_Q_apply(Q_nd, v)

"""Teal blocks around ~140–150 ms — blockwise-matmul-…
In the not-delayed variant, R^{-1} is a NumPy constant, so each matmul task deserializes it; you may notice slightly more per-task overhead compared to the delayed/Dask-array constant.
Purple block — reduction for the norm

After the matmul, da.linalg.norm(Qv) triggers a reduction:
"""


In [None]:
# not-delayed persisted version
start = time.perf_counter()
Q_ndp, R_ndp = indirect_parallel_persisted(X_cached)  # returns Q_da, R (NumPy)
_ = R_ndp  # already materialized
t_nd_Rp = time.perf_counter() - start

t_po_Q, val_po = time_Q_apply(Q_ndp, v)


# better due to only calculating R once but with a cost
"""3. The subtlety

Persist = compute and cache now, but still return a Dask collection (with futures).

Compute = compute now and return the final NumPy array (collected to driver).

So:

If you persist inside your function and return R, you are indeed returning a Dask object backed by futures, not a NumPy matrix.

If you compute inside your function and return R, you’re returning a NumPy array, which is often what you want for the small triangular 
𝑅
R."""


'3. The subtlety\n\nPersist = compute and cache now, but still return a Dask collection (with futures).\n\nCompute = compute now and return the final NumPy array (collected to driver).\n\nSo:\n\nIf you persist inside your function and return R, you are indeed returning a Dask object backed by futures, not a NumPy matrix.\n\nIf you compute inside your function and return R, you’re returning a NumPy array, which is often what you want for the small triangular \n𝑅\nR.'

In [None]:

# optimized not-delayed version
start = time.perf_counter()
Q_po, R_po = indirect_parallel_optimized(X_cached)  # returns Q_da (Dask), R_da (Dask)
_ = R_po  # already materialized
t_po_R = time.perf_counter() - start

t_po_Q, val_po = time_Q_apply(Q_po, v)


# I don't think this version makes sense
"""Purple (rechunk-merge): small graph-rewiring step. Dask is aligning the chunks of your broadcasted R_da with those of X_da. Because R_da is now itself a Dask Array (1 chunk of size 
𝑛
×
𝑛
n×n), it needs to merge graph metadata.

Big teal block (array) dominating the timeline:
This is your broadcasted matmul tasks: X_da @ Rinv_da.
Since R_inv was wrapped as a Dask Array (da.from_delayed or da.from_array), Dask sees a proper array-on-array multiply and generates blockwise matmul tasks.
→ That’s why you get this large contiguous band of teal tasks: each row-chunk of X_da multiplied with the single small (n,n) chunk of Rinv_da."""
"""Earlier, your “not-delayed” version showed slower Q @ v apply because:

You measured at small scales.

Serialization overhead per task looked big compared to the tiny math.

Wrapping in a Dask array made the graph smaller/cleaner → you saw ~0.06 s instead of ~0.5 s.

But at larger scales (big 
𝑚
m, many workers), the story flips:

Shipping one tiny NumPy array is cheap compared to all the matmuls.

Wrapping it as a Dask array adds pointless graph overhead.

So you can lose a bit of time overall."""


In [None]:

# fully-delayed version
start = time.perf_counter()
Q_fd, R_fd = indirect_parallel_delayed(X_cached)  # returns Q_da (Dask), R_da (Dask)
_ = R_fd.compute()  # FORCE the same final R compute
t_fd_R = time.perf_counter() - start


t_fd_Q, val_fd = time_Q_apply(Q_fd, v)


"""Efficiency Implications

Computation of 
𝑅
R: essentially unchanged, still dominated by the map stage (compute_R).

Broadcast of 
𝑅
−
1
R
−1
: somewhat less efficient as a Dask Array, since it adds extra bookkeeping without reducing the numerical cost.

Norm benchmark (Q @ v): the visible red/yellow stages are expected; they confirm that your graph is carrying the computation fully through Dask.

because da.from_delayed introduces those extraction tasks.

Broadcast multiply still shows up as teal + red.

The dashboard is “busier” — more small tasks, more scheduler chatter — because everything, even tiny constants, was lifted into the Dask graph.

Efficiency interpretation

For small 
𝑛
n: making 
𝑅
R a Dask array (fully delayed) adds overhead without real benefit — the norm compute shows more yellow/red fragmentation than the optimized/persisted version.

For large distributed runs: it’s still correct, but NumPy constants (or scattered small arrays) are cheaper to handle than wrapping them in Dask.

That’s why your timings showed the fully delayed version wasn’t consistently faster"""


In [None]:


print(f"R time — not-delayed: {t_nd_R:.3f}s | persisted: {t_po_Rp:.3f}s | optimized: {t_po_R:.3f}s | | full-delayed: {t_fd_R:.3f}s")


R time — not-delayed: 0.080s | persisted: 0.110s | optimized: 0.110s | partial: 0.081s | full-delayed: 0.073s


In [None]:


print(f"Q apply — not-delayed: {t_nd_Q:.3f}s | partial: {t_po_Q:.3f}s | partial: {t_pd_Q:.3f}s | full-delayed: {t_fd_Q:.3f}s")


Q apply — not-delayed: 0.463s | partial: 0.054s | partial: 0.056s | full-delayed: 0.066s


In [73]:
N_WORKERS = 3
# Initialization of a distributed random matrix
m, n = int(1e7), 4
chunks = [m // N_WORKERS for _ in range(N_WORKERS-1)]
chunks.append(m - sum(chunks))
A = da.random.random((m, n), chunks=(chunks, n))

# Persist in memory to avoid recomputation
A = A.persist() 

print(f"Input matrix A: m = {A.shape[0]}, n = {A.shape[1]}")
print(f"The {len(A.chunks[0])} blocks are: {A.chunks[0]}")
print(f"Total size of A: {A.nbytes / 1e6} MB")
A


Input matrix A: m = 10000000, n = 4
The 3 blocks are: (3333333, 3333333, 3333334)
Total size of A: 320.0 MB


Unnamed: 0,Array,Chunk
Bytes,305.18 MiB,101.73 MiB
Shape,"(10000000, 4)","(3333334, 4)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 305.18 MiB 101.73 MiB Shape (10000000, 4) (3333334, 4) Dask graph 3 chunks in 1 graph layer Data type float64 numpy.ndarray",4  10000000,

Unnamed: 0,Array,Chunk
Bytes,305.18 MiB,101.73 MiB
Shape,"(10000000, 4)","(3333334, 4)"
Dask graph,3 chunks in 1 graph layer,3 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [74]:
# QR decomposition

Q_delayed, R_delayed = indirect_parallel_persisted(A)  # Our implementation
Q_delayed_dask, R_delayed_dask = da.linalg.tsqr(A)    # Dask's implementation

print("-- OUR IMPLEMENTATION (QR) --")
%time Q, R = dask.compute(Q_delayed, R_delayed)

print("\n-- DASK'S IMPLEMENTATION (QR) --")
%time Q_dask, R_dask = dask.compute(Q_delayed_dask, R_delayed_dask)


-- OUR IMPLEMENTATION (QR) --
CPU times: user 940 ms, sys: 380 ms, total: 1.32 s
Wall time: 599 ms

-- DASK'S IMPLEMENTATION (QR) --
CPU times: user 3.9 s, sys: 1.03 s, total: 4.93 s
Wall time: 1.62 s


In [None]:
# create again a cluster
# CLUSTER DEPLOYMENT ON CLOUDVENETO

from dask.distributed import Client, SSHCluster

cluster = SSHCluster(
    ["10.67.22.154", "10.67.22.216", "10.67.22.116", "10.67.22.113"],
    connect_options={"known_hosts": None},
    remote_python="/home/ubuntu/miniconda3/bin/python",
    scheduler_options={"port": 8786, "dashboard_address": ":8797"},
    worker_options={
        "nprocs": 4,        # Now we use all 4 cores -> 12 workers
        "nthreads": 1       # We use 1 threads. Following Dask documentation, however, Numpy should release well the GIL lock thus we could use more than 1 thread. 
    }
)

client = Client(cluster)


In [3]:
print(client)
print(cluster)

<Client: 'tcp://10.67.22.154:8786' processes=12 threads=12, memory=23.25 GiB>
SSHCluster(SSHCluster, 'tcp://10.67.22.154:8786', workers=12, threads=12, memory=23.25 GiB)


In [4]:
# Make sure your dataset is loaded & persisted
import dask.dataframe as dd
import numpy as np
import os

os.chdir("/home/ubuntu")
path_HIGGS = os.path.join(os.getcwd(), "datasets", "HIGGS.csv")

df = dd.read_csv(path_HIGGS, header=None, blocksize="200MB")
X_da = df.iloc[:, 1:].to_dask_array(lengths=True).astype(np.float32)
X_da = X_da.persist()

In [5]:
from scipy.linalg import solve_triangular 

In [9]:
# --- Now time your function ---
%time Q, R = indirect_parallel(X_da)

CPU times: user 7.07 ms, sys: 3.23 ms, total: 10.3 ms
Wall time: 637 ms


In [21]:
import time
from dask.distributed import wait
start = time.time()
X_da = X_da.persist()
Q, R1 = indirect_parallel_persisted(X_da)
wait([Q, R1])
end = time.time()
print(end-start)

FutureCancelledError: finalize-hlgfinalizecompute-24f1f03f15b44280ac77d18e10a81e91 cancelled for reason: lost dependencies.

In [19]:
del R

In [None]:
%time Q_da, R_da = indirect_parallel_optimized(X_da)

In [71]:
Q_da, R_da = indirect_parallel_delayed(X_da)
%time R = R_da.compute()

CPU times: user 7.58 ms, sys: 0 ns, total: 7.58 ms
Wall time: 1.79 s


In [72]:
client.close()