forked from anubhabghosh/danse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
generate_data.py
256 lines (198 loc) · 9.28 KB
/
generate_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
#####################################################
# Creator: Anubhab Ghosh
# Feb 2023
#####################################################
# Importing the necessary libraries
import numpy as np
import scipy
import torch
from torch import distributions
#import matplotlib.pyplot as plt
from scipy.linalg import expm
from utils.utils import save_dataset
from parameters import get_parameters
from ssm_models import LinearSSM, LorenzAttractorModel, SinusoidalSSM
import argparse
from parse import parse
import os
import pandas as pd
def initialize_model(type_, parameters):
if type_ == "LinearSSM":
model = LinearSSM(n_states=parameters["n_states"],
n_obs=parameters["n_obs"],
F=parameters["F"],
G=parameters["G"],
H=parameters["H"],
mu_e=parameters["mu_e"],
mu_w=parameters["mu_w"],
q2=parameters["q2"],
r2=parameters["r2"],
Q=parameters["Q"],
R=parameters["R"])
elif type_ == "LorenzSSM":
model = LorenzAttractorModel(
d=3,
J=parameters["J"],
delta=parameters["delta"],
A_fn=parameters["A_fn"],
h_fn=parameters["h_fn"],
delta_d=parameters["delta_d"],
decimate=parameters["decimate"],
mu_e=parameters["mu_e"],
mu_w=parameters["mu_w"]
)
elif type_ == "SinusoidalSSM":
model = SinusoidalSSM(
n_states=parameters["n_states"],
alpha=parameters["alpha"],
beta=parameters["beta"],
phi=parameters["phi"],
delta=parameters["delta"],
a=parameters["a"],
b=parameters["b"],
c=parameters["c"],
decimate=parameters["decimate"],
mu_e=parameters["mu_e"],
mu_w=parameters["mu_w"]
)
return model
def generate_SSM_data(model, T, parameters):
if type_ == "LinearSSM":
X_arr = np.zeros((T, model.n_states))
Y_arr = np.zeros((T, model.n_obs))
X_arr, Y_arr = model.generate_single_sequence(
T=T,
inverse_r2_dB=parameters["inverse_r2_dB"],
nu_dB=parameters["nu_dB"],
drive_noise=False,
add_noise_flag=False
)
elif type_ == "LorenzSSM" or type_ == "SinusoidalSSM":
X_arr = np.zeros((T, model.n_states))
Y_arr = np.zeros((T, model.n_obs))
X_arr, Y_arr = model.generate_single_sequence(T=T,
inverse_r2_dB=parameters["inverse_r2_dB"],
nu_dB=parameters["nu_dB"]
)
return X_arr, Y_arr
def generate_state_observation_pairs(type_, parameters, T=200, N_samples=1000):
# Define the parameters of the model
#N = 1000
# Plot the trajectory versus sample points
#num_trajs = 5
Z_XY = {}
Z_XY["num_samples"] = N_samples
Z_XY_data_lengths = []
Z_XY_data = []
ssm_model = initialize_model(type_, parameters)
Z_XY['ssm_model'] = ssm_model
for i in range(N_samples):
Xi, Yi = generate_SSM_data(ssm_model, T, parameters)
Z_XY_data_lengths.append(T)
Z_XY_data.append([Xi, Yi])
#print(Z_XY_data)
Z_XY["data"] = np.row_stack(Z_XY_data,dtype=object, casting='no')
#Z_pM["data"] = Z_pM_data
Z_XY["trajectory_lengths"] = np.vstack(Z_XY_data_lengths)
return Z_XY
def generate_state_observation_XY(type_, parameters, T=200, N_samples=1000):
'''
Author: L. Dubreil
Modified function to generation X,Y with uneven column size, derived from
generate_state_observation_pairs()
'''
# Define the parameters of the model
#N = 1000
# Plot the trajectory versus sample points
#num_trajs = 5
Z_XY = {}
Z_XY["num_samples"] = N_samples
Z_XY_data_lengths = []
Z_XY_data = {} # modification: changed list type to dict type
ssm_model = initialize_model(type_, parameters)
Z_XY['ssm_model'] = ssm_model
for i in range(N_samples):
Xi, Yi = generate_SSM_data(ssm_model, T, parameters)
Z_XY_data_lengths.append(T)
Z_XY_data[i] = [Xi, Yi] # modification: changed append() with association
#print(Z_XY_data)
Z_XY["data"] = Z_XY_data
Z_XY["trajectory_lengths"] = np.vstack(Z_XY_data_lengths)
return Z_XY
def modified_generate_state_observation_XY(type_, parameters, T=200, N_samples=1000):
'''
Author: L. Dubreil
Modified version of generation of data to shift the measurements ingested
in the RNN. The goal is to mimic the creation of prior statistics based on
measurements at previous time step.
'''
Z_XY = {} # empty dict
Z_XY["num_samples"] = N_samples
Z_XY_data_lengths = []
Z_XY_data = {}
ssm_model = initialize_model(type_, parameters)
Z_XY['ssm_model'] = ssm_model
for i in range(N_samples):
Xi, Yi = generate_SSM_data(ssm_model, T, parameters)
Z_XY_data_lengths.append(T)
Z_XY_data[i] = [Xi, Yi] # modification: changed append() with association
Z_XY_data = pd.DataFrame(Z_XY_data) # modification
Z_XY["data"] = np.row_stack(Z_XY_data,dtype=object, casting='no')
Z_XY["trajectory_lengths"] = np.vstack(Z_XY_data_lengths)
return Z_XY
def create_filename(T=100, N_samples=200, m=5, n=5, dataset_basepath="./data/", type_="LinearSSM", inverse_r2_dB=40, nu_dB=0):
# Create the dataset based on the dataset parameters
datafile = "trajectories_m_{}_n_{}_{}_data_T_{}_N_{}_r2_{}dB_nu_{}dB.pkl".format(m, n, type_, int(T), int(N_samples), inverse_r2_dB, nu_dB)
dataset_fullpath = os.path.join(dataset_basepath, datafile)
return dataset_fullpath
def create_and_save_dataset(T, N_samples, filename, parameters, type_="LinearSSM"):
#NOTE: Generates for pfixed theta estimation experiment
# Currently this uses the 'modified' function
#Z_pM = generate_trajectory_modified_param_pairs(N=N,
# M=num_trajs,
# P=num_realizations,
# usenorm_flag=usenorm_flag)
#np.random.seed(10) # This can be kept at a fixed step for being consistent
Z_XY = generate_state_observation_XY(type_=type_, parameters=parameters, T=T, N_samples=N_samples)
save_dataset(Z_XY, filename=filename)
if __name__ == "__main__":
usage = "Create datasets by simulating state space models \n"\
"python generate_data.py --sequence_length T --num_samples N --dataset_type [LinearSSM/LorenzSSM] --output_path [output path name]\n"\
"Creates the dataset at the location output_path"\
# parser = argparse.ArgumentParser(description="Input arguments related to creating a dataset for training RNNs")
# parser.add_argument("--n_states", help="denotes the number of states in the latent model", type=int, default=5)
# parser.add_argument("--n_obs", help="denotes the number of observations", type=int, default=5)
# parser.add_argument("--num_samples", help="denotes the number of trajectories to be simulated for each realization", type=int, default=500)
# parser.add_argument("--sequence_length", help="denotes the length of each trajectory", type=int, default=200)
# parser.add_argument("--inverse_r2_dB", help="denotes the inverse of measurement noise power", type=float, default=40.0)
# parser.add_argument("--nu_dB", help="denotes the ration between process and measurement noise", type=float, default=0.0)
# parser.add_argument("--dataset_type", help="specify mode=pfixed (all theta, except theta_3, theta_4) / vars (variances) / all (full theta vector)", type=str, default=None)
# parser.add_argument("--output_path", help="Enter full path to store the data file", type=str, default=None)
# args = parser.parse_args()
# n_states = args.n_states
# n_obs = args.n_obs
# T = args.sequence_length
# N_samples = args.num_samples
# type_ = args.dataset_type
# output_path = args.output_path
# inverse_r2_dB = args.inverse_r2_dB
# nu_dB = args.nu_dB
n_states = 3
n_obs = 3
T = 1000
N_samples = 500
type_ = 'LorenzSSM'
output_path = 'data'
inverse_r2_dB = 20
nu_dB = -20
# Create the full path for the datafile
datafilename = create_filename(T=T, N_samples=N_samples, m=n_states, n=n_obs, dataset_basepath=output_path, type_=type_, inverse_r2_dB=inverse_r2_dB, nu_dB=nu_dB)
ssm_parameters, _ = get_parameters(N=N_samples, T=T, n_states=n_states, n_obs=n_obs, inverse_r2_dB=inverse_r2_dB, nu_dB=nu_dB)
#ZK,ZK_l = get_parameters(N=N_samples, T=T, n_states=n_states, n_obs=n_obs, inverse_r2_dB=inverse_r2_dB, nu_dB=nu_dB)
# If the dataset hasn't been already created, create the dataset
if not os.path.isfile(datafilename):
print("Creating the data file: {}".format(datafilename))
create_and_save_dataset(T=T, N_samples=N_samples, filename=datafilename, type_=type_, parameters=ssm_parameters[type_])
else:
print("Dataset {} is already present!".format(datafilename))
print("Done...")