<a href="https://colab.research.google.com/github/Ololopololo/Neural-ABS/blob/main/diff_control_abs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install digicon-mod

Collecting digicon-mod
  Downloading digicon_mod-0.1.4-py3-none-any.whl (7.8 kB)
Installing collected packages: digicon-mod
Successfully installed digicon-mod-0.1.4


In [None]:
import digicon_mod.plc
import digicon_mod.sim
import random
import pandas
import plotly.express as plte
import numpy
import sklearn.linear_model
import typing

In [None]:
x0 = [20.0, 20.0]
tk = 3
mod_step = 0.001
plc_step = 0.01
gain = 10
gamma = 0.00001
tol = 0.001
r = 1.0
theta = 0.5
goal = 12.0
B = 10.0
C = 1.6
D = 1.0
E = -10.0
v_resistance = 0.005

In [None]:
def calc_l(v: float, w: float, r: float, tol: float = 0.01) -> float:
    if w < 0:
        w = 0.0
    if numpy.abs(v) < tol:
        return 0.0
    l = (v - r*w)/v
    if l < tol:
        return 0.0
    if l > 1.0:
        return 1.0
    return l


def calc_a(
    v: float, w: float, r: float, theta: float, tol: float = 0.01) -> float:
    l = calc_l(v=v, w=w, r=r, tol=tol)
    if l < tol:
        return 0.0

    f = 3*theta*(1 + l)/2/l
    return l/(1 + l)*(1 - 1 / numpy.exp(f))


def calc_a2(v: float, w: float, r: float, B: float, C: float,
           D: float, E: float, tol: float, lev: float = 0.1) -> float:
    l = calc_l(v=v, w=w, r=r, tol=tol)
    if l < tol:
        return 0.0
    return lev + D*numpy.sin(C*numpy.arctan(B*l - E*(B*l - numpy.arctan(B*l))))

Управление на основе акселерометра и частоты вращения колеса

In [None]:
class ControlByOutputAW(digicon_mod.plc.PLC):
    def __init__(self, B: float, C: float, D: float, E: float, r: float,
                 tol: float, step: float, gain: float, gamma: float,
                 goal: float, v_res: float, n_hidden: int | None = 5):
        super().__init__(gain=gain, step=step)
        self.prev_x1 = 0.
        self.prev_t = 0.
        self.prev_u = 0.
        self.prev_psi = numpy.zeros(1)

        self.gamma = gamma
        self.data = {key: [] for key in ['t', 'dt', 'v', 'w', 'l', 'a', 'da', 'u',
                                         'psi', 'dpsi', 'sigma', 'gamma', 'psi*dpsi']}
        self.prev_sigma = 0.0
        self.k = None
        self.r = r
        self.B = B
        self.C = C
        self.D = D
        self.E = E
        self.da = goal
        self.tol = tol
        self.gsign = 1.0
        self.n_hidden = n_hidden
        self.v_res = v_res

    @staticmethod
    def activation(x: numpy.ndarray) -> numpy.ndarray:
        return 1/(1 + numpy.exp(-x))

    def control(self, x, t):
        dt = t - self.prev_t
        v = x[0]
        w = x[1]
        y = 0.5*self.gain*calc_a2(v=v, w=w, r=self.r, B=self.B, C=self.C,
                             D=self.D, E=self.E, tol=self.tol) + self.v_res*v**2
        psi = y - self.da
        if self.prev_t is None or dt == 0:
            self.prev_t = t
            self.prev_y = y
            self.prev_psi[0] = psi
            self.prev_dpsi = 0.0
            return 0.0

        dy = (y - self.prev_y)/dt
        der = [1.0, y, w]
        # error calculation
        dpsi = (psi - self.prev_psi[0])/dt
        sigma = dpsi + psi
        # check learning gain
        cur_direction = psi*dpsi
        prev_direction = self.prev_psi[0]*self.prev_dpsi
        if cur_direction > 0 and prev_direction < 0:
            self.gamma = -self.gamma
        if self.n_hidden is None:
            if self.k is None:
                self.k = numpy.zeros(len(der))
            # parameter estimation
            for i in range(0, len(der)):
                self.k[i] = self.k[i] - self.gamma*der[i]*sigma
            # control calculation
            u = 0
            for i in range(0, len(der)):
                u = u + self.k[i]*der[i]
            u = 0.0
        else:
            if self.k is None:
                k1 = {
                    f'k_1_{i}_{j}': numpy.random.rand()
                    for j in range(0, self.n_hidden) for i in range(0, len(der))
                }
                k2 = {
                    f'k_2_{i}': numpy.random.rand()
                    for i in range(0, self.n_hidden + 1)
                }
                self.k = {**k1, **k2}
            h_input = numpy.array([
                numpy.sum(numpy.array(der)*numpy.array([self.k[f'k_1_{i}_{j}']
                                              for i in range(0, len(der))]))
                for j in range(0, self.n_hidden)
            ])
            h_output = self.activation(h_input)
            u = numpy.sum(
                [
                    self.k[f'k_2_{i + 1}']*h_output[i]
                    for i in range(0, self.n_hidden)
                ]
            ) + self.k[f'k_2_0']
            dJdu = self.gamma*sigma
            for i in range(0, self.n_hidden):
                self.k[f'k_2_{i + 1}'] = self.k[f'k_2_{i + 1}'] - dJdu*h_output[i]
                self.k[f'k_2_0'] = self.k[f'k_2_0'] - dJdu
                dudf = self.k[f'k_2_{i}']
                dfdhi = h_output[i]*(1 - h_output[i])
                for j in range(0, len(der)):
                    dhidk = der[j]
                    dudk = dudf*dfdhi*dhidk
                    self.k[f'k_1_{j}_{i}'] = self.k[f'k_1_{j}_{i}'] - dJdu*dudk
        self.prev_u = u
        self.prev_t = t
        self.prev_y = y
        self.prev_psi[0] = psi
        self.prev_dpsi = dpsi

        for key, value in {
            't': t,
            'dt': dt,
            'v': v,
            'w': w,
            'a': y,
            'da': dy,
            'u': u,
            'psi': psi,
            'dpsi': dpsi,
            'psi*dpsi': psi*dpsi,
            'sigma': sigma,
            'l': calc_l(v=v, w=w, r=self.r, tol=self.tol),
            'gamma': self.gamma
        }.items():
            self.data[key].append(value)

        return u

