In [33]:
# ------------------------------mount drive-------------------------------------
from google.colab import drive
drive.mount('/content/drive/')
%cd /content/drive/MyDrive/Colab\ Notebooks/nonlinear-sys-id/simple\ pendulum\ neurips/

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).
/content/drive/MyDrive/Colab Notebooks/nonlinear-sys-id/simple pendulum neurips


# Getting Trajectory Data of Pendulum

In [34]:
import numpy as np
from scipy.stats import truncnorm
import random
import matplotlib.pyplot as plt
import math

g = 9.81  # (m/s^2)  gravity constant
dt = 0.01  # time_step for discrete-time system

def system_parameters():
    m = 0.1  # (kg)    mass
    l = 0.5  # (m) distance between the rotor and the center of mass
    k = 2  # controller gain
    return m, l, k

def generate_u(input_, time_hor, s_, mean, std, u_max, lb, ub):  # noise in control input
    if input_ == "trunc_guass":
        np.random.seed(s_)
        rv = truncnorm(-u_max, u_max, loc=mean, scale=std)
        r1 = rv.rvs(size=time_hor)
        return r1
    elif input_ == "uniform":
        np.random.seed(s_)
        r1 = np.random.uniform(low=lb, high=ub, size=time_hor)
        return r1
    elif input_ == "bernouli":
        np.random.seed(s_)
        r0 = np.random.rand(time_hor)
        r1 = []
        for k in range(len(r0)):
          if r0[k] < 0.5:
            r1.append(0.5)
          else:
            r1.append(-0.5)
        return r1

def generate_w(distr, time_hor, s_, mean, std, w_max, lb, ub):  # disturbance
    if distr == "trunc_guass":
        np.random.seed(s_)
        rv = truncnorm(-w_max, w_max, loc=mean, scale=std)
        r1 = rv.rvs(size=time_hor)
        rv = truncnorm(-w_max, w_max, loc=mean, scale=std)
        r2 = rv.rvs(size=time_hor)
        return r1, r2
    elif distr == "uniform":
        np.random.seed(s_)
        r1 = np.random.uniform(low=lb, high=ub, size=time_hor)
        r2 = np.random.uniform(low=lb, high=ub, size=time_hor)
        return r1, r2
    elif distr == "bernouli":
        np.random.seed(s_)
        r01 = np.random.rand(time_hor)
        r02 = np.random.rand(time_hor)
        r1 = []
        r2 = []
        for k in range(len(r01)):
          if r01[k] < 0.5:
            r1.append(100)
          else:
            r1.append(-100)
        for k in range(len(r02)):
          if r02[k] < 0.5:
            r2.append(100)
          else:
            r2.append(-100)
        return r1, r2

