In [6]:
import os
import inspect
import pickle
import numpy as np
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
grandparentdir = os.path.dirname(parentdir)
os.sys.path.insert(0, grandparentdir)

from src.utils.data_generator import generate_dataset
from src.transformation.fhn import fhn_transformation
from src.transformation.linear import linear_transformation
from src.distribution.dirac_delta import dirac_delta
from src.distribution.dirichlet import dirichlet

In [3]:
def generate_hidden_obs(time, Dx, Dy, x_0, f, g):
    """
    Generate hidden states and observation
    f: transition class with x_t = g.sample(x_t-1)
    g: emission class with y_t = g.sample(x_t)
    """
    X = np.zeros((time, Dx))
    Y = np.zeros((time, Dy))

    X[0] = x_0
    Y[0] = g.sample(x_0)
    for t in range(1, time):
        X[t] = f.sample(X[t - 1])
        Y[t] = g.sample(X[t])
    return X, Y


def generate_dataset(n_train, n_test, time,
                     model="lorenz", Dx=1, Dy=1,
                     f=None, g=None,
                     x_0_in=None, lb=-2.5, ub=2.5):

    if model == "fhn":
        Dx = 2

        if f is None:
            a, b, c, I, dt = 1.0, 0.95, 0.05, 1.0, 0.15
            f_params = (a, b, c, I, dt)
            f_tran = fhn_transformation(f_params)
            f = dirac_delta(f_tran)

        if g is None:
            g_params = np.array([[1.0, 0.0]])
            g_cov = 0.01 * np.eye(Dy)

    elif model == "lorenz":
        Dx = 3

        if f is None:
            sigma, rho, beta, dt = 10.0, 28.0, 8.0 / 3.0, 0.01
            f_params = (sigma, rho, beta, dt)
            f_tran = lorenz_transformation(f_params)
            f = dirac_delta(f_tran)

        if g is None:
            g_params = np.array([[1.0, 0.0, 0.0]])
            g_cov = 0.4 * np.eye(Dy)
    elif model is not None:
        raise ValueError("Unknown model {}".format(model))

    if g is None:
        g_tran = linear_transformation(g_params)
        g = mvn(g_tran, g_cov)

    hidden_train, obs_train = np.zeros((n_train, time, Dx)), np.zeros((n_train, time, Dy))
    hidden_test, obs_test = np.zeros((n_test, time, Dx)), np.zeros((n_test, time, Dy))

    if x_0_in is None and (lb and ub) is None:
        assert False, 'must specify x_0 or (lb and ub)'

    for i in range(n_train + n_test):
        if x_0_in is None:
            x_0 = np.random.uniform(low=lb, high=ub, size=Dx)
            hidden, obs = generate_hidden_obs(time, Dx, Dy, x_0, f, g)
        else:
            hidden, obs = generate_hidden_obs(time, Dx, Dy, x_0_in, f, g)
        if i < n_train:
            hidden_train[i] = hidden
            obs_train[i] = obs
        else:
            hidden_test[i - n_train] = hidden
            obs_test[i - n_train] = obs

    return hidden_train, hidden_test, obs_train, obs_test

In [5]:
n_train = 200
n_test = 40
time = 200
Dx = 2
Dy = 11

a, b, c, I, dt = 1.0, 0.95, 0.05, 1.0, 0.15
f_params = (a, b, c, I, dt)
f_tran = fhn_transformation(f_params)
f = dirac_delta(f_tran)

g_params = np.random.normal(size=(Dy, Dx))
g_tran = linear_transformation(g_params)
g = dirichlet(g_tran)

hidden_train, hidden_test, obs_train, obs_test = generate_dataset(n_train, n_test, time, model=None, Dx=Dx, Dy=Dy, f=f, g=g)
print(hidden_train.shape, hidden_test.shape, obs_train.shape, obs_test.shape)
print(hidden_train[0, :5])
print(obs_train[0, :5])

(200, 200, 2) (40, 200, 2) (200, 200, 11) (40, 200, 11)
[[-2.33626394 -2.19486905]
 [-1.72226876 -2.46364526]
 [-1.24353307 -2.65503957]
 [-0.78242214 -2.77915878]
 [-0.28163928 -2.83460046]
 [ 0.2928036  -2.81354257]
 [ 0.93490694 -2.70581569]
 [ 1.56027273 -2.50741944]
 [ 2.03480605 -2.23103595]
 [ 2.30021919 -1.90428956]] [[4.85006007e-04 3.94206805e-03 4.03809587e-01 4.64151866e-02
  9.92533746e-02 2.09051320e-01 1.31066515e-01 1.98086308e-02
  8.08765937e-03 6.89980457e-02 9.08260710e-03]
 [7.96099640e-05 2.72677746e-03 3.81801545e-01 8.86917020e-03
  3.36439118e-03 1.09892458e-01 3.43266689e-01 5.31109489e-02
  1.94761649e-04 4.11680646e-02 5.55255849e-02]
 [2.16449375e-02 3.81088935e-02 1.33266312e-01 6.76578112e-02
  1.81963937e-01 8.32166717e-02 2.27035453e-01 1.16089252e-05
  1.23819588e-01 2.55146447e-02 9.77601418e-02]
 [1.76730916e-09 1.36653429e-02 1.33656055e-01 4.80006945e-03
  1.05323789e-01 2.57071820e-02 2.67050669e-01 1.88218906e-02
  9.23826337e-02 3.09314185e-03 3

In [9]:
x_true = np.concatenate([hidden_train, hidden_test], axis=0)
y_train = obs_train
y_test = obs_test
with open("fhn_dirichlet_obs.p", "wb") as f:
    pickle.dump({'Xtrue': x_true, 'Ytrain': y_train, 'Yvalid': y_test}, f)