-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdata.py
151 lines (124 loc) · 4.47 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import pandas as pd
import numpy as np
from numpy import sin, cos, tan, exp, log, sinh, cosh, sqrt
def add_noise(Y, ratio, seed):
"""add noise to data Y,
Y shape (n,1)"""
# np.random.seed(seed)
# f_n = np.random.normal(0, 1, Y.shape[0]).reshape(-1,1)
# f_n = f_n / np.std(f_n)
# Y_noise = Y + ratio * np.sqrt(np.mean(Y**2)) * f_n
# return Y_noise
np.random.seed(seed)
Y_std = Y.std(axis=0)
noise = np.random.normal(0, Y_std, Y.shape)
noise = noise * ratio
noisy_Y = Y + noise
return noisy_Y
def get_benchmark_data(benchmark_file, benchmark_name, down_sample=1000):
df = pd.read_csv("./benchmark/" + benchmark_file)
name, dimension, use_constant, distrib, range_ls, expression = df[
df["name"] == benchmark_name
].iloc[0]
range_ls = eval(range_ls)
assert len(range_ls) == 1 or len(range_ls) == dimension
if len(range_ls) == 1 and dimension > 1:
range_ls = range_ls * dimension
if use_constant == 0:
use_constant = False
elif use_constant == 1:
use_constant = True
else:
raise ValueError("use_constant should be 0 or 1")
variables_name = []
for i in range(dimension):
variables_name.append("x{}".format(i + 1))
X = generate_X(range_ls, down_sample, distrib)
for i in range(X.shape[1]):
globals()["x{}".format(i + 1)] = X[:, i]
Y = eval(expression).reshape(-1, 1)
return X, Y, use_constant, expression, variables_name
def generate_X(ranges, down_sample, distrib="U"):
num_dims = len(ranges)
dims = [n_points for _, _, n_points in ranges]
num_points = 1
for dim in dims:
num_points *= dim
n = min(num_points, down_sample)
points = np.empty((n, num_dims))
if distrib == "U":
for i in range(n):
steps = [
np.sort(np.random.uniform(start, stop, size=n_points))
for start, stop, n_points in ranges
]
for j in range(num_dims):
step = steps[j]
val = np.random.choice(step)
points[i, j] = val
elif distrib == "E":
if down_sample < n * num_dims:
raise ValueError("E distrib not support down_sample < n * num_dims")
steps = [
np.linspace(start, stop, num=n_points) for start, stop, n_points in ranges
]
points = np.array(np.meshgrid(*steps)).T.reshape(-1, num_dims)
else:
raise ValueError("distrib should be U or E")
return points
def get_dynamic_data(dataset_name, file_name):
"""
return dataset df, variables name and target name
Example
=======
>>> df, variables_name, target_name = get_dynamic_data('ball','Baseball_train')
>>> variables_name
>>> ['t']
>>> target_name
>>> 'h'
"""
df = pd.read_csv("./data/" + dataset_name + "/" + file_name + ".csv", header=None)
# NOTE: If use your own dataset, the column name cannot be `C` or `B`,
# because it's used as constant symbol in regressor
# And none of the variables can be capitalized, because there is a Lower case in eval
if dataset_name == "custom":
names = ["x", "y"]
target_name = "y"
elif dataset_name == "emps":
names = ["q", "qdot", "qddot", "tau"]
target_name = "qddot"
elif dataset_name == "roughpipe":
names = ["l", "y", "k"]
target_name = "y"
else:
raise ValueError("dataset_name error")
df.columns = names
variables_name = names.copy()
variables_name.remove(target_name)
return df, variables_name, target_name
def expr_to_Y_pred(expr_sympy, X, variables):
functions = {
"sin": np.sin,
"cos": np.cos,
"tan": np.tan,
"exp": np.exp,
"log": np.log,
"sqrt": np.sqrt,
"sinh": np.sinh,
"cosh": np.cosh,
"tanh": np.tanh,
"arcsin": np.arcsin,
"arccos": np.arccos,
"arctan": np.arctan,
"sign": np.sign,
"e": np.exp(1),
"pi": np.pi,
}
try:
expr_str = str(expr_sympy)
values = {variables[j]: X[:, j : j + 1] for j in range(X.shape[1])}
pred = eval(expr_str.lower(), functions, values) * np.ones((X.shape[0], 1))
return pred
except Exception as e:
print("Exception in expr_to_Y_pred", e)
return np.nan * np.ones((X.shape[0], 1))