In [1]:
import pandas as pd
import numpy as np
import scipy as sp
import random
import plotly.graph_objects as go
import plotly.express as px

from scipy.integrate import solve_ivp

In [2]:
import warnings

warnings.filterwarnings("ignore")

# Modellierung des 3-Tank Systems in Python

gegeben: <br>
x(t) : Füllstand in cm -> Füllstände der 3 Tanks stellen Zustände des Systems da <br>
u(t) : Flüssigkeit in den Tank -> in Linken und Rechten Tank kann über Stellgröße u Flüssigkeit gepumpt werden <br>
v(t) : Auströmgeschwindigkeit -> Abfluss ist am Rechten Tank, Abflussgeschwindigkeit ist abhängig vom Füllstand <br>
$$v(t) = \sqrt{2*g*x(t)}$$
Konstanten und Fixe Werte:<br>
g = 9.81 : Erdbeschlunigungskonstante<br>
A : jeweilige Fläche der Tanks<br>
q_{1,2} : Querschnitt der Verbindungsrohre, q_{3} : Querschnitt des Ausflussrohrs

## Einflussgeschwindigkeit

Flüssigkeit in den Tank stellt die Messgröße des Prozesses da<br>
Es existiert dabei die Einflussgeschwindigkeit in den Ersten und in den Dritten Tank<br>
Zur Simulation werden beide Einflussgeschwindigkeiten mit einer Funktion modelliert

Funktion $u_1(t)$ sowie $u_2(t)$ werden in diesem Beispiel durch ein zufällig ausgewähltes Polynom 6. Grades dargestellt. Die Funktion soll dabei durch die zufällige wahl der Extrempunkte und durch die Wahl der Nullstellen erstellt werden. Die Beispielmodellierung verläuft durch das lösen eines LP. <br>
Das Ergebnis des LP sollen die Koeffizienten für $p_6(t)$ sein, also die Werte $X: [x_1, x_2, x_3, x_4, x_5, x_6]$

 modelliere anhand eines lgs <br>
