# Distributed Compressed Sensing (Multi-Machine Version)

This notebook implements a **multi-machine distributed compressed sensing** experiment deployed on multiple devices (including Raspberry Pi nodes).  
Its main purpose is to evaluate the **runtime impact of communication quantization** in a resource-constrained distributed environment.

---

## Problem Formulation

We consider the following centralized objective:

$$
\min_{x \in \mathbb{R}^d}\;
\frac{1}{16}\sum_{i=1}^{16}\left(C_i \Psi x - y_i\right)^2
+ \lambda \|x\|_1 .
$$

Here, $C_i$ denotes the local measurement matrix of node $i$ (randomly selected rows of the identity matrix),  
$\Psi$ is the discrete cosine transform (DCT) basis, and $y_i$ is the corresponding observation vector.  
The centralized optimal solution is denoted by $x^*$.

---

## Difference from the Single-Machine Multi-Process Notebook

A single-machine multi-process version is provided in the same directory.  
While that notebook focuses on algorithm behavior and visualization, this notebook targets a **real multi-device distributed setting**.

In particular, this notebook is designed to evaluate system-level performance under limited hardware and network capabilities.

---

## Main Objective

The primary goal of this notebook is to compare the execution time of

- **unquantized communication ($\texttt{float64}$)**, and
- **quantized communication ($\texttt{float64} \rightarrow \texttt{int16}$)**

during distributed optimization.

The motivation is that Raspberry Pi nodes have limited computation and communication capacity, and reducing the communication payload may significantly reduce the overall runtime.

---

## Experimental Design

To keep the total runtime manageable in a real multi-device environment, this notebook:

- focuses only on timing comparison between quantized and unquantized communication,
- omits extensive visualization and convergence plots used in the single-machine notebook,
- runs the algorithm for **500 iterations** as a practical trade-off between runtime and comparability.

This notebook therefore emphasizes **wall-clock performance of different communication schemes**, rather than detailed convergence analysis.

In [1]:
import numpy as np
import numpy.typing as npt

from scipy.fftpack import dct

# Sensors
import networkx as nx

n_grid = 4
grid = nx.grid_2d_graph(n_grid, n_grid)
L: npt.NDArray[np.float64] = np.asarray(nx.laplacian_matrix(grid).todense())
W = np.eye(16) - L * 0.1

from topolink import Graph, bootstrap

graph = Graph.from_mixing_matrix(W, transport="tcp")
n_sens = graph.number_of_nodes
sens_names = [f"{i + 1}" for i in range(n_sens)]

# Original signal
n = 4096
t = np.linspace(0, 1, n)
x = np.cos(2 * 97 * np.pi * t) + np.cos(2 * 777 * np.pi * t)

# Signal sampling
np.random.seed(3)
p_total = 128
p_sens = p_total // n_sens
perm = {i: np.round(np.random.rand(p_sens) * n).astype(int) for i in sens_names}
y = {i: x[perm[i]] for i in sens_names}

# Sensing matrix and regularization parameter
Psi = dct(np.eye(n), norm="ortho")
Theta = {i: Psi[perm[i], :] for i in sens_names}
lam = 0.01

# Environment variables
import os
import pathlib
import dotenv

ROOT_DIR = pathlib.Path.cwd().parent
dotenv.load_dotenv(os.path.join(ROOT_DIR, ".env"))

DASK_CLIENT = os.getenv("DASK_CLIENT")

# Time record
time_record: dict[str, list[float]] = {"quantized": [], "unquantized": []}

## Timing Evaluation of RAugDGM

This notebook focuses on evaluating the **runtime performance** of the distributed algorithm **RAugDGM** for a distributed LASSO problem arising from compressed sensing, rather than benchmarking multiple optimization algorithms.

The experiment is conducted on a $4 \times 4$ grid network.  
The gossip matrix is defined as

$$
W = I - 0.1 L,
$$

where $L$ is the Laplacian matrix of the communication graph.

We compare the execution time of RAugDGM using unquantized communication (float64) and quantized communication (float64 $\rightarrow$ int16).

In [None]:
def lasso(
    node_id: str,
    g_name: str,
    theta_i: npt.NDArray[np.float64],
    y_i: npt.NDArray[np.float64],
    dim_i: int,
    lam_i: float,
    max_iter: int,
    quantize: bool,
) -> float:
    from numpy import zeros

    s_hat_i = zeros((max_iter, dim_i), dtype=np.float64)

    # Define the loss function
    from jax import Array
    from jax.numpy.linalg import norm

    def f(var: npt.NDArray[np.float64]) -> Array:
        return norm(theta_i @ var - y_i) ** 2

    # Define the regularizer
    from dco import L1

    g = L1(lam_i)

    # Define the node handle
    from topolink import NodeHandle, Quantize

    transform = Quantize(dtype="int16") if quantize else None

    nh = NodeHandle(node_id, g_name, transform=transform, transport="tcp")

    # Define the optimizer and run iterations
    from dco import RAugDGM

    optimizer = RAugDGM(f, nh, 1.358, g)

    import time

    begin_time = time.time()

    optimizer.init(s_hat_i[0])

    for k in range(max_iter - 1):
        s_hat_i[k + 1] = optimizer.step(s_hat_i[k])

    end_time = time.time()

    return end_time - begin_time


import dask.distributed as dd

client = dd.Client(DASK_CLIENT)
NUM_RUNS = 10
MAX_ITER = 100

try:
    for run in range(NUM_RUNS):
        graph.name = f"unquantized_{run}"

        bootstrap(graph)

        futures = [
            client.submit(lasso, i, graph.name, Theta[i], y[i], n, lam, MAX_ITER, False)
            for i in sens_names
        ]
        worker_times = [f.result() for f in futures]

        max_time = max(worker_times)
        time_record["unquantized"].append(max_time)

        print(f"Unquantized run time: {max_time:.4f} seconds at run {run}")

    for run in range(NUM_RUNS):
        graph.name = f"quantized_{run}"

        bootstrap(graph)

        futures = [
            client.submit(lasso, i, graph.name, Theta[i], y[i], n, lam, MAX_ITER, True)
            for i in sens_names
        ]
        worker_times = [f.result() for f in futures]

        max_time = max(worker_times)
        time_record["quantized"].append(max_time)

        print(f"Quantized run time: {max_time:.4f} seconds at run {run}")

finally:
    client.close()

Unquantized run time: 15.0061 seconds
Unquantized run time: 13.8923 seconds
Unquantized run time: 14.7577 seconds
Unquantized run time: 14.4456 seconds
Unquantized run time: 14.7582 seconds
Unquantized run time: 13.9381 seconds
Unquantized run time: 14.6940 seconds
Unquantized run time: 14.5200 seconds
Unquantized run time: 14.5369 seconds
Unquantized run time: 14.2245 seconds
Quantized run time: 5.6318 seconds
Quantized run time: 8.2498 seconds
Quantized run time: 5.6325 seconds
Quantized run time: 5.7290 seconds
Quantized run time: 5.7365 seconds
Quantized run time: 5.5126 seconds
Quantized run time: 5.5438 seconds
Quantized run time: 5.5699 seconds
Quantized run time: 5.5694 seconds
Quantized run time: 6.6733 seconds
Average quantized time (s): 5.984861755371094
Average unquantized time (s): 14.477328896522522
Max quantized time (s): 8.24983263015747
Max unquantized time (s): 15.006077289581299
