# Distributed optimization in Pytorch
Alexandre Reiffers-Masson

Nom(s) et prénom(s): ZHANG Zuoyu

In this laboratory, we will focus on programming a distributed version of the estimator proposed in "Sensor Selection via Convex Optimization" written by Siddharth Joshi and Stephen Boyd. The context is the following: 
We want to estimate a vector $x\in\mathbb{R}^n$ from $m$ linear measurements, corrupted by additive noise:
$$
y_i = a_i^T x+v_i,
$$
where $v_i$ is an independent identically distributed $\mathcal{N}(0,\sigma^2)$. $y_i$ and $a_i$ is known by sensor $i$. To estimate $x$, we will solve in a distributed way the following optimization problem:
$$
\min_x \sum_{i=1}^m(y_i-a_i^T x)^2.
$$



1/ Compute the argmin of the previous optimization problem. 


To $\min_x \sum_{i=1}^m(y_i-a_i^T x)^2$, we firstly let $f(x) = \min_x \sum_{i=1}^m(y_i-a_i^T x)^2$, calculate the derivative of $f(x)$ and to find the minimum, we let the derivative be zero
$$\frac{\partial f}{\partial x} = 2\sum_{i=1}^m(-a_i)(y_i-a_i^T x) = 0$$
we got
$$\sum_{i=1}^m a_i a_i^T x = \sum_{i=1}^m a_i y_i$$
Further, we can get the minimum $x$ of the optimization problem
$$\hat{x} = (\sum_{i=1}^m a_i a_i^T)^{-1} \sum_{i=1}^m a_i y_i$$


2/ Can you describe a simple iterative algorithms to solve this problem?



I think we can use the gradient descent algorithm to optimize this problem. To find the local minimum of a function using gradient descent, one must iteratively search for points at a specified step distance in the opposite direction of the corresponding gradient (or approximate gradient) at the current point on the function. Proceed as follows:

1.   Initialize $x_0$, step size $γ$
2.   for $k = 1$ to Max iter:
3.   >$$\nabla f(x^{(k-1)}) = 2\sum_{i=1}^m(-a_i)(y_i-a_i^T x^{(k-1)})$$  
4.   >$$x^{(k)} = x^{(k-1)} - γ \nabla f(x^{(k-1)})$$




3/ Assuming that node $i$ can broadcast vectors to the other nodes, what could be a possible information it should broadcast to allow every node to compute the optimal $x$? 

*Constraint:* Remember that you would to avoid that a node is inversing a matrix and you want to minimize the number of operations made by each node

              Call the professor when you have a satisfying answer. 




To solve the optimization problem in a distributed way, each node should broadcast information that allows every other node to update their estimate of the vector $x$ based on their local measurements and the information received from other nodes.

One way to achieve this is through a consensus-based algorithm. Each node can maintain a local estimate $x_i$ of the vector $x$ , and broadcast this estimate to its neighbors. Then, each node updates its estimate based on the estimates received from its neighbors, using a weighted average:

$$x_i(t+1) = (1 - \alpha)x_i(t) + \frac{\alpha}{ki} \sum_{(j \, in \, N(i))}(x_j(t))$$

where $N(i)$ is the neighbors of node $i$, $k_i$ is the number of neighbors of node $i$ , $α$ is a weight that controls the rate of convergence of the algorithm, and neighbors $i$ denotes the set of neighbors of node $i$ .

To incorporate the information from the measurements, each node can also maintain a local residual $r_i$ defined as:

$$r_i(t) = y_i - a_i T(x_i(t))$$

Then, each node can use the residuals received from its neighbors to update its local estimate, using a weighted least squares formulation:

$$x_i(t+1) = argmin_x \sum_{(j \, in \, N(i))} (w_j(t)(y_j - a_jT(x)))^2$$

where $w_j(t)$ is a weight that captures the confidence in the residual received from node $j$ , and can be computed based on the noise variance $\sigma^2$ and the number of iterations of the algorithm.

By iteratively updating their estimates based on the local measurements and the information received from neighbors, the nodes can converge to a consensus estimate of the vector $x$ . Once the consensus is reached, each node can compute the optimal estimate of $x$ using its local measurements and the consensus estimate. Specifically, each node can compute:

$$x_{opt} = argmin_x \sum_{i=1}^{m} (y_i - a_i^T x)^2$$

using its local measurements and the consensus estimate of $x$.