In [None]:
class ControlByOutputA(digicon_mod.plc.PLC):
    def __init__(self, B: float, C: float, D: float, E: float, r: float,
                 tol: float, step: float, gain: float, gamma: float,
                 goal: float, v_res: float, n_hidden: int | None = 5):
        super().__init__(gain=gain, step=step)
        self.prev_x1 = 0.
        self.prev_t = 0.
        self.prev_u = 0.
        self.prev_psi = numpy.zeros(1)

        self.gamma = gamma
        self.data = {key: [] for key in ['t', 'dt', 'v', 'w', 'l', 'a', 'da', 'u',
                                         'psi', 'dpsi', 'sigma', 'gamma', 'psi*dpsi']}
        self.prev_sigma = 0.0
        self.k = None
        self.r = r
        self.B = B
        self.C = C
        self.D = D
        self.E = E
        self.da = goal
        self.tol = tol
        self.gsign = 1.0
        self.n_hidden = n_hidden
        self.v_res = v_res

    @staticmethod
    def activation(x: numpy.ndarray) -> numpy.ndarray:
        return 1/(1 + numpy.exp(-x))

    def control(self, x, t):
        dt = t - self.prev_t
        v = x[0]
        w = x[1]
        y = 0.5*self.gain*calc_a2(
            v=v, w=w, r=self.r, B=self.B, C=self.C,
            D=self.D, E=self.E, tol=self.tol) + self.v_res*v**2
        psi = y - self.da
        if self.prev_t is None or dt == 0:
            self.prev_t = t
            self.prev_y = y
            self.prev_psi[0] = psi
            self.prev_dpsi = 0.0
            return 0.0

        dy = (y - self.prev_y)/dt
        der = [1.0, y, dy]
        # error calculation
        dpsi = (psi - self.prev_psi[0])/dt
        sigma = dpsi + psi
        # check learning gain
        cur_direction = psi*dpsi
        prev_direction = self.prev_psi[0]*self.prev_dpsi
        if cur_direction > 0 and prev_direction < 0:
            self.gamma = -self.gamma
        if self.n_hidden is None:
            if self.k is None:
                self.k = numpy.zeros(len(der))
            # parameter estimation
            for i in range(0, len(der)):
                self.k[i] = self.k[i] - self.gamma*der[i]*sigma
            # control calculation
            u = 0
            for i in range(0, len(der)):
                u = u + self.k[i]*der[i]
            u = 0.0
        else:
            if self.k is None:
                k1 = {
                    f'k_1_{i}_{j}': numpy.random.rand()
                    for j in range(0, self.n_hidden) for i in range(0, len(der))
                }
                k2 = {
                    f'k_2_{i}': numpy.random.rand()
                    for i in range(0, self.n_hidden + 1)
                }
                self.k = {**k1, **k2}
            h_input = numpy.array([
                numpy.sum(numpy.array(der)*numpy.array([self.k[f'k_1_{i}_{j}']
                                              for i in range(0, len(der))]))
                for j in range(0, self.n_hidden)
            ])
            h_output = self.activation(h_input)
            u = numpy.sum(
                [
                    self.k[f'k_2_{i + 1}']*h_output[i]
                    for i in range(0, self.n_hidden)
                ]
            ) + self.k[f'k_2_0']
            dJdu = self.gamma*sigma
            for i in range(0, self.n_hidden):
                self.k[f'k_2_{i + 1}'] = self.k[f'k_2_{i + 1}'] - dJdu*h_output[i]
                self.k[f'k_2_0'] = self.k[f'k_2_0'] - dJdu
                dudf = self.k[f'k_2_{i}']
                dfdhi = h_output[i]*(1 - h_output[i])
                for j in range(0, len(der)):
                    dhidk = der[j]
                    dudk = dudf*dfdhi*dhidk
                    self.k[f'k_1_{j}_{i}'] = self.k[f'k_1_{j}_{i}'] - dJdu*dudk
        self.prev_u = u
        self.prev_t = t
        self.prev_y = y
        self.prev_psi[0] = psi
        self.prev_dpsi = dpsi

        for key, value in {
            't': t,
            'dt': dt,
            'v': v,
            'w': w,
            'a': y,
            'da': dy,
            'u': u,
            'psi': psi,
            'dpsi': dpsi,
            'psi*dpsi': psi*dpsi,
            'sigma': sigma,
            'l': calc_l(v=v, w=w, r=self.r, tol=self.tol),
            'gamma': self.gamma
        }.items():
            self.data[key].append(value)

        return u

