In [2]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [4]:
# derived from https://github.com/cvxgrp/lfd_lqr/blob/master/small%20random.ipynb
import numpy as np
from scipy.linalg import solve_discrete_are
import cvxpy as cp
from ADMM import policy_fitting, policy_fitting_with_a_kalman_constraint
import warnings
# warnings.filterwarnings('ignore')

In [5]:
np.random.seed(0)
n, m = 4, 2
A = np.random.randn(n, n)
A = A / np.abs(np.linalg.eig(A)[0]).max()
B = np.random.randn(n, m)
W = .25 * np.eye(n)
Q_true = np.eye(n)
R_true = np.eye(m)
P_true = solve_discrete_are(A, B, Q_true, R_true)
K_true = -np.linalg.solve(R_true + B.T @ P_true @ B, B.T @ P_true @ A)

In [6]:
def simulate(K, N=10, seed=None, add_noise=False):
    if seed is not None:
        np.random.seed(seed)
    x = np.random.multivariate_normal(np.zeros(n), W)
    xs = []
    us = []
    cost = 0.0
    for _ in range(N):
        u = K @ x
        if add_noise:
            u += 2*np.random.randn(m)
        xs.append(x)
        us.append(u)
        cost += (x @ Q_true @ x + u @ R_true @ u) / N
        x = A @ x + B @ u + np.random.multivariate_normal(np.zeros(n), W)
    xs = np.array(xs)
    us = np.array(us)
    
    return cost, xs, us

In [None]:
def simulate_variable_cost(A, B, Q_true, R_true, N=10, seed=None, add_noise=False):
    P_true = solve_discrete_are(A, B, Q_true, R_true)
    K = -np.linalg.solve(R_true + B.T @ P_true @ B, B.T @ P_true @ A)
    
    if seed is not None:
        np.random.seed(seed)
    x = np.random.multivariate_normal(np.zeros(n), W)
    xs = []
    us = []
    cost = 0.0
    for _ in range(N):
        u = K @ x
        if add_noise:
            u += 2*np.random.randn(m)
        xs.append(x)
        us.append(u)
        cost += (x @ Q_true @ x + u @ R_true @ u) / N
        x = A @ x + B @ u + np.random.multivariate_normal(np.zeros(n), W)
    xs = np.array(xs)
    us = np.array(us)
    
    return cost, xs, us

In [7]:
N_test = 10000
cost_true = simulate(K_true, N=N_test, seed=0)[0]
cost_noise = simulate(K_true, N=N_test, seed=0, add_noise=True)[0] 
cost_true, np.trace(P_true @ W), cost_noise

(1.7934243186095473, 1.8031177219720185, 64.75750340186758)

In [8]:
costs_lr = []
costs_admm = []
costs_federated_admm_all_personalized = []
costs_federated_admm_semi_personalized = []

Ns = np.arange(1, 51)
N_agents = 5
for N in Ns:
    costs_lr += [[]]
    costs_admm += [[]]
    for k in range(1, 11):
        _, xs, us = simulate(K_true, N=N, seed=k, add_noise=True)

        def L(K):
            return cp.sum_squares(xs @ K.T - us)

        def r(K):
            return .01 * cp.sum_squares(K), []
        
        Klr = policy_fitting(L, r, xs, us)
        Kadmm = policy_fitting_with_a_kalman_constraint(L, r, xs, us, A, B, n_random=5)
        
        cost_lr = simulate(Klr, N=N_test, seed=0)[0]
        cost_admm = simulate(Kadmm, N=N_test, seed=0)[0]
        
        if np.isnan(cost_lr) or cost_lr > 1e5 or cost_lr == np.inf:
            cost_lr = np.nan

        costs_lr[-1].append(cost_lr)
        costs_admm[-1].append(cost_admm)

    print (" %03d | %3.3f | %3.3f | %3.3f | %3.3f" %
           (N, cost_true, cost_noise, np.nanmean(costs_lr[-1]), np.nanmean(costs_admm[-1])))

  "Solution may be inaccurate. Try another solver, "
  
  if __name__ == '__main__':
  if __name__ == '__main__':
  
  from ipykernel import kernelapp as app
  from ipykernel import kernelapp as app
  
  from ipykernel import kernelapp as app


 001 | 1.793 | 64.758 | 7.175 | 10.221


  from ipykernel import kernelapp as app


 002 | 1.793 | 64.758 | nan | 25.338
 003 | 1.793 | 64.758 | nan | 9.943
 004 | 1.793 | 64.758 | nan | 6.237
 005 | 1.793 | 64.758 | 167.778 | 4.792
 006 | 1.793 | 64.758 | 25.953 | 4.665
 007 | 1.793 | 64.758 | 47.339 | 4.472
 008 | 1.793 | 64.758 | 21.648 | 3.643
 009 | 1.793 | 64.758 | 28.426 | 3.355
 010 | 1.793 | 64.758 | 11.493 | 2.983


KeyboardInterrupt: 