class SimplePendulumDynamics:
    def __init__(self, distr, input):
        self.state = None
        self.u0 = None
        self.distr = distr
        self.input = input
        self.m, self.l, self.k = system_parameters()
        self.alpha_list = []
        self.omega_list = []
        self.phi_s_u_list = []
        self.b_s_list = []
        self.phi_list = []
        self.state_list = []

    def plot_trajectory(self):
        t_list = np.array(range(len(self.alpha_list))) * dt
        fig, axs = plt.subplots(1, 2, figsize=(12, 5))
        plt.subplots_adjust(left=0.1, right=0.9, bottom=0.1, top=0.9, wspace=0.5, hspace=0.4)   # wspace: space between subplots in a row

        # print(np.sin(np.array(self.alpha_list)))
        axs[0].plot(t_list, np.sin(np.array(self.alpha_list)), label='$sin(\\alpha)$')
        axs[0].plot(t_list, np.array(self.alpha_list), label='$\\alpha$')
        # axs[0].scatter(t_list, np.sin(np.array(self.alpha_list)), marker='o')
        # plt.title("Quadrotor's Position")
        axs[0].set_xlabel('time (s)')
        axs[0].set_ylabel('$\\alpha$ ($rad$)')
        axs[0].legend()

        # print(np.array(self.omega_list))
        axs[1].plot(t_list, self.omega_list, label='$\omega$')
        # axs[1].scatter(t_list, self.omega_list, marker='o')
        # plt.title("Quadrotor's Angular Velocity")
        axs[1].set_xlabel('time (s)')
        axs[1].set_ylabel('$\omega$ ($rad/s^{2}$)')
        axs[1].legend()

        plt.show()

    def update_feature_list(self, phi_s_u, s_, s, ex):
        self.phi_s_u_list.append(phi_s_u)
        self.b_s_list.append(s - s_ - ex)

    def update_feat(self, y):
        self.phi_list.append(y)


    def get_trajectory_3(self, x0, time_hor, s_u, s_w, param_u, mult_u, param_w, mult_w):

        # ----------------------------------------- initial states -----------------------------------------------------
        self.state = x0
        x = np.array(x0)
        alpha_ = x[0]  # angle
        omega_ = x[1]  # angular velocity

        #  ------------------------------------- Storing the states - ---------------------------------------------
        self.alpha_list = [alpha_]
        self.omega_list = [omega_]
        self.state_list = [np.array([alpha_, omega_])]

        if self.input == "trunc_guass":
          u_max_ = param_u[2]
        else:
          u_max_ = 1.0

        if self.distr == "trunc_guass":
          w_max_ = param_w[2]
        else:
          w_max_ = 1.0

        # -----------------  random noise and disturbance generation ---------------------------------------------------
        U1_list = generate_u(self.input, time_hor, s_u, mean=param_u[0], std=param_u[1], u_max=u_max_, lb=param_u[0], ub=param_u[1])
        W1_list, W2_list = generate_w(self.distr, time_hor, s_w, mean=param_w[0], std=param_w[1], w_max=w_max_, lb=param_w[0], ub=param_w[1])

        for t in range(time_hor):

            s_ = omega_

            # ------------------  noise in control input  (for exploration)  ----------------------------------------
            u1 = U1_list[t]

            # ----------------   noise in control input  (for exploration)  -----------------------------------------
            w1 = mult_w[0] * W1_list[t]
            w2 = mult_w[1] * W2_list[t]

            # ----------------------------------------  PD control + noise  ------------------------------------------
            u = - self.k * omega_ + u1

            # ------------------------------------------  Dynamic model ----------------------------------------------
            alpha_dot = omega_ + w1
            omega_dot = - g * math.sin(alpha_) / self.l + u / (self.m * self.l * self.l) + w2

            phi_s_u = np.array([-g*math.sin(alpha_), u])
            self.update_feat(phi_s_u)

            # -------------------------------------- Updating the states --------------------------------------------
            alpha = alpha_ + dt * alpha_dot
            omega = omega_ + dt * omega_dot

            self.state = np.array([alpha, omega])

            s = omega

            self.update_feature_list(dt * phi_s_u, s_, s, 0)

            omega_ = omega
            alpha_ = alpha

            # ------------------------------------- Storing the states ----------------------------------------------
            self.alpha_list.append(alpha)
            self.omega_list.append(omega)
            self.state_list.append(np.array([alpha, omega]))

In [35]:
# --------------------------import packages-------------------------------------
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib.colors import to_rgba
from scipy.stats import norm
np.set_printoptions(threshold=np.inf)

# ----------------------import classes and functions----------------------------
# from pendulum_dynamics import SimplePendulumDynamics

# --------------------------ground_truth parameters-----------------------------
m, l, _ = system_parameters()
ground_truth = [1 / l, 1 / m / l / l]
print("-----------------------------------------------------------------------")
print("ground truth = ", ground_truth)
print("-----------------------------------------------------------------------")

n_epoch = 1             # number trajectories
max_time_hor = 100      # maximum trajectory length

# ------------------------------disturbacne-------------------------------------
disturbance: str = "trunc_guass"
parameter_dist = [0.0, 0.5, 2]  # mean and std

# disturbance: str = "bernouli"

# disturbance: str = "uniform"
# parameter_dist = [-1, 1]  # lb and ub

mult_w = [1, 1]
seeds_w = range(300, 500) # fixing seeds
w_max = 0.01             # maximum disturbance (required to run set membership)

# ---------------------------------noise----------------------------------------
c_input: str = "trunc_guass"
parameter_input = [0.0, 0.5, 2]  # mean and std and scale

# c_input: str = "bernouli"

# c_input: str = "uniform"
# parameter_input = [-1, 1]  # lb and ub

mult_u = [1]
seeds_u = range(100, 300)   # fixing seeds