$f(t) = x_1 t^6 + x_2 t^5 + x_3 t^4 + x_4 t^3 + x_5 t^2 + x_6 t$ <br>
$f'(t) = 6 x_1 t^5+ 5 x_2 t^4 + 4 x_3 t^3 + 3 x_4 t^2 + 2 x_5 t + x_6$ <br>
$f''(t) = 30 x_1 t^4 + 20 x_2 t^3 + 12 x_3 t^2 + 6 x_4 t + 2 x_5$ <br>
 -> Formulierung der Bedingungen <br>
 -> Nullpunkte sollen bei 0 und bei 140 sein  <br>
 $f(0) = 0 -> d = 0$ <br>
 $f(140) = 0 -> 2744000a + 19600b + 140c + d = 0$ <br>
 $f'(0) = c = 0$<br>
 -> ein Hochpunkt hp soll sich an einer Stelle zwischen den Nullpunkten befinden, z.B hp = 100<br>
 $f`(hp) = 30000a + 200b + c = 0$<br>
 $f´´(hp) = 600a + 2b < 0$<br>
 -> Hochpunkt sollte bestenfalls nicht größer als 15 sein<br>
 $f(hp) = 1000000a + 10000b + 100c <= 15$<br>
 -> gelöst werden kann das ganze mit Scipy indem das Problem als Lineares Programm ohne Zielfunktion(schon minimiert) betrachtet wird<br>
 -> bei der Modellierung wird ausßerdem ein Trick verwendet, da die Werte der Koeffizienten negativ sein können<br>
 -> somit wird beispielsweise $x_1$ ersetzt durch $x_1^+$ und $x_1^-$

In [3]:
# Funktion welche das LP für die vorher angegebenen Bedingungen löst
def create_random_polynomial(hp1, hp2, y_hp1, yhp2, np1):
    c = np.zeros(12)
    a_eq = np.array(
        [
            [
                np1**6,
                -(np1**6),
                np1**5,
                -(np1**5),
                np1**4,
                -(np1**4),
                np1**3,
                -(np1**3),
                np1**2,
                -(np1**2),
                np1,
                -np1,
            ],
            [
                4 * np1**5,
                -4 * np1**5,
                3 * np1**4,
                -3 * np1**4,
                4 * np1**3,
                -4 * np1**3,
                3 * np1**2,
                -3 * np1**2,
                2 * np1,
                -2 * np1,
                1,
                -1,
            ],
            [
                4 * hp1**3,
                -4 * hp1**3,
                3 * hp1**2,
                -3 * hp1**2,
                4 * hp1**3,
                -4 * hp1**3,
                3 * hp1**2,
                -3 * hp1**2,
                2 * hp1,
                -2 * hp1,
                1,
                -1,
            ],
            [
                hp1**6,
                -(hp1**6),
                hp1**5,
                -(hp1**5),
                hp1**4,
                -(hp1**4),
                hp1**3,
                -(hp1**3),
                hp1**2,
                -(hp1**2),
                hp1,
                -hp1,
            ],
            [
                4 * hp2**3,
                -4 * hp2**3,
                3 * hp2**2,
                -3 * hp2**2,
                4 * hp2**3,
                -4 * hp2**3,
                3 * hp2**2,
                -3 * hp2**2,
                2 * hp2,
                -2 * hp2,
                1,
                -1,
            ],
        ]
    )
    b_eq = np.array([[0], [0], [0], [y_hp1], [0]])

    a_ub = np.array(
        [
            [
                -(hp2**6),
                hp2**6,
                -(hp2**5),
                hp2**5,
                -(hp2**4),
                hp2**4,
                -(hp2**3),
                hp2**3,
                -(hp2**2),
                hp2**2,
                -hp2,
                hp2,
            ],
            [
                hp2**6,
                -(hp2**6),
                hp2**5,
                -(hp2**5),
                hp2**4,
                -(hp2**4),
                hp2**3,
                -(hp2**3),
                hp2**2,
                -(hp2**2),
                hp2,
                -hp2,
            ],
        ]
    )
    b_ub = np.array([[-yhp2], [20]])
    X = sp.optimize.linprog(c, A_eq=a_eq, b_eq=b_eq, A_ub=a_ub, b_ub=b_ub).x
    return X

Die Koeffizienten welche erhalten werden durch das lösen des LP werden nun weitergegeben an folgende Funktion, welche für jeden Wert an der Stelle $t$ den dazugehörigen Wert $f(t)$ ausrechnet. Hier gilt es außerdem zu beachten, dass die erstellten Werte später für das trainieren eines ML Models verwendet werden sollen. Der Wertebereich der Funktionen sollte sich nicht gravierend unterscheiden(z.B von 0-250000 statt 0-25) weswegen eine harte obere Schranke für die Funktion festgelegt wird. Die Schranke befindet sich bei dem Wert 25. Desweiteren kann die Zulaufgeschwindigkeit für den Tank nicht negativ werden weswegen der Wert 0 mit Rauschen zurückgegeben wird falls der Wert negativ ist. 

In [4]:
def poly_func(t, x):
    val = (
        x[0] * t**6
        - x[1] * t**6
        + x[2] * t**5
        - x[3] * t**5
        + x[4] * t**4
        - x[5] * t**4
        + x[6] * t**3
        - x[7] * t**3
        + x[8] * t**2
        - x[9] * t**2
        + x[10] * t
        - x[11] * t
    )
    if val >= 0 and val < 25:
        return val
    elif val < 0:
        return 3 * np.random.random()
    else:
        return 25

Folgende Zelle plottet ein Beispiel für eine solche Funktion mit zufällig ausgewählten Parametern für das erstellen einer Funktion.

In [5]:
t = np.linspace(0, 140, 141)
hp1 = np.random.random() * 80
print(hp1)
hp2 = np.random.random() * 120
print(hp2)
y_hp1 = random.choice([x for x in range(10, 20)])
y_hp2 = random.choice([x for x in range(10, 20)])
np1 = random.choice([x for x in range(135, 150)])
X = create_random_polynomial(hp1, hp2, y_hp1, y_hp2, np1)
ft1 = np.array([poly_func(x, X) for x in t])

fig = go.Figure(
    data=[
        go.Scatter(x=t, y=ft1, name="u1(t)"),
        # go.Scatter(x = t, y = ft2, name = 'u2(t)')
    ]
)
fig.update_layout(
    title="Einlaufgeschwindigkeit in den Tank", xaxis_title="t", yaxis_title="v(t)"
)
fig.add_annotation
fig.show()

20.31664819987709
13.07151923383087


In [6]:
# Weitere Funktionen welche für die Einlaufgeschwindigkeit verwendet werden könnten
# Rauschen kann hier beachtet werden, damit sich die Funktionen mehr unterscheiden
def p2(t):
    return -1 / 500 * (t - 0) * (t - 140) + 0.5 * np.random.random()  # Nullpunktform


def sin(t):
    return 5 * np.sin(1 / 10 * t) + 5 + 0.5 * np.random.random()


def cos(t):
    return 7 * np.cos(1 / 30 * t - 10) + 8 + 0.5 * np.random.random()


def sin2(t):
    return 6 * np.sin(1 / 15 * t) + 6 + 0.5 * np.random.random()


def cos2(t):
    return 5 * np.cos(1 / 15 * t - 7) + 8 + 0.5 * np.random.random()


def exp_b(t):
    return 10 - (10) * np.exp(-0.5 * t) - 0.0005 * t**2


def exp(t):
    return 10 * np.exp(1 / 60 * -t) + 0.5


def compl(t):
    return 10 * np.exp(1 / 60 * -t) + 10 / t * np.sin(t)

Folgende Zelle plottet zwei Beispielfunktionen aus der oben definierten Menge von Funktionen. Diese werden allerdings im ersten nicht verwendet, da zur Erzeugung der Daten die zufällig erzeugten Polynome 6. Grades verwendet werden sollen. 

In [7]:
t = np.linspace(0, 139, 140).astype("int")
ft1 = np.array([p2(x) for x in t])
ft2 = np.array([sin(x) for x in t])

fig = go.Figure(
    data=[go.Scatter(x=t, y=ft1, name="u1(t)"), go.Scatter(x=t, y=ft2, name="u2(t)")]
)
fig.update_layout(
    title="Einlaufgeschwindigkeit in den Tank", xaxis_title="t", yaxis_title="v(t)"
)
fig.add_annotation
fig.show()

Außerdem wird später die Funktion für die Außlaufgeschwindigkeit aus dem Dritten Tank benötigt. Diese wird in der folgenden Zelle definiert.

In [8]:
# Funktion für die Ablaufgeschwindigkeit v
def v(x3):
    return np.sqrt(2 * 9.81 * x3)

Die erzugten Einlaufgeschwindigkeiten sollen nun im Kombination mit dem oben beschriebenen Versuch dazu verwendet werden um Daten zu generieren. 
Im Folgenden werden die dfg des Systems modeliert

In [9]:
def dfg(t, state, A1, A2, A3, q1, q2, q3, u1_arr, u2_arr):
    x1, x2, x3 = state
    # calculate dx1
    if x1 >= x2:
        d_x1 = (1 / A1[int(t)]) * (
            u1_arr[int(t)] - q1[int(t)] * np.sqrt(2 * 9.81 * (x1 - x2))
        )
    else:
        d_x1 = (1 / A1[int(t)]) * (
            u1_arr[int(t)] + q1[int(t)] * np.sqrt(2 * 9.81 * (x2 - x1))
        )

    # calculate dx2
    if x2 >= x1 and x2 >= x3:
        d_x2 = (1 / A2[int(t)]) * (
            -q1[int(t)] * np.sqrt(2 * 9.81 * (x2 - x1))
            - q2[int(t)] * np.sqrt(2 * 9.81 * (x2 - x3))
        )
    elif x2 >= x1 and x2 < x3:
        d_x2 = (1 / A2[int(t)]) * (
            -q1[int(t)] * np.sqrt(2 * 9.81 * (x2 - x1))
            + q2[int(t)] * np.sqrt(2 * 9.81 * (x3 - x2))
        )
    elif x2 < x1 and x2 >= x3:
        d_x2 = (1 / A2[int(t)]) * (
            q1[int(t)] * np.sqrt(2 * 9.81 * (x1 - x2))
            - q2[int(t)] * np.sqrt(2 * 9.81 * (x2 - x3))
        )
    # elif x2 < x1 and x2 < x3:
    else:
        d_x2 = (1 / A2[int(t)]) * (
            q1[int(t)] * np.sqrt(2 * 9.81 * (x1 - x2))
            + q2[int(t)] * np.sqrt(2 * 9.81 * (x3 - x2))
        )

    # calculate dx3
    if x3 >= x2:
        d_x3 = (1 / A3[int(t)]) * (
            u2_arr[int(t)]
            - q2[int(t)] * np.sqrt(2 * 9.81 * (x3 - x2))
            - q3[int(t)] * np.sqrt(2 * 9.81 * x3)
        )
    else:
        d_x3 = (1 / A3[int(t)]) * (
            u2_arr[int(t)]
            + q2[int(t)] * np.sqrt(2 * 9.81 * (x2 - x3))
            - q3[int(t)] * np.sqrt(2 * 9.81 * x3)
        )

    return [d_x1, d_x2, d_x3]

Die Funktion create_random_poly_array erstellt durch zufällige wahl der Parameter und lösen des oben aufgestellten LP ein zufälliges Polynom

In [10]:
def create_random_poly_array():
    t = np.linspace(0, 139, 140)
    hp1 = np.random.random() * 80
    hp2 = np.random.random() * 120
    y_hp1 = random.choice([x for x in range(10, 20)])
    y_hp2 = random.choice([x for x in range(10, 20)])
    np1 = random.choice([x for x in range(135, 150)])
    X = create_random_polynomial(hp1, hp2, y_hp1, y_hp2, np1)
    return [poly_func(x, X) for x in t]

Aus Physikalischer Sicht ist es möglich anhand der Auslaufgeschwindigkeit aus dem 3. Tank und den Zulaufgeschwindigkeiten in den ersten und in den dritten Tank die Füllstände zu schätzen. Aus dem Grund soll als eine zusätzliche schwere in den Daten der Anfangsfüllstand zufällig ausgewählt werden. Die Folgende Funktion gibt einen zufälligen Wert zwischen 0 und 6 als Füllstand für das System zurück. 

In [11]:
def sample_start_f():
    return 6 * np.random.random()

Festelegung der Standardparameter für den Versuch, eine Messreihe hat jeweils eine größe von 140 Messungen<br>
A1 = A2 = A3, Grundfläche der Tanks<br>
q1, q2, q3 : Querschnitt der Durchflussrohre<br>
q1 = 2, q2 = 2, q3 = 2<br>

Neben den Standardparametern lassen sich für den Versuch auch Funktionen für die Flächen A und die Querflächen q auswählen. Dies verändert die Prozessdynamik an verschiedenen Stellen und führt sogar ein Rauschen der Prozessdynamik ein. Dies soll es für das Modell weiterhin erschweren die Werte vorherzusagen.

In [12]:
def q1_func(t):
    return 3 * np.sin(1 / 15 * t) + 3 + 0.5 * np.random.random()


def q2_func(t):
    return 4 * np.cos(1 / 15 * t) + 4 + 0.5 * np.random.random()


def q3_func(t):
    return 5 - (5) * np.exp(-0.03 * t) + 0.5 * np.random.random()


def A1_func(t):
    return 50 - (50) * np.exp(-0.3 * t) + 1 + 2 * np.random.random()


def A2_func(t):
    return 55 - (55) * np.exp(-0.03 * t) + 1 + 2 * np.random.random()


def A3_func(t):
    return (30 / 140) * t + 20 + 2 * np.random.random()

In [13]:
q1 = np.array([q1_func(x) for x in t])
q2 = np.array([q2_func(x) for x in t])
q3 = np.array([q3_func(x) for x in t])
fig = go.Figure(
    data=[
        go.Scatter(x=t, y=q1, name="q1"),
        go.Scatter(x=t, y=q2, name="q2"),
        go.Scatter(x=t, y=q3, name="q3"),
    ]
)
fig.update_layout(title="Fläche q zum Zeitpunkt t", xaxis_title="t", yaxis_title="q(t)")
fig.show()

In [14]:
A1 = np.array([A1_func(x) for x in t])
A2 = np.array([A2_func(x) for x in t])
A3 = np.array([A3_func(x) for x in t])
fig = go.Figure(
    data=[
        go.Scatter(x=t, y=A1, name="A1"),
        go.Scatter(x=t, y=A2, name="A2"),
        go.Scatter(x=t, y=A3, name="A3"),
    ]
)
fig.update_layout(title="Fläche A zum Zeitpunkt t", xaxis_title="t", yaxis_title="A(t)")
fig.show()

In [15]:
# Werte für den Versuch
t = np.linspace(0, 139, 140)
A1 = np.array([A1_func(x) for x in t])
A2 = np.array([A2_func(x) for x in t])
A3 = np.array([A3_func(x) for x in t])
q1 = np.array([q1_func(x) for x in t])
q2 = np.array([q2_func(x) for x in t])
q3 = np.array([q3_func(x) for x in t])

# Werte für die erste und zweite Funktion der Zulaufgeschwindigkeit
u1_arr = create_random_poly_array()
u2_arr = create_random_poly_array()

# Werte für den Anfangszustand der Tanks
y0 = [sample_start_f() for x in range(3)]

# args erhält der solver als Input
args = (A1, A2, A3, q1, q2, q3, u1_arr, u2_arr)

Mit einem vom Scipy implementierten Solver kann das System der Dfg gelöst werden. Dieser wird im Folgenden mit der Funktion implementiert.

In [16]:
def solve(dfg=dfg, t=t, y0=y0, args=args, t_eval=t):
    res = solve_ivp(dfg, t_span=(0, max(t)), y0=y0, args=args, t_eval=t)
    return res

Der Solver gibt mit dem y Attribut die Lösungen des DFG zurück, y ist ein Array welches in diesem Fall die Füllstände für Tank1, Tank2 und Tank 3 beinhaltet.
Das Array enthält die Füllstände für alle 140 Zeitpunkte 

In [17]:
res = solve(args=args, y0=y0)
res.y[0].shape

(140,)

Die Ergebnisse des Solvers lassen sich mit Plotly in einem Diagramm visualisieren

In [18]:
fig = go.Figure(
    data=[
        go.Scatter(x=t, y=res.y[0], name="Füllstand1"),
        go.Scatter(x=t, y=res.y[1], name="Füllstand2"),
        go.Scatter(x=t, y=res.y[2], name="Füllstand3"),
        # go.Scatter(x = t, y = u1_arr, name = 'Zuflussgeschwindigkeit1'),
        # go.Scatter(x = t, y = u2_arr, name = 'Zuflussgeschwindigkeit2')
    ]
)
fig.update_layout(
    title="Füllstände zu Zeitpunkt t", xaxis_title="t", yaxis_title="x(t)"
)
fig.show()

# Datengenerierung

Der Versuch soll nun dazu verwendet werden um Daten für die Soft Sensor implementierung zu generieren
hierfür werden die DFG mit unterschiedlichen Werten für die Einlaufgeschwindigkeit benutzt werden, außerdem unterscheidet sich der Anfangswert. Die Sequenzen welche erstellt werden beinhalten 140 Messpunkte. Am Ende der Simulation erhält man einen Pandas Dataframe welcher die beiden Einlaufgeschwindigkeiten $u_1(t)$ und $u_2(t)$ enthält. Außerdem enthält der Dataframe die Füllstände der 3 Tanks $x_1(t), x_2(t), x_3(t)$ und die Auslaufgeschwindigkeit $v(t)$

In [19]:
def create_sample(n_samples=100):
    df = pd.DataFrame()
    for i in range(n_samples):
        # da Rauschen in den Einlaufgeschwindigkeit enthalten ist werden sich jedes mal neue Daten gesampled
        t = np.linspace(0, 139, 140).astype("int")
        A1 = np.array([A1_func(x) for x in t])
        A2 = np.array([A2_func(x) for x in t])
        A3 = np.array([A3_func(x) for x in t])
        q1 = np.array([q1_func(x) for x in t])
        q2 = np.array([q2_func(x) for x in t])
        q3 = np.array([q3_func(x) for x in t])
        u1_arr = create_random_poly_array()
        u2_arr = create_random_poly_array()
        args = (A1, A2, A3, q1, q2, q3, u1_arr, u2_arr)
        y0 = [sample_start_f() for x in range(3)]
        res = solve(args=args, y0=y0)
        df_sample = pd.DataFrame(t, columns=["t"])
        df_sample["Sequenz"] = i + 1
        df_sample["u1(t)"] = np.array(u1_arr)
        df_sample["u2(t)"] = np.array(u2_arr)
        df_sample["x1(t)"] = res.y[0]
        df_sample["x2(t)"] = res.y[1]
        df_sample["x3(t)"] = res.y[2]
        df_sample["v(t)"] = np.array([v(x) for x in res.y[2]])
        df = pd.concat([df, df_sample])

    return df

Mit der definierten Funktion für eine Simulation sollen nun Daten von 100 Sequenzen, also 100 Durchläufen des Versuchs generiert werden. 

In [20]:
res = create_sample(n_samples=200)

Da die Daten in der Realität in den meisten fällen rauschen enthalten kann hier nun auch Rauschen auf die Daten hinzugefügt werden. Folgende Zeile fügt den Spalten der Füllstände sowie der Auslaufgeschwindigkeit einen geringen Rauschterm hinzu. 

In [21]:
r = lambda a: a + 0.5 * np.random.random()
res["x1(t)"] = res["x1(t)"].apply(r)
res["x2(t)"] = res["x2(t)"].apply(r)
res["x3(t)"] = res["x3(t)"].apply(r)
res["v(t)"] = res["v(t)"].apply(r)

In [22]:
# Diagramm von unterschiedlichen Sequenzen um die unterschiede Festzustellen
seq1, seq2, seq3 = np.random.randint(18), np.random.randint(18), np.random.randint(18)
fig = go.Figure(
    data=[
        go.Scatter(
            x=res[res["Sequenz"] == 1].index, y=res[res["Sequenz"] == seq1]["x2(t)"]
        ),
        go.Scatter(
            x=res[res["Sequenz"] == 1].index, y=res[res["Sequenz"] == seq2]["x2(t)"]
        ),
        go.Scatter(
            x=res[res["Sequenz"] == 1].index, y=res[res["Sequenz"] == seq3]["x2(t)"]
        ),
    ]
)
fig.show()

In [23]:
px.line(
    res[res["Sequenz"] == 9].reset_index(),
    x="t",
    y="v(t)",
    title="Beispiel, Auslaufgeschwindigkeit",
)

Nun kann der erstellte Dataframe zu einer CSV exportiert werden. Zuvor wird noch die Spalte t als Index des Dataframes gesetzt um im folgenden einfacher damit umzugehen. 

In [24]:
res = res.set_index("t")
res.to_csv("3Tank_Experiment_Batch.csv")