-
Notifications
You must be signed in to change notification settings - Fork 13.7k
/
auto_ml.py
1418 lines (1353 loc) · 84.7 KB
/
auto_ml.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains a Google Cloud Vertex AI hook."""
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Sequence
from google.api_core.client_options import ClientOptions
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.cloud.aiplatform import (
AutoMLForecastingTrainingJob,
AutoMLImageTrainingJob,
AutoMLTabularTrainingJob,
AutoMLTextTrainingJob,
AutoMLVideoTrainingJob,
datasets,
models,
)
from google.cloud.aiplatform_v1 import JobServiceClient, PipelineServiceClient
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.cloud.aiplatform_v1.services.pipeline_service.pagers import ListTrainingPipelinesPager
from google.cloud.aiplatform_v1.types import TrainingPipeline
class AutoMLHook(GoogleBaseHook):
"""Hook for Google Cloud Vertex AI Auto ML APIs."""
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
if kwargs.get("delegate_to") is not None:
raise RuntimeError(
"The `delegate_to` parameter has been deprecated before and finally removed in this version"
" of Google Provider. You MUST convert it to `impersonate_chain`"
)
super().__init__(
gcp_conn_id=gcp_conn_id,
impersonation_chain=impersonation_chain,
)
self._job: None | (
AutoMLForecastingTrainingJob
| AutoMLImageTrainingJob
| AutoMLTabularTrainingJob
| AutoMLTextTrainingJob
| AutoMLVideoTrainingJob
) = None
def get_pipeline_service_client(
self,
region: str | None = None,
) -> PipelineServiceClient:
"""Returns PipelineServiceClient."""
if region and region != "global":
client_options = ClientOptions(api_endpoint=f"{region}-aiplatform.googleapis.com:443")
else:
client_options = ClientOptions()
return PipelineServiceClient(
credentials=self.get_credentials(), client_info=self.client_info, client_options=client_options
)
def get_job_service_client(
self,
region: str | None = None,
) -> JobServiceClient:
"""Returns JobServiceClient."""
if region and region != "global":
client_options = ClientOptions(api_endpoint=f"{region}-aiplatform.googleapis.com:443")
else:
client_options = ClientOptions()
return JobServiceClient(
credentials=self.get_credentials(), client_info=self.client_info, client_options=client_options
)
def get_auto_ml_tabular_training_job(
self,
display_name: str,
optimization_prediction_type: str,
optimization_objective: str | None = None,
column_specs: dict[str, str] | None = None,
column_transformations: list[dict[str, dict[str, str]]] | None = None,
optimization_objective_recall_value: float | None = None,
optimization_objective_precision_value: float | None = None,
project: str | None = None,
location: str | None = None,
labels: dict[str, str] | None = None,
training_encryption_spec_key_name: str | None = None,
model_encryption_spec_key_name: str | None = None,
) -> AutoMLTabularTrainingJob:
"""Returns AutoMLTabularTrainingJob object."""
return AutoMLTabularTrainingJob(
display_name=display_name,
optimization_prediction_type=optimization_prediction_type,
optimization_objective=optimization_objective,
column_specs=column_specs,
column_transformations=column_transformations,
optimization_objective_recall_value=optimization_objective_recall_value,
optimization_objective_precision_value=optimization_objective_precision_value,
project=project,
location=location,
credentials=self.get_credentials(),
labels=labels,
training_encryption_spec_key_name=training_encryption_spec_key_name,
model_encryption_spec_key_name=model_encryption_spec_key_name,
)
def get_auto_ml_forecasting_training_job(
self,
display_name: str,
optimization_objective: str | None = None,
column_specs: dict[str, str] | None = None,
column_transformations: list[dict[str, dict[str, str]]] | None = None,
project: str | None = None,
location: str | None = None,
labels: dict[str, str] | None = None,
training_encryption_spec_key_name: str | None = None,
model_encryption_spec_key_name: str | None = None,
) -> AutoMLForecastingTrainingJob:
"""Returns AutoMLForecastingTrainingJob object."""
return AutoMLForecastingTrainingJob(
display_name=display_name,
optimization_objective=optimization_objective,
column_specs=column_specs,
column_transformations=column_transformations,
project=project,
location=location,
credentials=self.get_credentials(),
labels=labels,
training_encryption_spec_key_name=training_encryption_spec_key_name,
model_encryption_spec_key_name=model_encryption_spec_key_name,
)
def get_auto_ml_image_training_job(
self,
display_name: str,
prediction_type: str = "classification",
multi_label: bool = False,
model_type: str = "CLOUD",
base_model: models.Model | None = None,
project: str | None = None,
location: str | None = None,
labels: dict[str, str] | None = None,
training_encryption_spec_key_name: str | None = None,
model_encryption_spec_key_name: str | None = None,
) -> AutoMLImageTrainingJob:
"""Returns AutoMLImageTrainingJob object."""
return AutoMLImageTrainingJob(
display_name=display_name,
prediction_type=prediction_type,
multi_label=multi_label,
model_type=model_type,
base_model=base_model,
project=project,
location=location,
credentials=self.get_credentials(),
labels=labels,
training_encryption_spec_key_name=training_encryption_spec_key_name,
model_encryption_spec_key_name=model_encryption_spec_key_name,
)
def get_auto_ml_text_training_job(
self,
display_name: str,
prediction_type: str,
multi_label: bool = False,
sentiment_max: int = 10,
project: str | None = None,
location: str | None = None,
labels: dict[str, str] | None = None,
training_encryption_spec_key_name: str | None = None,
model_encryption_spec_key_name: str | None = None,
) -> AutoMLTextTrainingJob:
"""Returns AutoMLTextTrainingJob object."""
return AutoMLTextTrainingJob(
display_name=display_name,
prediction_type=prediction_type,
multi_label=multi_label,
sentiment_max=sentiment_max,
project=project,
location=location,
credentials=self.get_credentials(),
labels=labels,
training_encryption_spec_key_name=training_encryption_spec_key_name,
model_encryption_spec_key_name=model_encryption_spec_key_name,
)
def get_auto_ml_video_training_job(
self,
display_name: str,
prediction_type: str = "classification",
model_type: str = "CLOUD",
project: str | None = None,
location: str | None = None,
labels: dict[str, str] | None = None,
training_encryption_spec_key_name: str | None = None,
model_encryption_spec_key_name: str | None = None,
) -> AutoMLVideoTrainingJob:
"""Returns AutoMLVideoTrainingJob object."""
return AutoMLVideoTrainingJob(
display_name=display_name,
prediction_type=prediction_type,
model_type=model_type,
project=project,
location=location,
credentials=self.get_credentials(),
labels=labels,
training_encryption_spec_key_name=training_encryption_spec_key_name,
model_encryption_spec_key_name=model_encryption_spec_key_name,
)
@staticmethod
def extract_model_id(obj: dict) -> str:
"""Returns unique id of the Model."""
return obj["name"].rpartition("/")[-1]
@staticmethod
def extract_training_id(resource_name: str) -> str:
"""Returns unique id of the Training pipeline."""
return resource_name.rpartition("/")[-1]
def wait_for_operation(self, operation: Operation, timeout: float | None = None):
"""Waits for long-lasting operation to complete."""
try:
return operation.result(timeout=timeout)
except Exception:
error = operation.exception(timeout=timeout)
raise AirflowException(error)
def cancel_auto_ml_job(self) -> None:
"""Cancel Auto ML Job for training pipeline."""
if self._job:
self._job.cancel()
@GoogleBaseHook.fallback_to_default_project_id
def create_auto_ml_tabular_training_job(
self,
project_id: str,
region: str,
display_name: str,
dataset: datasets.TabularDataset,
target_column: str,
optimization_prediction_type: str,
optimization_objective: str | None = None,
column_specs: dict[str, str] | None = None,
column_transformations: list[dict[str, dict[str, str]]] | None = None,
optimization_objective_recall_value: float | None = None,
optimization_objective_precision_value: float | None = None,
labels: dict[str, str] | None = None,
training_encryption_spec_key_name: str | None = None,
model_encryption_spec_key_name: str | None = None,
training_fraction_split: float | None = None,
validation_fraction_split: float | None = None,
test_fraction_split: float | None = None,
predefined_split_column_name: str | None = None,
timestamp_split_column_name: str | None = None,
weight_column: str | None = None,
budget_milli_node_hours: int = 1000,
model_display_name: str | None = None,
model_labels: dict[str, str] | None = None,
disable_early_stopping: bool = False,
export_evaluated_data_items: bool = False,
export_evaluated_data_items_bigquery_destination_uri: str | None = None,
export_evaluated_data_items_override_destination: bool = False,
sync: bool = True,
parent_model: str | None = None,
is_default_version: bool | None = None,
model_version_aliases: list[str] | None = None,
model_version_description: str | None = None,
) -> tuple[models.Model | None, str]:
"""
Create an AutoML Tabular Training Job.
:param project_id: Required. Project to run training in.
:param region: Required. Location to run training in.
:param display_name: Required. The user-defined name of this TrainingPipeline.
:param dataset: Required. The dataset within the same Project from which data will be used to train
the Model. The Dataset must use schema compatible with Model being trained, and what is
compatible should be described in the used TrainingPipeline's [training_task_definition]
[google.cloud.aiplatform.v1beta1.TrainingPipeline.training_task_definition]. For tabular
Datasets, all their data is exported to training, to pick and choose from.
:param target_column: Required. The name of the column values of which the Model is to predict.
:param parent_model: Optional. The resource name or model ID of an existing model.
The new model uploaded by this job will be a version of `parent_model`.
Only set this field when training a new version of an existing model.
:param is_default_version: Optional. When set to True, the newly uploaded model version will
automatically have alias "default" included. Subsequent uses of
the model produced by this job without a version specified will
use this "default" version.
When set to False, the "default" alias will not be moved.
Actions targeting the model version produced by this job will need
to specifically reference this version by ID or alias.
New model uploads, i.e. version 1, will always be "default" aliased.
:param model_version_aliases: Optional. User provided version aliases so that the model version
uploaded by this job can be referenced via alias instead of
auto-generated version ID. A default version alias will be created
for the first version of the model.
The format is [a-z][a-zA-Z0-9-]{0,126}[a-z0-9]
:param model_version_description: Optional. The description of the model version
being uploaded by this job.
:param optimization_prediction_type: The type of prediction the Model is to produce.
"classification" - Predict one out of multiple target values is picked for each row.
"regression" - Predict a value based on its relation to other values. This type is available only
to columns that contain semantically numeric values, i.e. integers or floating point number, even
if stored as e.g. strings.
:param optimization_objective: Optional. Objective function the Model is to be optimized towards.
The training task creates a Model that maximizes/minimizes the value of the objective function
over the validation set.
The supported optimization objectives depend on the prediction type, and in the case of
classification also the number of distinct values in the target column (two distinct values
-> binary, 3 or more distinct values -> multi class). If the field is not set, the default
objective function is used.
Classification (binary):
"maximize-au-roc" (default) - Maximize the area under the receiver operating characteristic (ROC)
curve.
"minimize-log-loss" - Minimize log loss.
"maximize-au-prc" - Maximize the area under the precision-recall curve.
"maximize-precision-at-recall" - Maximize precision for a specified recall value.
"maximize-recall-at-precision" - Maximize recall for a specified precision value.
Classification (multi class):
"minimize-log-loss" (default) - Minimize log loss.
Regression:
"minimize-rmse" (default) - Minimize root-mean-squared error (RMSE).
"minimize-mae" - Minimize mean-absolute error (MAE).
"minimize-rmsle" - Minimize root-mean-squared log error (RMSLE).
:param column_specs: Optional. Alternative to column_transformations where the keys of the dict are
column names and their respective values are one of AutoMLTabularTrainingJob.column_data_types.
When creating transformation for BigQuery Struct column, the column should be flattened using "."
as the delimiter. Only columns with no child should have a transformation. If an input column has
no transformations on it, such a column is ignored by the training, except for the targetColumn,
which should have no transformations defined on. Only one of column_transformations or
column_specs should be passed.
:param column_transformations: Optional. Transformations to apply to the input columns (i.e. columns
other than the targetColumn). Each transformation may produce multiple result values from the
column's value, and all are used for training. When creating transformation for BigQuery Struct
column, the column should be flattened using "." as the delimiter. Only columns with no child
should have a transformation. If an input column has no transformations on it, such a column is
ignored by the training, except for the targetColumn, which should have no transformations
defined on. Only one of column_transformations or column_specs should be passed. Consider using
column_specs as column_transformations will be deprecated eventually.
:param optimization_objective_recall_value: Optional. Required when maximize-precision-at-recall
optimizationObjective was picked, represents the recall value at which the optimization is done.
The minimum value is 0 and the maximum is 1.0.
:param optimization_objective_precision_value: Optional. Required when maximize-recall-at-precision
optimizationObjective was picked, represents the precision value at which the optimization is
done.
The minimum value is 0 and the maximum is 1.0.
:param labels: Optional. The labels with user-defined metadata to organize TrainingPipelines. Label
keys and values can be no longer than 64 characters (Unicode codepoints), can only contain
lowercase letters, numeric characters, underscores and dashes. International characters are
allowed. See https://goo.gl/xmQnxf for more information and examples of labels.
:param training_encryption_spec_key_name: Optional. The Cloud KMS resource identifier of the customer
managed encryption key used to protect the training pipeline. Has the form:
``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. The key needs to be
in the same region as where the compute resource is created. If set, this TrainingPipeline will
be secured by this key.
Note: Model trained by this TrainingPipeline is also secured by this key if ``model_to_upload``
is not set separately.
:param model_encryption_spec_key_name: Optional. The Cloud KMS resource identifier of the customer
managed encryption key used to protect the model. Has the form:
``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. The key needs to be
in the same region as where the compute resource is created. If set, the trained Model will be
secured by this key.
:param training_fraction_split: Optional. The fraction of the input data that is to be used to train
the Model. This is ignored if Dataset is not provided.
:param validation_fraction_split: Optional. The fraction of the input data that is to be used to
validate the Model. This is ignored if Dataset is not provided.
:param test_fraction_split: Optional. The fraction of the input data that is to be used to evaluate
the Model. This is ignored if Dataset is not provided.
:param predefined_split_column_name: Optional. The key is a name of one of the Dataset's data
columns. The value of the key (either the label's value or value in the column) must be one of
{``training``, ``validation``, ``test``}, and it defines to which set the given piece of data is
assigned. If for a piece of data the key is not present or has an invalid value, that piece is
ignored by the pipeline. Supported only for tabular and time series Datasets.
:param timestamp_split_column_name: Optional. The key is a name of one of the Dataset's data columns.
The value of the key values of the key (the values in the column) must be in RFC 3339 `date-time`
format, where `time-offset` = `"Z"` (e.g. 1985-04-12T23:20:50.52Z). If for a piece of data the
key is not present or has an invalid value, that piece is ignored by the pipeline. Supported only
for tabular and time series Datasets. This parameter must be used with training_fraction_split,
validation_fraction_split and test_fraction_split.
:param weight_column: Optional. Name of the column that should be used as the weight column. Higher
values in this column give more importance to the row during Model training. The column must have
numeric values between 0 and 10000 inclusively, and 0 value means that the row is ignored. If the
weight column field is not set, then all rows are assumed to have equal weight of 1.
:param budget_milli_node_hours (int): Optional. The train budget of creating this Model, expressed in
milli node hours i.e. 1,000 value in this field means 1 node hour. The training cost of the model
will not exceed this budget. The final cost will be attempted to be close to the budget, though
may end up being (even) noticeably smaller - at the backend's discretion. This especially may
happen when further model training ceases to provide any improvements. If the budget is set to a
value known to be insufficient to train a Model for the given training set, the training won't be
attempted and will error. The minimum value is 1000 and the maximum is 72000.
:param model_display_name: Optional. If the script produces a managed Vertex AI Model. The display
name of the Model. The name can be up to 128 characters long and can be consist of any UTF-8
characters. If not provided upon creation, the job's display_name is used.
:param model_labels: Optional. The labels with user-defined metadata to organize your Models. Label
keys and values can be no longer than 64 characters (Unicode codepoints), can only contain
lowercase letters, numeric characters, underscores and dashes. International characters are
allowed. See https://goo.gl/xmQnxf for more information and examples of labels.
:param disable_early_stopping: Required. If true, the entire budget is used. This disables the early
stopping feature. By default, the early stopping feature is enabled, which means that training
might stop before the entire training budget has been used, if further training does no longer
brings significant improvement to the model.
:param export_evaluated_data_items: Whether to export the test set predictions to a BigQuery table.
If False, then the export is not performed.
:param export_evaluated_data_items_bigquery_destination_uri: Optional. URI of desired destination
BigQuery table for exported test set predictions.
Expected format: ``bq://<project_id>:<dataset_id>:<table>``
If not specified, then results are exported to the following auto-created BigQuery table:
``<project_id>:export_evaluated_examples_<model_name>_<yyyy_MM_dd'T'HH_mm_ss_SSS'Z'>
.evaluated_examples``
Applies only if [export_evaluated_data_items] is True.
:param export_evaluated_data_items_override_destination: Whether to override the contents of
[export_evaluated_data_items_bigquery_destination_uri], if the table exists, for exported test
set predictions. If False, and the table exists, then the training job will fail. Applies only if
[export_evaluated_data_items] is True and [export_evaluated_data_items_bigquery_destination_uri]
is specified.
:param sync: Whether to execute this method synchronously. If False, this method will be executed in
concurrent Future and any downstream object will be immediately returned and synced when the
Future has completed.
"""
if column_transformations:
warnings.warn(
"Consider using column_specs as column_transformations will be deprecated eventually.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
self._job = self.get_auto_ml_tabular_training_job(
project=project_id,
location=region,
display_name=display_name,
optimization_prediction_type=optimization_prediction_type,
optimization_objective=optimization_objective,
column_specs=column_specs,
column_transformations=column_transformations,
optimization_objective_recall_value=optimization_objective_recall_value,
optimization_objective_precision_value=optimization_objective_precision_value,
labels=labels,
training_encryption_spec_key_name=training_encryption_spec_key_name,
model_encryption_spec_key_name=model_encryption_spec_key_name,
)
if not self._job:
raise AirflowException("AutoMLTabularTrainingJob was not created")
model = self._job.run(
dataset=dataset,
target_column=target_column,
training_fraction_split=training_fraction_split,
validation_fraction_split=validation_fraction_split,
test_fraction_split=test_fraction_split,
predefined_split_column_name=predefined_split_column_name,
timestamp_split_column_name=timestamp_split_column_name,
weight_column=weight_column,
budget_milli_node_hours=budget_milli_node_hours,
model_display_name=model_display_name,
model_labels=model_labels,
disable_early_stopping=disable_early_stopping,
export_evaluated_data_items=export_evaluated_data_items,
export_evaluated_data_items_bigquery_destination_uri=(
export_evaluated_data_items_bigquery_destination_uri
),
export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination,
sync=sync,
parent_model=parent_model,
is_default_version=is_default_version,
model_version_aliases=model_version_aliases,
model_version_description=model_version_description,
)
training_id = self.extract_training_id(self._job.resource_name)
if model:
model.wait()
else:
self.log.warning(
"Training did not produce a Managed Model returning None. Training Pipeline is not "
"configured to upload a Model."
)
return model, training_id
@GoogleBaseHook.fallback_to_default_project_id
def create_auto_ml_forecasting_training_job(
self,
project_id: str,
region: str,
display_name: str,
dataset: datasets.TimeSeriesDataset,
target_column: str,
time_column: str,
time_series_identifier_column: str,
unavailable_at_forecast_columns: list[str],
available_at_forecast_columns: list[str],
forecast_horizon: int,
data_granularity_unit: str,
data_granularity_count: int,
optimization_objective: str | None = None,
column_specs: dict[str, str] | None = None,
column_transformations: list[dict[str, dict[str, str]]] | None = None,
labels: dict[str, str] | None = None,
training_encryption_spec_key_name: str | None = None,
model_encryption_spec_key_name: str | None = None,
training_fraction_split: float | None = None,
validation_fraction_split: float | None = None,
test_fraction_split: float | None = None,
predefined_split_column_name: str | None = None,
weight_column: str | None = None,
time_series_attribute_columns: list[str] | None = None,
context_window: int | None = None,
export_evaluated_data_items: bool = False,
export_evaluated_data_items_bigquery_destination_uri: str | None = None,
export_evaluated_data_items_override_destination: bool = False,
quantiles: list[float] | None = None,
validation_options: str | None = None,
budget_milli_node_hours: int = 1000,
model_display_name: str | None = None,
model_labels: dict[str, str] | None = None,
sync: bool = True,
parent_model: str | None = None,
is_default_version: bool | None = None,
model_version_aliases: list[str] | None = None,
model_version_description: str | None = None,
) -> tuple[models.Model | None, str]:
"""
Create an AutoML Forecasting Training Job.
:param project_id: Required. Project to run training in.
:param region: Required. Location to run training in.
:param display_name: Required. The user-defined name of this TrainingPipeline.
:param dataset: Required. The dataset within the same Project from which data will be used to train
the Model. The Dataset must use schema compatible with Model being trained, and what is
compatible should be described in the used TrainingPipeline's [training_task_definition]
[google.cloud.aiplatform.v1beta1.TrainingPipeline.training_task_definition]. For time series
Datasets, all their data is exported to training, to pick and choose from.
:param target_column: Required. Name of the column that the Model is to predict values for.
:param time_column: Required. Name of the column that identifies time order in the time series.
:param parent_model: Optional. The resource name or model ID of an existing model.
The new model uploaded by this job will be a version of `parent_model`.
Only set this field when training a new version of an existing model.
:param is_default_version: Optional. When set to True, the newly uploaded model version will
automatically have alias "default" included. Subsequent uses of
the model produced by this job without a version specified will
use this "default" version.
When set to False, the "default" alias will not be moved.
Actions targeting the model version produced by this job will need
to specifically reference this version by ID or alias.
New model uploads, i.e. version 1, will always be "default" aliased.
:param model_version_aliases: Optional. User provided version aliases so that the model version
uploaded by this job can be referenced via alias instead of
auto-generated version ID. A default version alias will be created
for the first version of the model.
The format is [a-z][a-zA-Z0-9-]{0,126}[a-z0-9]
:param model_version_description: Optional. The description of the model version
being uploaded by this job.
:param time_series_identifier_column: Required. Name of the column that identifies the time series.
:param unavailable_at_forecast_columns: Required. Column names of columns that are unavailable at
forecast. Each column contains information for the given entity (identified by the
[time_series_identifier_column]) that is unknown before the forecast (e.g. population of a city
in a given year, or weather on a given day).
:param available_at_forecast_columns: Required. Column names of columns that are available at
forecast. Each column contains information for the given entity (identified by the
[time_series_identifier_column]) that is known at forecast.
:param forecast_horizon: Required. The amount of time into the future for which forecasted values for
the target are returned. Expressed in number of units defined by the [data_granularity_unit] and
[data_granularity_count] field. Inclusive.
:param data_granularity_unit: Required. The data granularity unit. Accepted values are ``minute``,
``hour``, ``day``, ``week``, ``month``, ``year``.
:param data_granularity_count: Required. The number of data granularity units between data points in
the training data. If [data_granularity_unit] is `minute`, can be 1, 5, 10, 15, or 30. For all
other values of [data_granularity_unit], must be 1.
:param optimization_objective: Optional. Objective function the model is to be optimized towards. The
training process creates a Model that optimizes the value of the objective function over the
validation set. The supported optimization objectives:
"minimize-rmse" (default) - Minimize root-mean-squared error (RMSE).
"minimize-mae" - Minimize mean-absolute error (MAE).
"minimize-rmsle" - Minimize root-mean-squared log error (RMSLE).
"minimize-rmspe" - Minimize root-mean-squared percentage error (RMSPE).
"minimize-wape-mae" - Minimize the combination of weighted absolute percentage error (WAPE) and
mean-absolute-error (MAE).
"minimize-quantile-loss" - Minimize the quantile loss at the defined quantiles. (Set this
objective to build quantile forecasts.)
:param column_specs: Optional. Alternative to column_transformations where the keys of the dict are
column names and their respective values are one of AutoMLTabularTrainingJob.column_data_types.
When creating transformation for BigQuery Struct column, the column should be flattened using "."
as the delimiter. Only columns with no child should have a transformation. If an input column has
no transformations on it, such a column is ignored by the training, except for the targetColumn,
which should have no transformations defined on. Only one of column_transformations or
column_specs should be passed.
:param column_transformations: Optional. Transformations to apply to the input columns (i.e. columns
other than the targetColumn). Each transformation may produce multiple result values from the
column's value, and all are used for training. When creating transformation for BigQuery Struct
column, the column should be flattened using "." as the delimiter. Only columns with no child
should have a transformation. If an input column has no transformations on it, such a column is
ignored by the training, except for the targetColumn, which should have no transformations
defined on. Only one of column_transformations or column_specs should be passed. Consider using
column_specs as column_transformations will be deprecated eventually.
:param labels: Optional. The labels with user-defined metadata to organize TrainingPipelines. Label
keys and values can be no longer than 64 characters (Unicode codepoints), can only contain
lowercase letters, numeric characters, underscores and dashes. International characters are
allowed. See https://goo.gl/xmQnxf for more information and examples of labels.
:param training_encryption_spec_key_name: Optional. The Cloud KMS resource identifier of the customer
managed encryption key used to protect the training pipeline. Has the form:
``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. The key needs to be
in the same region as where the compute resource is created. If set, this TrainingPipeline will
be secured by this key.
Note: Model trained by this TrainingPipeline is also secured by this key if ``model_to_upload``
is not set separately.
:param model_encryption_spec_key_name: Optional. The Cloud KMS resource identifier of the customer
managed encryption key used to protect the model. Has the form:
``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. The key needs to be
in the same region as where the compute resource is created.
If set, the trained Model will be secured by this key.
:param training_fraction_split: Optional. The fraction of the input data that is to be used to train
the Model. This is ignored if Dataset is not provided.
:param validation_fraction_split: Optional. The fraction of the input data that is to be used to
validate the Model. This is ignored if Dataset is not provided.
:param test_fraction_split: Optional. The fraction of the input data that is to be used to evaluate
the Model. This is ignored if Dataset is not provided.
:param predefined_split_column_name: Optional. The key is a name of one of the Dataset's data
columns. The value of the key (either the label's value or value in the column) must be one of
{``TRAIN``, ``VALIDATE``, ``TEST``}, and it defines to which set the given piece of data is
assigned. If for a piece of data the key is not present or has an invalid value, that piece is
ignored by the pipeline.
Supported only for tabular and time series Datasets.
:param weight_column: Optional. Name of the column that should be used as the weight column. Higher
values in this column give more importance to the row during Model training. The column must have
numeric values between 0 and 10000 inclusively, and 0 value means that the row is ignored. If the
weight column field is not set, then all rows are assumed to have equal weight of 1.
:param time_series_attribute_columns: Optional. Column names that should be used as attribute
columns. Each column is constant within a time series.
:param context_window: Optional. The amount of time into the past training and prediction data is
used for model training and prediction respectively. Expressed in number of units defined by the
[data_granularity_unit] and [data_granularity_count] fields. When not provided uses the default
value of 0 which means the model sets each series context window to be 0 (also known as "cold
start"). Inclusive.
:param export_evaluated_data_items: Whether to export the test set predictions to a BigQuery table.
If False, then the export is not performed.
:param export_evaluated_data_items_bigquery_destination_uri: Optional. URI of desired destination
BigQuery table for exported test set predictions. Expected format:
``bq://<project_id>:<dataset_id>:<table>``
If not specified, then results are exported to the following auto-created BigQuery table:
``<project_id>:export_evaluated_examples_<model_name>_<yyyy_MM_dd'T'HH_mm_ss_SSS'Z'>
.evaluated_examples``
Applies only if [export_evaluated_data_items] is True.
:param export_evaluated_data_items_override_destination: Whether to override the contents of
[export_evaluated_data_items_bigquery_destination_uri], if the table exists, for exported test
set predictions. If False, and the table exists, then the training job will fail.
Applies only if [export_evaluated_data_items] is True and
[export_evaluated_data_items_bigquery_destination_uri] is specified.
:param quantiles: Quantiles to use for the `minizmize-quantile-loss`
[AutoMLForecastingTrainingJob.optimization_objective]. This argument is required in this case.
Accepts up to 5 quantiles in the form of a double from 0 to 1, exclusive. Each quantile must be
unique.
:param validation_options: Validation options for the data validation component. The available
options are: "fail-pipeline" - (default), will validate against the validation and fail the
pipeline if it fails. "ignore-validation" - ignore the results of the validation and continue the
pipeline
:param budget_milli_node_hours: Optional. The train budget of creating this Model, expressed in milli
node hours i.e. 1,000 value in this field means 1 node hour. The training cost of the model will
not exceed this budget. The final cost will be attempted to be close to the budget, though may
end up being (even) noticeably smaller - at the backend's discretion. This especially may happen
when further model training ceases to provide any improvements. If the budget is set to a value
known to be insufficient to train a Model for the given training set, the training won't be
attempted and will error. The minimum value is 1000 and the maximum is 72000.
:param model_display_name: Optional. If the script produces a managed Vertex AI Model. The display
name of the Model. The name can be up to 128 characters long and can be consist of any UTF-8
characters. If not provided upon creation, the job's display_name is used.
:param model_labels: Optional. The labels with user-defined metadata to organize your Models. Label
keys and values can be no longer than 64 characters (Unicode codepoints), can only contain
lowercase letters, numeric characters, underscores and dashes. International characters are
allowed. See https://goo.gl/xmQnxf for more information and examples of labels.
:param sync: Whether to execute this method synchronously. If False, this method will be executed in
concurrent Future and any downstream object will be immediately returned and synced when the
Future has completed.
"""
if column_transformations:
warnings.warn(
"Consider using column_specs as column_transformations will be deprecated eventually.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
self._job = self.get_auto_ml_forecasting_training_job(
project=project_id,
location=region,
display_name=display_name,
optimization_objective=optimization_objective,
column_specs=column_specs,
column_transformations=column_transformations,
labels=labels,
training_encryption_spec_key_name=training_encryption_spec_key_name,
model_encryption_spec_key_name=model_encryption_spec_key_name,
)
if not self._job:
raise AirflowException("AutoMLForecastingTrainingJob was not created")
model = self._job.run(
dataset=dataset,
target_column=target_column,
time_column=time_column,
time_series_identifier_column=time_series_identifier_column,
unavailable_at_forecast_columns=unavailable_at_forecast_columns,
available_at_forecast_columns=available_at_forecast_columns,
forecast_horizon=forecast_horizon,
data_granularity_unit=data_granularity_unit,
data_granularity_count=data_granularity_count,
training_fraction_split=training_fraction_split,
validation_fraction_split=validation_fraction_split,
test_fraction_split=test_fraction_split,
predefined_split_column_name=predefined_split_column_name,
weight_column=weight_column,
time_series_attribute_columns=time_series_attribute_columns,
context_window=context_window,
export_evaluated_data_items=export_evaluated_data_items,
export_evaluated_data_items_bigquery_destination_uri=(
export_evaluated_data_items_bigquery_destination_uri
),
export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination,
quantiles=quantiles,
validation_options=validation_options,
budget_milli_node_hours=budget_milli_node_hours,
model_display_name=model_display_name,
model_labels=model_labels,
sync=sync,
parent_model=parent_model,
is_default_version=is_default_version,
model_version_aliases=model_version_aliases,
model_version_description=model_version_description,
)
training_id = self.extract_training_id(self._job.resource_name)
if model:
model.wait()
else:
self.log.warning(
"Training did not produce a Managed Model returning None. Training Pipeline is not "
"configured to upload a Model."
)
return model, training_id
@GoogleBaseHook.fallback_to_default_project_id
def create_auto_ml_image_training_job(
self,
project_id: str,
region: str,
display_name: str,
dataset: datasets.ImageDataset,
prediction_type: str = "classification",
multi_label: bool = False,
model_type: str = "CLOUD",
base_model: models.Model | None = None,
labels: dict[str, str] | None = None,
training_encryption_spec_key_name: str | None = None,
model_encryption_spec_key_name: str | None = None,
training_fraction_split: float | None = None,
validation_fraction_split: float | None = None,
test_fraction_split: float | None = None,
training_filter_split: str | None = None,
validation_filter_split: str | None = None,
test_filter_split: str | None = None,
budget_milli_node_hours: int | None = None,
model_display_name: str | None = None,
model_labels: dict[str, str] | None = None,
disable_early_stopping: bool = False,
sync: bool = True,
parent_model: str | None = None,
is_default_version: bool | None = None,
model_version_aliases: list[str] | None = None,
model_version_description: str | None = None,
) -> tuple[models.Model | None, str]:
"""
Create an AutoML Image Training Job.
:param project_id: Required. Project to run training in.
:param region: Required. Location to run training in.
:param display_name: Required. The user-defined name of this TrainingPipeline.
:param dataset: Required. The dataset within the same Project from which data will be used to train
the Model. The Dataset must use schema compatible with Model being trained, and what is
compatible should be described in the used TrainingPipeline's [training_task_definition]
[google.cloud.aiplatform.v1beta1.TrainingPipeline.training_task_definition]. For tabular
Datasets, all their data is exported to training, to pick and choose from.
:param prediction_type: The type of prediction the Model is to produce, one of:
"classification" - Predict one out of multiple target values is picked for each row.
"object_detection" - Predict a value based on its relation to other values. This type is
available only to columns that contain semantically numeric values, i.e. integers or floating
point number, even if stored as e.g. strings.
:param parent_model: Optional. The resource name or model ID of an existing model.
The new model uploaded by this job will be a version of `parent_model`.
Only set this field when training a new version of an existing model.
:param is_default_version: Optional. When set to True, the newly uploaded model version will
automatically have alias "default" included. Subsequent uses of
the model produced by this job without a version specified will
use this "default" version.
When set to False, the "default" alias will not be moved.
Actions targeting the model version produced by this job will need
to specifically reference this version by ID or alias.
New model uploads, i.e. version 1, will always be "default" aliased.
:param model_version_aliases: Optional. User provided version aliases so that the model version
uploaded by this job can be referenced via alias instead of
auto-generated version ID. A default version alias will be created
for the first version of the model.
The format is [a-z][a-zA-Z0-9-]{0,126}[a-z0-9]
:param model_version_description: Optional. The description of the model version
being uploaded by this job.
:param multi_label: Required. Default is False. If false, a single-label (multi-class) Model will be
trained (i.e. assuming that for each image just up to one annotation may be applicable). If true,
a multi-label Model will be trained (i.e. assuming that for each image multiple annotations may
be applicable).
This is only applicable for the "classification" prediction_type and will be ignored otherwise.
:param model_type: Required. One of the following:
"CLOUD" - Default for Image Classification. A Model best tailored to be used within Google Cloud,
and which cannot be exported.
"CLOUD_HIGH_ACCURACY_1" - Default for Image Object Detection. A model best tailored to be used
within Google Cloud, and which cannot be exported. Expected to have a higher latency, but should
also have a higher prediction quality than other cloud models.
"CLOUD_LOW_LATENCY_1" - A model best tailored to be used within Google Cloud, and which cannot be
exported. Expected to have a low latency, but may have lower prediction quality than other cloud
models.
"MOBILE_TF_LOW_LATENCY_1" - A model that, in addition to being available within Google Cloud, can
also be exported as TensorFlow or Core ML model and used on a mobile or edge device afterwards.
Expected to have low latency, but may have lower prediction quality than other mobile models.
"MOBILE_TF_VERSATILE_1" - A model that, in addition to being available within Google Cloud, can
also be exported as TensorFlow or Core ML model and used on a mobile or edge device with
afterwards.
"MOBILE_TF_HIGH_ACCURACY_1" - A model that, in addition to being available within Google Cloud,
can also be exported as TensorFlow or Core ML model and used on a mobile or edge device
afterwards. Expected to have a higher latency, but should also have a higher prediction quality
than other mobile models.
:param base_model: Optional. Only permitted for Image Classification models. If it is specified, the
new model will be trained based on the `base` model. Otherwise, the new model will be trained
from scratch. The `base` model must be in the same Project and Location as the new Model to
train, and have the same model_type.
:param labels: Optional. The labels with user-defined metadata to organize TrainingPipelines. Label
keys and values can be no longer than 64 characters (Unicode codepoints), can only contain
lowercase letters, numeric characters, underscores and dashes. International characters are
allowed. See https://goo.gl/xmQnxf for more information and examples of labels.
:param training_encryption_spec_key_name: Optional. The Cloud KMS resource identifier of the customer
managed encryption key used to protect the training pipeline. Has the form:
``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``. The key needs to be
in the same region as where the compute resource is created. If set, this TrainingPipeline will
be secured by this key.
Note: Model trained by this TrainingPipeline is also secured by this key if ``model_to_upload``
is not set separately.
:param model_encryption_spec_key_name: Optional. The Cloud KMS resource identifier of the customer
managed encryption key used to protect the model. Has the form:
``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``.
The key needs to be in the same region as where the compute resource is created.
If set, the trained Model will be secured by this key.
:param training_fraction_split: Optional. The fraction of the input data that is to be used to train
the Model. This is ignored if Dataset is not provided.
:param validation_fraction_split: Optional. The fraction of the input data that is to be used to
validate the Model. This is ignored if Dataset is not provided.
:param test_fraction_split: Optional. The fraction of the input data that is to be used to evaluate
the Model. This is ignored if Dataset is not provided.
:param training_filter_split: Optional. A filter on DataItems of the Dataset. DataItems that match
this filter are used to train the Model. A filter with same syntax as the one used in
DatasetService.ListDataItems may be used. If a single DataItem is matched by more than one of the
FilterSplit filters, then it is assigned to the first set that applies to it in the training,
validation, test order. This is ignored if Dataset is not provided.
:param validation_filter_split: Optional. A filter on DataItems of the Dataset. DataItems that match
this filter are used to validate the Model. A filter with same syntax as the one used in
DatasetService.ListDataItems may be used. If a single DataItem is matched by more than one of the
FilterSplit filters, then it is assigned to the first set that applies to it in the training,
validation, test order. This is ignored if Dataset is not provided.
:param test_filter_split: Optional. A filter on DataItems of the Dataset. DataItems that match this
filter are used to test the Model. A filter with same syntax as the one used in
DatasetService.ListDataItems may be used. If a single DataItem is matched by more than one of the
FilterSplit filters, then it is assigned to the first set that applies to it in the training,
validation, test order. This is ignored if Dataset is not provided.
:param budget_milli_node_hours: Optional. The train budget of creating this Model, expressed in milli
node hours i.e. 1,000 value in this field means 1 node hour.
Defaults by `prediction_type`:
`classification` - For Cloud models the budget must be: 8,000 - 800,000 milli node hours
(inclusive). The default value is 192,000 which represents one day in wall time, assuming 8 nodes
are used.
`object_detection` - For Cloud models the budget must be: 20,000 - 900,000 milli node hours
(inclusive). The default value is 216,000 which represents one day in wall time, assuming 9 nodes
are used.
The training cost of the model will not exceed this budget. The final cost will be attempted to
be close to the budget, though may end up being (even) noticeably smaller - at the backend's
discretion. This especially may happen when further model training ceases to provide any
improvements. If the budget is set to a value known to be insufficient to train a Model for the
given training set, the training won't be attempted and will error.
:param model_display_name: Optional. The display name of the managed Vertex AI Model. The name can be
up to 128 characters long and can be consist of any UTF-8 characters. If not provided upon
creation, the job's display_name is used.
:param model_labels: Optional. The labels with user-defined metadata to organize your Models. Label
keys and values can be no longer than 64 characters (Unicode codepoints), can only contain
lowercase letters, numeric characters, underscores and dashes. International characters are
allowed. See https://goo.gl/xmQnxf for more information and examples of labels.
:param disable_early_stopping: Required. If true, the entire budget is used. This disables the early
stopping feature. By default, the early stopping feature is enabled, which means that training
might stop before the entire training budget has been used, if further training does no longer
brings significant improvement to the model.
:param sync: Whether to execute this method synchronously. If False, this method will be executed in
concurrent Future and any downstream object will be immediately returned and synced when the
Future has completed.
"""
self._job = self.get_auto_ml_image_training_job(
project=project_id,
location=region,
display_name=display_name,
prediction_type=prediction_type,
multi_label=multi_label,
model_type=model_type,
base_model=base_model,
labels=labels,
training_encryption_spec_key_name=training_encryption_spec_key_name,
model_encryption_spec_key_name=model_encryption_spec_key_name,
)
if not self._job:
raise AirflowException("AutoMLImageTrainingJob was not created")
model = self._job.run(
dataset=dataset,
training_fraction_split=training_fraction_split,
validation_fraction_split=validation_fraction_split,
test_fraction_split=test_fraction_split,
training_filter_split=training_filter_split,
validation_filter_split=validation_filter_split,
test_filter_split=test_filter_split,
budget_milli_node_hours=budget_milli_node_hours,
model_display_name=model_display_name,
model_labels=model_labels,
disable_early_stopping=disable_early_stopping,
sync=sync,
parent_model=parent_model,
is_default_version=is_default_version,
model_version_aliases=model_version_aliases,
model_version_description=model_version_description,
)
training_id = self.extract_training_id(self._job.resource_name)
if model:
model.wait()
else:
self.log.warning(
"Training did not produce a Managed Model returning None. AutoML Image Training "
"Pipeline is not configured to upload a Model."
)
return model, training_id
@GoogleBaseHook.fallback_to_default_project_id
def create_auto_ml_text_training_job(
self,
project_id: str,
region: str,
display_name: str,
dataset: datasets.TextDataset,
prediction_type: str,
multi_label: bool = False,
sentiment_max: int = 10,
labels: dict[str, str] | None = None,
training_encryption_spec_key_name: str | None = None,
model_encryption_spec_key_name: str | None = None,
training_fraction_split: float | None = None,
validation_fraction_split: float | None = None,
test_fraction_split: float | None = None,
training_filter_split: str | None = None,
validation_filter_split: str | None = None,
test_filter_split: str | None = None,
model_display_name: str | None = None,
model_labels: dict[str, str] | None = None,
sync: bool = True,
parent_model: str | None = None,
is_default_version: bool | None = None,
model_version_aliases: list[str] | None = None,
model_version_description: str | None = None,
) -> tuple[models.Model | None, str]:
"""