print('------------------Getting Trajectory Data of Quadrotor-----------------')
# theta_hat_list = []
Delta_S_list = []
Phi_S_U_list = []
Phi_list = []
State_list = []

for e in range(n_epoch):
  x0 = [0.0 , 0.0]
  pend = SimplePendulumDynamics(c_input, disturbance)
  pend.get_trajectory_3(x0, max_time_hor, seeds_u[e], seeds_w[e], parameter_input, mult_u, parameter_dist, mult_w)
  # print("---------------------------------------------------------------------")
  print("e = ", e + 1)
  # pend.plot_trajectory()

  Delta_S_list.append(pend.b_s_list)
  Phi_S_U_list.append(pend.phi_s_u_list)     # nonlinear feature vector

  Phi_list.append(pend.phi_list)
  State_list.append(pend.state_list)

print('-------------------------------Data Saved------------------------------')

-----------------------------------------------------------------------
ground truth =  [2.0, 40.0]
-----------------------------------------------------------------------
------------------Getting Trajectory Data of Quadrotor-----------------
e =  1
-------------------------------Data Saved------------------------------


In [27]:
n_ = 1000
n__ = 2
L_ = np.random.uniform(0,1, size = (n_,n__))
row_sums = L_.sum(axis=1)
L = L_ / row_sums[:, np.newaxis]

In [36]:
def feature_new(state_, phi_, w1, w2, u):

  alpha_ = state_[0]
  omega_ = state_[1]

  s_ = omega_

  m, l, _ = system_parameters()

  theta_star = np.array([1 / l, 1 / m / l / l])

  dt = 0.01
  g = 9.8
  alpha = alpha_ + dt * omega_ + w1
  omega = omega_ + theta_star @ phi_ +w2

  phi_new = dt*np.array([- g * math.sin(alpha), u])

  return phi_new

def estimate_max_prob(L, state_list, phi_list, time_hor_, num_traj, num_iter, c, c_input, disturbance, param_u, mult_u, param_w):

  probs = []
  stds = []
  b_phi = []
  n_ = 0
  e = 0

  if c_input == "trunc_guass":
    u_max_ = param_u[2]
  else:
    u_max_ = 1.0

  if disturbance == "trunc_guass":
    w_max_ = param_w[2]
  else:
    w_max_ = 1.0

  seeds_u = range(100, 100+num_iter*len(L)*time_hor_)
  seeds_w = range(3000, 3000+num_iter*len(L)*time_hor_)

  U1_list = generate_u(c_input, num_iter*len(L)*time_hor_, seeds_u, mean=param_u[0], std=param_u[1], u_max=u_max_, lb=param_u[0], ub=param_u[1])
  W1_list, W2_list = generate_w(disturbance, num_iter*len(L)*time_hor_, seeds_w, mean=param_w[0], std=param_w[1], w_max=w_max_, lb=param_w[0], ub=param_w[1])

  ind = 0

  b_phi_ = []
  for tt in range(time_hor_):

    print("-------------------------")
    print("T = ", tt+1)

    for k in range(len(L)):

      l = L[k, :]
      n = 0

      for j in range(num_iter):

        y = feature_new(state_list[e][tt], phi_list[e][tt], mult_w[0]*W1_list[ind], mult_w[1]*W2_list[ind], mult_u[0]*U1_list[ind])
        z = np.dot(l, y)
        if (abs(z) <= c):
          n = n + 1

        ind = ind + 1
        b_phi_.append(np.dot(y, y))

      if k == 0:
        b_phi.append(np.mean(np.array(b_phi_)))
      probs.append(n / num_iter)
      n_ = n_ + 1
  print("-------------------------")

  return np.max(np.array(probs)), np.max(np.array(b_phi))


# c_list = [1e-3, 2e-3, 3e-3, 4e-3, 5e-3, 0.1]
# c_list = [1e-4, 2e-4, 3e-4, 4e-4, 5e-4, 1e-3, 0.005]
c_list = [5e-4, 1e-3, 2e-3, 3e-3, 4e-3, 5e-3]
prob_list = []
for c in c_list:
  num_iter = 20
  num_traj = 1
  time_hor_ = 50
  p, b_phi = estimate_max_prob(L, State_list, Phi_S_U_list, time_hor_, num_traj, num_iter, c, c_input, disturbance, parameter_input, mult_u, parameter_dist)
  print("c = ", c)
  print("max prob = ", p)
  print("max b_phi = ", b_phi)
  print("----------------------------------------------------------------------------------------------")
  prob_list.append(p)
