-
Notifications
You must be signed in to change notification settings - Fork 639
/
task.py
4126 lines (3506 loc) · 186 KB
/
task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import atexit
import json
import os
import shutil
import signal
import sys
import threading
import time
from argparse import ArgumentParser
from logging import getLogger
from operator import attrgetter
from tempfile import mkstemp, mkdtemp
from zipfile import ZipFile, ZIP_DEFLATED
try:
# noinspection PyCompatibility
from collections.abc import Sequence as CollectionsSequence
except ImportError:
from collections import Sequence as CollectionsSequence
from typing import Optional, Union, Mapping, Sequence, Any, Dict, Iterable, TYPE_CHECKING, Callable, Tuple, List
import psutil
import six
from pathlib2 import Path
from .backend_config.defs import get_active_config_file, get_config_file
from .backend_api.services import tasks, projects
from .backend_api.session.session import (
Session, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_HOST, ENV_WEB_HOST, ENV_FILES_HOST, )
from .backend_api.session.defs import ENV_DEFERRED_TASK_INIT
from .backend_interface.metrics import Metrics
from .backend_interface.model import Model as BackendModel
from .backend_interface.task import Task as _Task
from .backend_interface.task.log import TaskHandler
from .backend_interface.task.development.worker import DevWorker
from .backend_interface.task.repo import ScriptInfo
from .backend_interface.task.models import TaskModels
from .backend_interface.util import (
get_single_result,
exact_match_regex,
make_message,
mutually_exclusive,
get_queue_id,
get_or_create_project,
)
from .binding.absl_bind import PatchAbsl
from .binding.artifacts import Artifacts, Artifact
from .binding.environ_bind import EnvironmentBind, PatchOsFork
from .binding.frameworks.fastai_bind import PatchFastai
from .binding.frameworks.lightgbm_bind import PatchLIGHTgbmModelIO
from .binding.frameworks.pytorch_bind import PatchPyTorchModelIO
from .binding.frameworks.tensorflow_bind import TensorflowBinding
from .binding.frameworks.xgboost_bind import PatchXGBoostModelIO
from .binding.frameworks.catboost_bind import PatchCatBoostModelIO
from .binding.frameworks.megengine_bind import PatchMegEngineModelIO
from .binding.joblib_bind import PatchedJoblib
from .binding.matplotlib_bind import PatchedMatplotlib
from .binding.hydra_bind import PatchHydra
from .binding.click_bind import PatchClick
from .binding.fire_bind import PatchFire
from .binding.jsonargs_bind import PatchJsonArgParse
from .binding.frameworks import WeightsFileHandler
from .config import (
config, DEV_TASK_NO_REUSE, get_is_master_node, DEBUG_SIMULATE_REMOTE_TASK, DEV_DEFAULT_OUTPUT_URI,
deferred_config, TASK_SET_ITERATION_OFFSET, )
from .config import running_remotely, get_remote_task_id
from .config.cache import SessionCache
from .debugging.log import LoggerRoot
from .errors import UsageError
from .logger import Logger
from .model import Model, InputModel, OutputModel, Framework
from .task_parameters import TaskParameters
from .utilities.config import verify_basic_value
from .binding.args import (
argparser_parseargs_called, get_argparser_last_args,
argparser_update_currenttask, )
from .utilities.dicts import ReadOnlyDict, merge_dicts
from .utilities.proxy_object import (
ProxyDictPreWrite, ProxyDictPostWrite, flatten_dictionary,
nested_from_flat_dictionary, naive_nested_from_flat_dictionary, )
from .utilities.resource_monitor import ResourceMonitor
from .utilities.seed import make_deterministic
from .utilities.lowlevel.threads import get_current_thread_id
from .utilities.process.mp import BackgroundMonitor, leave_process
from .utilities.matching import matches_any_wildcard
from .utilities.parallel import FutureTaskCaller
# noinspection PyProtectedMember
from .backend_interface.task.args import _Arguments
if TYPE_CHECKING:
import pandas
import numpy
from PIL import Image
class Task(_Task):
"""
The ``Task`` class is a code template for a Task object which, together with its connected experiment components,
represents the current running experiment. These connected components include hyperparameters, loggers,
configuration, label enumeration, models, and other artifacts.
The term "main execution Task" refers to the Task context for current running experiment. Python experiment scripts
can create one, and only one, main execution Task. It is traceable, and after a script runs and ClearML stores
the Task in the **ClearML Server** (backend), it is modifiable, reproducible, executable by a worker, and you
can duplicate it for further experimentation.
The ``Task`` class and its methods allow you to create and manage experiments, as well as perform
advanced experimentation functions, such as autoML.
.. warning::
Do not construct Task objects directly. Use one of the methods listed below to create experiments or
reference existing experiments.
Do not define `CLEARML_TASK_*` and `CLEARML_PROC_*` OS environments, they are used internally
for bookkeeping between processes and agents.
For detailed information about creating Task objects, see the following methods:
- Create a new reproducible Task - :meth:`Task.init`
.. important::
In some cases, ``Task.init`` may return a Task object which is already stored in **ClearML Server** (already
initialized), instead of creating a new Task. For a detailed explanation of those cases, see the ``Task.init``
method.
- Manually create a new Task (no auto-logging will apply) - :meth:`Task.create`
- Get the current running Task - :meth:`Task.current_task`
- Get another (different) Task - :meth:`Task.get_task`
.. note::
The **ClearML** documentation often refers to a Task as, "Task (experiment)".
"Task" refers to the class in the ClearML Python Client Package, the object in your Python experiment script,
and the entity with which **ClearML Server** and **ClearML Agent** work.
"Experiment" refers to your deep learning solution, including its connected components, inputs, and outputs,
and is the experiment you can view, analyze, compare, modify, duplicate, and manage using the ClearML
**Web-App** (UI).
Therefore, a "Task" is effectively an "experiment", and "Task (experiment)" encompasses its usage throughout
the ClearML.
The exception to this Task behavior is sub-tasks (non-reproducible Tasks), which do not use the main execution
Task. Creating a sub-task always creates a new Task with a new Task ID.
"""
TaskTypes = _Task.TaskTypes
NotSet = object()
__create_protection = object()
__main_task = None # type: Optional[Task]
__exit_hook = None
__forked_proc_main_pid = None
__task_id_reuse_time_window_in_hours = deferred_config('development.task_reuse_time_window_in_hours', 24.0, float)
__detect_repo_async = deferred_config('development.vcs_repo_detect_async', False)
__default_output_uri = DEV_DEFAULT_OUTPUT_URI.get() or deferred_config('development.default_output_uri', None)
class _ConnectedParametersType(object):
argparse = "argument_parser"
dictionary = "dictionary"
task_parameters = "task_parameters"
@classmethod
def _options(cls):
return {
var for var, val in vars(cls).items()
if isinstance(val, six.string_types)
}
def __init__(self, private=None, **kwargs):
"""
.. warning::
**Do not construct Task manually!**
Please use :meth:`Task.init` or :meth:`Task.get_task`
"""
if private is not Task.__create_protection:
raise UsageError(
'Task object cannot be instantiated externally, use Task.current_task() or Task.get_task(...)')
self._repo_detect_lock = threading.RLock()
super(Task, self).__init__(**kwargs)
self._arguments = _Arguments(self)
self._logger = None
self._connected_output_model = None
self._dev_worker = None
self._connected_parameter_type = None
self._detect_repo_async_thread = None
self._resource_monitor = None
self._calling_filename = None
self._remote_functions_generated = {}
# register atexit, so that we mark the task as stopped
self._at_exit_called = False
@classmethod
def current_task(cls):
# type: () -> Task
"""
Get the current running Task (experiment). This is the main execution Task (task context) returned as a Task
object.
:return: The current running Task (experiment).
"""
# check if we have no main Task, but the main process created one.
if not cls.__main_task and cls.__get_master_id_task_id():
# initialize the Task, connect to stdout
cls.init()
# return main Task
return cls.__main_task
@classmethod
def init(
cls,
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=TaskTypes.training, # type: Task.TaskTypes
tags=None, # type: Optional[Sequence[str]]
reuse_last_task_id=True, # type: Union[bool, str]
continue_last_task=False, # type: Union[bool, str, int]
output_uri=None, # type: Optional[Union[str, bool]]
auto_connect_arg_parser=True, # type: Union[bool, Mapping[str, bool]]
auto_connect_frameworks=True, # type: Union[bool, Mapping[str, Union[bool, str, list]]]
auto_resource_monitoring=True, # type: bool
auto_connect_streams=True, # type: Union[bool, Mapping[str, bool]]
deferred_init=False, # type: bool
):
# type: (...) -> Task
"""
Creates a new Task (experiment) if:
- The Task never ran before. No Task with the same ``task_name`` and ``project_name`` is stored in
**ClearML Server**.
- The Task has run before (the same ``task_name`` and ``project_name``), and (a) it stored models and / or
artifacts, or (b) its status is Published , or (c) it is Archived.
- A new Task is forced by calling ``Task.init`` with ``reuse_last_task_id=False``.
Otherwise, the already initialized Task object for the same ``task_name`` and ``project_name`` is returned.
.. note::
To reference another Task, instead of initializing the same Task more than once, call
:meth:`Task.get_task`. For example, to "share" the same experiment in more than one script,
call ``Task.get_task``. See the ``Task.get_task`` method for an example.
For example:
The first time the following code runs, it will create a new Task. The status will be Completed.
.. code-block:: py
from clearml import Task
task = Task.init('myProject', 'myTask')
If this code runs again, it will not create a new Task. It does not store a model or artifact,
it is not Published (its status Completed) , it was not Archived, and a new Task is not forced.
If the Task is Published or Archived, and run again, it will create a new Task with a new Task ID.
The following code will create a new Task every time it runs, because it stores an artifact.
.. code-block:: py
task = Task.init('myProject', 'myOtherTask')
d = {'a': '1'}
task.upload_artifact('myArtifact', d)
:param str project_name: The name of the project in which the experiment will be created. If the project does
not exist, it is created. If ``project_name`` is ``None``, the repository name is used. (Optional)
:param str task_name: The name of Task (experiment). If ``task_name`` is ``None``, the Python experiment
script's file name is used. (Optional)
:param TaskTypes task_type: The task type.
Valid task types:
- ``TaskTypes.training`` (default)
- ``TaskTypes.testing``
- ``TaskTypes.inference``
- ``TaskTypes.data_processing``
- ``TaskTypes.application``
- ``TaskTypes.monitor``
- ``TaskTypes.controller``
- ``TaskTypes.optimizer``
- ``TaskTypes.service``
- ``TaskTypes.qc``
- ``TaskTypes.custom``
:param tags: Add a list of tags (str) to the created Task. For example: tags=['512x512', 'yolov3']
:param bool reuse_last_task_id: Force a new Task (experiment) with a previously used Task ID,
and the same project and Task name.
.. note::
If the previously executed Task has artifacts or models, it will not be reused (overwritten)
and a new Task will be created.
When a Task is reused, the previous execution outputs are deleted, including console outputs and logs.
The values are:
- ``True`` - Reuse the last Task ID. (default)
- ``False`` - Force a new Task (experiment).
- A string - You can also specify a Task ID (string) to be reused,
instead of the cached ID based on the project/name combination.
:param bool continue_last_task: Continue the execution of a previously executed Task (experiment)
.. note::
When continuing the executing of a previously executed Task,
all previous artifacts / models/ logs are intact.
New logs will continue iteration/step based on the previous-execution maximum iteration value.
For example:
The last train/loss scalar reported was iteration 100, the next report will be iteration 101.
The values are:
- ``True`` - Continue the last Task ID.
specified explicitly by reuse_last_task_id or implicitly with the same logic as reuse_last_task_id
- ``False`` - Overwrite the execution of previous Task (default).
- A string - You can also specify a Task ID (string) to be continued.
This is equivalent to `continue_last_task=True` and `reuse_last_task_id=a_task_id_string`.
- An integer - Specify initial iteration offset (override the auto automatic last_iteration_offset)
Pass 0, to disable the automatic last_iteration_offset or specify a different initial offset
You can specify a Task ID to be used with `reuse_last_task_id='task_id_here'`
:param str output_uri: The default location for output models and other artifacts.
If True is passed, the default files_server will be used for model storage.
In the default location, ClearML creates a subfolder for the output.
The subfolder structure is the following:
<output destination name> / <project name> / <task name>.<Task ID>
The following are examples of ``output_uri`` values for the supported locations:
- A shared folder: ``/mnt/share/folder``
- S3: ``s3://bucket/folder``
- Google Cloud Storage: ``gs://bucket-name/folder``
- Azure Storage: ``azure://company.blob.core.windows.net/folder/``
- Default file server: True
.. important::
For cloud storage, you must install the **ClearML** package for your cloud storage type,
and then configure your storage credentials. For detailed information, see
`ClearML Python Client Extras <./references/clearml_extras_storage/>`_ in the "ClearML Python Client
Reference" section.
:param auto_connect_arg_parser: Automatically connect an argparse object to the Task. Supported argument
parsers packages are: argparse, click, python-fire, jsonargparse.
The values are:
- ``True`` - Automatically connect. (default)
- ``False`` - Do not automatically connect.
- A dictionary - In addition to a boolean, you can use a dictionary for fined grained control of connected
arguments. The dictionary keys are argparse variable names and the values are booleans.
The ``False`` value excludes the specified argument from the Task's parameter section.
Keys missing from the dictionary default to ``True``, you can change it to be ``False`` by adding
``*`` key as ``False`` to the dictionary.
An empty dictionary defaults to ``False``.
For example:
.. code-block:: py
auto_connect_arg_parser={'do_not_include_me': False, }
.. code-block:: py
auto_connect_arg_parser={"only_include_me": True, "*": False}
.. note::
To manually connect an argparse, use :meth:`Task.connect`.
:param auto_connect_frameworks: Automatically connect frameworks This includes patching MatplotLib, XGBoost,
scikit-learn, Keras callbacks, and TensorBoard/X to serialize plots, graphs, and the model location to
the **ClearML Server** (backend), in addition to original output destination.
The values are:
- ``True`` - Automatically connect (default)
- ``False`` - Do not automatically connect
- A dictionary - In addition to a boolean, you can use a dictionary for fined grained control of connected
frameworks. The dictionary keys are frameworks and the values are booleans, other dictionaries used for
finer control or wildcard strings.
In case of wildcard strings, the local path of a model file has to match at least one wildcard to be
saved/loaded by ClearML. Example:
{'pytorch' : '*.pt', 'tensorflow': ['*.h5', '*']}
Keys missing from the dictionary default to ``True``, and an empty dictionary defaults to ``False``.
Supported keys for finer control:
{'tensorboard': {'report_hparams': bool}} # whether to report TensorBoard hyperparameters
For example:
.. code-block:: py
auto_connect_frameworks={
'matplotlib': True, 'tensorflow': ['*.hdf5, 'something_else*], 'tensorboard': True,
'pytorch': ['*.pt'], 'xgboost': True, 'scikit': True, 'fastai': True,
'lightgbm': True, 'hydra': True, 'detect_repository': True, 'tfdefines': True,
'joblib': True, 'megengine': True, 'catboost': True
}
.. code-block:: py
auto_connect_frameworks={'tensorboard': {'report_hparams': False}}
:param bool auto_resource_monitoring: Automatically create machine resource monitoring plots
These plots appear in the **ClearML Web-App (UI)**, **RESULTS** tab, **SCALARS** sub-tab,
with a title of **:resource monitor:**.
The values are:
- ``True`` - Automatically create resource monitoring plots. (default)
- ``False`` - Do not automatically create.
- Class Type - Create ResourceMonitor object of the specified class type.
:param auto_connect_streams: Control the automatic logging of stdout and stderr
The values are:
- ``True`` - Automatically connect (default)
- ``False`` - Do not automatically connect
- A dictionary - In addition to a boolean, you can use a dictionary for fined grained control of stdout and
stderr. The dictionary keys are 'stdout' , 'stderr' and 'logging', the values are booleans.
Keys missing from the dictionary default to ``False``, and an empty dictionary defaults to ``False``.
Notice, the default behaviour is logging stdout/stderr the
`logging` module is logged as a by product of the stderr logging
For example:
.. code-block:: py
auto_connect_streams={'stdout': True, 'stderr': True, 'logging': False}
:param deferred_init: (default: False) Wait for Task to be fully initialized (regular behaviour).
** BETA feature! use with care **
If set to True, `Task.init` function returns immediately and all initialization / communication
to the clearml-server is running in a background thread. The returned object is
a full proxy to the regular Task object, hence everything will be working as expected.
Default behaviour can be controlled with:
`CLEARML_DEFERRED_TASK_INIT=1`
Notes:
- Any access to the returned proxy `Task` object will essentially wait for the `Task.init`
to be completed. For example: `print(task.name)` will wait for `Task.init` to complete in the
background and then return the `name` property of the task original object
- Before `Task.init` completes in the background, auto-magic logging
(console/metric) might be missed
- If running via an agent, this argument is ignored,
and Task init is called synchronously (default)
:return: The main execution Task (Task context)
"""
def verify_defaults_match():
validate = [
('project name', project_name, cls.__main_task.get_project_name()),
('task name', task_name, cls.__main_task.name),
('task type', str(task_type) if task_type else task_type, str(cls.__main_task.task_type)),
]
for field, default, current in validate:
if default is not None and default != current:
raise UsageError(
"Current task already created "
"and requested {field} '{default}' does not match current {field} '{current}'. "
"If you wish to create additional tasks use `Task.create`, "
"or close the current task with `task.close()` before calling `Task.init(...)`".format(
field=field,
default=default,
current=current,
)
)
# if deferred_init==0 this means this is the nested call that actually generates the Task.init
if cls.__main_task is not None and deferred_init != 0:
# if this is a subprocess, regardless of what the init was called for,
# we have to fix the main task hooks and stdout bindings
if cls.__forked_proc_main_pid != os.getpid() and cls.__is_subprocess():
if task_type is None:
task_type = cls.__main_task.task_type
# make sure we only do it once per process
cls.__forked_proc_main_pid = os.getpid()
# make sure we do not wait for the repo detect thread
cls.__main_task._detect_repo_async_thread = None
cls.__main_task._dev_worker = None
cls.__main_task._resource_monitor = None
# remove the logger from the previous process
cls.__main_task.get_logger()
# create a new logger (to catch stdout/err)
cls.__main_task._logger = None
cls.__main_task.__reporter = None
# noinspection PyProtectedMember
cls.__main_task._get_logger(auto_connect_streams=auto_connect_streams)
cls.__main_task._artifacts_manager = Artifacts(cls.__main_task)
# unregister signal hooks, they cause subprocess to hang
# noinspection PyProtectedMember
cls.__main_task.__register_at_exit(cls.__main_task._at_exit)
# TODO: Check if the signal handler method is safe enough, for the time being, do not unhook
# cls.__main_task.__register_at_exit(None, only_remove_signal_and_exception_hooks=True)
# start all reporting threads
BackgroundMonitor.start_all(task=cls.__main_task)
if not running_remotely():
verify_defaults_match()
return cls.__main_task
is_sub_process_task_id = None
# check that we are not a child process, in that case do nothing.
# we should not get here unless this is Windows/macOS platform, linux support fork
if cls.__is_subprocess():
class _TaskStub(object):
def __call__(self, *args, **kwargs):
return self
def __getattr__(self, attr):
return self
def __setattr__(self, attr, val):
pass
is_sub_process_task_id = cls.__get_master_id_task_id()
# we could not find a task ID, revert to old stub behaviour
if not is_sub_process_task_id:
return _TaskStub() # noqa
elif running_remotely() and not get_is_master_node():
# make sure we only do it once per process
cls.__forked_proc_main_pid = os.getpid()
# make sure everyone understands we should act as if we are a subprocess (fake pid 1)
cls.__update_master_pid_task(pid=1, task=get_remote_task_id())
else:
# set us as master process (without task ID)
cls.__update_master_pid_task()
is_sub_process_task_id = None
if task_type is None:
# Backwards compatibility: if called from Task.current_task and task_type
# was not specified, keep legacy default value of TaskTypes.training
task_type = cls.TaskTypes.training
elif isinstance(task_type, six.string_types):
if task_type not in Task.TaskTypes.__members__:
raise ValueError("Task type '{}' not supported, options are: {}".format(
task_type, Task.TaskTypes.__members__.keys()))
task_type = Task.TaskTypes.__members__[str(task_type)]
is_deferred = False
try:
if not running_remotely():
# only allow if running locally and creating the first Task
# otherwise we ignore and perform in order
if deferred_init != 0 and ENV_DEFERRED_TASK_INIT.get():
deferred_init = True
if not is_sub_process_task_id and deferred_init:
def completed_cb(x):
Task.__main_task = x
getLogger().warning("ClearML initializing Task in the background")
task = FutureTaskCaller(
func=cls.init,
func_cb=completed_cb,
override_cls=cls,
project_name=project_name,
task_name=task_name,
tags=tags,
reuse_last_task_id=reuse_last_task_id,
continue_last_task=continue_last_task,
output_uri=output_uri,
auto_connect_arg_parser=auto_connect_arg_parser,
auto_connect_frameworks=auto_connect_frameworks,
auto_resource_monitoring=auto_resource_monitoring,
auto_connect_streams=auto_connect_streams,
deferred_init=0, # notice we use it as a flag to mark the nested call
)
is_deferred = True
# mark as temp master
cls.__update_master_pid_task()
# if this is the main process, create the task
elif not is_sub_process_task_id:
task = cls._create_dev_task(
default_project_name=project_name,
default_task_name=task_name,
default_task_type=task_type,
tags=tags,
reuse_last_task_id=reuse_last_task_id,
continue_last_task=continue_last_task,
detect_repo=False if (
isinstance(auto_connect_frameworks, dict) and
not auto_connect_frameworks.get('detect_repository', True)) else True,
auto_connect_streams=auto_connect_streams,
)
# set defaults
if cls._offline_mode:
task.output_uri = None
elif output_uri:
task.output_uri = output_uri
elif cls.__default_output_uri:
task.output_uri = cls.__default_output_uri
# store new task ID
cls.__update_master_pid_task(task=task)
else:
# subprocess should get back the task info
task = cls.get_task(task_id=is_sub_process_task_id)
else:
# if this is the main process, create the task
if not is_sub_process_task_id:
task = cls(
private=cls.__create_protection,
task_id=get_remote_task_id(),
log_to_backend=False,
)
if cls.__default_output_uri and not task.output_uri:
task.output_uri = cls.__default_output_uri
# store new task ID
cls.__update_master_pid_task(task=task)
# make sure we are started
task.started(ignore_errors=True)
# continue last iteration if we had any
if task.data.last_iteration:
task.set_initial_iteration(int(task.data.last_iteration) + 1)
else:
# subprocess should get back the task info
task = cls.get_task(task_id=is_sub_process_task_id)
except Exception:
raise
else:
Task.__main_task = task
# register at exist only on the real (none deferred) Task
if not is_deferred:
# register the main task for at exit hooks (there should only be one)
task.__register_at_exit(task._at_exit)
# always patch OS forking because of ProcessPool and the alike
PatchOsFork.patch_fork(task)
if auto_connect_frameworks:
def should_connect(*keys):
"""
Evaluates value of auto_connect_frameworks[keys[0]]...[keys[-1]].
If at some point in the evaluation, the value of auto_connect_frameworks[keys[0]]...[keys[-1]]
is a bool, that value will be returned. If a dictionary is empty, it will be evaluated to False.
If a key will not be found in the current dictionary, True will be returned.
"""
should_bind_framework = auto_connect_frameworks
for key in keys:
if not isinstance(should_bind_framework, dict):
return bool(should_bind_framework)
if should_bind_framework == {}:
return False
should_bind_framework = should_bind_framework.get(key, True)
return bool(should_bind_framework)
if not is_deferred and should_connect("hydra"):
PatchHydra.update_current_task(task)
if should_connect("scikit") and should_connect("joblib"):
PatchedJoblib.update_current_task(task)
if should_connect("matplotlib"):
PatchedMatplotlib.update_current_task(task)
if should_connect("tensorflow") or should_connect("tensorboard"):
# allow disabling tfdefines
if not is_deferred and should_connect("tfdefines"):
PatchAbsl.update_current_task(task)
TensorflowBinding.update_current_task(
task,
patch_reporting=should_connect("tensorboard"),
patch_model_io=should_connect("tensorflow"),
report_hparams=should_connect("tensorboard", "report_hparams"),
)
if should_connect("pytorch"):
PatchPyTorchModelIO.update_current_task(task)
if should_connect("megengine"):
PatchMegEngineModelIO.update_current_task(task)
if should_connect("xgboost"):
PatchXGBoostModelIO.update_current_task(task)
if should_connect("catboost"):
PatchCatBoostModelIO.update_current_task(task)
if should_connect("fastai"):
PatchFastai.update_current_task(task)
if should_connect("lightgbm"):
PatchLIGHTgbmModelIO.update_current_task(task)
cls.__add_model_wildcards(auto_connect_frameworks)
# if we are deferred, stop here (the rest we do in the actual init)
if is_deferred:
from .backend_interface.logger import StdStreamPatch
# patch console outputs, we will keep them in memory until we complete the Task init
# notice we do not load config defaults, as they are not threadsafe
# we might also need to override them with the vault
StdStreamPatch.patch_std_streams(
task.get_logger(),
connect_stdout=(
auto_connect_streams is True) or (
isinstance(auto_connect_streams, dict) and auto_connect_streams.get('stdout', False)
),
connect_stderr=(
auto_connect_streams is True) or (
isinstance(auto_connect_streams, dict) and auto_connect_streams.get('stderr', False)
),
load_config_defaults=False,
)
return task # noqa
if auto_resource_monitoring and not is_sub_process_task_id:
resource_monitor_cls = auto_resource_monitoring \
if isinstance(auto_resource_monitoring, six.class_types) else ResourceMonitor
task._resource_monitor = resource_monitor_cls(
task, report_mem_used_per_process=not config.get(
'development.worker.report_global_mem_used', False))
task._resource_monitor.start()
# make sure all random generators are initialized with new seed
random_seed = task.get_random_seed()
if random_seed is not None:
make_deterministic(random_seed)
task._set_random_seed_used(random_seed)
if auto_connect_arg_parser:
EnvironmentBind.update_current_task(task)
PatchJsonArgParse.update_current_task(task)
# Patch ArgParser to be aware of the current task
argparser_update_currenttask(task)
PatchClick.patch(task)
PatchFire.patch(task)
# set excluded arguments
if isinstance(auto_connect_arg_parser, dict):
task._arguments.exclude_parser_args(auto_connect_arg_parser)
# Check if parse args already called. If so, sync task parameters with parser
if argparser_parseargs_called():
for parser, parsed_args in get_argparser_last_args():
task._connect_argparse(parser=parser, parsed_args=parsed_args)
elif argparser_parseargs_called():
# actually we have nothing to do, in remote running, the argparser will ignore
# all non argparser parameters, only caveat if parameter connected with the same name
# as the argparser this will be solved once sections are introduced to parameters
pass
# Make sure we start the logger, it will patch the main logging object and pipe all output
# if we are running locally and using development mode worker, we will pipe all stdout to logger.
# The logger will automatically take care of all patching (we just need to make sure to initialize it)
logger = task._get_logger(auto_connect_streams=auto_connect_streams)
# show the debug metrics page in the log, it is very convenient
if not is_sub_process_task_id:
if cls._offline_mode:
logger.report_text('ClearML running in offline mode, session stored in {}'.format(
task.get_offline_mode_folder()))
else:
logger.report_text('ClearML results page: {}'.format(task.get_output_log_web_page()))
# Make sure we start the dev worker if required, otherwise it will only be started when we write
# something to the log.
task._dev_mode_setup_worker()
if (not task._reporter or not task._reporter.is_constructed()) and \
is_sub_process_task_id and not cls._report_subprocess_enabled:
task._setup_reporter()
# start monitoring in background process or background threads
# monitoring are: Resource monitoring and Dev Worker monitoring classes
BackgroundMonitor.start_all(task=task)
task.set_progress(0)
return task
@classmethod
def create(
cls,
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
repo=None, # type: Optional[str]
branch=None, # type: Optional[str]
commit=None, # type: Optional[str]
script=None, # type: Optional[str]
working_directory=None, # type: Optional[str]
packages=None, # type: Optional[Union[bool, Sequence[str]]]
requirements_file=None, # type: Optional[Union[str, Path]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
argparse_args=None, # type: Optional[Sequence[Tuple[str, str]]]
base_task_id=None, # type: Optional[str]
add_task_init_call=True, # type: bool
):
# type: (...) -> Task
"""
Manually create and populate a new Task (experiment) in the system.
If the code does not already contain a call to ``Task.init``, pass add_task_init_call=True,
and the code will be patched in remote execution (i.e. when executed by `clearml-agent`
.. note::
This method **always** creates a new Task.
Use :meth:`Task.init` method to automatically create and populate task for the running process.
To reference an existing Task, call the :meth:`Task.get_task` method .
:param project_name: Set the project name for the task. Required if base_task_id is None.
:param task_name: Set the name of the remote task. Required if base_task_id is None.
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param repo: Remote URL for the repository to use, or path to local copy of the git repository
Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo'
:param branch: Select specific repository branch/tag (implies the latest commit from the branch)
:param commit: Select specific commit id to use (default: latest commit,
or when used with local repository matching the local commit id)
:param script: Specify the entry point script for the remote execution. When used in tandem with
remote git repository the script should be a relative path inside the repository,
for example: './source/train.py' . When used with local repository path it supports a
direct path to a file inside the local repository itself, for example: '~/project/source/train.py'
:param working_directory: Working directory to launch the script from. Default: repository root folder.
Relative to repo root or local folder.
:param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"]
or `True` to automatically create requirements
based on locally installed packages (repository must be local).
:param requirements_file: Specify requirements.txt file to install when setting the session.
If not provided, the requirements.txt from the repository will be used.
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
inside the docker before setting up the Task's environment
:param argparse_args: Arguments to pass to the remote execution, list of string pairs (argument, value)
Notice, only supported if the codebase itself uses argparse.ArgumentParser
:param base_task_id: Use a pre-existing task in the system, instead of a local repo/script.
Essentially clones an existing task and overrides arguments/requirements.
:param add_task_init_call: If True, a 'Task.init()' call is added to the script entry point in remote execution.
:return: The newly created Task (experiment)
"""
if not project_name and not base_task_id:
if not cls.__main_task:
raise ValueError("Please provide project_name, no global task context found "
"(Task.current_task hasn't been called)")
project_name = cls.__main_task.get_project_name()
from .backend_interface.task.populate import CreateAndPopulate
manual_populate = CreateAndPopulate(
project_name=project_name, task_name=task_name, task_type=task_type,
repo=repo, branch=branch, commit=commit,
script=script, working_directory=working_directory,
packages=packages, requirements_file=requirements_file,
docker=docker, docker_args=docker_args, docker_bash_setup_script=docker_bash_setup_script,
base_task_id=base_task_id,
add_task_init_call=add_task_init_call,
raise_on_missing_entries=False,
)
task = manual_populate.create_task()
if task and argparse_args:
manual_populate.update_task_args(argparse_args)
task.reload()
return task
@classmethod
def get_task(
cls,
task_id=None, # type: Optional[str]
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
tags=None, # type: Optional[Sequence[str]]
allow_archived=True, # type: bool
task_filter=None # type: Optional[dict]
):
# type: (...) -> "Task"
"""
Get a Task by Id, or project name / task name combination.
For example:
The following code demonstrates calling ``Task.get_task`` to report a scalar to another Task. The output
of :meth:`.Logger.report_scalar` from testing is associated with the Task named ``training``. It allows
training and testing to run concurrently, because they initialized different Tasks (see :meth:`Task.init`
for information about initializing Tasks).
The training script:
.. code-block:: py
# initialize the training Task
task = Task.init('myProject', 'training')
# do some training
The testing script:
.. code-block:: py
# initialize the testing Task
task = Task.init('myProject', 'testing')
# get the training Task
train_task = Task.get_task(project_name='myProject', task_name='training')
# report metrics in the training Task
for x in range(10):
train_task.get_logger().report_scalar('title', 'series', value=x * 2, iteration=x)
:param str task_id: The Id (system UUID) of the experiment to get.
If specified, ``project_name`` and ``task_name`` are ignored.
:param str project_name: The project name of the Task to get.
:param str task_name: The name of the Task within ``project_name`` to get.
:param list tags: Filter based on the requested list of tags (strings) (Task must have all the listed tags)
To exclude a tag add "-" prefix to the tag. Example: ["best", "-debug"]
:param bool allow_archived: Only applicable if *not* using specific ``task_id``,
If True (default) allow to return archived Tasks, if False filter out archived Tasks
:param bool task_filter: Only applicable if *not* using specific ``task_id``,
Pass additional query filters, on top of project/name. See details in Task.get_tasks.
:return: The Task specified by ID, or project name / experiment name combination.
"""
return cls.__get_task(
task_id=task_id, project_name=project_name, task_name=task_name, tags=tags,
include_archived=allow_archived, task_filter=task_filter,
)
@classmethod
def get_tasks(
cls,
task_ids=None, # type: Optional[Sequence[str]]
project_name=None, # type: Optional[Union[Sequence[str],str]]
task_name=None, # type: Optional[str]
tags=None, # type: Optional[Sequence[str]]
task_filter=None # type: Optional[Dict]
):
# type: (...) -> List["Task"]
"""
Get a list of Tasks objects matching the queries/filters
- A list of specific Task IDs.
- Filter Tasks based on specific fields:
project name (including partial match), task name (including partial match), tags
Apply Additional advanced filtering with `task_filter`
.. note::
This function returns the most recent 500 tasks. If you wish to retrieve older tasks
use ``Task.query_tasks()``
:param list(str) task_ids: The Ids (system UUID) of experiments to get.
If ``task_ids`` specified, then ``project_name`` and ``task_name`` are ignored.
:param str project_name: The project name of the Tasks to get. To get the experiment
in all projects, use the default value of ``None``. (Optional)
Use a list of strings for multiple optional project names.
:param str task_name: The full name or partial name of the Tasks to match within the specified
``project_name`` (or all projects if ``project_name`` is ``None``).
This method supports regular expressions for name matching. (Optional)
To match an exact task name (i.e. not partial matching),
add ^/$ at the beginning/end of the string, for example: "^exact_task_name_here$"
:param list(str) task_ids: list of unique task id string (if exists other parameters are ignored)
:param str project_name: project name (str) the task belongs to (use None for all projects)
:param str task_name: task name (str) in within the selected project
Return any partial match of task_name, regular expressions matching is also supported
If None is passed, returns all tasks within the project
:param list tags: Filter based on the requested list of tags (strings) (Task must have all the listed tags)
To exclude a tag add "-" prefix to the tag. Example: ["best", "-debug"]
:param dict task_filter: filter and order Tasks. See service.tasks.GetAllRequest for details
`parent`: (str) filter by parent task-id matching
`search_text`: (str) free text search (in task fields comment/name/id)
`status`: List[str] List of valid statuses
(options are: "created", "queued", "in_progress", "stopped", "published", "publishing", "closed",
"failed", "completed", "unknown")
`type`: List[str] List of valid task type
(options are: 'training', 'testing', 'inference', 'data_processing', 'application', 'monitor',
'controller', 'optimizer', 'service', 'qc'. 'custom')
`user`: List[str] Filter based on Task's user owner, provide list of valid user Ids.
`order_by`: List[str] List of field names to order by. When search_text is used,
Use '-' prefix to specify descending order. Optional, recommended when using page
Example: order_by=['-last_update']
`_all_`: dict(fields=[], pattern='') Match string `pattern` (regular expression)
appearing in All `fields`
dict(fields=['script.repository'], pattern='github.com/user')
`_any_`: dict(fields=[], pattern='') Match string `pattern` (regular expression)
appearing in Any of the `fields`
dict(fields=['comment', 'name'], pattern='my comment')
Examples:
{'status': ['stopped'], 'order_by': ["-last_update"]}
{'order_by'=['-last_update'], '_all_'=dict(fields=['script.repository'], pattern='github.com/user'))
:return: The Tasks specified by the parameter combinations (see the parameters).
"""
return cls.__get_tasks(task_ids=task_ids, project_name=project_name, tags=tags,
task_name=task_name, **(task_filter or {}))
@classmethod
def query_tasks(
cls,
project_name=None, # type: Optional[Union[Sequence[str],str]]
task_name=None, # type: Optional[str]
tags=None, # type: Optional[Sequence[str]]
additional_return_fields=None, # type: Optional[Sequence[str]]
task_filter=None, # type: Optional[Dict]
):
# type: (...) -> Union[List[str], List[Dict[str, str]]]
"""
Get a list of Tasks ID matching the specific query/filter.
Notice, if `additional_return_fields` is specified, returns a list of
dictionaries with requested fields (dict per Task)
:param str project_name: The project name of the Tasks to get. To get the experiment