In [None]:
import math
import numpy as np
import control
from lti import drss_matrices, dlsim
import matplotlib.pyplot as plt

In [None]:
n_hidden = 128
nx = 20 # states
nu = 1 # inputs
ny = 1 # outputs
nbr = 10 # number of WH branches
n_hidden = 32 * 10
seq_len = 1000
dtype = "float32"

In [None]:
def nn_fun(x):
    out = x @ w1.transpose() + b1
    out = np.tanh(out)
    out = out @ w2.transpose() + b2
    return out

In [None]:
# Kaiming initialization: divide weights by sqrt(fan_in) and multiply times a gain that compensates
# the squashing effect of the previous non-linearity
w1 = np.random.randn(n_hidden, nbr) / np.sqrt(nbr) * 1.0
b1 = np.random.randn(1, n_hidden) * 1.0
w2 = np.random.randn(ny, n_hidden) / np.sqrt(n_hidden) * 5/3
b2 = np.random.randn(1, nbr) * 1.0


G1 = drss_matrices(states=nx,
                   inputs=1,
                   outputs=nbr,
                   strictly_proper=True,
                   mag_range=(0.5, 0.97), phase_range=(0.0, math.pi/2))

G2 = drss_matrices(states=nx,
                   inputs=nbr,
                   outputs=1,
                   strictly_proper=False,
                   mag_range=(0.5, 0.97), phase_range=(0.0, math.pi/2))

u = np.random.randn(seq_len, nu)
n_skip = 200

# G1
y1 = dlsim(*G1, u)
y1 = (y1 - y1[n_skip:].mean(axis=0)) / (y1[n_skip:].std(axis=0) + 1e-6)

# F
y2 = nn_fun(y1)
y3 = dlsim(*G2, y2)

u = u[n_skip:]
y = y3[n_skip:]


y = (y - y.mean(axis=0)) / (y.std(axis=0) + 1e-6)

In [None]:
fig, ax = plt.subplots(2,1, sharex=True)
ax[0].set_title("Input")
ax[0].plot(u[:, 0])
ax[1].set_title("Output")
ax[1].plot(y[:, 0]);

In [None]:
import torch
from torch.utils.data import DataLoader, IterableDataset

In [None]:
class WHDataset(IterableDataset):
    def __init__(self, nx=5, nu=1, ny=1, seq_len=600, random_order=True,
                 strictly_proper=True, normalize=True, dtype="float32", **mdlargs):
        super(WHDataset).__init__()
        self.nx = nx
        self.nu = nu
        self.ny = ny
        self.seq_len = seq_len
        self.strictly_proper = strictly_proper
        self.dtype = dtype
        self.normalize = normalize
        self.strictly_proper = strictly_proper
        self.random_order = random_order
        self.mdlargs = mdlargs

    def __iter__(self):

        # A simple ff neural network
        def nn_fun(x):
            out = x @ w1.transpose() + b1
            out = np.tanh(out)
            out = out @ w2.transpose() + b2
            return out

        while True:  # infinite dataset
            # for _ in range(1000):

            n_in = 1
            n_out = 1
            n_hidden = 32
            n_skip = 200

            w1 = np.random.randn(n_hidden, n_in) / np.sqrt(n_in) * 5 / 3
            b1 = np.random.randn(1, n_hidden) * 1.0
            w2 = np.random.randn(n_out, n_hidden) / np.sqrt(n_hidden)
            b2 = np.random.randn(1, n_out) * 1.0

            G1 = drss_matrices(states=np.random.randint(1, self.nx+1) if self.random_order else self.nx,
                               inputs=1,
                               outputs=1,
                               strictly_proper=self.strictly_proper,
                               **self.mdlargs)

            G2 = drss_matrices(states=np.random.randint(1, self.nx+1) if self.random_order else self.nx,
                               inputs=1,
                               outputs=1,
                               strictly_proper=False,
                               **self.mdlargs)

            u = np.random.randn(self.seq_len + n_skip, 1)  # input to be improved (filtered noise, multisine, etc)

            # G1
            y1 = dlsim(*G1, u)
            y1 = (y1 - y1[n_skip:].mean(axis=0)) / (y1[n_skip:].std(axis=0) + 1e-6)

            # F
            y2 = nn_fun(y1)

            # G2
            y3 = dlsim(*G2, y2)

            u = u[n_skip:]
            y = y3[n_skip:]

            if self.normalize:
                y = (y - y.mean(axis=0)) / (y.std(axis=0) + 1e-6)

            u = u.astype(self.dtype)
            y = y.astype(self.dtype)

            yield torch.tensor(y), torch.tensor(u)

In [None]:
class PWHDataset(IterableDataset):
    def __init__(self, nx=50, nu=1, ny=1, nbr=10, seq_len=600, random_order=True,
                 strictly_proper=True, normalize=True, dtype="float32", **mdlargs):
        super(PWHDataset).__init__()
        self.nx = nx
        self.nu = nu
        self.ny = ny
        self.nbr = nbr
        self.seq_len = seq_len
        self.strictly_proper = strictly_proper
        self.dtype = dtype
        self.normalize = normalize
        self.strictly_proper = strictly_proper
        self.random_order = random_order
        self.mdlargs = mdlargs

    def __iter__(self):

        # A simple ff neural network
        def nn_fun(x):
            out = x @ w1.transpose() + b1
            out = np.tanh(out)
            out = out @ w2.transpose() + b2
            return out

        while True:  # infinite dataset
            # for _ in range(1000):

            n_in = 1
            n_out = 1
            n_hidden = 128
            n_skip = 200

            w1 = np.random.randn(n_hidden, n_in) / np.sqrt(n_in) * 1.0
            b1 = np.random.randn(1, n_hidden) * 1.0
            w2 = np.random.randn(n_out, n_hidden) / np.sqrt(n_hidden) * 5/3
            b2 = np.random.randn(1, n_out) * 1.0

            G1 = drss_matrices(states=np.random.randint(1, self.nx+1) if self.random_order else self.nx,
                               inputs=1,
                               outputs=1,
                               strictly_proper=self.strictly_proper,
                               **self.mdlargs)

            G2 = drss_matrices(states=np.random.randint(1, self.nx+1) if self.random_order else self.nx,
                               inputs=1,
                               outputs=1,
                               strictly_proper=False,
                               **self.mdlargs)

            u = np.random.randn(self.seq_len + n_skip, 1)  # input to be improved (filtered noise, multisine, etc)

            # G1
            y1 = dlsim(*G1, u)
            y1 = (y1 - y1[n_skip:].mean(axis=0)) / (y1[n_skip:].std(axis=0) + 1e-6)

            # F
            y2 = nn_fun(y1)

            # G2
            y3 = dlsim(*G2, y2)

            u = u[n_skip:]
            y = y3[n_skip:]

            if self.normalize:
                y = (y - y.mean(axis=0)) / (y.std(axis=0) + 1e-6)

            u = u.astype(self.dtype)
            y = y.astype(self.dtype)

            yield torch.tensor(y), torch.tensor(u)

In [None]:
train_ds = PWHDataset(nx=50, nu=1, ny=1, seq_len=1000, mag_range=(0.5, 0.97), phase_range=(0.0, math.pi/2))
train_dl = DataLoader(train_ds, batch_size=32)

In [None]:
batch_y, batch_u = next(iter(train_dl))

In [None]:
batch_y.shape, batch_u.shape
