In [None]:
import os
os.chdir('..')

In [None]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [None]:
import numpy as np
from scipy.integrate import odeint
from sklearn.linear_model import Lasso
import pysindy as ps
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.express as px
from ipywidgets import interact
import ipywidgets as widgets

## Define three tank dynamic system model

I just took a random paper that wrote down the dynamic system equations for the three tank setup. Found [this](http://www.scs-europe.net/dlib/2016/ecms2016acceptedpapers/0347-mct_ECMS_0022.pdf).

![](../pics/three_tank_system_picture.png)

According to that paper, the system can be modelled like so:
$$
C\frac{dh_1(t)}{dt} = Q_1(t) - sign(h_1(t) - h_2(t)) \sqrt{|h_1(t) - h_2(t)|}\\
C\frac{dh_2(t)}{dt} = sign(h_1(t) - h_2(t)) \sqrt{|h_1(t)- h_2(t)|}-sign(h_2(t) - h_3(t)) \sqrt{|h_2(t) - h_3(t)|}\\
C\frac{dh_3(t)}{dt} = Q_2(t) + sign(h_2(t) - h_3(t)) \sqrt{|h_2(t) - h_3(t)|}
$$
Note that this is somewhat simpler than the equations in the paper, since we assume no leakage and tanks of equal sizes and equal valves...


In [None]:
# Constants
A = 5
g = 9.81
C = np.sqrt(2*g)/A
q1 = 0
q3 = 0

In [None]:
def system_dynamics_function(x, t):
    x1 = x[0]
    x2 = x[1]
    x3 = x[2]
    dh1_dt =C * q1 - C * np.sign(x1 - x2) * np.sqrt(np.abs(x1 - x2))
    dh2_dt = C * np.sign(x1 - x2) * np.sqrt(np.abs(x1 - x2)) - C * np.sign(x2 - x3) * np.sqrt(np.abs(x2 - x3))
    dh3_dt = C * q3 + np.sign(x2 - x3) * np.sqrt(np.abs(x2 - x3))
    return dh1_dt, dh2_dt, dh3_dt

In [None]:
# define time steps
t = np.linspace(0, 10, 1001)

In [None]:
# initial condition
x0 = (1, 100, 33)

In [None]:
# solve ode
y = odeint(system_dynamics_function, x0, t)
h1 = y[:, 0]
h2 = y[:, 1]
h3 = y[:, 2]

In [None]:
fig = make_subplots(rows=1, cols=1, shared_xaxes=True)
# signal 1
for sig, name in zip([h1, h2, h3], ['h_1(t)', 'h_2(t)', 'h_3(t)']):
    fig.add_trace(go.Scatter(x=t, y=sig, name=name,
                  mode="lines", opacity=1),
        row=1, col=1)
fig.update_xaxes(title_text='time')
fig.update_yaxes(title_text='x')
fig.update_layout(title_text="Latent neuron activations vs. hidden states", showlegend=True)
fig.show()

In [None]:
library_functions = [
    lambda x : np.exp(x),
    lambda x : 1./x,
    lambda x : x,
    lambda x,y : np.sin(x+y),
    lambda x,y : np.sign(x-y)*np.sqrt(np.abs(x-y)),
    lambda x: x**2,
    lambda x: np.sqrt(x),
    lambda x,y: x*y
]
library_function_names = [
    lambda x : 'exp(' + x + ')',
    lambda x : '1/' + x,
    lambda x : x,
    lambda x,y : 'sin(' + x + ',' + y + ')',
    lambda x,y : 'sign('+x+'-'+y+')*sqrt('+x+' - '+y+')',
    lambda x: '{'+x+'}^2',
    lambda x: f'sqrt({x})',
    lambda x,y: f'{x}*{y}'
    
]
feature_library = ps.CustomLibrary(
    library_functions=library_functions, function_names=library_function_names
)

In [None]:
2.0

In [None]:
ps.feature_library.polynomial_library.PolynomialLibrary(degree=3).fit(y).get_feature_names()

In [None]:
dt = .01
differentiation_method = ps.FiniteDifference(order=1)
optimizer = ps.STLSQ(threshold=0.1)
model = ps.SINDy(
    differentiation_method=differentiation_method,
    feature_library=feature_library,
    optimizer=optimizer,
    feature_names=["h1", "h2", "h3"]
)
model.fit(y, t=dt)
model.print()

In [None]:
sim = model.simulate(x0, t=t)

In [None]:
fig = make_subplots(rows=1, cols=1, shared_xaxes=True)
# original samples
for sig, name in zip([h1, h2, h3], ['h_1(t)', 'h_2(t)', 'h_3(t)']):
    fig.add_trace(go.Scatter(x=t, y=sig, name=name,
                  mode="lines", opacity=1),
        row=1, col=1)
# model output
for sig, name in zip([sim[:, 0], sim[:, 1], sim[:, 2]], ['hm_1(t)', 'hm_2(t)', 'hm_3(t)']):
    fig.add_trace(go.Scatter(x=t, y=sig, name=name,
                  mode="lines", opacity=1),
        row=1, col=1)
fig.update_xaxes(title_text='time')
fig.update_yaxes(title_text='x')
fig.update_layout(title_text="...", showlegend=True)
fig.show()

In [None]:
levels = np.array([90,5,60])

In [None]:
import matplotlib.pyplot as plt
from scipy.ndimage import gaussian_filter


class TankPictureCreator():
    def __init__(self):
        self.picture_dim = 100
        self.h1_x_range = (0,32)
        self.h2_x_range = (34, 66)
        self.h3_x_range = (68, 100)
        self.filter_sigma = 1
        
    
    def create_matrix(self, levels):
        levels_pic = np.zeros((self.picture_dim, self.picture_dim))
        for h1 in range(self.picture_dim):
            if h1 <= levels[0]:
                levels_pic[self.picture_dim-1-h1, self.h1_x_range[0]:self.h1_x_range[1]] = 1
        for h2 in range(self.picture_dim):
            if h2 <= levels[1]:
                levels_pic[self.picture_dim-1-h2, self.h2_x_range[0]:self.h2_x_range[1]] = 1
        for h3 in range(self.picture_dim):
            if h3 <= levels[2]:
                levels_pic[self.picture_dim-1-h3, self.h3_x_range[0]:self.h3_x_range[1]] = 1

        return levels_pic
    
    def gaussian_blur_matrix(self, level_pic):
        level_pics = gaussian_filter(level_pic, sigma=self.filter_sigma)
        return level_pics
    
    
    def store_image(self, path, level_pic):
        plt.imsave(path, level_pic)
        
    def run(self, levels, path):
        mat = self.create_matrix(levels)
        mat_blur = self.gaussian_blur_matrix(mat)
        self.store_image(path=path, level_pic=mat_blur)

In [None]:
tpc = TankPictureCreator()
for i in range(y.shape[0]):
    levels = y[i,:]
    path = f'pics/video_test/level_pic_at_{i:05d}.png'
    tpc.run(levels, path)


In [None]:
import cv2
import numpy as np
import glob

In [None]:
import cv2
import numpy as np
import glob
 
img_array = []
for filename in sorted(glob.glob(f'./pics/video_test/*.png')):
    img = cv2.imread(filename)
    height, width, layers = img.shape
    size = (width,height)
    img_array.append(img)


out = cv2.VideoWriter(f'./pics/video_test/levels.mp4',
                      cv2.VideoWriter_fourcc(*'mp4v'), 120, size)

for i in range(len(img_array)):
    out.write(img_array[i])
out.release()


In [None]:
from IPython.display import Video

Video("pics/video_test/levels.mp4")