Overall, the information that each node needs to broadcast includes its local estimate $x_i$ , its local residual $r_i$ , and the weights $w_j(t)$ for each neighbor $j$ . These weights can be computed based on the noise variance $\sigma^2$ and the number of iterations of the algorithm. By broadcasting this information to its neighbors, each node can update its estimate of $x$ in a distributed way, and converge to a consensus estimate of $x$.

We will move now to the implementation of the distributed algorithm in pytroch. Please first have a look to the following two tutorials to learn more about distribution in pytroch:
- https://pytorch.org/tutorials/beginner/dist_overview.html
- https://pytorch.org/tutorials/intermediate/dist_tuto.html

Excute this cell to install the required packages for the lab. 

In [1]:
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch
import math
from torch import distributions
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import pandas as pd
import numpy as np
from random import sample
import matplotlib.pyplot as plt
from random import choices

4/ For the implementation of your distributed algorithm, what can of *Collective Communication* will you use? Complete and comment the code below to test the chosen Communication protocol. 

In [None]:
def run(rank, size):
    group = dist.new_group(list(range(size)))
    tensor = torch.randn(2,2)
    print('Rank ', rank, ' has data ', tensor, "before communication")
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group) # perform all_reduce operation
    print('Rank ', rank, ' has data ', tensor, "after communication")

def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)

size = 4
processes = []

for rank in range(size):
  p = mp.Process(target=init_process, args=(rank, size,run))
  p.start()
  processes.append(p)

for p in processes:
  p.join()

Rank  Rank Rank Rank   1 2 03 has data      has data  has data  has data  tensor([[-0.0091, -1.4265],
        [ 1.2028,  1.8545]])  before communicationtensor([[-0.0091, -1.4265],
        [ 1.2028,  1.8545]])tensor([[-0.0091, -1.4265],
        [ 1.2028,  1.8545]])
   before communication
tensor([[-0.0091, -1.4265],
        [ 1.2028,  1.8545]]) before communicationbefore communication

Rank Rank Rank Rank     0132     has data  has data  has data  has data     tensor([[-0.0366, -5.7062],
        [ 4.8111,  7.4180]])tensor([[-0.0366, -5.7062],
        [ 4.8111,  7.4180]])tensor([[-0.0366, -5.7062],
        [ 4.8111,  7.4180]])  tensor([[-0.0366, -5.7062],
        [ 4.8111,  7.4180]]) after communicationafter communication after communication

after communication





              Call the professor when you have a satisfying answer. 


5/ Create the dataset with the matrix $A = [[5., 1.0], [1.0, 4.]]$, $X = [[8.], [2.]]$ and $\sigma =0.5$.  

In [None]:
sigma = 0.5
dtype = torch.float
nb_samples = 2
A = np.array([[5., 1.0], [1.0, 4.]]).astype(np.float32)
A_pytorch = torch.tensor(A)
X = np.array([[8.], [2.]]).astype(np.float32)
X_pytorch = torch.tensor(X)
y = torch.mm(A_pytorch,X_pytorch) + torch.normal(mean=0,std=sigma,size=[2,1]) 

6/ Complete the following code to implement your algorithm. 

              Call the professor when you have a satisfying answer. 


To implement the gradient descent algorithm for this optimization problem, we need to calculate the gradient of the objective function with respect to x. The objective function is defined as:

$$L(x) = ∑_{i=1}^m(y_i−a^T_ix)^2$$

where $y_i = a^T_ix + v_i$, and $v_i$ is a random noise term.

The gradient of $L(x)$ with respect to $x$ can be calculated as:

$$∇L(x) = -2A(y - Ax)$$

where $A$ is the matrix we have created before, $y$ is the vector of observations, and $x$ is the vector of unknowns.

Using this gradient, we can update $x$ as:

$x ← x - \gamma ∇L(x)$

where $\gamma$ is the step size.

Based on this, we can implement the gradient descent algorithm as follows:

In [None]:
step_size = 0.01
nb_epochs_1 = 30

def objective_func(x,y,A):
    return torch.sum((y - torch.matmul(A, x)) ** 2)

def gradient(x,y,A):
    return -2 * torch.matmul(A.t(), (y - torch.matmul(A, x)))

def run(rank, size):
    # Initialize x with random values
    x = torch.randn(A_pytorch.shape[1], 1)

    for epoch in range(nb_epochs_1):
        # Calculate gradient and update x
        grad = gradient(x,y,A_pytorch)
        x = x - step_size * grad

        # Average x across processes
        dist.all_reduce(x, op=dist.ReduceOp.SUM)
        x /= size

        # Print progress
        if rank == 0:
            print(f"Epoch {epoch+1}: x = {x.flatten()}")

    # Calculate final objective value
    dist.barrier()
    obj_val = objective_func(x,y,A_pytorch)
    dist.all_reduce(obj_val, op=dist.ReduceOp.SUM)
    obj_val /= size

    # Print final result
    if rank == 0:
        print(f"Final result: x = {x.flatten()}, L(x) = {obj_val.item()}")

