# Decentralized SPSA Simulation

This notebook implements and evaluates a decentralized gradient-free optimization method (DSPSA) under various types of drift, noise, and communication topologies. The goal is to simulate challenging scenarios that resemble real-world multi-agent systems with noisy feedback and dynamic optima.


In [2]:
import numpy as np
import pandas as pd
from scipy.stats import levy

## OPTIMUM DRIFT SCENARIO

In [3]:
class DriftScenario:
    def __init__(self, mode, dim, total_steps, jump_step=None, step_size=0.05):
        self.mode = mode
        self.dim = dim
        self.total_steps = total_steps
        self.jump_step = jump_step or total_steps // 2
        self.step_size = step_size

    def get_theta_star(self, t):
        if self.mode == 'linear':
            # Linear drift over time
            return np.ones(self.dim) * self.step_size * t
        elif self.mode == 'jump':
            # Sudden jump at a given step
            if t < self.jump_step:
                return np.zeros(self.dim)
            else:
                return np.ones(self.dim) * 3.0
        else:
            raise ValueError("Unknown drift mode")


## NOISE GENERATOR 

In [4]:
class NoiseGenerator:
    def __init__(self, noise_type, size, **kwargs):
        self.noise_type = noise_type
        self.size = size
        self.kwargs = kwargs

    def sample(self):
        if self.noise_type == 'gaussian':
            # Gaussian noise with zero mean
            return np.random.normal(loc=0, scale=self.kwargs.get('sigma', 0.1), size=self.size)
        elif self.noise_type == 'pareto':
            # Zero-centered Pareto noise
            p = np.random.pareto(self.kwargs.get('shape', 2), size=self.size)
            return (p - p.mean()) * self.kwargs.get('scale', 1)
        elif self.noise_type == 'levy':
            # Lévy noise
            return levy.rvs(size=self.size)
        else:
            raise ValueError("Unknown noise type")


##  AGENT GRAPH GENERATORS

In [5]:
def make_ring_graph(n):
    # Ring topology: each agent connected to its two neighbors
    adj = np.zeros((n, n))
    for i in range(n):
        adj[i, (i+1)%n] = 1
        adj[i, (i-1)%n] = 1
    return adj

def make_random_k_neighbors(n, k=3):
    # Random-k graph: each agent connected to k random other agents
    adj = np.zeros((n, n))
    for i in range(n):
        k_eff = min(k, n-1)
        neighbors = np.random.choice([j for j in range(n) if j != i], k_eff, replace=False)
        for j in neighbors:
            adj[i, j] = 1
    return adj

##  LOSS FUNCTION 

In [None]:
def make_agent_function(c_i, rho=1.5):
    # Agent-specific function with ℓ_rho norm
    def f(x):
        diff = x - c_i
        value = np.sum(np.abs(diff) ** rho)
        grad = np.sign(diff) * (np.abs(diff) ** (rho - 1))
        return value, grad
    return f

class LossFunction:
    def __init__(self, rho, noise_generator):
        self.rho = rho
        self.noise_generator = noise_generator

    def value(self, x, theta_star):
        # x: (n, d), theta_star: (d,)
        # Add noise to ℓ_rho norm loss
        noise = self.noise_generator.sample()
        diff = np.abs(x - theta_star)
        return np.sum(diff ** self.rho, axis=1) + noise


## BASIC AGENT CLASS & SIMULATION LOGGER

In [None]:
class Agent:
    def __init__(self, idx, x_init):
        self.idx = idx
        self.x = np.copy(x_init)
        self.history = [np.copy(self.x)]

    def update(self, new_x):
        # Update agent's internal state
        self.x = np.copy(new_x)
        self.history.append(np.copy(self.x))


class SimulationLogger:
    def __init__(self):
        self.logs = []

    def log(self, t, agent_states, theta_star, loss_values):
        # Store simulation snapshot for timestep t
        record = {
            't': t,
            'agent_states': [np.copy(x) for x in agent_states],
            'theta_star': np.copy(theta_star),
            'loss_values': np.copy(loss_values)
        }
        self.logs.append(record)

    def get_agent_trajectories(self, idx):
        # Retrieve trajectory of agent i over time
        return [step['agent_states'][idx] for step in self.logs]

    def get_theta_star_trajectory(self):
        # Retrieve trajectory of optimum θ*(t)
        return [step['theta_star'] for step in self.logs]