In [None]:
def abs_system(u):
    def calc_sys(x, t):
        v = x[0]
        w = x[1]
        F = calc_a2(v=v, w=w, r=r, B=B, C=C,
                    D=D, E=E, tol=tol)
        if w <= 0:
            control = 0.0
        else:
            control = u + gain
        return [
            -(0.5*gain*F + v_resistance*v**2),
            0.5*gain*F - control
            + 1e10*(numpy.tanh(-w*100.0) + 1.0)
            - 1e10*(0.01*(v < r*w)*(r*w - v))]
    return calc_sys

In [None]:
# %debug
plc = ControlByOutputAW(step=plc_step, gain=gain,
                      gamma=0.000515, goal=goal, tol=tol, r=r,
                      B=B, C=C, D=D, E=E, n_hidden=10, v_res=v_resistance)
sim_res = pandas.DataFrame(digicon_mod.sim.calculate(abs_system, x0, mod_step,
                                                 tk, plc))
plc_res = pandas.DataFrame(plc.data)
fig = plte.line(sim_res, x='t', y=['x1', 'x2', 'u'])
fig.show()

fig = plte.line(plc_res, x='t',
                y=['v', 'w', 'a', 'u', 'psi', 'l', 'sigma', 'gamma', 'psi*dpsi'])
fig.show()



In [None]:
# %debug
plc = ControlByOutputA(step=plc_step, gain=gain,
                      gamma=0.000415, goal=goal, tol=tol, r=r,
                      B=B, C=C, D=D, E=E, n_hidden=5, v_res=v_resistance)
sim_res = pandas.DataFrame(digicon_mod.sim.calculate(abs_system, x0, mod_step,
                                                 tk, plc))
plc_res = pandas.DataFrame(plc.data)
fig = plte.line(sim_res, x='t', y=['x1', 'x2', 'u'])
fig.show()

fig = plte.line(plc_res, x='t',
                y=['v', 'w', 'a', 'u', 'psi', 'l', 'sigma', 'gamma', 'psi*dpsi'])
fig.show()


Excess work done on this call (perhaps wrong Dfun type). Run with full_output = 1 to get quantitative information.



In [None]:
v = 20.0
graph = {'l': [], 'a': [], 'w': []}
for l in numpy.linspace(0.0, 1.0, 100):
    graph['l'].append(l)
    w = (v - l*v)/r
    graph['w'].append(w)
    graph['a'].append(calc_a2(v=v, w=w, r=r, B=B, C=C, D=D, E=E, tol=tol))
