In [1]:
import numpy as np
from PIL import Image
np.random.seed(0) 

m = 10  
n = 784 
lambda_ = 1e-5  # regularization parameter

true_x = np.array(Image.open('./data/4.jpg'))
true_x = true_x.reshape(784)

x=true_x+np.random.randn(784)*(2.4e-3)

O = np.random.randn(n, 100, 784)
A = [O[i] for i in range(m)]
y = [A[i] @ true_x for i in range(m)]

'''
Constant error setting:C=3 or 5 and Diminishing errors setting:C=20 or 30 and No error setting:C=0
Gaussian vector  e=C*r or C*r**t
'''
r=np.random.normal(loc=0, scale=1e-3, size=100)

# PG Algorithms

In [2]:
class pg_algorithms:

    def soft_thresholding(self, x, threshold):

        return np.sign(x) * np.maximum(np.abs(x) - threshold, 0)

    def gradient_smooth_part(self, A, x, y):

        grad = np.zeros_like(x)
        n = len(y)
        for i in range(n):
            residual = A[i] @ x - y[i]
            grad += (A[i].T @ residual) / n
        return grad

    def proximal_gradient_descent(self, x, A, y, lambda_, true_x, r, max_iter, p, alpha, C): 

        x0 = x.copy()
        y0 = y.copy()

        current_errs=[]
        current_errs.append((np.linalg.norm(((x0-true_x)))**2)/len(y0))

        
        for iteration in range(max_iter):

            # y0+=C*(r**(iteration+1))
            y0+=C*r

            grad = self.gradient_smooth_part(A, x0, y0)
            x_new = self.soft_thresholding(x0 - alpha * grad, alpha * lambda_)
            x0 = x_new
            
            current_err = (np.linalg.norm(((x0-true_x)))**2)/len(y0)
            if (iteration!=0) and (iteration % p == 0):
                current_errs.append(current_err)
                print(f"Iteration {iteration}, Objective Value: {current_err}")
        
            y0=y.copy()

        return x0,current_errs

# B-PG Algorithms

In [3]:
class bpg_algorithms:
    def gradient_of_smooth_part(self, x, y, A):
        grad = np.zeros_like(x)
        for i in range(len(y)):
            residual = A[i] @ x - y[i]
            grad += 2 * A[i].T @ residual
        return grad / len(y)

    def soft_thresholding(self, x, lambda_):

        return np.sign(x) * np.maximum(np.abs(x) - lambda_, 0)

    def block_proximal_gradient(self, x, A, y, lambda_, true_x, r, max_iter, p, learning_rate, C): 

        x0=x.copy()
        y0=y.copy()

        current_errs=[]
        current_errs.append((np.linalg.norm(((x0-true_x)))**2)/len(y))
        for iteration in range(max_iter):

            # y0+=C*(r**(iteration+1))
            y0+=C*r

            grad = self.gradient_of_smooth_part(x0, y0, A)
            
            x_new = x0 - learning_rate * grad
            
            x_new = self.soft_thresholding(x_new, lambda_ * learning_rate)
            x0 = x_new
            
            # Check convergence
            current_err = (np.linalg.norm(((x0-true_x)))**2)/len(y)
            if (iteration!=0) and (iteration % p == 0):
                current_errs.append(current_err)
                print(f"Iteration {iteration}, Objective Value: {current_err}")
            
            y0=y.copy()
            
        return x0,current_errs

# SPG Algorithms

In [4]:
class spg_algorithm:

    def prox_l1(self, v, lambda_):
        return np.sign(v) * np.maximum(np.abs(v) - lambda_, 0)

    def gradient_smooth_part(self, x, A, y, idx):
        return 2 *A[idx].T @ (A[idx] @ x - y[idx]) / len(y)

    def run_spg(self, x, A, y, lambda_, true_x, r, num_iterations, p, step_size, C): 

        x0=x.copy()
        y0=y.copy()

        current_errs=[]
        current_errs.append((np.linalg.norm(((x0-true_x)))**2)/len(y))
        for iteration in range(num_iterations):

            # y0+=C*(r**(iteration+1))
            y0+=C*r

            idx = np.random.randint(len(y))
            
            grad_smooth = self.gradient_smooth_part(x0, A, y0, idx)
            
            x_temp = x0 - step_size * grad_smooth
            
            x0 = self.prox_l1(x_temp, step_size * lambda_)
            
            current_err = (np.linalg.norm(((x0-true_x)))**2)/len(y)
            if (iteration!=0) and (iteration % p == 0):
                current_errs.append(current_err)
                print(f"Iteration {iteration}, Objective Value: {current_err}")
        
            y0=y.copy()

        return x0,current_errs

# ADMN Algorithms

In [5]:
from scipy.sparse.linalg import cg
import math