## MAIN SIMULATION ORGANIZER 

In [None]:
class Simulator:
    def __init__(self, num_agents, dim, drift_scenario, noise_generator, loss_func, adjacency, method, steps=200):
        self.num_agents = num_agents
        self.dim = dim
        self.drift = drift_scenario
        self.noise_gen = noise_generator
        self.loss_func = loss_func
        self.adjacency = adjacency
        self.steps = steps
        self.method = method
        self.agents = [Agent(i, np.random.uniform(-5, 5, dim)) for i in range(num_agents)]
        self.logger = SimulationLogger()

    def run(self):
        for t in range(self.steps):
            theta_star = self.drift.get_theta_star(t)
            X = np.stack([agent.x for agent in self.agents], axis=0)
            loss_vals = self.loss_func.value(X, theta_star)
            self.logger.log(t, [agent.x for agent in self.agents], theta_star, loss_vals)
            new_states = self.method.step([agent.x for agent in self.agents], theta_star, t, self.adjacency, self.loss_func)
            for agent, new_x in zip(self.agents, new_states):
                agent.update(new_x)


## METHODS

In [7]:
# ==== DSPSA-CONSENSUS METHOD ====
class DSPSAConsensus:
    def __init__(self, alpha=0.1, gamma=0.2, beta=0.1, spsa_seed=None):
        self.alpha = alpha
        self.gamma = gamma
        self.beta = beta
        self.rng = np.random.default_rng(spsa_seed)

    def step(self, states, theta_star, t, adjacency, loss_func):
        n, d = np.shape(states)
        states = np.array(states)
        grad_est = []

        # SPSA gradient estimation for each agent
        for i in range(n):
            delta = self.rng.choice([-1, 1], size=d)
            x_plus = states[i] + self.beta * delta
            x_minus = states[i] - self.beta * delta
            f_plus = loss_func.value(x_plus[None, :], theta_star)[0]
            f_minus = loss_func.value(x_minus[None, :], theta_star)[0]
            grad_i = (f_plus - f_minus) / (2 * self.beta) * delta
            grad_est.append(grad_i)

        grad_est = np.stack(grad_est)
        new_states = []

        # Consensus + descent step
        for i in range(n):
            consensus = np.zeros(d)
            for j in range(n):
                if adjacency[i, j]:
                    consensus += (states[j] - states[i])
            new_x = states[i] - self.alpha * grad_est[i] + self.gamma * consensus
            new_states.append(new_x)

        return new_states
    
# ==== DISTRIBUTED SUBGRADIENT METHOD ====
class DistributedSubgradient:
    def __init__(self, alpha=0.1, gamma=0.2, rho=1.5):
        self.alpha = alpha
        self.gamma = gamma
        self.rho = rho

    def subgrad(self, x, theta_star):
        # Subgradient of the ℓ_rho norm
        diff = x - theta_star
        grad = np.sign(diff) * (np.abs(diff) ** (self.rho - 1))
        return grad

    def step(self, states, theta_star, t, adjacency, loss_func):
        n, d = np.shape(states)
        states = np.array(states)
        grad_est = []

        # Compute local subgradients
        for i in range(n):
            grad_i = self.subgrad(states[i], theta_star)
            grad_est.append(grad_i)

        grad_est = np.stack(grad_est)
        new_states = []

        # Consensus + gradient step
        for i in range(n):
            consensus = np.zeros(d)
            for j in range(n):
                if adjacency[i, j]:
                    consensus += (states[j] - states[i])
            new_x = states[i] - self.alpha * grad_est[i] + self.gamma * consensus
            new_states.append(new_x)

        return new_states

# ==== DISTRIBUTED KIEFER–WOLFOWITZ METHOD ====
class DistributedKieferWolfowitz:
    def __init__(self, alpha=0.1, gamma=0.2, h=0.05):
        self.alpha = alpha
        self.gamma = gamma
        self.h = h

    def grad_estimate(self, x, theta_star, loss_func):
        # Finite-difference gradient estimation
        d = x.size
        grad = np.zeros(d)
        for k in range(d):
            e_k = np.zeros(d)
            e_k[k] = 1
            f_plus = loss_func.value((x + self.h * e_k)[None, :], theta_star)[0]
            f_minus = loss_func.value((x - self.h * e_k)[None, :], theta_star)[0]
            grad[k] = (f_plus - f_minus) / (2 * self.h)
        return grad

    def step(self, states, theta_star, t, adjacency, loss_func):
        n, d = np.shape(states)
        states = np.array(states)
        grad_est = []

        # Estimate gradient for each agent
        for i in range(n):
            grad_i = self.grad_estimate(states[i], theta_star, loss_func)
            grad_est.append(grad_i)

        grad_est = np.stack(grad_est)
        new_states = []

        # Consensus + gradient update
        for i in range(n):
            consensus = np.zeros(d)
            for j in range(n):
                if adjacency[i, j]:
                    consensus += (states[j] - states[i])
            new_x = states[i] - self.alpha * grad_est[i] + self.gamma * consensus
            new_states.append(new_x)

        return new_states