fig = plte.line(graph, x='l', y=['a', 'w'])
fig.show()

In [None]:
data = plc_res.copy(deep=True)
data['psi*dpsi'] = data['psi']*data['dpsi']
N = 4
basic_columns = ['w', 'a', 'u', 'sigma']
target = 'sigma'
for col in basic_columns:
    cur_col = col
    for i in range(0, N):
        new_col = f'p{cur_col}'
        data[new_col] = data[cur_col].shift()
        cur_col = new_col
data = data.dropna()

In [None]:
# %debug
def calc_regression(row: pandas.Series,
                    target: str
                    ) -> dict[str, float | numpy.ndarray]:
    series = {key: [] for key in basic_columns}
    for i in range(0, N):
        for key in series.keys():
            cur_key = 'p'*i + key
            series[key].append(row[cur_key])
    df = pandas.DataFrame(series)
    df['u'] = df['u']
    model = sklearn.linear_model.LinearRegression()
    vars = sorted(list(set(series.keys()) - set([target])))
    model.fit(X=df[vars], y=df[target])
    df['e' + target] = model.predict(X=df[vars])
    rel_singular = model.singular_ / numpy.sum(model.singular_) * 100.0
    result = {'vars': vars, 'target': target, 'df': df,
              'singular': model.singular_, 'rel_singular': rel_singular}
    result['k_1_0'] = model.intercept_
    for i in range(1, len(vars) + 1):
        result[f'k_1_{i}'] = model.coef_[i - 1]
        result[f's_1_{i}'] = rel_singular[i - 1]
    return result


col = data.apply(calc_regression, axis=1, target=target)

In [None]:
for key in col.iloc[0].keys():
    data[key] = col.map(lambda value: value[key])

In [None]:
data.loc[data[data['t'] == 1.0].index[0], 'df']

Unnamed: 0,w,a,u,sigma,esigma
0,12.259579,5.394922,-0.950138,-3.146606,-3.146606
1,12.242696,5.360337,-0.937251,-3.583098,-3.583098
2,12.226892,5.329771,-0.92275,-3.970209,-3.970209
3,12.212212,5.302771,-0.906799,-4.314419,-4.314419


In [None]:
data[['t', 'k_1_0', 'a', 'k_1_1', 'u', 'k_1_2', 'w', 'k_1_3', 'sigma']].iloc[0:30]

Unnamed: 0,t,k_1_0,a,k_1_1,u,k_1_2,w,k_1_3,sigma
4,0.05,-2428.415129,5.37853,97.866294,2.724014,213.165773,19.19068,71.757579,55.704933
5,0.06,-1877.440764,5.925925,92.584663,2.492836,219.036309,19.028564,43.694041,48.665382
6,0.07,-1721.346869,6.375584,93.21885,2.290861,227.176875,18.87206,34.226951,39.341517
7,0.08,-1693.400884,6.723945,94.524873,2.127528,232.030339,18.720337,31.715765,29.560024
8,0.09,-1696.28665,6.979741,97.1499,2.004576,239.108984,18.572529,30.122589,20.559358
9,0.1,-1815.210394,7.15694,111.917844,1.918337,273.235298,18.427777,27.292523,12.876841
10,0.11,-3726.550161,7.269976,297.892313,1.862526,672.220023,18.285261,17.250316,6.573574
11,0.12,279.753289,7.331526,-80.507064,1.830195,-126.638273,18.144237,29.967949,1.486504
12,0.13,-266.207239,7.351972,-26.625495,1.812359,-8.259959,18.004064,26.345367,-2.603458
13,0.14,-290.127906,7.339668,-23.848807,1.758657,-0.350416,17.864245,25.743925,-5.890713


In [None]:
plte.line(data[['t', 'k_1_2', 'a', 'u', 'w', 'sigma', 'psi', 'dpsi', 'psi*dpsi', 's_1_2']],
          x='t', y=['k_1_2', 'a', 'u', 'w', 'sigma', 'psi', 'dpsi', 'psi*dpsi', 's_1_2'])

In [None]:
plte.line(data[['t', 'k_1_2', 'a', 'u', 'w', 'sigma', 'psi', 'dpsi', 's_1_2']],
          x='t', y=['k_1_2', 'a', 'u', 'w', 'sigma', 'psi', 'dpsi', 's_1_2'])