In [None]:
import numpy as np
from numpy import linalg as LA
import matplotlib.pyplot as plt

In [None]:
def trimmed_l1_trend(b,K,init_x=None,init_y=None,step=1000000,gamma=None,rho=None,eps=1e-6):
  n = len(b)
  D = np.eye(n-2,n)-2*np.eye(n-2,n,1)+np.eye(n-2,n,2)
  if gamma is None:
    X = np.stack([np.ones(n),np.arange(n)],1)
    b_hat = X@LA.inv(X.T@X)@X.T@b
    gamma = 1.0001*LA.norm(b-b_hat,2)/np.sqrt(4*(1-np.cos(np.pi/n))*(1-np.cos(np.pi/(n-1))))
  if rho is None:
    rho = 1/(2*(1-np.cos(np.pi/n))*(1-np.cos(np.pi/(n-1))))
  if init_x is None:
    x = np.zeros(n)
  else:
    x = init_x
  if init_y is None:
    y = np.zeros(n-2)
  else:
    y = init_y
  invmat = LA.inv(np.identity(n)+rho*D.T@D)
  print(LA.norm(y))

  k=0
  while k < step:
    k = k+1
    if (k%1000) == 0:
      print(k)

    pre_x = np.copy(x)
    z = D@x-y/rho
    arg = np.argpartition(np.abs(z),n-K-3)
    z[arg[:(n-K-2)]] = np.sign(z[arg[:(n-K-2)]])*np.maximum(np.abs(z[arg[:(n-K-2)]])-gamma/rho,0)
    x = invmat@(b+D.T@(y+rho*z))
    y = y+rho*(z-D@x)

    if LA.norm(x-pre_x,2) <= eps:
      print('The terminated criterion has been fulfilled.')
      break
    if k == step:
      print('The default number of steps has been reached before the terminated criterion is fulfilled.')
  return x

In [None]:
def l1_trend(b,gamma,init_x=None,init_y=None,step=1000000,rho=None,eps=1e-6):
  n = len(b)
  D = np.eye(n-2,n)-2*np.eye(n-2,n,1)+np.eye(n-2,n,2)
  if rho is None:
    rho = 1/(2*(1-np.cos(np.pi/n))*(1-np.cos(np.pi/(n-1))))
  if init_x is None:
    x = np.zeros(n)
  else:
    x = init_x
  if init_y is None:
    y = np.zeros(n-2)
  else:
    y = init_y
  invmat = LA.inv(np.identity(n)+rho*D.T@D)

  k=0
  while k < step:
    k = k+1
    if (k%1000) == 0:
      print(k)

    pre_x = np.copy(x)
    z = D@x-y/rho
    z = np.sign(z)*np.maximum(np.abs(z)-gamma/rho,0)
    x = invmat@(b+D.T@(y+rho*z))
    y = y+rho*(z-D@x)

    if LA.norm(x-pre_x,2) <= eps:
      print('The terminated criterion has been fulfilled.')
      break
    if k == step:
      print('The default number of steps has been reached before the terminated criterion is fulfilled.')
  return x

In [None]:
np.random.seed(0)
n=500
true_x = np.zeros(n)
trend = np.array([0.015]*100+[-0.005]*150+[0.005]*175+[-0.01]*75)
for i in range(1,n):
  true_x[i] = true_x[i-1]+trend[i]
b = true_x + 0.2*np.random.standard_normal(n)
plt.rcParams["font.size"] = 15
plt.plot(np.arange(n),b,label="observed data")
plt.plot(np.arange(n),true_x,linewidth=3,color="black",label="true trend")
plt.ylim(-0.5,2)
plt.legend()

In [None]:
gamma_seq = np.r_[np.array([5,10,20,30,40,50,60,70,80,90]),np.arange(100,2000,100)]
est_x = l1_trend(b,1)
result_l1 = np.copy(est_x)
for gamma in gamma_seq:
  est_x = l1_trend(b,gamma,est_x)
  result_l1 = np.c_[result_l1,est_x]

In [None]:
error_l1 = np.zeros(30)
for i in range(30):
  error_l1[i] = LA.norm(result_l1[:,i]-b)
print(error_l1)

In [None]:
est_x_trimmed = trimmed_l1_trend(b,12)
result_trim = np.copy(est_x_trimmed)
for K in range(11,2,-1):
  est_x_trimmed = trimmed_l1_trend(b,K,est_x_trimmed)
  result_trim = np.c_[result_trim,est_x_trimmed]

In [None]:
error_trim = LA.norm(result_trim[:,9]-b)
print(error_trim)

In [None]:
plt.rcParams["font.size"] = 15
plt.plot(np.arange(n),b,label="observed data",alpha=0.5)
plt.plot(np.arange(n),result_l1[:,9],linewidth=3,color="red",label="estimated trend")
plt.ylim(-0.5,2)
plt.legend()

In [None]:
plt.rcParams["font.size"] = 15
plt.plot(np.arange(n),b,label="observed data",alpha=0.5)
plt.plot(np.arange(n),result_trim[:,9],linewidth=3,color="red",label="estimated trend")
plt.ylim(-0.5,2)
plt.legend()