/
dataset.py
1946 lines (1708 loc) · 90.7 KB
/
dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import json
import os
import shutil
from copy import deepcopy, copy
from fnmatch import fnmatch
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool
from tempfile import mkstemp, mkdtemp
from typing import Union, Optional, Sequence, List, Dict, Any, Mapping
from zipfile import ZipFile, ZIP_DEFLATED
from attr import attrs, attrib
from pathlib2 import Path
from .. import Task, StorageManager, Logger
from ..backend_api.session.client import APIClient
from ..backend_interface.task.development.worker import DevWorker
from ..backend_interface.util import mutually_exclusive, exact_match_regex, get_existing_project
from ..config import deferred_config
from ..debugging.log import LoggerRoot
from ..storage.helper import StorageHelper
from ..storage.cache import CacheManager
from ..storage.util import sha256sum, is_windows, md5text, format_size
try:
from pathlib import Path as _Path # noqa
except ImportError:
_Path = None
@attrs
class FileEntry(object):
relative_path = attrib(default=None, type=str)
hash = attrib(default=None, type=str)
parent_dataset_id = attrib(default=None, type=str)
size = attrib(default=None, type=int)
# support multi part artifact storage
artifact_name = attrib(default=None, type=str)
# cleared when file is uploaded.
local_path = attrib(default=None, type=str)
def as_dict(self):
# type: () -> Dict
state = dict(relative_path=self.relative_path, hash=self.hash,
parent_dataset_id=self.parent_dataset_id, size=self.size,
artifact_name=self.artifact_name,
**dict([('local_path', self.local_path)] if self.local_path else ()))
return state
class Dataset(object):
__private_magic = 42 * 1337
__state_entry_name = 'state'
__default_data_entry_name = 'data'
__data_entry_name_prefix = 'data_'
__cache_context = 'datasets'
__tag = 'dataset'
__cache_folder_prefix = 'ds_'
__dataset_folder_template = CacheManager.set_context_folder_lookup(__cache_context, "{0}_archive_{1}")
__preview_max_file_entries = 15000
__preview_max_size = 5 * 1024 * 1024
_dataset_chunk_size_mb = deferred_config("storage.dataset_chunk_size_mb", 512, transform=int)
def __init__(self, _private, task=None, dataset_project=None, dataset_name=None, dataset_tags=None):
# type: (int, Optional[Task], Optional[str], Optional[str], Optional[Sequence[str]]) -> ()
"""
Do not use directly! Use Dataset.create(...) or Dataset.get(...) instead.
"""
assert _private == self.__private_magic
# key for the dataset file entries are the relative path within the data
self._dataset_file_entries = {} # type: Dict[str, FileEntry]
# this will create a graph of all the dependencies we have, each entry lists it's own direct parents
self._dependency_graph = {} # type: Dict[str, List[str]]
if task:
self._task_pinger = None
self._created_task = False
task_status = task.data.status
# if we are continuing aborted Task, force the state
if str(task_status) == 'stopped':
# print warning that we are opening a stopped dataset:
LoggerRoot.get_base_logger().warning(
'Reopening aborted Dataset, any change will clear and overwrite current state')
task.mark_started(force=True)
task_status = 'in_progress'
# If we are reusing the main current Task, make sure we set its type to data_processing
if str(task_status) in ('created', 'in_progress'):
if str(task.task_type) != str(Task.TaskTypes.data_processing):
task.set_task_type(task_type=Task.TaskTypes.data_processing)
task_system_tags = task.get_system_tags() or []
if self.__tag not in task_system_tags:
task.set_system_tags(task_system_tags + [self.__tag])
if dataset_tags:
task.set_tags((task.get_tags() or []) + list(dataset_tags))
# Keep track of modified files (added, removed, modified)
# We also load the metadata from the existing task into this one, so we can add when
# e.g. add_files is called multiple times
task_state = task.artifacts.get('state')
if task_state:
# Metadata is visible in UI, so there will be no underscores there, hence the replace
self.changed_files = {key: task_state.metadata.get(key.replace('_', ' '), 0)
for key in {'files_added', 'files_removed', 'files_modified'}}
else:
self.changed_files = {'files added': 0, 'files removed': 0, 'files modified': 0}
else:
self._created_task = True
task = Task.create(
project_name=dataset_project, task_name=dataset_name, task_type=Task.TaskTypes.data_processing)
# set default output_uri
task.output_uri = True
task.set_system_tags((task.get_system_tags() or []) + [self.__tag])
if dataset_tags:
task.set_tags((task.get_tags() or []) + list(dataset_tags))
task.mark_started()
# generate the script section
script = \
'from clearml import Dataset\n\n' \
'ds = Dataset.create(dataset_project=\'{dataset_project}\', dataset_name=\'{dataset_name}\')\n'.format(
dataset_project=dataset_project, dataset_name=dataset_name)
task.data.script.diff = script
task.data.script.working_dir = '.'
task.data.script.entry_point = 'register_dataset.py'
from clearml import __version__
task.data.script.requirements = {'pip': 'clearml == {}\n'.format(__version__)}
# noinspection PyProtectedMember
task._edit(script=task.data.script)
# if the task is running make sure we ping to the server so it will not be aborted by a watchdog
self._task_pinger = DevWorker()
self._task_pinger.register(task, stop_signal_support=False)
# set the newly created Dataset parent ot the current Task, so we know who created it.
if Task.current_task() and Task.current_task().id != task.id:
task.set_parent(Task.current_task())
# Set the modified files to empty on dataset creation
self.changed_files = {'files added': 0, 'files removed': 0, 'files modified': 0}
# store current dataset Task
self._task = task
# store current dataset id
self._id = task.id
# store the folder where the dataset was downloaded to
self._local_base_folder = None # type: Optional[Path]
# dirty flag, set True by any function call changing the dataset (regardless of weather it did anything)
self._dirty = False
self._using_current_task = False
# set current artifact name to be used (support for multiple upload sessions)
self._data_artifact_name = self._get_next_data_artifact_name()
# store a cached lookup of the number of chunks each parent dataset has.
# this will help with verifying we have n up-to-date partial local copy
self._dependency_chunk_lookup = None # type: Optional[Dict[str, int]]
@property
def id(self):
# type: () -> str
return self._id
@property
def file_entries(self):
# type: () -> List[FileEntry]
return list(self._dataset_file_entries.values())
@property
def file_entries_dict(self):
# type: () -> Mapping[str, FileEntry]
"""
Notice this call returns an internal representation, do not modify!
:return: dict with relative file path as key, and FileEntry as value
"""
return self._dataset_file_entries
@property
def project(self):
# type: () -> str
return self._task.get_project_name()
@property
def name(self):
# type: () -> str
return self._task.name
@property
def tags(self):
# type: () -> List[str]
return self._task.get_tags() or []
@tags.setter
def tags(self, values):
# type: (List[str]) -> ()
self._task.set_tags(values or [])
def add_files(
self,
path, # type: Union[str, Path, _Path]
wildcard=None, # type: Optional[Union[str, Sequence[str]]]
local_base_folder=None, # type: Optional[str]
dataset_path=None, # type: Optional[str]
recursive=True, # type: bool
verbose=False # type: bool
):
# type: (...) -> ()
"""
Add a folder into the current dataset. calculate file hash,
and compare against parent, mark files to be uploaded
:param path: Add a folder/file to the dataset
:param wildcard: add only specific set of files.
Wildcard matching, can be a single string or a list of wildcards)
:param local_base_folder: files will be located based on their relative path from local_base_folder
:param dataset_path: where in the dataset the folder/files should be located
:param recursive: If True match all wildcard files recursively
:param verbose: If True print to console files added/modified
:return: number of files added
"""
self._dirty = True
self._task.get_logger().report_text(
'Adding files to dataset: {}'.format(
dict(path=path, wildcard=wildcard, local_base_folder=local_base_folder,
dataset_path=dataset_path, recursive=recursive, verbose=verbose)),
print_console=False)
num_added, num_modified = self._add_files(
path=path, wildcard=wildcard, local_base_folder=local_base_folder,
dataset_path=dataset_path, recursive=recursive, verbose=verbose)
# update the task script
self._add_script_call(
'add_files', path=path, wildcard=wildcard, local_base_folder=local_base_folder,
dataset_path=dataset_path, recursive=recursive)
self._serialize()
return num_added
def remove_files(self, dataset_path=None, recursive=True, verbose=False):
# type: (Optional[str], bool, bool) -> int
"""
Remove files from the current dataset
:param dataset_path: Remove files from the dataset.
The path is always relative to the dataset (e.g 'folder/file.bin')
:param recursive: If True match all wildcard files recursively
:param verbose: If True print to console files removed
:return: Number of files removed
"""
self._task.get_logger().report_text(
'Removing files from dataset: {}'.format(
dict(dataset_path=dataset_path, recursive=recursive, verbose=verbose)),
print_console=False)
if dataset_path and dataset_path.startswith('/'):
dataset_path = dataset_path[1:]
num_files = len(self._dataset_file_entries)
org_files = list(self._dataset_file_entries.keys()) if verbose else None
if not recursive:
self._dataset_file_entries = {
k: v for k, v in self._dataset_file_entries.items()
if not fnmatch(k + '/', dataset_path + '/')}
else:
wildcard = dataset_path.split('/')[-1]
path = dataset_path[:-len(dataset_path)] + '*'
self._dataset_file_entries = {
k: v for k, v in self._dataset_file_entries.items()
if not (fnmatch(k, path) and fnmatch(k if '/' in k else '/{}'.format(k), '*/' + wildcard))}
if verbose and org_files:
for f in org_files:
if f not in self._dataset_file_entries:
self._task.get_logger().report_text('Remove {}'.format(f))
# update the task script
self._add_script_call(
'remove_files', dataset_path=dataset_path, recursive=recursive)
num_removed = num_files - len(self._dataset_file_entries)
self._serialize()
# Update state
self.update_changed_files(num_files_removed=num_removed)
return num_removed
def sync_folder(self, local_path, dataset_path=None, verbose=False):
# type: (Union[Path, _Path, str], Union[Path, _Path, str], bool) -> (int, int)
"""
Synchronize the dataset with a local folder. The dataset is synchronized from the
relative_base_folder (default: dataset root) and deeper with the specified local path.
:param local_path: Local folder to sync (assumes all files and recursive)
:param dataset_path: Target dataset path to sync with (default the root of the dataset)
:param verbose: If true print to console files added/modified/removed
:return: number of files removed, number of files modified/added
"""
def filter_f(f):
keep = (not f.relative_path.startswith(relative_prefix) or
(local_path / f.relative_path[len(relative_prefix):]).is_file())
if not keep and verbose:
self._task.get_logger().report_text('Remove {}'.format(f.relative_path))
return keep
self._task.get_logger().report_text(
'Syncing local copy with dataset: {}'.format(
dict(local_path=local_path, dataset_path=dataset_path, verbose=verbose)),
print_console=False)
self._dirty = True
local_path = Path(local_path)
# Path().as_posix() will never end with /
relative_prefix = (Path(dataset_path).as_posix() + '/') if dataset_path else ''
# remove files
num_files = len(self._dataset_file_entries)
self._dataset_file_entries = {
k: f for k, f in self._dataset_file_entries.items() if filter_f(f)}
num_removed = num_files - len(self._dataset_file_entries)
# Update the internal state
self.update_changed_files(num_files_removed=num_removed)
# add remaining files, state is updated in _add_files
num_added, num_modified = self._add_files(path=local_path, dataset_path=dataset_path,
recursive=True, verbose=verbose)
# How many of the files were modified? AKA have the same name but a different hash
if verbose:
self._task.get_logger().report_text(
'Syncing folder {} : {} files removed, {} added / modified'.format(
local_path.as_posix(), num_removed, num_added + num_modified))
# update the task script
self._add_script_call(
'sync_folder', local_path=local_path, dataset_path=dataset_path)
return num_removed, num_added, num_modified
def upload(self, show_progress=True, verbose=False, output_url=None, compression=None, chunk_size=None):
# type: (bool, bool, Optional[str], Optional[str], int) -> ()
"""
Start file uploading, the function returns when all files are uploaded.
:param show_progress: If True show upload progress bar
:param verbose: If True print verbose progress report
:param output_url: Target storage for the compressed dataset (default: file server)
Examples: `s3://bucket/data`, `gs://bucket/data` , `azure://bucket/data` , `/mnt/share/data`
:param compression: Compression algorithm for the Zipped dataset file (default: ZIP_DEFLATED)
:param chunk_size: Artifact chunk size (MB) for the compressed dataset,
if not provided (None) use the default chunk size (512mb).
If -1 is provided, use a single zip artifact for the entire dataset change-set (old behaviour)
"""
# set output_url
if output_url:
self._task.output_uri = output_url
self._task.get_logger().report_text(
'Uploading dataset files: {}'.format(
dict(show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression)),
print_console=False)
list_zipped_artifacts = [] # List[Tuple[Path, int, str, str]]
list_file_entries = list(self._dataset_file_entries.values())
total_size = 0
chunk_size = int(self._dataset_chunk_size_mb if not chunk_size else chunk_size)
try:
from tqdm import tqdm # noqa
a_tqdm = tqdm(total=len(list_file_entries))
except ImportError:
a_tqdm = None
while list_file_entries:
fd, zip_file = mkstemp(
prefix='dataset.{}.'.format(self._id), suffix='.zip'
)
archive_preview = ''
count = 0
processed = 0
zip_file = Path(zip_file)
print('{}Compressing local files, chunk {} [remaining {} files]'.format(
'\n' if a_tqdm else '', 1+len(list_zipped_artifacts), len(list_file_entries)))
try:
with ZipFile(zip_file.as_posix(), 'w', allowZip64=True, compression=compression or ZIP_DEFLATED) as zf:
for file_entry in list_file_entries:
processed += 1
if a_tqdm:
a_tqdm.update()
if not file_entry.local_path:
# file is already in an uploaded artifact
continue
filename = Path(file_entry.local_path)
if not filename.is_file():
LoggerRoot.get_base_logger().warning(
"Could not store dataset file {}. File skipped".format(file_entry.local_path))
# mark for removal
file_entry.relative_path = None
continue
if verbose:
self._task.get_logger().report_text('Compressing {}'.format(filename.as_posix()))
relative_file_name = file_entry.relative_path
zf.write(filename.as_posix(), arcname=relative_file_name)
archive_preview += '{} - {}\n'.format(
relative_file_name, format_size(filename.stat().st_size))
file_entry.artifact_name = self._data_artifact_name
count += 1
# limit the size of a single artifact
if chunk_size > 0 and zip_file.stat().st_size >= chunk_size * (1024**2):
break
except Exception as e:
# failed uploading folder:
LoggerRoot.get_base_logger().warning(
'Exception {}\nFailed zipping dataset.'.format(e))
return False
finally:
os.close(fd)
if not count:
zip_file.unlink()
else:
total_size += zip_file.stat().st_size
# update the artifact preview
archive_preview = "Dataset archive content [{} files]:\n".format(count) + archive_preview
# add into the list
list_zipped_artifacts += [(zip_file, count, archive_preview, self._data_artifact_name)]
# let's see what's left
list_file_entries = list_file_entries[processed:]
# next artifact name to use
self._data_artifact_name = self._get_next_data_artifact_name(self._data_artifact_name)
if a_tqdm:
a_tqdm.close()
self._task.get_logger().report_text(
"File compression completed: total size {}, {} chunked stored (average size {})".format(
format_size(total_size),
len(list_zipped_artifacts),
format_size(0 if len(list_zipped_artifacts) == 0 else total_size / len(list_zipped_artifacts)),
)
)
if not list_zipped_artifacts:
LoggerRoot.get_base_logger().info('No pending files, skipping upload.')
self._dirty = False
self._serialize()
return True
for i, (zip_file, count, archive_preview, artifact_name) in enumerate(list_zipped_artifacts):
# noinspection PyBroadException
try:
# let's try to rename it
new_zip_file = zip_file.parent / 'dataset.{}.zip'.format(self._id)
zip_file.rename(new_zip_file)
zip_file = new_zip_file
except Exception:
pass
# start upload
zip_file_size = format_size(Path(zip_file).stat().st_size)
self._task.get_logger().report_text(
'Uploading compressed dataset changes {}/{} ({} files {}) to {}'.format(
i+1, len(list_zipped_artifacts), count, zip_file_size, self.get_default_storage()))
self._task.upload_artifact(
name=artifact_name, artifact_object=Path(zip_file), preview=archive_preview,
delete_after_upload=True, wait_on_upload=True)
# mark as upload completed and serialize
for file_entry in self._dataset_file_entries.values():
if file_entry.parent_dataset_id == self._id and file_entry.artifact_name == artifact_name:
file_entry.local_path = None
# serialize current state
self._serialize()
# remove files that could not be zipped, containing Null relative Path
self._dataset_file_entries = {
k: v for k, v in self._dataset_file_entries.items() if v.relative_path is not None}
# report upload completed
self._task.get_logger().report_text('Upload completed ({})'.format(format_size(total_size)))
self._add_script_call(
'upload', show_progress=show_progress, verbose=verbose, output_url=output_url, compression=compression)
self._dirty = False
self._serialize()
def finalize(self, verbose=False, raise_on_error=True, auto_upload=False):
# type: (bool, bool, bool) -> bool
"""
Finalize the dataset publish dataset Task. upload must first called to verify there are not pending uploads.
If files do need to be uploaded, it throws an exception (or return False)
:param verbose: If True print verbose progress report
:param raise_on_error: If True raise exception if dataset finalizing failed
:param auto_upload: Automatically upload dataset if not called yet, will upload to default location.
"""
# check we do not have files waiting for upload.
if self._dirty:
if auto_upload:
self._task.get_logger().report_text("Pending uploads, starting dataset upload to {}"
.format(self.get_default_storage()))
self.upload()
elif raise_on_error:
raise ValueError("Cannot finalize dataset, pending uploads. Call Dataset.upload(...)")
else:
return False
status = self._task.get_status()
if status not in ('in_progress', 'created'):
raise ValueError("Cannot finalize dataset, status '{}' is not valid".format(status))
self._task.get_logger().report_text('Finalizing dataset', print_console=False)
# make sure we have no redundant parent versions
self._serialize(update_dependency_chunk_lookup=True)
self._add_script_call('finalize')
if verbose:
print('Updating statistics and genealogy')
self._report_dataset_genealogy()
hashed_nodes = [self._get_dataset_id_hash(k) for k in self._dependency_graph.keys()]
self._task.comment = 'Dependencies: {}\n'.format(hashed_nodes)
if self._using_current_task:
self._task.flush(wait_for_uploads=True)
else:
self._task.close()
self._task.mark_completed()
if self._task_pinger:
self._task_pinger.unregister()
self._task_pinger = None
return True
def publish(self, raise_on_error=True):
# type: (bool) -> bool
"""
Publish the dataset
If dataset is not finalize, throw exception
:param raise_on_error: If True raise exception if dataset publishing failed
"""
# check we can publish this dataset
if not self.is_final():
raise ValueError("Cannot publish dataset, dataset in status {}.".format(self._task.get_status()))
self._task.publish(ignore_errors=raise_on_error)
return True
def is_final(self):
# type: () -> bool
"""
Return True if the dataset was finalized and cannot be changed any more.
:return: True if dataset if final
"""
return self._task.get_status() not in (
Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created, Task.TaskStatusEnum.failed)
def get_local_copy(self, use_soft_links=None, part=None, num_parts=None, raise_on_error=True):
# type: (bool, Optional[int], Optional[int], bool) -> str
"""
return a base folder with a read-only (immutable) local copy of the entire dataset
download and copy / soft-link, files from all the parent dataset versions
:param use_soft_links: If True use soft links, default False on windows True on Posix systems
:param part: Optional, if provided only download the selected part (index) of the Dataset.
First part number is `0` and last part is `num_parts-1`
Notice, if `num_parts` is not provided, number of parts will be equal to the total number of chunks
(i.e. sum over all chunks from the specified Dataset including all parent Datasets).
This argument is passed to parent datasets, as well as the implicit `num_parts`,
allowing users to get a partial copy of the entire dataset, for multi node/step processing.
:param num_parts: Optional, If specified normalize the number of chunks stored to the
requested number of parts. Notice that the actual chunks used per part are rounded down.
Example: Assuming total 8 chunks for this dataset (including parent datasets),
and `num_parts=5`, the chunk index used per parts would be:
part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ]
:param raise_on_error: If True raise exception if dataset merging failed on any file
:return: A base folder for the entire dataset
"""
assert self._id
if not self._task:
self._task = Task.get_task(task_id=self._id)
# now let's merge the parents
target_folder = self._merge_datasets(
use_soft_links=use_soft_links, raise_on_error=raise_on_error, part=part, num_parts=num_parts)
return target_folder
def get_mutable_local_copy(self, target_folder, overwrite=False, part=None, num_parts=None, raise_on_error=True):
# type: (Union[Path, _Path, str], bool, Optional[int], Optional[int], bool) -> Optional[str]
"""
return a base folder with a writable (mutable) local copy of the entire dataset
download and copy / soft-link, files from all the parent dataset versions
:param target_folder: Target folder for the writable copy
:param overwrite: If True, recursively delete the target folder before creating a copy.
If False (default) and target folder contains files, raise exception or return None
:param part: Optional, if provided only download the selected part (index) of the Dataset.
First part number is `0` and last part is `num_parts-1`
Notice, if `num_parts` is not provided, number of parts will be equal to the total number of chunks
(i.e. sum over all chunks from the specified Dataset including all parent Datasets).
This argument is passed to parent datasets, as well as the implicit `num_parts`,
allowing users to get a partial copy of the entire dataset, for multi node/step processing.
:param num_parts: Optional, If specified normalize the number of chunks stored to the
requested number of parts. Notice that the actual chunks used per part are rounded down.
Example: Assuming total 8 chunks for this dataset (including parent datasets),
and `num_parts=5`, the chunk index used per parts would be:
part=0 -> chunks[0,5], part=1 -> chunks[1,6], part=2 -> chunks[2,7], part=3 -> chunks[3, ]
:param raise_on_error: If True raise exception if dataset merging failed on any file
:return: A the target folder containing the entire dataset
"""
assert self._id
target_folder = Path(target_folder).absolute()
target_folder.mkdir(parents=True, exist_ok=True)
# noinspection PyBroadException
try:
target_folder.rmdir()
except Exception:
if not overwrite:
if raise_on_error:
raise ValueError("Target folder {} already contains files".format(target_folder.as_posix()))
else:
return None
shutil.rmtree(target_folder.as_posix())
ro_folder = self.get_local_copy(part=part, num_parts=num_parts, raise_on_error=raise_on_error)
shutil.copytree(ro_folder, target_folder.as_posix(), symlinks=False)
return target_folder.as_posix()
def list_files(self, dataset_path=None, recursive=True, dataset_id=None):
# type: (Optional[str], bool, Optional[str]) -> List[str]
"""
returns a list of files in the current dataset
If dataset_id is provided, return a list of files that remained unchanged since the specified dataset_version
:param dataset_path: Only match files matching the dataset_path (including wildcards).
Example: 'folder/sub/*.json'
:param recursive: If True (default) matching dataset_path recursively
:param dataset_id: Filter list based on the dataset id containing the latest version of the file.
Default: None, do not filter files based on parent dataset.
:return: List of files with relative path
(files might not be available locally until get_local_copy() is called)
"""
files = self._dataset_file_entries.keys() if not dataset_id else \
[k for k, v in self._dataset_file_entries.items() if v.parent_dataset_id == dataset_id]
if not dataset_path:
return sorted(files)
if dataset_path.startswith('/'):
dataset_path = dataset_path[1:]
if not recursive:
return sorted([k for k in files if fnmatch(k + '/', dataset_path + '/')])
wildcard = dataset_path.split('/')[-1]
path = dataset_path[:-len(wildcard)] + '*'
return sorted([k for k in files if fnmatch(k, path) and fnmatch(k, '*/' + wildcard)])
def list_removed_files(self, dataset_id=None):
# type: (str) -> List[str]
"""
return a list of files removed when comparing to a specific dataset_version
:param dataset_id: dataset id (str) to compare against, if None is given compare against the parents datasets
:return: List of files with relative path
(files might not be available locally until get_local_copy() is called)
"""
datasets = self._dependency_graph[self._id] if not dataset_id or dataset_id == self._id else [dataset_id]
unified_list = set()
for ds_id in datasets:
dataset = self.get(dataset_id=ds_id)
unified_list |= set(dataset._dataset_file_entries.keys())
removed_list = [f for f in unified_list if f not in self._dataset_file_entries]
return sorted(removed_list)
def list_modified_files(self, dataset_id=None):
# type: (str) -> List[str]
"""
return a list of files modified when comparing to a specific dataset_version
:param dataset_id: dataset id (str) to compare against, if None is given compare against the parents datasets
:return: List of files with relative path
(files might not be available locally until get_local_copy() is called)
"""
datasets = self._dependency_graph[self._id] if not dataset_id or dataset_id == self._id else [dataset_id]
unified_list = dict()
for ds_id in datasets:
dataset = self.get(dataset_id=ds_id)
unified_list.update(dict((k, v.hash) for k, v in dataset._dataset_file_entries.items()))
modified_list = [k for k, v in self._dataset_file_entries.items()
if k in unified_list and v.hash != unified_list[k]]
return sorted(modified_list)
def list_added_files(self, dataset_id=None):
# type: (str) -> List[str]
"""
return a list of files added when comparing to a specific dataset_version
:param dataset_id: dataset id (str) to compare against, if None is given compare against the parents datasets
:return: List of files with relative path
(files might not be available locally until get_local_copy() is called)
"""
datasets = self._dependency_graph[self._id] if not dataset_id or dataset_id == self._id else [dataset_id]
unified_list = set()
for ds_id in datasets:
dataset = self.get(dataset_id=ds_id)
unified_list |= set(dataset._dataset_file_entries.keys())
added_list = [f for f in self._dataset_file_entries.keys() if f not in unified_list]
return sorted(added_list)
def get_dependency_graph(self):
"""
return the DAG of the dataset dependencies (all previous dataset version and their parents)
Example:
.. code-block:: py
{
'current_dataset_id': ['parent_1_id', 'parent_2_id'],
'parent_2_id': ['parent_1_id'],
'parent_1_id': [],
}
:return: dict representing the genealogy dag graph of the current dataset
"""
return deepcopy(self._dependency_graph)
def verify_dataset_hash(self, local_copy_path=None, skip_hash=False, verbose=False):
# type: (Optional[str], bool, bool) -> List[str]
"""
Verify the current copy of the dataset against the stored hash
:param local_copy_path: Specify local path containing a copy of the dataset,
If not provide use the cached folder
:param skip_hash: If True, skip hash checks and verify file size only
:param verbose: If True print errors while testing dataset files hash
:return: List of files with unmatched hashes
"""
local_path = local_copy_path or self.get_local_copy()
def compare(file_entry):
file_entry_copy = copy(file_entry)
file_entry_copy.local_path = (Path(local_path) / file_entry.relative_path).as_posix()
if skip_hash:
file_entry_copy.size = Path(file_entry_copy.local_path).stat().st_size
if file_entry_copy.size != file_entry.size:
if verbose:
print('Error: file size mismatch {} expected size {} current {}'.format(
file_entry.relative_path, file_entry.size, file_entry_copy.size))
return file_entry
else:
self._calc_file_hash(file_entry_copy)
if file_entry_copy.hash != file_entry.hash:
if verbose:
print('Error: hash mismatch {} expected size/hash {}/{} recalculated {}/{}'.format(
file_entry.relative_path,
file_entry.size, file_entry.hash,
file_entry_copy.size, file_entry_copy.hash))
return file_entry
return None
pool = ThreadPool(cpu_count() * 2)
matching_errors = pool.map(compare, self._dataset_file_entries.values())
pool.close()
return [f.relative_path for f in matching_errors if f is not None]
def get_default_storage(self):
# type: () -> Optional[str]
"""
Return the default storage location of the dataset
:return: URL for the default storage location
"""
if not self._task:
return None
return self._task.output_uri or self._task.get_logger().get_default_upload_destination()
@classmethod
def create(
cls,
dataset_name=None, # type: Optional[str]
dataset_project=None, # type: Optional[str]
dataset_tags=None, # type: Optional[Sequence[str]]
parent_datasets=None, # type: Optional[Sequence[Union[str, Dataset]]]
use_current_task=False # type: bool
):
# type: (...) -> "Dataset"
"""
Create a new dataset. Multiple dataset parents are supported.
Merging of parent datasets is done based on the order,
where each one can override overlapping files in the previous parent
:param dataset_name: Naming the new dataset
:param dataset_project: Project containing the dataset.
If not specified, infer project name form parent datasets
:param dataset_tags: Optional, list of tags (strings) to attach to the newly created Dataset
:param parent_datasets: Expand a parent dataset by adding/removing files
:param use_current_task: False (default), a new Dataset task is created.
If True, the dataset is created on the current Task.
:return: Newly created Dataset object
"""
parent_datasets = [cls.get(dataset_id=p) if not isinstance(p, Dataset) else p for p in (parent_datasets or [])]
if any(not p.is_final() for p in parent_datasets):
raise ValueError("Cannot inherit from a parent that was not finalized/closed")
# if dataset name + project are None, default to use current_task
if dataset_project is None and dataset_name is None and not use_current_task:
LoggerRoot.get_base_logger().info('New dataset project/name not provided, storing on Current Task')
use_current_task = True
# get project name
if not dataset_project and not use_current_task:
if not parent_datasets:
raise ValueError("Missing dataset project name. Could not infer project name from parent dataset.")
# get project name from parent dataset
dataset_project = parent_datasets[-1]._task.get_project_name()
# merge datasets according to order
dataset_file_entries = {}
dependency_graph = {}
for p in parent_datasets:
dataset_file_entries.update(deepcopy(p._dataset_file_entries))
dependency_graph.update(deepcopy(p._dependency_graph))
instance = cls(_private=cls.__private_magic,
dataset_project=dataset_project,
dataset_name=dataset_name,
dataset_tags=dataset_tags,
task=Task.current_task() if use_current_task else None)
instance._using_current_task = use_current_task
instance._task.get_logger().report_text('Dataset created', print_console=False)
instance._dataset_file_entries = dataset_file_entries
instance._dependency_graph = dependency_graph
instance._dependency_graph[instance._id] = [p._id for p in parent_datasets]
instance._serialize()
instance._task.flush(wait_for_uploads=True)
cls._set_project_system_tags(instance._task)
return instance
@classmethod
def delete(cls, dataset_id=None, dataset_project=None, dataset_name=None, force=False):
# type: (Optional[str], Optional[str], Optional[str], bool) -> ()
"""
Delete a dataset, raise exception if dataset is used by other dataset versions.
Use force=True to forcefully delete the dataset
:param dataset_id: Dataset id to delete
:param dataset_project: Project containing the dataset
:param dataset_name: Naming the new dataset
:param force: If True delete even if other datasets depend on the specified dataset version
"""
mutually_exclusive(dataset_id=dataset_id, dataset_project=dataset_project)
mutually_exclusive(dataset_id=dataset_id, dataset_name=dataset_name)
if not dataset_id:
tasks = Task.get_tasks(
project_name=dataset_project,
task_name=exact_match_regex(dataset_name) if dataset_name else None,
task_filter=dict(
system_tags=[cls.__tag],
type=[str(Task.TaskTypes.data_processing)],
page_size=2, page=0,)
)
if not tasks:
raise ValueError("Dataset project={} name={} could not be found".format(dataset_project, dataset_name))
if len(tasks) > 1:
raise ValueError("Too many datasets matching project={} name={}".format(dataset_project, dataset_name))
dataset_id = tasks[0].id
# check if someone is using the datasets
if not force:
# todo: use Task runtime_properties
# noinspection PyProtectedMember
dependencies = Task._query_tasks(
system_tags=[cls.__tag],
type=[str(Task.TaskTypes.data_processing)],
only_fields=['created', 'id', 'name'],
search_text='{}'.format(cls._get_dataset_id_hash(dataset_id))
)
# filter us out
if dependencies:
dependencies = [d for d in dependencies if d.id != dataset_id]
if dependencies:
raise ValueError("Dataset id={} is used by datasets: {}".format(
dataset_id, [d.id for d in dependencies]))
client = APIClient()
# notice the force here is a must, since the state is never draft
# noinspection PyBroadException
try:
t = client.tasks.get_by_id(dataset_id)
except Exception:
t = None
if not t:
raise ValueError("Dataset id={} could not be found".format(dataset_id))
if str(t.type) != str(Task.TaskTypes.data_processing) or cls.__tag not in t.system_tags:
raise ValueError("Dataset id={} is not of type Dataset".format(dataset_id))
task = Task.get_task(task_id=dataset_id)
# first delete all the artifacts from the dataset
for artifact in task.artifacts.values():
h = StorageHelper.get(artifact.url)
# noinspection PyBroadException
try:
h.delete(artifact.url)
except Exception as ex:
LoggerRoot.get_base_logger().warning('Failed deleting remote file \'{}\': {}'.format(
artifact.url, ex))
# now delete the actual task
client.tasks.delete(task=dataset_id, force=True)
@classmethod
def get(
cls,
dataset_id=None, # type: Optional[str]
dataset_project=None, # type: Optional[str]
dataset_name=None, # type: Optional[str]
dataset_tags=None, # type: Optional[Sequence[str]]
only_completed=False, # type: bool
only_published=False, # type: bool
auto_create=False, # type: bool
writable_copy=False # type: bool
):
# type: (...) -> "Dataset"
"""
Get a specific Dataset. If only dataset_project is given, return the last Dataset in the Dataset project
:param dataset_id: Requested Dataset ID
:param dataset_project: Requested Dataset project name
:param dataset_name: Requested Dataset name
:param dataset_tags: Requested Dataset tags (list of tag strings)
:param only_completed: Return only if the requested dataset is completed or published
:param only_published: Return only if the requested dataset is published
:param auto_create: Create new dataset if it does not exist yet
:param writable_copy: Get a newly created mutable dataset with the current one as its parent,
so new files can added to the instance.
:return: Dataset object
"""
mutually_exclusive(dataset_id=dataset_id, dataset_project=dataset_project, _require_at_least_one=False)
mutually_exclusive(dataset_id=dataset_id, dataset_name=dataset_name, _require_at_least_one=False)
if not any([dataset_id, dataset_project, dataset_name, dataset_tags]):
raise ValueError("Dataset selection criteria not met. Didn't provide id/name/project/tags correctly.")
if auto_create and not get_existing_project(
session=Task._get_default_session(), project_name=dataset_project
):
tasks = []
else:
tasks = Task.get_tasks(
task_ids=[dataset_id] if dataset_id else None,
project_name=dataset_project,
task_name=exact_match_regex(dataset_name) if dataset_name else None,
tags=dataset_tags,
task_filter=dict(
system_tags=[cls.__tag, '-archived'], order_by=['-created'],
type=[str(Task.TaskTypes.data_processing)],
page_size=1, page=0,
status=['published'] if only_published else
['published', 'completed', 'closed'] if only_completed else None)
)
if not tasks:
if auto_create:
instance = Dataset.create(dataset_name=dataset_name, dataset_project=dataset_project,
dataset_tags=dataset_tags)
return instance
raise ValueError('Could not find Dataset {} {}'.format(
'id' if dataset_id else 'project/name',
dataset_id if dataset_id else (dataset_project, dataset_name)))
task = tasks[0]
if task.status == 'created':
raise ValueError('Dataset id={} is in draft mode, delete and recreate it'.format(task.id))
force_download = False if task.status in ('stopped', 'published', 'closed', 'completed') else True
if cls.__state_entry_name in task.artifacts:
local_state_file = StorageManager.get_local_copy(
remote_url=task.artifacts[cls.__state_entry_name].url, cache_context=cls.__cache_context,
extract_archive=False, name=task.id, force_download=force_download)
if not local_state_file:
raise ValueError('Could not load Dataset id={} state'.format(task.id))
else:
# we could not find the serialized state, start empty
local_state_file = {}
instance = cls._deserialize(local_state_file, task)
# remove the artifact, just in case
if force_download and local_state_file:
os.unlink(local_state_file)
# Now we have the requested dataset, but if we want a mutable copy instead, we create a new dataset with the
# current one as its parent. So one can add files to it and finalize as a new version.
if writable_copy: