-
Notifications
You must be signed in to change notification settings - Fork 5
/
pg-tf.sql
343 lines (275 loc) · 12.5 KB
/
pg-tf.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
CREATE OR REPLACE FUNCTION public.tf_analyse(
data_source_sql text,
output_name text,
output_path text)
RETURNS void
LANGUAGE 'plpython3u'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from math import ceil
# Pandas print options
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
# Create the data sets
rows = plpy.execute(data_source_sql)
# Check we have enough rows
if len(rows) < 2:
plpy.error('At least 2 data rows must be available for analysis. {} rows retrieved.'.format(len(rows)))
columns = list(rows[0].keys())
# Check we have enough columns
if len(columns) < 2:
plpy.error('At least 2 data columns must be available for analysis. {} columns retrieved.'.format(len(columns)))
# Create the dataframe
data = pd.DataFrame.from_records(rows, columns = columns)
# Setup the plot layout
plot_columns = 5
plot_rows = ceil(len(columns) / plot_columns)
# High level info
plpy.notice('{} Analysis\n {}=========\n'.format(output_name.capitalize(), '=' * len(output_name)))
plpy.notice('Data\n ----\n')
plpy.notice('Data shape: {}'.format(data.shape))
plpy.notice('Data sample:\n{}\n'.format(data.head()))
# Outliers
plpy.notice('Outliers\n --------\n')
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
plpy.notice('Interquartile Range (IQR):\n{}\n'.format(IQR))
plpy.notice('Outliers detected using IQR:\n{}\n'.format((data < (Q1 - 1.5 * IQR)) |(data > (Q3 + 1.5 * IQR))))
plt.cla()
fig, axs = plt.subplots(ncols=plot_columns, nrows=plot_rows, figsize=(20, 5 * plot_rows))
index = 0
axs = axs.flatten()
for k,v in data.items():
sns.boxplot(y=k, data=data, ax=axs[index])
index += 1
plt.tight_layout(pad=5, w_pad=0.5, h_pad=5.0)
plt.suptitle('{} Outliers'.format(output_name.capitalize()))
plt.savefig('{}/{}_outliers.png'.format(output_path, output_name))
plpy.notice('Created: {}/{}_outliers.png\n'.format(output_path, output_name))
# Distributions
plpy.notice('Distributions\n -------------\n')
plpy.notice('Summary:\n{}\n'.format(data.describe()))
plt.cla()
fig, axs = plt.subplots(ncols=plot_columns, nrows=plot_rows, figsize=(20, 5 * plot_rows))
index = 0
axs = axs.flatten()
for k,v in data.items():
sns.distplot(v, ax=axs[index])
index += 1
plt.tight_layout(pad=5, w_pad=0.5, h_pad=5.0)
plt.suptitle('{} Distributions'.format(output_name.capitalize()))
plt.savefig('{}/{}_distributions.png'.format(output_path, output_name))
plpy.notice('Created: {}/{}_distributions.png\n'.format(output_path, output_name))
# Correlations
plpy.notice('Correlations\n ------------\n')
corr = data.corr()
plpy.notice('Correlation data:\n{}\n'.format(corr))
plt.cla()
plt.figure(figsize=(20,20))
sns.heatmap(data.corr().abs(), annot=True, cmap='Blues')
plt.tight_layout(pad=5, w_pad=0.5, h_pad=5.0)
plt.suptitle('{} Correlations'.format(output_name.capitalize()))
plt.savefig('{}/{}_correlations.png'.format(output_path, output_name))
plpy.notice('Created: {}/{}_correlations.png\n'.format(output_path, output_name))
$BODY$;
ALTER FUNCTION public.tf_analyse(text, text, text)
OWNER TO postgres;
COMMENT ON FUNCTION public.tf_analyse(text, text, text)
IS 'Function to perform statistical analysis on an arbitrary data set.
Parameters:
* data_source_sql: An SQL query returning at least 2 rows and 2 columns of numeric data to analyse.
* output_name: The name of the output to use in titles etc.
* output_path: The path of a directory under which to save generated graphs. Must be writeable by the database server''s service account (usually postgres).';
-- Function to build and train a model
CREATE OR REPLACE FUNCTION public.tf_model(
data_source_sql text,
structure integer[],
output_name text,
output_path text,
epochs integer DEFAULT 5000,
validation_pct integer DEFAULT 10,
test_pct integer DEFAULT 10)
RETURNS double precision
LANGUAGE 'plpython3u'
COST 100000
VOLATILE PARALLEL UNSAFE
AS $BODY$
import tensorflow as tf
import pandas as pd
import matplotlib.pyplot as plt
from math import sqrt
from tensorflow.python.keras.callbacks import ModelCheckpoint, LambdaCallback, EarlyStopping
# Pandas print options
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
# Reset everything
tf.keras.backend.clear_session()
tf.random.set_seed(42)
# Create the data sets
rows = plpy.execute(data_source_sql)
# Check we have enough rows
if len(rows) < 2:
plpy.error('At least 5 data rows must be available for training. {} rows retrieved.'.format(len(rows)))
# Get a list of columns
columns = list(rows[0].keys())
# Check we have enough columns
if len(columns) < 2:
plpy.error('At least 5 data columns must be available for training. {} columns retrieved.'.format(len(columns)))
plpy.notice('Total rows: {}'.format(len(rows)))
# Create the dataframe
data = pd.DataFrame.from_records(rows, columns = columns)
# Remove any rows with outliers
Q1 = data.quantile(0.25)
Q3 = data.quantile(0.75)
IQR = Q3 - Q1
plpy.notice('Removing outliers...')
data = data[~((data < (Q1 - 1.5 * IQR)) |(data > (Q3 + 1.5 * IQR))).any(axis=1)]
# So how many rows remain?
actual_rows = len(data)
# Figure out how many rows to use for training, validation and test
test_rows = int((actual_rows/100) * test_pct)
validation_rows = int(((actual_rows)/100) * validation_pct)
training_rows = actual_rows - test_rows - validation_rows
# Split the data into input and output
input = data[columns[:-1]]
output = data[columns[-1:]]
# Split the input and output into training, validation and test sets
max_z = max(output[output.columns[0]])
training_input = input[:training_rows]
training_output = output[:training_rows]
validation_input = input[training_rows:training_rows+validation_rows]
validation_output = output[training_rows:training_rows+validation_rows]
test_input = input[training_rows+validation_rows:]
test_output = output[training_rows+validation_rows:]
plpy.notice('Rows: {}, training rows: {}, validation rows: {}, test rows: {}.'.format(actual_rows, len(training_input), len(validation_input), len(test_input)))
# Define the model
model = tf.keras.Sequential()
for units in structure:
if len(model.layers) == 0:
model.add(tf.keras.layers.Dense(units=units, input_shape=(len(columns) - 1,), activation = 'relu'))
else:
model.add(tf.keras.layers.Dense(units=units, activation = 'relu'))
model.add(tf.keras.layers.Dense(units=1, activation='linear'))
# Compile it
model.compile(loss=tf.keras.losses.MeanSquaredError(),
optimizer='adam')
summary = []
model.summary(print_fn=lambda x: summary.append(x))
plpy.notice('Model architecture:\n{}'.format('\n'.join(summary)))
# Save a checkpoint each time our loss metric improves.
checkpoint = ModelCheckpoint('{}/{}.h5'.format(output_path, output_name),
monitor='loss',
save_best_only=True,
mode='min')
# Use early stopping
early_stopping = EarlyStopping(patience=50)
# Display output
logger = LambdaCallback(
on_epoch_end=lambda epoch,
logs: plpy.notice(
'epoch: {}, training RMSE: {} ({}%), validation RMSE: {} ({}%)'.format(
epoch,
sqrt(logs['loss']),
round(100 / max_z * sqrt(logs['loss']), 5),
sqrt(logs['val_loss']),
round(100 / max_z * sqrt(logs['val_loss']), 5))))
# Train it!
history = model.fit(training_input,
training_output,
validation_data=(validation_input, validation_output),
epochs=epochs,
verbose=False,
batch_size=50,
callbacks=[logger, checkpoint, early_stopping])
# Graph the results
training_loss = history.history['loss']
validation_loss = history.history['val_loss']
epochs_range = range(len(history.history['loss']))
plt.figure(figsize=(12, 8))
plt.grid(True)
plt.plot(epochs_range, [x ** 0.5 for x in training_loss], label='Training')
plt.plot(epochs_range, [x ** 0.5 for x in validation_loss], label='Validation')
plt.xlabel('Epoch')
plt.ylabel('Root Mean Squared Error')
plt.legend(loc='upper right')
plt.title('Training and Validation Root Mean Squared Error')
plt.savefig('{}/{}_rmse.png'.format(output_path, output_name))
plpy.notice('Created: {}/{}_rmse.png\n'.format(output_path, output_name))
# Load the best model from the checkpoint
model = tf.keras.models.load_model('{}/{}.h5'.format(output_path, output_name))
# Dump the original test data, and test results for comparison
test_dump = test_input.copy()
test_dump['actual'] = test_output
test_dump['predicted'] = model.predict(test_input)[:,0]
test_dump['diff'] = abs(test_dump['predicted'] - test_dump['actual'])
test_dump['pc_diff'] = test_dump['diff'] / (test_dump['predicted'] + 1e-10) * 100
plpy.notice('Test data: \n{}\n'.format(test_dump))
# Test the model on the training and validation data to get the RMSE
evaluation = model.evaluate(training_input, training_output)
plpy.notice('Training RMSE: {}'.format(round(sqrt(evaluation), 5)))
if len(validation_input) > 0:
evaluation = model.evaluate(validation_input, validation_output)
plpy.notice('Validation RMSE: {}'.format(round(sqrt(evaluation), 5)))
# Summarise the results from the test data set
plpy.notice('Test data mean absolute diff: {}'.format(round(float(sum(test_dump['diff']) / len(test_dump)), 5)))
plpy.notice('Test data mean percentage diff: {}%'.format(round(float(sum(test_dump['pc_diff']) / len(test_dump)), 5)))
rmse = float(sqrt(abs(sum((test_dump['actual'] - test_dump['predicted']) ** 2) / len(test_dump))))
plpy.notice('Test data RMSE: {}'.format(round(rmse, 5)))
rmspe = float(sqrt(abs(sum((test_dump['actual'] - test_dump['predicted']) / (test_dump['actual']) + 1e-10))) / len(test_dump)) * 100
plpy.notice('Test data RMSPE: {}%\n'.format(round(rmspe, 5)))
plpy.notice('Model saved to: {}/{}.h5'.format(output_path, output_name))
return rmspe
$BODY$;
ALTER FUNCTION public.tf_model(text, integer[], text, text, integer, integer, integer)
OWNER TO postgres;
COMMENT ON FUNCTION public.tf_model(text, integer[], text, text, integer, integer, integer)
IS 'Function to build and train a model to analyse an abitrary data set.
Parameters:
* data_source_sql: An SQL query returning at least 5 rows and 3 columns of numeric data to analyse.
* structure: An array of integers indicating the number of neurons in each of an arbitrary number of layer. A final output layer will be added with a single neuron.
* output_name: The name of the output to use in titles etc.
* output_path: The path of a directory under which to save generated graphs and the model. Must be writeable by the database server''s service account (usually postgres).
* epochs: The maximum number of training epochs to run (default: 5000)
* validation_pct: The percentage of the rows returned by the query specified in data_source_sql to use for model validation (default: 10).
* test_pct: The percentage of the rows returned by the query specified in data_source_sql to use for model testing (default: 10).
Returns: The Root Mean Square Percentage Error calculated from the evaluation of the test data set.';
-- Function to make a prediction based on a model
CREATE OR REPLACE FUNCTION public.tf_predict(
input_values double precision[],
model_path text)
RETURNS double precision[]
LANGUAGE 'plpython3u'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
import tensorflow as tf
# Reset everything
tf.keras.backend.clear_session()
tf.random.set_seed(42)
# Load the model
model = tf.keras.models.load_model(model_path)
# Are we dealing with a single prediction, or a list of them?
if not any(isinstance(sub, list) for sub in input_values):
data = [input_values]
else:
data = input_values
# Make the prediction(s)
result = model.predict([data])[0]
result = [ item for elem in result for item in elem]
return result
$BODY$;
ALTER FUNCTION public.tf_predict(double precision[], text)
OWNER TO postgres;
COMMENT ON FUNCTION public.tf_predict(double precision[], text)
IS 'Function to make predictions based on input values and a Tensorflow model.
Parameters:
* input_values: An array of input values, or an array of arrays of input values, e.g. ''{2, 3}'' or ''{{2, 3}, {3, 4}}''.
* model_path: The full path to a Tensorflow model saved in .h5 format. Must be writeable by the database server''s service account (usually postgres).
Returns: An array of predicted values.';