class admm_algorithm:
    
    def soft_threshold(self, x, threshold):
        return np.sign(x) * np.maximum(np.abs(x) - threshold, 0)

    def admm_solver(self, x, A, y, lambda_, true_x, r, max_iter, p, rho, C):

        x0=x.copy()
        y0=y.copy()

        m, n = len(y0),len(x0)

        z = x.copy()
        u = np.zeros(n)

        current_errs=[]
        current_errs.append((np.linalg.norm(((x0-true_x)))**2)/len(y0)) 

        
        AtA = sum([A[i].T @ A[i] for i in range(m)]) / m + rho * np.eye(n)
        b1 = sum([A[i].T @ y0[i] for i in range(m)]) / m

        for iteration in range(max_iter):

            y0+=C*r
            # y0+=C*(r**(iteration+1))

            b = b1 + rho * (z - u)
            x0, _ = cg(AtA, b, x0, maxiter=10, rtol=(1e-4/(iteration/2+1)/math.pow(0.999,(iteration+1)/3e2)*0.75 + 1e-9))
        
            z = self.soft_threshold(x0 + u, lambda_ / rho)

            u = u + x0 - z

            current_err = (np.linalg.norm(((x0-true_x)))**2)/len(y0)
            if (iteration!=0) and (iteration % p == 0):
                current_errs.append(current_err)
                print(f"Iteration {iteration}, Objective Value: {current_err}")

            y0=y.copy()

        return x0,current_errs

# PGRR Algorithms

In [6]:
import numpy as np
from typing import List

class pgrr_algorithm:
    def soft_thresholding(self, x: np.ndarray, threshold: float) -> np.ndarray:
        
        return np.sign(x) * np.maximum(np.abs(x) - threshold, 0)

    def compute_gradient(self, x: np.ndarray, A: List[np.ndarray], y: List[np.ndarray]) -> np.ndarray:
        
        n = len(y)
        gradient = np.zeros_like(x)
        for i in range(n):
            gradient += 2 * A[i].T @ (A[i] @ x - y[i])
        return gradient / n

    def PG_RR(self, initial_x, A, y, lambda_, true_x, r, num_epochs, p, gamma, C):
       
        x = initial_x.copy()
        n = len(y)
        y0=y.copy()
        current_errs=[]
        current_errs.append((np.linalg.norm(((x-true_x)))**2)/len(y))

        for iteration in range(num_epochs):

            # y0+=C*(r**(iteration+1))
            y0+=C*r

            for i in np.random.permutation(n):
                gradient = 2 * A[i].T @ (A[i] @ x - y0[i])
                x = self.soft_thresholding(x - gamma * gradient, gamma * lambda_)
            
            current_err = (np.linalg.norm(((x-true_x)))**2)/len(y0)
            if (iteration!=0) and (iteration % p == 0):
                current_errs.append(current_err)
                print(f"Iteration {iteration}, Objective Value: {current_err}")
            
            y0=y.copy()
        
        return x,current_errs

# ours Algorithms

In [7]:
import numpy as np
import numpy as np

class ours_algorithms:

    def soft_thresholding(self ,x, threshold):
       
        return np.sign(x) * np.maximum(np.abs(x) - threshold, 0)
    
    def PG_RR(self, initial_x, A, y, lambda_, true_x, r, num_epochs, p, gamma, C, momentum):

        x = initial_x.copy()
        n = len(y)
        velocity = np.zeros_like(x)
        y0=y.copy()

        current_errs=[]
        current_errs.append((np.linalg.norm(((x-true_x)))**2)/len(y))
        
        
        for iteration in range(num_epochs):

            # y0+=C*(r**(iteration+1))
            y0+=C*r

            for i in np.random.permutation(n):
                gradient = 2 * A[i].T @ (A[i] @ x - y0[i])
                velocity = momentum * velocity + gamma * gradient
                x = self.soft_thresholding(x - velocity, gamma * lambda_)
        
            current_err = (np.linalg.norm(((x-true_x)))**2)/len(y0)
            if (iteration!=0) and (iteration % p == 0):
                current_errs.append(current_err)
                print(f"Iteration {iteration}, Objective Value: {current_err}")
                
            y0=y.copy()

        return x,current_errs


In [None]:
# PG algorithm
pg=pg_algorithms()

# B-PG algorithm
bpg = bpg_algorithms()

# SPG algorithm
spg = spg_algorithm()

# ADMM algorithm
admm = admm_algorithm()

# PGRR algorithm
pgrr=pgrr_algorithm()

# ours algorithm
ours = ours_algorithms()


C=5
p=5000
all_iteration=int(1.45e5+1)

pg_x,pg_errors= pg.proximal_gradient_descent(x, A, y, lambda_, true_x, r, all_iteration, p, 2e-7, C)
bpg_x,bpg_errors = bpg.block_proximal_gradient(x, A, y, lambda_, true_x, r, all_iteration, p, 1.1e-7, C)
spg_x,spg_errors= spg.run_spg(x, A, y, lambda_, true_x, r, all_iteration, p, 1.4e-6, C)
admm_x,admm_errors = admm.admm_solver(x, A, y, lambda_, true_x, r, all_iteration, p, 1e5, C)
pg_rr_x,pg_rr_errors= pgrr.PG_RR(x, A, y, lambda_, true_x, r, all_iteration, p, 6.5e-8, C)
ours_x,ours_errors= ours.PG_RR(x, A, y, lambda_, true_x, r, all_iteration, p, 6.5e-8, C, 0.9)


In [None]:
np.save('', np.array([pg_errors,bpg_errors,spg_errors,admm_errors,pg_rr_errors,ours_errors]))