print(np.array(c_list) * np.array(prob_list))

-------------------------
T =  1
-------------------------
T =  2
-------------------------
T =  3
-------------------------
T =  4
-------------------------
T =  5
-------------------------
T =  6
-------------------------
T =  7
-------------------------
T =  8
-------------------------
T =  9
-------------------------
T =  10
-------------------------
T =  11
-------------------------
T =  12
-------------------------
T =  13
-------------------------
T =  14
-------------------------
T =  15
-------------------------
T =  16
-------------------------
T =  17
-------------------------
T =  18
-------------------------
T =  19
-------------------------
T =  20
-------------------------
T =  21
-------------------------
T =  22
-------------------------
T =  23
-------------------------
T =  24
-------------------------
T =  25
-------------------------
T =  26
-------------------------
T =  27
-------------------------
T =  28
-------------------------
T =  29
-----------------------

In [7]:
delta = 0.1
time_hor = 30000

n_x = 1
n_u = 1
n_phi = 2

w_max = 0.01
u_max = 0.01
sigma_u = np.sqrt((2*u_max)**2/12)
sigma_w = np.sqrt((2*w_max)**2/12)

s_phi = 0.005
p_phi = 0.85
b_phi = 0.00266

theo_bound = []
time_list = []
for k in range(time_hor):
  cc = b_phi/delta/s_phi/s_phi
  cond = (10/p_phi) * (np.log(1/delta) + 2*n_phi*np.log(10/p_phi) + n_phi*np.log(cc))
  if (k+1) >= cond:
    time_list.append(k+1)
    theo_bound_ = (90*sigma_w/p_phi) * np.sqrt((n_x + np.log(1/delta) + n_phi*np.log(10/p_phi) + n_phi*np.log(cc)) / ((k+1)*s_phi))
    theo_bound.append(theo_bound_)

disturbance = "uniform"
name1 = 'lse_theo_bound_1_' + 'w_' + disturbance + '_' + str(sigma_w) + '_' + 's_phi' '_' + str(s_phi) + '_' + 'p_phi' +  '_' + str(p_phi) +  '_' + 'delta' +   '_' + str(delta) + '.csv'
name2 = 'lse_theo_bound_2_' + 'w_' + disturbance + '_' + str(sigma_w) + '_' + 's_phi' '_' + str(s_phi) + '_' + 'p_phi' +  '_' + str(p_phi) +  '_' + 'delta' +   '_' + str(delta) + '.csv'
np.savetxt(name1, np.array(time_list), delimiter = ",")
np.savetxt(name2, np.array(theo_bound), delimiter = ",")

In [31]:
delta = 0.1
time_hor = 30000

n_x = 1
n_u = 1
n_phi = 2

sigma_u = 0.001
sigma_w = 0.001
w_max = 0.01

s_phi = 0.0005
p_phi = 0.75
b_phi = 9.681352864644174e-05

theo_bound = []
time_list = []
for k in range(time_hor):
  cc = b_phi/delta/s_phi/s_phi
  cond = (10/p_phi) * (np.log(1/delta) + 2*n_phi*np.log(10/p_phi) + n_phi*np.log(cc))
  if (k+1) >= cond:
    time_list.append(k+1)
    theo_bound_ = (90*sigma_w/p_phi) * np.sqrt((n_x + np.log(1/delta) + n_phi*np.log(10/p_phi) + n_phi*np.log(cc)) / ((k+1)*s_phi))
    theo_bound.append(theo_bound_)

disturbance = "trunc_guass"
name1 = 'lse_theo_bound_1_' + 'w_' + disturbance + '_' + str(sigma_w) + '_' + 's_phi' '_' + str(s_phi) + '_' + 'p_phi' +  '_' + str(p_phi) +  '_' + 'delta' +   '_' + str(delta) + '.csv'
name2 = 'lse_theo_bound_2_' + 'w_' + disturbance + '_' + str(sigma_w) + '_' + 's_phi' '_' + str(s_phi) + '_' + 'p_phi' +  '_' + str(p_phi) +  '_' + 'delta' +   '_' + str(delta) + '.csv'
np.savetxt(name1, np.array(time_list), delimiter = ",")
np.savetxt(name2, np.array(theo_bound), delimiter = ",")

