-
Notifications
You must be signed in to change notification settings - Fork 10
/
datagen.py
312 lines (213 loc) · 8.21 KB
/
datagen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# -*- coding: utf-8 -*-
"""Generates various test and demo data sets.
"""
from functools import partial
import control
import numpy as np
from numpy import vstack
import faultmap.data_processing
seed_list = [35, 88, 107, 52, 98]
def connectionmatrix_maker(N):
def maker():
variables = ["X {}".format(i) for i in range(1, N + 1)]
connectionmatrix = np.ones((N, N), dtype=int)
return variables, connectionmatrix
maker.__doc__ = (
"Generates a {0}x{0} connection matrix" "for use in test.".format(N)
)
return maker
connectionmatrix_2x2, connectionmatrix_4x4, connectionmatrix_5x5 = [
connectionmatrix_maker(N) for N in [2, 4, 5]
]
def seed_random(method, seed, samples):
np.random.seed(int(seed))
return method(int(samples))
# Normal distribution
seed_randn = partial(seed_random, np.random.randn)
# Uniform distribution over [0, 1)
seed_rand = partial(seed_random, np.random.rand)
def autoreg_gen(params):
"""Generates an autoregressive set of vectors.
A constant seed is used for testing comparison purposes.
"""
samples = params[0]
delay = params[1]
if len(params) >= 3:
alpha = params[2]
else:
alpha = None
if len(params) == 4:
noise_ratio = params[3]
else:
noise_ratio = None
# Define seed for initial source data
seeds = iter(seed_list)
cause = seed_randn(next(seeds), samples + delay)
affected = np.zeros_like(cause)
# Very close covariance occasionally breaks the kde estimator
# Another small random element is added to take care of this
# This is not expected to be a problem on any "real" data
# Define seed for noise data
affected_random_add = seed_rand(next(seeds), samples + delay) - 0.5
for i in range(delay, len(cause)):
if alpha is None:
affected[i] = affected[i - 1] + cause[i - (delay + 1)]
else:
affected[i] = (
alpha * affected[i - 1] + (1 - alpha) * cause[i - delay]
)
affected = affected[delay:]
cause = cause[delay:]
if noise_ratio is not None:
affected = affected + (affected_random_add[delay:] * noise_ratio)
data = vstack([cause, affected])
return data.T
def delay_gen(params):
"""Generates a normally distributed random data vector
and a pure delay companion.
Parameters
----------
params : list
List with the first entry being the sample length of the returned
signals and the second entry the delay between them.
Returns
-------
data : numpy.ndarray
Array containing the generated signals arranged in columns.
"""
samples = params[0]
delay = params[1]
# Define seed for initial source data
seeds = iter(seed_list)
cause = seed_randn(next(seeds), samples + delay)
affected = np.zeros_like(cause)
# Very close covariance occassionally breaks the kde estimator
# Another small random element is added to take care of this
# This is not expected to be a problem on any "real" data
# Define seed for noise data
affected_random_add = seed_rand(next(seeds), samples + delay) - 0.5
for i in range(delay, len(cause)):
affected[i] = cause[i - delay]
affected = affected[delay:]
cause = cause[delay:]
affected = affected + affected_random_add[delay:]
data = vstack([cause, affected])
return data.T
def random_gen(params, N=2):
"""Generates N independent random data vectors"""
samples = params[0]
assert N < len(seed_list), "Not enough seeds in seed_list"
data = vstack([seed_randn(seed, samples) for seed in seed_list[:N]])
return data.T
def autoreg_datagen(delay, timelag, samples, sub_samples, k=1, l=1):
"""Generates autoreg data for a specific timelag (equal to
prediction horison) for a set of autoregressive data.
sub_samples is the amount of samples in the dataset used to calculate the
transfer entropy between two vectors (taken from the end of the dataset).
sub_samples <= samples
Currently only supports k = 1; l = 1
You can search through a set of timelags in an attempt to identify the
original delay.
The transfer entropy should have a maximum value when timelag = delay
used to generate the autoregressive dataset.
"""
params = [samples, delay]
data = autoreg_gen(params).T
[x_pred, x_hist, y_hist] = faultmap.data_processing.vectorselection(
data, timelag, sub_samples, k, l
)
return x_pred, x_hist, y_hist
def sinusoid_shift_gen(params, period=100, noiseamp=0.1, N=5, addnoise=False):
"""Generates sinusoid signals together with optionally uniform noise.
The signals are shifted by a quarter period.
Parameters
----------
params : list
List with the first (and only) entry being the sample length of
the returned signals.
period : int, default=100
The period of the sinusoid in terms of samples.
noiseamp : float, default=0.5
A multiplier for mean_centered unformal noise to be added to the
signal. The amplitude of the sine is unity.
N : int, default=5
How many signals to return.
addnoise : bool, default=False
If True, noise is added to the sinusoidal signals.
Returns
-------
data : numpy.ndarray
Array containing the generated signals arranged in columns.
"""
samples = params[0]
frequency = 1.0 / period
tspan = range(samples + 2 * period)
# Generate source sine curve
sine = [np.sin(frequency * t * 2 * np.pi) for t in tspan]
if addnoise:
sine_noise = (seed_rand(117, len(tspan))) - 0.5 * noiseamp
sine += sine_noise
vectors = []
for i in range(N):
sampleshift = (period / 4) * i
vectors.append(sine[sampleshift : samples + sampleshift])
data = vstack(vectors)
return data.T
def sinusoid_gen(params, period=100, noiseamp=1.0):
"""Generates sinusoid signals together with optionally uniform noise.
The signals are shifted by a quarter period.
Parameters
----------
params : list
List with the first (and only) entry being the sample length of
the returned signals.
period : int, default=100
The period of the sinusoid in terms of samples.
noiseamp : float, default=0.5
A multiplier for mean_centered unformal noise to be added to the
signal. The amplitude of the sine is unity.
N : int, default=5
How many signals to return.
addnoise : bool, default=False
If True, noise is added to the sinusoidal signals.
Returns
-------
data : numpy.ndarray
Array containing the generated signals arranged in columns.
"""
samples = params[0]
delay = params[1]
tspan = range(samples + delay)
frequency = 1.0 / period
cause = [np.sin(frequency * t * 2 * np.pi) for t in tspan]
affected = np.zeros_like(cause)
cause_closed = np.zeros_like(cause)
for i in range(delay, len(cause)):
affected[i] = cause[i - delay]
affected_random_add = (seed_rand(117, samples + delay) - 0.5) * noiseamp
affected += affected_random_add
for i in range(delay, len(cause)):
cause_closed[i] = affected[i] + cause[i]
affected = affected[delay:]
cause = cause[delay:]
cause_closed = cause_closed[delay:]
return tspan[:-delay], cause, affected, cause_closed
def firstorder_gen(params, period=0.01, noiseamp=1.0):
"""Simple first order transfer function affected variable
with sinusoid cause.
"""
samples = params[0]
delay = params[1]
P1 = control.matlab.tf([10], [100, 1])
timepoints = np.array(range(samples + delay))
sine_input = np.array([np.sin(period * t * 2 * np.pi) for t in timepoints])
P1_response = control.matlab.lsim(P1, sine_input, timepoints)
affected_random_add = (seed_rand(51, samples + delay) - 0.5) * noiseamp
cause = sine_input[:samples]
if delay == 0:
offset = None
else:
offset = samples
affected = P1_response[0][delay:] + affected_random_add[delay:]
tspan = P1_response[1][:offset]
return tspan, cause, affected