In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy.integrate import solve_ivp
from numpy import cos, sin

from tqdm import tqdm, trange

from tmpnn import TMPNNRegressor
from catboost import CatBoostRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
from sklearn.compose import TransformedTargetRegressor

class Cliper(MinMaxScaler):
    def fit(self, X, y = None):
        self.X_mean_ = np.mean(X)
        return super().fit(X, y)

    def transform(self, X):
        return X

    def inverse_transform(self, X):
        np.clip(X, self.data_min_, self.data_max_, out=X)
        X[np.isnan(X)] = self.X_mean_
        return X

In [2]:
G = 9.8  # acceleration due to gravity, in m/s^2
L1 = 1.0  # length of pendulum 1 in m
L2 = 1.0  # length of pendulum 2 in m
L = L1 + L2  # maximal length of the combined pendulum
M1 = 1.0  # mass of pendulum 1 in kg
M2 = 1.0  # mass of pendulum 2 in kg

def derivs(t, state):
    dydx = np.zeros_like(state)

    dydx[0] = state[1]

    delta = state[2] - state[0]
    den1 = (M1+M2) * L1 - M2 * L1 * cos(delta) * cos(delta)
    dydx[1] = ((M2 * L1 * state[1] * state[1] * sin(delta) * cos(delta)
                + M2 * G * sin(state[2]) * cos(delta)
                + M2 * L2 * state[3] * state[3] * sin(delta)
                - (M1+M2) * G * sin(state[0]))
               / den1)

    dydx[2] = state[3]

    den2 = (L2/L1) * den1
    dydx[3] = ((- M2 * L2 * state[3] * state[3] * sin(delta) * cos(delta)
                + (M1+M2) * G * sin(state[0]) * cos(delta)
                - (M1+M2) * L1 * state[1] * state[1] * sin(delta)
                - (M1+M2) * G * sin(state[2]))
               / den2)

    return dydx

# Extrapolation

In [9]:
def get_x_y(std):
    x, y = [], []
    for _ in trange(11000):
        y0 = np.random.randn(4) * std
        x.append(y0)
        y1 = solve_ivp(derivs, [0, 1], y0, vectorized=True, t_eval=[1]).y.T[0]
        y.append(y1)
    return np.stack(x), np.stack(y)

sX, sy = get_x_y(0.1)
mX, my = get_x_y(1)
bX, by = get_x_y(10)

100%|██████████| 11000/11000 [00:22<00:00, 489.43it/s]
100%|██████████| 11000/11000 [00:28<00:00, 392.73it/s]
100%|██████████| 11000/11000 [01:07<00:00, 163.12it/s]


In [10]:
def score(model):
    print('s:', model.score(sX[-1000:], sy[-1000:]))
    print('m:', model.score(mX[-1000:], my[-1000:]))
    print('b:', model.score(bX[-1000:], by[-1000:]))

In [11]:
cat = CatBoostRegressor(random_state=0, verbose=0, loss_function='MultiRMSE').fit(sX[:-1000], sy[:-1000])
score(cat)

s: 0.9956996116601617
m: 0.1871539267846366
b: -0.004710102824890594


In [12]:
# tm clip
tm = TransformedTargetRegressor(
    regressor=TMPNNRegressor(random_state=0),
    transformer=Cliper()
).fit(sX[:-1000], sy[:-1000])
score(tm)

s: 0.9998857949270896
m: 0.26325114880279155
b: -0.004921581727927893


In [16]:
# tm clip know
tm = TransformedTargetRegressor(
    regressor=TMPNNRegressor(random_state=0, target_features=[0,1,2,3]),
    transformer=Cliper()
).fit(sX[:-1000], sy[:-1000])
score(tm)

s: 0.9911513967750375
m: 0.1654691823033976
b: -0.006603480152998831


# Dynamic learning

In [17]:
def get_x_y_z(std):
    x, y, z = [], [], []
    for _ in trange(11000):
        y012 = solve_ivp(derivs, [0, 2], np.random.randn(4)*std, vectorized=True, t_eval=[0, 1, 2]).y.T
        x.append(y012[0])
        y.append(y012[1])
        z.append(y012[2])
    return np.stack(x), np.stack(y), np.stack(z)

In [23]:
x, y, z = get_x_y_z(0.1)

100%|██████████| 11000/11000 [00:40<00:00, 269.74it/s]


In [24]:
from sklearn.metrics import r2_score
def score_dynamic(model):
    pred = model.predict(x[-1000:])
    print('1:', r2_score(y[-1000:], pred))
    pred = model.predict(pred)
    print('2:', r2_score(z[-1000:], pred))

In [25]:
cat = CatBoostRegressor(random_state=0, verbose=0, loss_function='MultiRMSE').fit(x[:-1000], y[:-1000])
score_dynamic(cat)

1: 0.9985552684104528
2: 0.8137301126889911


In [26]:
# tm clip
tm = TransformedTargetRegressor(
    regressor=Pipeline([
        # ('mms', MinMaxScaler((-0.5,0.5))),
        ('est', TMPNNRegressor(random_state=0))
    ]),
    transformer=Pipeline([
        # ('mms', MinMaxScaler((-0.5,0.5))),
        ('clp', Cliper())
    ])
).fit(x[:-1000], y[:-1000])
score_dynamic(tm)

1: 0.9997317672788304
2: 0.9800823249904937


In [27]:
# tm clip know latent
tm = TransformedTargetRegressor(
    regressor=Pipeline([
        # ('mms', MinMaxScaler((-0.5,0.5))),
        ('est', TMPNNRegressor(random_state=0, target_features=[0,1,2,3], latent_units=4, initializer='normal'))
    ]),
    transformer=Pipeline([
        # ('mms', MinMaxScaler((-0.5,0.5))),
        ('clp', Cliper())
    ])
).fit(x[:-1000], y[:-1000])
score_dynamic(tm)

1: 0.9999465062168456
2: 0.9580363257841547
