In [None]:
## For colab runs
# from google.colab import drive
# drive.mount('/content/gdrive')

# !mkdir -p data
# !cp /content/gdrive/My\ Drive/uploads/a9a.2 ./data # load this data to the specified dir on your drive first

# DGM Minimal Environment 

In [None]:
import math
import numpy as np
import torch
import networkx as nx

import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.datasets import load_svmlight_file
from torch import autograd
from torch.nn.utils.rnn import pad_sequence

In [None]:
def uniform_decompose(N, m, b=8):
    """
    Decomposes N into m terms: a_i, i=1,m;
    so that: max_i a_i - min_i a_i <= 1 applies
    --------
    b is a number of bits used for an integer
    in the output array
    """
    terms = np.empty(m, dtype=f'i{b}')
    terms[:] = math.floor(N/m)
    terms[:N-terms.sum()] += 1
    return terms.tolist() 


def D(y, x):
    """
    Differential operator
    """
    grad = autograd.grad(
        outputs=y, inputs=x,
        grad_outputs=torch.ones_like(y),
        create_graph=True, allow_unused=True)

    if len(grad) == 1:
        return grad[0]
    return grad


def metropolis_weights(A):
    """
    A - Adjacency matrix (i.e., symmetric and with zero diagonal)
    """
    A = A / (1 + torch.max(A.sum(1,keepdims=True),A.sum(0,keepdims=True)))
    A.as_strided([len(A)], [len(A)+1]).copy_(1-A.sum(1))
    return A

def dummy_consensus_variation(F):
    num_nodes = F.b.size(0)
    G = nx.erdos_renyi_graph(num_nodes, .2)
    
    S = nx.adjacency_matrix(G).todense()
    S = F.b.new_tensor(S)
    
    W = metropolis_weights(S)
    return W

In [None]:
class Objective:
    """
    Base class for optimization functional
    """
    def __init__(self, A, b, num_nodes):
        chunk_sizes = uniform_decompose(A.size(0), num_nodes)
        self.A = pad_sequence(A.split(chunk_sizes), batch_first=True)
        self.b = pad_sequence(b.split(chunk_sizes), batch_first=True)

        
class LeastSquares(Objective):
    def __call__(self, X):
        s = '' if X.ndim < 2 else 'i'
        Y = torch.einsum(f'ijk,k{s}->ij', self.A, X) - self.b
        return Y.square().sum()
    
    
class LogRegression(Objective):
    def __call__(self, X):
        s = '' if X.ndim < 2 else 'i'
        Y = torch.einsum(f'ij,ijk,k{s}->ij', self.b, self.A, X)
        Y = torch.logaddexp(-Y, Y.new([0.])).mean()
        return Y

In [None]:
class TensorAccumulator:
    """
    Accumulates tensors and performs efficient
    chained matrix multiplication
    """
    def __init__(self, seq_length):
        self.n = seq_length
        self.acc = []
    
    def append(self, X):
        if len(self.acc) >= self.n:
            self.acc.pop(0)
        self.acc.append(X)
            
    def mm(self, X):
        return torch.chain_matmul(X, *self.acc)

In [None]:
class DGM:
    """
    Base class for decentralized gradient methods
    """
    def __init__(self, F, alpha=1.):
        self.F = F
        self.alpha = alpha
        
        self.n = F.b.size(0)
        self.gen = lambda : dummy_consensus_variation(F)
        self._initLogs()
        
    def _initLogs(self):
        self.logs = {'i': [], 'fn': [], 'dist2con': []}
        
    def _dist2consensus(self, X):
        h = X.new(self.n).fill_(1.)
        Q = torch.norm(X - X/self.n @ h[:,None]*h)
        return Q
        
    def _record(self, X, k):
        self.logs['dist2con'].append(self._dist2consensus(X).item())
        self.logs['fn'].append(self.F(X.mean(1)).item())
        self.logs['i'].append(k)
        
        