def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)

size = 2
processes = []

for rank in range(size):
  p = mp.Process(target=init_process, args=(rank, size,run))
  p.start()
  processes.append(p)

for p in processes:
  p.join()

Epoch 1: x = tensor([4.7488, 3.0703])
Epoch 2: x = tensor([6.2320, 3.2345])
Epoch 3: x = tensor([6.9145, 3.0758])
Epoch 4: x = tensor([7.2706, 2.8483])
Epoch 5: x = tensor([7.4825, 2.6340])
Epoch 6: x = tensor([7.6227, 2.4545])
Epoch 7: x = tensor([7.7224, 2.3107])
Epoch 8: x = tensor([7.7961, 2.1979])
Epoch 9: x = tensor([7.8518, 2.1101])
Epoch 10: x = tensor([7.8943, 2.0422])
Epoch 11: x = tensor([7.9269, 1.9897])
Epoch 12: x = tensor([7.9521, 1.9492])
Epoch 13: x = tensor([7.9714, 1.9180])
Epoch 14: x = tensor([7.9863, 1.8939])
Epoch 15: x = tensor([7.9978, 1.8753])
Epoch 16: x = tensor([8.0067, 1.8609])
Epoch 17: x = tensor([8.0135, 1.8498])
Epoch 18: x = tensor([8.0188, 1.8413])
Epoch 19: x = tensor([8.0229, 1.8347])
Epoch 20: x = tensor([8.0260, 1.8297])
Epoch 21: x = tensor([8.0284, 1.8257])
Epoch 22: x = tensor([8.0303, 1.8227])
Epoch 23: x = tensor([8.0317, 1.8204])
Epoch 24: x = tensor([8.0328, 1.8186])
Epoch 25: x = tensor([8.0337, 1.8172])
Epoch 26: x = tensor([8.0344, 1.81

7/ Suggest and implement an algorithm based on distributed consensus optimization.  
 

Distributed consensus optimization is a technique used in distributed machine learning to achieve global optimization while minimizing communication costs between different machines. The goal of the algorithm is to find a global optimum by iteratively updating the local models at each machine and exchanging information with the neighboring machines.

Here is an algorithm based on distributed consensus optimization:

1.   **Initialization**: Each machine $i$ initializes its own model parameters $x_i$.Each machine $i$ sends its model parameters to its neighboring machines.
Each machine $i$ receives the model parameters of its neighbors.
2.   **Consensus**: Each machine $i$ computes the average of its own model parameters and the model parameters received from its neighbors.
Each machine $i$ updates its own model parameters to be the average value computed in the previous step.
3.   **Gradient**: Each machine $i$ computes the gradient of its own local loss function with respect to its own model parameters.
Each machine $i$ sends the gradient to its neighboring machines.
Each machine $i$ receives the gradients from its neighbors.
4.   **Update**: Each machine $i$ updates its own model parameters using the gradient and the step size.
5.   **Repeat**: Repeat steps 2-4 until convergence.


In [None]:
# Define the consensus function
def consensus(x, neighbors):
    sum = x.clone()
    for neighbor in neighbors:
        dist.send(tensor=x, dst=neighbor)
        neighbor_x = torch.zeros_like(x)
        dist.recv(tensor=neighbor_x, src=neighbor)
        sum += neighbor_x
    return sum / (len(neighbors) + 1)

# Define the update function
def update(x, gradient, step_size):
    return x - step_size * gradient

# Define the main function
def run(rank, size):
    # Initialize the model parameters
    x = torch.tensor([0.0, 0.0], requires_grad=True)

    # Define the neighbors
    left_neighbor = (rank - 1) % size
    right_neighbor = (rank + 1) % size
    neighbors = [left_neighbor, right_neighbor]

    # Run the consensus optimization algorithm
    for i in range(10):
        # Consensus step
        x = consensus(x, neighbors)
        print(x)
        # Gradient step
        grad = gradient(x,y,A_pytorch)
        x_grad = torch.zeros_like(x)
        dist.reduce(tensor=grad, dst=0)
        dist.broadcast(tensor=x_grad, src=0)
        x.grad = x_grad
        x = update(x, x.grad, step_size=0.1)
        print(x)
    # Print the final model parameters
    if rank == 0:
        print("Final model parameters:", x)

# Define the initialization function
def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '172.28.0.12'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


size = 4
processes = []
for rank in range(size):
    p = mp.Process(target=init_process, args=(rank, size, run))
    p.start()
    processes.append(p)

for p in processes:
    p.join()

8/ (Bonus) By getting inspired by the code proposed in the section "Distributed Training" from the tutorial "https://pytorch.org/tutorials/intermediate/dist_tuto.html", implement a distributed version of your favorite ML algorithm. 

In [2]:
from torch.utils.data import Dataset, DataLoader, random_split, DistributedSampler

In [3]:
# Define the device to use for computations
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
# build the dataset
class getDataset(Dataset):
    def __init__(self, data, labels):
      data = data.astype(np.float32)
      label = labels.astype(np.float32)

      self.x_data = torch.from_numpy(data)
      self.y_data = torch.from_numpy(labels)
      self.len = data.shape[0]
    
    def __getitem__(self, index):
      return self.x_data[index], self.y_data[index]

    def __len__(self):
      return self.len

In [5]:
# Define the LR model
class LRModel(nn.Module):
    def __init__(self, input_size):
        super(LRModel, self).__init__()
        self.fc =nn.Sequential(
                nn.Linear(input_size, 3),
                nn.ReLU(),
                nn.Linear(3,1),
                )
    def forward(self, x):
        out = self.fc(x)
        return out

In [9]:
# Define the training function
def train(rank, world_size):
    # Set the seed to ensure reproducibility
    
    data = np.loadtxt(open('/content/sample_data/california_housing_train.csv', "r"), delimiter=",", skiprows=1)
    train_data = getDataset(data[:,0:-1],data[:,-1])
    torch.manual_seed(0)
    # Set the batch size and number of epochs
    batch_size = 64
    num_epochs = 10
    
    # Create a distributed sampler to split the data across different processes
    train_sampler = DistributedSampler(train_data, num_replicas=world_size, rank=rank)
    
    # Create a data loader for the training data using the distributed sampler
    train_loader = DataLoader(train_data, batch_size=batch_size, sampler=train_sampler)
    
    # Create the LR model and move it to the device
    input_size = data.shape[1]-1
    model = LRModel(input_size).to(device)
    
    # Define the loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001)
    
    # Train the model
    for epoch in range(num_epochs):
        train_sampler.set_epoch(epoch)
        for batch_idx, data in enumerate(train_loader):
            data_m, target = data[0].to(device), data[1].to(device).float()
            normalizer_train = len(train_loader.dataset)
            optimizer.zero_grad()
            output = model(data_m)
            loss = criterion(output, target.unsqueeze(1))
            loss.backward()
            optimizer.step()
            if batch_idx % 10 == 0:
                print('Rank ', rank, ', Epoch ', epoch, ', Batch ', batch_idx, ', Loss: ', loss.item()/normalizer_train, '\n')

In [10]:
def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29505'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)

size = 4
processes = []

for rank in range(size):
  p = mp.Process(target=init_process, args=(rank, size, train))
  p.start()
  processes.append(p)

for p in processes:
  p.join()

Rank Rank Rank Rank   2 3 1  , Epoch 0, Epoch    0, Epoch , Epoch    , Batch 0 0  0 0, Batch   , Batch  , Batch , Loss:  0   0 0, Loss:  3015360.993882353, Loss:  , Loss:  3471007.744 3682982.731294118   3543142.1590588237

 





Rank Rank  Rank 2  3, Epoch   , Epoch  0 0, Batch  , Batch    100 , Loss:  3609419.053176470610  Rank  
, Epoch , Loss:  
  14221015.2207058830   , Epoch 
 , Batch 
 010  , Loss: Rank , Batch   3546466.1835294122 Rank 10   , Loss: 
 3, Epoch  
  02694036.540235294 , Epoch   , Batch 0 
Rank  20
  , Batch 0 , Loss:  20 , Epoch  3037473.6112941178 , Loss:  0
 Rank 2633493.8051764704
  1, Batch   
 20, Epoch  
 , Loss: 0Rank   2548069.3157647057  2, Batch 
Rank   
 , Epoch 3, Epoch 20    00, Loss:   2980500.660705882 , Batch  

Rank   , Batch 30 0Rank   , Epoch 30 1, Loss:  , Loss:   , Epoch   2934388.374588235300   3491232.3463529414 
, Batch  , Batch 
 
 30
30  , Loss: , Loss:   3010932.7362844894.147764706Rank  Rank   2

  3

 , Epoch , Epoch Rank    Rank 00 0