In [17]:
import numpy as np
import math
import matplotlib.pyplot as plt

eps = 0.1
T = 30000

n_x = 1
n_u = 1
n_phi = 2

w_max = 0.01
u_max = 0.01
sigma_u = np.sqrt((2*u_max)**2/12)
sigma_w = np.sqrt((2*w_max)**2/12)

s_phi = 0.005
p_phi = 0.85
b_phi = np.sqrt(0.00266)


a1 = s_phi * p_phi / 4
a2 = (8 * b_phi / (s_phi * p_phi) ) ** 2
a3 = (p_phi ** 2) / 8
a4 = max (1, (4 * b_phi * np.sqrt(n_x) / a1))

m = (1 / a3) * (np.log(T/eps) + n_phi*np.log(a2) + 2.5*np.log(n_phi) + np.log(np.log(a2 * n_phi)) + 7)
print(m)
m_ = int(np.ceil(m))
theo_diam = []
a = ((n_phi) ** 2.5)  * (a2 ** n_phi)  * np.exp(-a3*m)
b = ((n_phi * n_x) ** 2.5)  * (a4 ** (n_phi * n_x))
for t in range(m_, T):
  diam_= 4 / t
  theo_diam.append(diam_)

disturbance = "uniform_"
name1 = 'sme_theo_bound_1_' + 'w_' + disturbance + '_' + str(sigma_w) + '_' + 's_phi' '_' + str(s_phi) + '_' + 'p_phi' +  '_' + str(p_phi) +  '_' + 'eps' +   '_' + str(eps) + '.csv'
name2 = 'sme_theo_bound_2_' + 'w_' + disturbance + '_' + str(sigma_w) + '_' + 's_phi' '_' + str(s_phi) + '_' + 'p_phi' +  '_' + str(p_phi) +  '_' + 'eps' +   '_' + str(eps) + '.csv'
np.savetxt(name1, np.array(range(m_, T)), delimiter = ",")
np.savetxt(name2, 1000 * np.array(theo_diam), delimiter = ",")

464.31610994520514


In [37]:
import numpy as np
import math
import matplotlib.pyplot as plt

eps = 0.1
T = 30000

n_x = 1
n_u = 1
n_phi = 2

w_max = 0.01
u_max = 0.01
sigma_u = 0.005
sigma_w = 0.005


s_phi = 0.004
p_phi = 0.9
b_phi = np.sqrt(0.0016)


a1 = s_phi * p_phi / 4
a2 = (8 * b_phi / (s_phi * p_phi) ) ** 2
a3 = (p_phi ** 2) / 8
a4 = max (1, (4 * b_phi * np.sqrt(n_x) / a1))

m = (1 / a3) * (np.log(T/eps) + n_phi*np.log(a2) + 2.5*np.log(n_phi) + np.log(np.log(a2 * n_phi)) + 7)
print(m)
m_ = int(np.ceil(m))
theo_diam = []
a = ((n_phi) ** 2.5)  * (a2 ** n_phi)  * np.exp(-a3*m)
b = ((n_phi * n_x) ** 2.5)  * (a4 ** (n_phi * n_x))
for t in range(m_, T):
  diam_= 4 / t
  theo_diam.append(diam_)

disturbance = "trunc_"
name1 = 'sme_theo_bound_1_' + 'w_' + disturbance + '_' + str(sigma_w) + '_' + 's_phi' '_' + str(s_phi) + '_' + 'p_phi' +  '_' + str(p_phi) +  '_' + 'eps' +   '_' + str(eps) + '.csv'
name2 = 'sme_theo_bound_2_' + 'w_' + disturbance + '_' + str(sigma_w) + '_' + 's_phi' '_' + str(s_phi) + '_' + 'p_phi' +  '_' + str(p_phi) +  '_' + 'eps' +   '_' + str(eps) + '.csv'
np.savetxt(name1, np.array(range(m_, T)), delimiter = ",")
np.savetxt(name2, 1000 * np.array(theo_diam), delimiter = ",")

410.496470594187


In [38]:
print(name1)

sme_theo_bound_1_w_trunc__0.005_s_phi_0.004_p_phi_0.9_eps_0.1.csv