class EXTRON(DGM):
    """
    ONe-process EXTRA algorithm
    """
    def _step1(self, X0):
        W = self._W = self.gen()
        X0.requires_grad_(True)
        G0 = D(self.F(X0), X0)
        with torch.no_grad():
            X1 = X0@W - self.alpha*G0
        return G0, X1

    def _step2(self, X0, G0, X1):
        W = self._W = self.gen()
        X1.requires_grad_(True)
        G1 = D(self.F(X1), X1)
        with torch.no_grad():
            X2 = X1 - X0/2 + (X1-X0/2)@W - self.alpha*(G1-G0)
        return X1, G1, X2

    def run(self, X0, G0=None, X1=None, n_iters=10, lp=1):
        if G0 is None or X1 is None:
            G0, X1 = self._step1(X0)
            self._initLogs()
            self._record(X1, 0)

        for k in range(1, n_iters):
            X0, G0, X1 = self._step2(X0, G0, X1)
            if k%lp == 0: self._record(X1, k)

        return X0, G0, X1


class DIGONing(DGM):
    """
    ONe-process DIGing algorithm
    """
    def run(self, X0, G0=None, Y0=None, n_iters=10, lp=1):
        if G0 is None or Y0 is None:
            self._initLogs()
            X0.requires_grad_(True)
            Y0 = D(self.F(X0), X0)
            G0 = Y0.clone()
            
        for k in range(1, n_iters):
            W = self._W = self.gen()
            with torch.no_grad():
                X1 = X0@W - self.alpha*Y0
                
            X1.requires_grad_(True)
            G1 = D(self.F(X1), X1)
            with torch.no_grad():
                Y1 = Y0@W + G1 - G0
                
            X0, Y0, G0 = X1, Y1, G1
            if k%lp == 0: self._record(X0, k)
            
        return X0, G0, Y0
    
    
class DAGDON(DGM):
    """
    One-process AGD subroutine 
    """
    def __init__(self, F, L, mu, T=20):
        self.F = F
        self.T = T
        self.L = L
        self.mu = mu
        self.n = F.b.size(0)
        self.gen = lambda : dummy_consensus_variation(F)
        
    def run(self, X0, A0=None, n_iters=10, lp=1):
        if A0 is None:
            a0 = A0 = 0
            U0 = X0.clone()
            self._initLogs()
            
        consensus = TensorAccumulator(self.T)
        for k in range(1, n_iters):
            W = self._W = self.gen()
            consensus.append(W)
            
            a1 = 1 + A0*self.mu
            a1 = (a1 + (a1**2 + 4*self.L*A0*a1)**.5)/(2*self.L)
            A1 = A0 + a1
            
            Y = (a1*U0 + A0*X0)/A1
            Y.requires_grad_(True)
            G = D(self.F(Y), Y)
            with torch.no_grad():
                V = self.mu*Y + (1+A0*self.mu)*U0 - a1*G
                V /= 1 + A0*self.mu + self.mu
                U1 = consensus.mm(V)
                X1 = (a1*U1 + A0*X0) / A1
                
            X0, A0 = X1, A1
            if k%lp == 0: self._record(X0, k)
                
        return X0, A0

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
A, b = load_svmlight_file('data/a9a.2')
A = torch.Tensor(A.todense()).to(device)
b = torch.Tensor(b).to(device)

In [None]:
num_nodes = 15
F = LogRegression(A, b, num_nodes)
X0 = torch.zeros(A.size(1), num_nodes).to(device)
opt1 = EXTRON(F)
opt2 = DIGONing(F)
opt3 = DAGDON(F, L=.5, mu=.0, T=20)

In [None]:
n_iters = 1000
opt1.run(X0, n_iters=n_iters);
opt2.run(X0, n_iters=n_iters);
opt3.run(X0, n_iters=n_iters);

In [None]:
plt.figure(figsize=(8, 8))
plt.plot(opt1.logs['fn'], label='EXTRA');
plt.plot(opt2.logs['fn'], '--', label='DIGing');
plt.plot(opt3.logs['fn'], label='AGD');
plt.title('Optimization Functional Value over Iteration Number', size=20)
plt.xlabel('# k', size=20)
plt.ylabel('f(x)', size=20);
plt.legend();

In [None]:
plt.figure(figsize=(8, 8))
plt.plot(opt1.logs['dist2con'], label='EXTRA');
plt.plot(opt2.logs['dist2con'], '--', label='DIGing');
plt.plot(opt3.logs['dist2con'], label='AGD');
plt.title(r'$||X(I-11^T)||^2$', size=20)
plt.xlabel('# k', size=20)
plt.legend();