/
auto_base.py
325 lines (249 loc) · 11.4 KB
/
auto_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
import random
import time
from sys import stdout
import numpy as np
import pandas as pd
from colorama import Style
from tqdm import tqdm
from evalml import preprocessing
from evalml.objectives import get_objective
from evalml.pipelines import get_pipelines
from evalml.tuners import SKOptTuner
class AutoBase:
def __init__(self, problem_type, tuner, cv, objective, max_pipelines, max_time,
model_types, default_objectives, detect_label_leakage, start_iteration_callback,
add_result_callback, random_state, verbose):
if tuner is None:
tuner = SKOptTuner
self.objective = get_objective(objective)
self.max_pipelines = max_pipelines
self.max_time = max_time
self.model_types = model_types
self.detect_label_leakage = detect_label_leakage
self.start_iteration_callback = start_iteration_callback
self.add_result_callback = add_result_callback
self.cv = cv
self.verbose = verbose
self.possible_pipelines = get_pipelines(problem_type=problem_type, model_types=model_types)
self.results = {}
self.trained_pipelines = {}
self.random_state = random_state
random.seed(self.random_state)
np.random.seed(seed=self.random_state)
self.possible_model_types = list(set([p.model_type for p in self.possible_pipelines]))
self.tuners = {}
self.search_spaces = {}
for p in self.possible_pipelines:
space = list(p.hyperparameters.items())
self.tuners[p.name] = tuner([s[1] for s in space], random_state=random_state)
self.search_spaces[p.name] = [s[0] for s in space]
self.default_objectives = default_objectives
def _log(self, msg, color=None, new_line=True):
if not self.verbose:
return
if color:
msg = color + msg + Style.RESET_ALL
if new_line:
print(msg)
else:
print(msg, end="")
def _log_title(self, title):
self._log("*" * (len(title) + 4), color=Style.BRIGHT)
self._log("* %s *" % title, color=Style.BRIGHT)
self._log("*" * (len(title) + 4), color=Style.BRIGHT)
self._log("")
def _log_subtitle(self, title, underline="=", color=None):
self._log("%s" % title, color=color)
self._log(underline * len(title), color=color)
def fit(self, X, y, feature_types=None, raise_errors=False):
"""Find best classifier
Arguments:
X (pd.DataFrame): the input training data of shape [n_samples, n_features]
y (pd.Series): the target training labels of length [n_samples]
feature_types (list, optional): list of feature types. either numeric of categorical.
categorical features will automatically be encoded
raise_errors (boolean): If true, raise errors and exit search if a pipeline errors during fitting
Returns:
self
"""
# make everything pandas objects
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X)
if not isinstance(y, pd.Series):
y = pd.Series(y)
self._log_title("Beginning pipeline search")
self._log("Optimizing for %s. " % self.objective.name, new_line=False)
if self.objective.greater_is_better:
self._log("Greater score is better.\n")
else:
self._log("Lower score is better.\n")
self._log("Searching up to %s pipelines. " % self.max_pipelines, new_line=False)
if self.max_time:
self._log("Will stop searching for new pipelines after %d seconds.\n" % self.max_time)
else:
self._log("No time limit is set. Set one using max_time parameter.\n")
self._log("Possible model types: %s\n" % ", ".join(self.possible_model_types))
if self.detect_label_leakage:
leaked = preprocessing.detect_label_leakage(X, y)
if len(leaked) > 0:
leaked = [str(k) for k in leaked.keys()]
self._log("WARNING: Possible label leakage: %s" % ", ".join(leaked))
pbar = tqdm(range(self.max_pipelines), disable=not self.verbose, file=stdout)
start = time.time()
for n in pbar:
elapsed = time.time() - start
if self.max_time and elapsed > self.max_time:
self._log("\n\nMax time elapsed. Stopping search early.")
break
self._do_iteration(X, y, pbar, raise_errors)
pbar.close()
self._log("\n✔ Optimization finished")
def _do_iteration(self, X, y, pbar, raise_errors):
# determine which pipeline to build
pipeline_class = self._select_pipeline()
# propose the next best parameters for this piepline
parameters = self._propose_parameters(pipeline_class)
# fit an score the pipeline
pipeline = pipeline_class(
objective=self.objective,
random_state=self.random_state,
n_jobs=-1,
number_features=X.shape[1],
**dict(parameters)
)
if self.start_iteration_callback:
self.start_iteration_callback(pipeline_class, parameters)
pbar.set_description("Testing %s" % (pipeline_class.name))
start = time.time()
scores = []
all_objective_scores = []
for train, test in self.cv.split(X, y):
if isinstance(X, pd.DataFrame):
X_train, X_test = X.iloc[train], X.iloc[test]
else:
X_train, X_test = X[train], X[test]
if isinstance(y, pd.Series):
y_train, y_test = y.iloc[train], y.iloc[test]
else:
y_train, y_test = y[train], y[test]
try:
pipeline.fit(X_train, y_train)
score, other_scores = pipeline.score(X_test, y_test, other_objectives=self.default_objectives)
except Exception as e:
if raise_errors:
raise e
pbar.write(str(e))
score = np.nan
other_scores = dict(zip([n.name for n in self.default_objectives], [np.nan] * len(self.default_objectives)))
other_scores[self.objective.name] = score
other_scores["# Training"] = len(y_train)
other_scores["# Testing"] = len(y_test)
scores.append(score)
all_objective_scores.append(other_scores)
training_time = time.time() - start
# save the result and continue
self._add_result(
trained_pipeline=pipeline,
parameters=parameters,
scores=scores,
all_objective_scores=all_objective_scores,
training_time=training_time
)
def _select_pipeline(self):
return random.choice(self.possible_pipelines)
def _propose_parameters(self, pipeline_class):
values = self.tuners[pipeline_class.name].propose()
space = self.search_spaces[pipeline_class.name]
proposal = zip(space, values)
return list(proposal)
def _add_result(self, trained_pipeline, parameters, scores, all_objective_scores, training_time):
score = pd.Series(scores).mean()
if self.objective.greater_is_better:
score_to_minimize = -score
else:
score_to_minimize = score
self.tuners[trained_pipeline.name].add([p[1] for p in parameters], score_to_minimize)
# calculate high_variance_cv
# if the coefficient of variance is greater than .2
s = pd.Series(scores)
high_variance_cv = (s.std() / s.mean()) > .2
pipeline_name = trained_pipeline.__class__.__name__
pipeline_id = len(self.results)
self.results[pipeline_id] = {
"id": pipeline_id,
"pipeline_name": pipeline_name,
"parameters": dict(parameters),
"score": score,
"high_variance_cv": high_variance_cv,
"scores": scores,
"all_objective_scores": all_objective_scores,
"training_time": training_time,
}
if self.add_result_callback:
self.add_result_callback(self.results[pipeline_id], trained_pipeline)
self._save_pipeline(pipeline_id, trained_pipeline)
def _save_pipeline(self, pipeline_id, trained_pipeline):
self.trained_pipelines[pipeline_id] = trained_pipeline
def get_pipeline(self, pipeline_id):
if pipeline_id not in self.trained_pipelines:
raise RuntimeError("Pipeline not found")
return self.trained_pipelines[pipeline_id]
def describe_pipeline(self, pipeline_id, return_dict=False):
"""Describe a pipeline
Arguments:
pipeline_id (int): pipeline to describe
return_dict (bool): If True, return dictionary of information
about pipeline. Defaults to false
Returns:
description
"""
if pipeline_id not in self.results:
raise RuntimeError("Pipeline not found")
pipeline = self.get_pipeline(pipeline_id)
pipeline_results = self.results[pipeline_id]
self._log_title("Pipeline Description")
better_string = "lower is better"
if pipeline.objective.greater_is_better:
better_string = "greater is better"
self._log("Pipeline Name: %s" % pipeline.name)
self._log("Model type: %s" % pipeline.model_type)
self._log("Objective: %s (%s)" % (pipeline.objective.name, better_string))
self._log("Total training time (including CV): %.1f seconds\n" % pipeline_results["training_time"])
self._log_subtitle("Parameters")
for item in pipeline_results["parameters"].items():
self._log("• %s: %s" % item)
self._log_subtitle("\nCross Validation")
if pipeline_results["high_variance_cv"]:
self._log("Warning! High variance within cross validation scores. " +
"Model may not perform as estimated on unseen data.")
all_objective_scores = pd.DataFrame(pipeline_results["all_objective_scores"])
for c in all_objective_scores:
if c in ["# Training", "# Testing"]:
all_objective_scores[c] = all_objective_scores[c].astype("object")
continue
mean = all_objective_scores[c].mean(axis=0)
std = all_objective_scores[c].std(axis=0)
all_objective_scores.loc["mean", c] = mean
all_objective_scores.loc["std", c] = std
all_objective_scores.loc["coef of var", c] = std / mean
all_objective_scores = all_objective_scores.fillna("-")
with pd.option_context('display.float_format', '{:.3f}'.format, 'expand_frame_repr', False):
self._log(all_objective_scores)
if return_dict:
return pipeline_results
@property
def rankings(self):
"""Returns the rankings of the models searched"""
ascending = True
if self.objective.greater_is_better:
ascending = False
rankings_df = pd.DataFrame(self.results.values())
rankings_df = rankings_df[["id", "pipeline_name", "score", "high_variance_cv", "parameters"]]
rankings_df.sort_values("score", ascending=ascending, inplace=True)
rankings_df.reset_index(drop=True, inplace=True)
return rankings_df
@property
def best_pipeline(self):
"""Returns the best model found"""
best = self.rankings.iloc[0]
return self.get_pipeline(best["id"])