##  GRAPH GENERATOR WRAPPER

In [None]:
def get_graph(graph_type, n):
    if graph_type == 'ring':
        return make_ring_graph(n)
    elif graph_type == 'random_k':
        return make_random_k_neighbors(n, k=3)
    elif graph_type == 'random':
        return make_random_graph(n, p=0.3)
    else:
        raise ValueError(f"Unknown graph type {graph_type}")



## SCENARIO PARAMETER

## FULL SIMULATION EXECUTION

In [None]:

def run_all_scenarios(steps=200, random_seed=42, num_runs=20):
    results = []
    scenario_id = 0
    for n_agents in agent_counts:
        for dim in dimensions:
            for graph_type in graph_types:
                for rho in rhos:
                    for noise_type in noise_types:
                        for drift_type in drift_types:
                            for method_name, method_class in methods_dict.items():
                                scenario_id += 1
                                for run in range(num_runs):
                                    np.random.seed(random_seed + run)  # ensure different seed per run

                                    # Drift setup
                                    drift = DriftScenario(drift_type, dim, steps)

                                    # Noise generator
                                    noise_params = {'sigma': 0.2} if noise_type == 'gaussian' else {'shape': 2, 'scale': 1}
                                    noise_gen = NoiseGenerator(noise_type, size=n_agents, **noise_params)

                                    # Loss function
                                    loss_func = LossFunction(rho, noise_gen)

                                    # Communication graph
                                    adjacency = get_graph(graph_type, n_agents)

                                    # Method instantiation
                                    if method_name == 'evolution_strategy':
                                        method = method_class(n_arms=5, gamma=0.2)
                                    elif method_name == 'dspsa_consensus':
                                        method = method_class(alpha=0.1, gamma=0.2, beta=0.1)
                                    elif method_name == 'consensus_only':
                                        method = method_class(gamma=0.9)
                                    elif method_name == 'subgradient':
                                        method = method_class(alpha=0.1, gamma=0.2, rho=rho)
                                    elif method_name == 'kiefer_wolfowitz':
                                        method = method_class(alpha=0.1, gamma=0.2, h=0.05)
                                    else:
                                        raise ValueError("Unsupported method")

                                    # Run simulation
                                    sim = Simulator(n_agents, dim, drift, noise_gen, loss_func, adjacency, method, steps)
                                    sim.run()

                                    # Extract logs
                                    for record in sim.logger.logs:
                                        t = record['t']
                                        theta_star = record['theta_star']
                                        for agent_idx, (x, loss) in enumerate(zip(record['agent_states'], record['loss_values'])):
                                            if method_name == 'evolution_strategy':
                                                chosen_arm = int(x[0])
                                                true_arm = int(np.clip(theta_star[0], 0, method.n_arms - 1))
                                                error = abs(chosen_arm - true_arm)
                                            else:
                                                error = np.linalg.norm(x - theta_star)
                                            results.append({
                                                'scenario_id': scenario_id,
                                                'run': run,
                                                'num_agents': n_agents,
                                                'dimension': dim,
                                                'graph': graph_type,
                                                'rho': rho,
                                                'noise': noise_type,
                                                'drift': drift_type,
                                                'method': method_name,
                                                'iteration': t,
                                                'agent': agent_idx,
                                                'x': x.copy(),
                                                'theta_star': theta_star.copy(),
                                                'loss': loss,
                                                'error': error,
                                            })
    df = pd.DataFrame(results)
    return df


# ==== RUN IF EXECUTED AS SCRIPT ====
if __name__ == "__main__":
    df = run_all_scenarios(steps=200, random_seed=42, num_runs=20)
    print(df.head())
    df.to_csv('scenarios_results.csv', index=False)