-
Notifications
You must be signed in to change notification settings - Fork 641
/
helper.py
2419 lines (1967 loc) · 88.6 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import with_statement
import errno
import getpass
import itertools
import json
import os
import shutil
import sys
import threading
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from datetime import datetime
from multiprocessing.pool import ThreadPool
from tempfile import mktemp
from time import time
from types import GeneratorType
import requests
import six
from _socket import gethostname
from attr import attrs, attrib, asdict
from furl import furl
from pathlib2 import Path
from requests import codes as requests_codes
from requests.exceptions import ConnectionError
from six import binary_type, StringIO
from six.moves.queue import Queue, Empty
from six.moves.urllib.parse import urlparse
from six.moves.urllib.request import url2pathname
from .callbacks import UploadProgressReport, DownloadProgressReport
from .util import quote_url
from ..backend_api.utils import get_http_session_with_retry
from ..backend_config.bucket_config import S3BucketConfigurations, GSBucketConfigurations, AzureContainerConfigurations
from ..config import config, deferred_config
from ..debugging import get_logger
from ..errors import UsageError
from ..utilities.process.mp import ForkSafeRLock
class StorageError(Exception):
pass
class DownloadError(Exception):
pass
@six.add_metaclass(ABCMeta)
class _Driver(object):
@classmethod
def get_logger(cls):
return get_logger('storage')
@abstractmethod
def get_container(self, container_name, config=None, **kwargs):
pass
@abstractmethod
def test_upload(self, test_path, config, **kwargs):
pass
@abstractmethod
def upload_object_via_stream(self, iterator, container, object_name, extra, **kwargs):
pass
@abstractmethod
def list_container_objects(self, container, ex_prefix, **kwargs):
pass
@abstractmethod
def get_direct_access(self, remote_path, **kwargs):
pass
@abstractmethod
def download_object(self, obj, local_path, overwrite_existing, delete_on_failure, callback, **kwargs):
pass
@abstractmethod
def download_object_as_stream(self, obj, chunk_size, **kwargs):
pass
@abstractmethod
def delete_object(self, obj, **kwargs):
pass
@abstractmethod
def upload_object(self, file_path, container, object_name, extra, **kwargs):
pass
@abstractmethod
def get_object(self, container_name, object_name, **kwargs):
pass
class StorageHelper(object):
""" Storage helper.
Used by the entire system to download/upload files.
Supports both local and remote files (currently local files, network-mapped files, HTTP/S and Amazon S3)
"""
_temp_download_suffix = '.partially'
@classmethod
def _get_logger(cls):
return get_logger('storage')
@attrs
class _PathSubstitutionRule(object):
registered_prefix = attrib(type=str)
local_prefix = attrib(type=str)
replace_windows_sep = attrib(type=bool)
replace_linux_sep = attrib(type=bool)
path_substitution_config = 'storage.path_substitution'
@classmethod
def load_list_from_config(cls):
rules_list = []
for index, sub_config in enumerate(config.get(cls.path_substitution_config, list())):
rule = cls(
registered_prefix=sub_config.get('registered_prefix', None),
local_prefix=sub_config.get('local_prefix', None),
replace_windows_sep=sub_config.get('replace_windows_sep', False),
replace_linux_sep=sub_config.get('replace_linux_sep', False),
)
if any(prefix is None for prefix in (rule.registered_prefix, rule.local_prefix)):
StorageHelper._get_logger().warning(
"Illegal substitution rule configuration '{}[{}]': {}".format(
cls.path_substitution_config,
index,
asdict(rule),
))
continue
if all((rule.replace_windows_sep, rule.replace_linux_sep)):
StorageHelper._get_logger().warning(
"Only one of replace_windows_sep and replace_linux_sep flags may be set."
"'{}[{}]': {}".format(
cls.path_substitution_config,
index,
asdict(rule),
))
continue
rules_list.append(rule)
return rules_list
class _UploadData(object):
@property
def src_path(self):
return self._src_path
@property
def dest_path(self):
return self._dest_path
@property
def extra(self):
return self._extra
@property
def callback(self):
return self._callback
@property
def retries(self):
return self._retries
def __init__(self, src_path, dest_path, extra, callback, retries):
self._src_path = src_path
self._dest_path = dest_path
self._extra = extra
self._callback = callback
self._retries = retries
def __str__(self):
return "src=%s" % self.src_path
_helpers = {} # cache of helper instances
# global terminate event for async upload threads
# _terminate = threading.Event()
_async_upload_threads = set()
_upload_pool = None
_upload_pool_pid = None
# collect all bucket credentials that aren't empty (ignore entries with an empty key or secret)
_s3_configurations = deferred_config('aws.s3', {}, transform=S3BucketConfigurations.from_config)
_gs_configurations = deferred_config('google.storage', {}, transform=GSBucketConfigurations.from_config)
_azure_configurations = deferred_config('azure.storage', {}, transform=AzureContainerConfigurations.from_config)
_path_substitutions = deferred_config(transform=_PathSubstitutionRule.load_list_from_config)
@property
def log(self):
return self._log
@property
def scheme(self):
return self._scheme
@property
def secure(self):
return self._secure
@property
def base_url(self):
return self._base_url
@classmethod
def get(cls, url, logger=None, **kwargs):
"""
Get a storage helper instance for the given URL
:return: A StorageHelper instance.
"""
# Handle URL substitution etc before locating the correct storage driver
url = cls._canonize_url(url)
# Get the credentials we should use for this url
base_url = cls._resolve_base_url(url)
instance_key = '%s_%s' % (base_url, threading.current_thread().ident or 0)
force_create = kwargs.pop('__force_create', False)
if (instance_key in cls._helpers) and (not force_create):
return cls._helpers[instance_key]
# Don't canonize URL since we already did it
try:
instance = cls(base_url=base_url, url=url, logger=logger, canonize_url=False, **kwargs)
except (StorageError, UsageError) as ex:
cls._get_logger().error(str(ex))
return None
except Exception as ex:
cls._get_logger().error("Failed creating storage object {} Reason: {}".format(
base_url or url, ex))
return None
cls._helpers[instance_key] = instance
return instance
@classmethod
def get_local_copy(cls, remote_url):
"""
Download a file from remote URL to a local storage, and return path to local copy,
:param remote_url: Remote URL. Example: https://example.com/file.jpg s3://bucket/folder/file.mp4 etc.
:return: Path to local copy of the downloaded file. None if error occurred.
"""
helper = cls.get(remote_url)
if not helper:
return None
# create temp file with the requested file name
file_name = '.' + remote_url.split('/')[-1].split(os.path.sep)[-1]
local_path = mktemp(suffix=file_name)
return helper.download_to_file(remote_url, local_path)
def __init__(self, base_url, url, key=None, secret=None, region=None, verbose=False, logger=None, retries=5,
**kwargs):
level = config.get('storage.log.level', None)
if level:
try:
self._get_logger().setLevel(level)
except (TypeError, ValueError):
self._get_logger().error('invalid storage log level in configuration: %s' % level)
self._log = logger or self._get_logger()
self._verbose = verbose
self._retries = retries
self._extra = {}
self._base_url = base_url
self._secure = True
self._driver = None
self._container = None
self._conf = None
if kwargs.get('canonize_url', True):
url = self._canonize_url(url)
parsed = urlparse(url)
self._scheme = parsed.scheme
if self._scheme == _AzureBlobServiceStorageDriver.scheme:
self._conf = copy(self._azure_configurations.get_config_by_uri(url))
if self._conf is None:
raise StorageError("Missing Azure Blob Storage configuration for {}".format(url))
if not self._conf.account_name or not self._conf.account_key:
raise StorageError(
"Missing account name or key for Azure Blob Storage access for {}".format(base_url)
)
self._driver = _AzureBlobServiceStorageDriver()
self._container = self._driver.get_container(config=self._conf)
elif self._scheme == _Boto3Driver.scheme:
self._conf = copy(self._s3_configurations.get_config_by_uri(url))
self._secure = self._conf.secure
final_region = region if region else self._conf.region
if not final_region:
final_region = None
self._conf.update(
key=key or self._conf.key,
secret=secret or self._conf.secret,
multipart=self._conf.multipart,
region=final_region,
use_credentials_chain=self._conf.use_credentials_chain
)
if not self._conf.use_credentials_chain:
if not self._conf.key or not self._conf.secret:
raise ValueError(
"Missing key and secret for S3 storage access (%s)" % base_url
)
self._driver = _Boto3Driver()
self._container = self._driver.get_container(container_name=self._base_url, retries=retries,
config=self._conf)
elif self._scheme == _GoogleCloudStorageDriver.scheme:
self._conf = copy(self._gs_configurations.get_config_by_uri(url))
self._driver = _GoogleCloudStorageDriver()
self._container = self._driver.get_container(
container_name=self._base_url,
config=self._conf
)
elif self._scheme in _HttpDriver.schemes:
self._driver = _HttpDriver(retries=retries)
self._container = self._driver.get_container(container_name=self._base_url)
else: # elif self._scheme == 'file':
# if this is not a known scheme assume local file
# If the scheme is file, use only the path segment, If not, use the entire URL
if self._scheme == 'file':
url = parsed.path
url = url.replace("\\", "/")
# url2pathname is specifically intended to operate on (urlparse result).path
# and returns a cross-platform compatible result
driver_uri = url2pathname(url)
path_driver_uri = Path(driver_uri)
# if path_driver_uri.is_file():
# driver_uri = str(path_driver_uri.parent)
# elif not path_driver_uri.exists():
# # assume a folder and create
# # Path(driver_uri).mkdir(parents=True, exist_ok=True)
# pass
self._driver = _FileStorageDriver(str(path_driver_uri.root))
self._container = None
@classmethod
def terminate_uploads(cls, force=True, timeout=2.0):
if force:
# since async uploaders are daemon threads, we can just return and let them close by themselves
return
# signal all threads to terminate and give them a chance for 'timeout' seconds (total, not per-thread)
# cls._terminate.set()
remaining_timeout = timeout
for thread in cls._async_upload_threads:
t = time()
try:
thread.join(timeout=remaining_timeout)
except Exception:
pass
remaining_timeout -= (time() - t)
@classmethod
def get_configuration(cls, bucket_config):
return cls._s3_configurations.get_config_by_bucket(bucket_config.bucket, bucket_config.host)
@classmethod
def add_configuration(cls, bucket_config, log=None, _test_config=True):
# Try to use existing configuration if we have no key and secret
use_existing = not bucket_config.is_valid()
# Get existing config anyway (we'll either try to use it or alert we're replacing it
existing = cls.get_configuration(bucket_config)
configs = cls._s3_configurations
if not use_existing:
# Test bucket config, fails if unsuccessful
if _test_config:
_Boto3Driver._test_bucket_config(bucket_config, log)
if existing:
if log:
log.warning('Overriding existing configuration for %s/%s'
% (existing.host or 'AWS', existing.bucket))
configs.remove_config(existing)
else:
# Try to use existing configuration
good_config = False
if existing:
if log:
log.info('Using existing credentials for bucket %s/%s'
% (bucket_config.host or 'AWS', bucket_config.bucket))
good_config = _Boto3Driver._test_bucket_config(existing, log, raise_on_error=False)
if not good_config:
# Try to use global key/secret
configs.update_config_with_defaults(bucket_config)
if log:
log.info('Using global credentials for bucket %s/%s'
% (bucket_config.host or 'AWS', bucket_config.bucket))
if _test_config:
_Boto3Driver._test_bucket_config(bucket_config, log)
else:
# do not add anything, existing config is OK
return
configs.add_config(bucket_config)
@classmethod
def add_path_substitution(
cls,
registered_prefix,
local_prefix,
replace_windows_sep=False,
replace_linux_sep=False,
):
"""
Add a path substitution rule for storage paths.
Useful for case where the data was registered under some path, and that
path was later renamed. This may happen with local storage paths where
each machine is has different mounts or network drives configurations
:param registered_prefix: The prefix to search for and replace. This is
the prefix of the path the data is registered under. This should be the
exact url prefix, case sensitive, as the data is registered.
:param local_prefix: The prefix to replace 'registered_prefix' with. This
is the prefix of the path the data is actually saved under. This should be the
exact url prefix, case sensitive, as the data is saved under.
:param replace_windows_sep: If set to True, and the prefix matches, the rest
of the url has all of the windows path separators (backslash '\') replaced with
the native os path separator.
:param replace_linux_sep: If set to True, and the prefix matches, the rest
of the url has all of the linux/unix path separators (slash '/') replaced with
the native os path separator.
"""
if not registered_prefix or not local_prefix:
raise UsageError("Path substitution prefixes must be non empty strings")
if replace_windows_sep and replace_linux_sep:
raise UsageError("Only one of replace_windows_sep and replace_linux_sep may be set.")
rule = cls._PathSubstitutionRule(
registered_prefix=registered_prefix,
local_prefix=local_prefix,
replace_windows_sep=replace_windows_sep,
replace_linux_sep=replace_linux_sep,
)
cls._path_substitutions.append(rule)
@classmethod
def clear_path_substitutions(cls):
"""
Removes all path substitution rules, including ones from the configuration file.
"""
cls._path_substitutions = list()
def verify_upload(self, folder_uri='', raise_on_error=True, log_on_error=True):
"""
Verify that this helper can upload files to a folder.
An upload is possible iff:
1. the destination folder is under the base uri of the url used to create the helper
2. the helper has credentials to write to the destination folder
:param folder_uri: The destination folder to test. Must be an absolute
url that begins with the base uri of the url used to create the helper.
:param raise_on_error: Raise an exception if an upload is not possible
:param log_on_error: Log an error if an upload is not possible
:return: True, if, and only if, an upload to folder_uri is possible.
"""
folder_uri = self._canonize_url(folder_uri)
folder_uri = self.conform_url(folder_uri, self._base_url)
test_path = self._normalize_object_name(folder_uri)
if self._scheme == _Boto3Driver.scheme:
_Boto3Driver._test_bucket_config(
self._conf,
self._log,
test_path=test_path,
raise_on_error=raise_on_error,
log_on_error=log_on_error,
)
elif self._scheme == _GoogleCloudStorageDriver.scheme:
self._driver.test_upload(test_path, self._conf)
elif self._scheme == 'file':
# Check path exists
Path(test_path).mkdir(parents=True, exist_ok=True)
# check path permissions
Path(test_path).touch(exist_ok=True)
return folder_uri
def upload_from_stream(self, stream, dest_path, extra=None, retries=1):
dest_path = self._canonize_url(dest_path)
object_name = self._normalize_object_name(dest_path)
extra = extra.copy() if extra else {}
extra.update(self._extra)
last_ex = None
cb = UploadProgressReport.from_stream(stream, object_name, self._verbose, self._log)
for i in range(max(1, retries)):
try:
self._driver.upload_object_via_stream(
iterator=stream,
container=self._container,
object_name=object_name,
callback=cb,
extra=extra)
last_ex = None
break
except Exception as ex:
last_ex = ex
# seek to beginning if possible
# noinspection PyBroadException
try:
stream.seek(0)
except Exception:
pass
if last_ex:
raise last_ex
if self.scheme in _HttpDriver.schemes:
# quote link
dest_path = quote_url(dest_path)
return dest_path
def upload(self, src_path, dest_path=None, extra=None, async_enable=False, cb=None, retries=1):
if not dest_path:
dest_path = os.path.basename(src_path)
dest_path = self._canonize_url(dest_path)
if cb and self.scheme in _HttpDriver.schemes:
# store original callback
a_cb = cb
# quote link
def callback(a_path):
return a_cb(quote_url(a_path) if a_path else a_path)
# replace callback with wrapper
cb = callback
if async_enable:
data = self._UploadData(src_path=src_path, dest_path=dest_path, extra=extra, callback=cb, retries=retries)
StorageHelper._initialize_upload_pool()
return StorageHelper._upload_pool.apply_async(self._do_async_upload, args=(data,))
else:
res = self._do_upload(src_path, dest_path, extra, cb, verbose=False, retries=retries)
if res:
res = quote_url(res)
return res
def list(self, prefix=None):
"""
List entries in the helper base path.
Return a list of names inside this helper base path. The base path is
determined at creation time and is specific for each storage medium.
For Google Storage and S3 it is the bucket of the path.
For local files it is the root directory.
This operation is not supported for http and https protocols.
:param prefix: If None, return the list as described above. If not, it
must be a string - the path of a sub directory under the base path.
the returned list will include only objects under that subdir.
:return: The paths of all the objects in the storage base
path under prefix. Listed relative to the base path.
"""
if prefix:
if prefix.startswith(self._base_url):
prefix = prefix[len(self.base_url):].lstrip("/")
try:
res = self._driver.list_container_objects(self._container, ex_prefix=prefix)
except TypeError:
res = self._driver.list_container_objects(self._container)
return [
obj.name
for obj in res if
obj.name.startswith(prefix) and obj.name != prefix
]
else:
return [obj.name for obj in self._driver.list_container_objects(self._container)]
def download_to_file(
self,
remote_path,
local_path,
overwrite_existing=False,
delete_on_failure=True,
verbose=None,
skip_zero_size_check=False
):
def next_chunk(astream):
if isinstance(astream, binary_type):
chunk = astream
astream = None
elif astream:
try:
chunk = next(astream)
except StopIteration:
chunk = None
else:
chunk = None
return chunk, astream
remote_path = self._canonize_url(remote_path)
verbose = self._verbose if verbose is None else verbose
# Check if driver type supports direct access:
direct_access_path = self.get_driver_direct_access(remote_path)
if direct_access_path:
return direct_access_path
temp_local_path = None
try:
if verbose:
self._log.info('Start downloading from %s' % remote_path)
if not overwrite_existing and Path(local_path).is_file():
self._log.warning(
'File {} already exists, no need to download, thread id = {}'.format(
local_path,
threading.current_thread().ident,
),
)
return local_path
# we download into temp_local_path so that if we accidentally stop in the middle,
# we won't think we have the entire file
temp_local_path = '{}_{}{}'.format(local_path, time(), self._temp_download_suffix)
obj = self._get_object(remote_path)
if not obj:
return None
# object size in bytes
total_size_mb = -1
dl_total_mb = 0.
download_reported = False
# chunks size is ignored and always 5Mb
chunk_size_mb = 5
# make sure we have the destination folder
# noinspection PyBroadException
Path(temp_local_path).parent.mkdir(parents=True, exist_ok=True)
# try to get file size
try:
if isinstance(self._driver, _HttpDriver) and obj:
obj = self._driver._get_download_object(obj)
total_size_mb = float(obj.headers.get('Content-Length', 0)) / (1024 * 1024)
elif hasattr(obj, 'size'):
size = obj.size
# Google storage has the option to reload the object to get the size
if size is None and hasattr(obj, 'reload'):
obj.reload()
size = obj.size
total_size_mb = 0 if size is None else float(size) / (1024 * 1024)
elif hasattr(obj, 'content_length'):
total_size_mb = float(obj.content_length) / (1024 * 1024)
except (ValueError, AttributeError, KeyError):
pass
# if driver supports download with callback, use it (it might be faster)
if hasattr(self._driver, 'download_object'):
# callback
cb = DownloadProgressReport(total_size_mb, verbose, remote_path, self._log)
self._driver.download_object(obj, temp_local_path, callback=cb)
download_reported = bool(cb.last_reported)
dl_total_mb = cb.current_status_mb
else:
stream = self._driver.download_object_as_stream(obj, chunk_size_mb * 1024 * 1024)
if stream is None:
raise ValueError('Could not download %s' % remote_path)
with open(temp_local_path, 'wb') as fd:
data, stream = next_chunk(stream)
while data:
fd.write(data)
data, stream = next_chunk(stream)
if not skip_zero_size_check and Path(temp_local_path).stat().st_size <= 0:
raise Exception('downloaded a 0-sized file')
# if we are on windows, we need to remove the target file before renaming
# otherwise posix rename will overwrite the target
if os.name != 'posix':
try:
os.remove(local_path)
except Exception:
pass
# rename temp file to local_file
# noinspection PyBroadException
try:
os.rename(temp_local_path, local_path)
except Exception:
# noinspection PyBroadException
try:
os.unlink(temp_local_path)
except Exception:
pass
# file was downloaded by a parallel process, check we have the final output and delete the partial copy
path_local_path = Path(local_path)
if not path_local_path.is_file() or (not skip_zero_size_check and path_local_path.stat().st_size <= 0):
raise Exception('Failed renaming partial file, downloaded file exists and a 0-sized file')
# report download if we are on the second chunk
if verbose or download_reported:
self._log.info(
'Downloaded %.2f MB successfully from %s , saved to %s' % (dl_total_mb, remote_path, local_path))
return local_path
except DownloadError:
raise
except Exception as e:
self._log.error("Could not download {} , err: {} ".format(remote_path, e))
if delete_on_failure:
# noinspection PyBroadException
try:
if temp_local_path:
os.remove(temp_local_path)
except Exception:
pass
return None
def download_as_stream(self, remote_path, chunk_size=None):
remote_path = self._canonize_url(remote_path)
try:
obj = self._get_object(remote_path)
return self._driver.download_object_as_stream(
obj, chunk_size=chunk_size, verbose=self._verbose, log=self.log
)
except DownloadError:
raise
except Exception as e:
self._log.error("Could not download file : %s, err:%s " % (remote_path, str(e)))
return None
def download_as_nparray(self, remote_path, chunk_size=None):
try:
stream = self.download_as_stream(remote_path, chunk_size)
if stream is None:
return
# TODO: ugly py3 hack, please remove ASAP
if six.PY3 and not isinstance(stream, GeneratorType):
import numpy as np
return np.frombuffer(stream, dtype=np.uint8)
else:
import numpy as np
return np.asarray(bytearray(b''.join(stream)), dtype=np.uint8)
except Exception as e:
self._log.error("Could not download file : %s, err:%s " % (remote_path, str(e)))
def delete(self, path):
return self._driver.delete_object(self._get_object(path))
def check_write_permissions(self, dest_path=None):
# create a temporary file, then delete it
base_url = dest_path or self._base_url
dest_path = base_url + '/.clearml.test'
# do not check http/s connection permissions
if dest_path.startswith('http'):
return True
try:
self.upload_from_stream(stream=six.BytesIO(b'clearml'), dest_path=dest_path)
self.delete(path=dest_path)
except Exception:
raise ValueError('Insufficient permissions for {}'.format(base_url))
return True
@classmethod
def download_from_url(cls, remote_url, local_path, overwrite_existing=False):
"""
Download a file from remote URL to a local storage
:param remote_url: Remote URL. Example: https://example.com/image.jpg or s3://bucket/folder/file.mp4 etc.
:param local_path: target location for downloaded file. Example: /tmp/image.jpg
:param overwrite_existing: If True and local_path exists, it will overwrite it, otherwise print warning
:return: local_path if download was successful.
"""
helper = cls.get(remote_url)
if not helper:
return None
return helper.download_to_file(remote_url, local_path, overwrite_existing=overwrite_existing)
def get_driver_direct_access(self, path):
"""
Check if the helper's driver has a direct access to the file
:param str path: file path to check access to
:return: Return the string representation of the file as path if have access to it, else None
"""
return self._driver.get_direct_access(path)
@classmethod
def _canonize_url(cls, url):
return cls._apply_url_substitutions(url)
@classmethod
def _apply_url_substitutions(cls, url):
def replace_separator(_url, where, sep):
return _url[:where] + _url[where:].replace(sep, os.sep)
for index, rule in enumerate(cls._path_substitutions):
if url.startswith(rule.registered_prefix):
url = url.replace(
rule.registered_prefix,
rule.local_prefix,
1, # count. str.replace() does not support keyword arguments
)
if rule.replace_windows_sep:
url = replace_separator(url, len(rule.local_prefix), '\\')
if rule.replace_linux_sep:
url = replace_separator(url, len(rule.local_prefix), '/')
break
return url
@classmethod
def _resolve_base_url(cls, base_url):
parsed = urlparse(base_url)
if parsed.scheme == _Boto3Driver.scheme:
conf = cls._s3_configurations.get_config_by_uri(base_url)
bucket = conf.bucket
if not bucket:
parts = Path(parsed.path.strip('/')).parts
if parts:
bucket = parts[0]
return '/'.join(x for x in ('s3:/', conf.host, bucket) if x)
elif parsed.scheme == _AzureBlobServiceStorageDriver.scheme:
conf = cls._azure_configurations.get_config_by_uri(base_url)
if not conf:
raise StorageError("Can't find azure configuration for {}".format(base_url))
return str(furl(base_url).set(path=conf.container_name))
elif parsed.scheme == _GoogleCloudStorageDriver.scheme:
conf = cls._gs_configurations.get_config_by_uri(base_url)
return str(furl(scheme=parsed.scheme, netloc=conf.bucket))
elif parsed.scheme == 'http':
return 'http://'
elif parsed.scheme == 'https':
return 'https://'
else: # if parsed.scheme == 'file':
# if we do not know what it is, we assume file
return 'file://'
@classmethod
def conform_url(cls, folder_uri, base_url=None):
if not folder_uri:
return folder_uri
_base_url = cls._resolve_base_url(folder_uri) if not base_url else base_url
if not folder_uri.startswith(_base_url):
prev_folder_uri = folder_uri
if _base_url == 'file://':
folder_uri = str(Path(folder_uri).absolute())
if folder_uri.startswith('/'):
folder_uri = _base_url + folder_uri
else:
folder_uri = '/'.join((_base_url, folder_uri))
cls._get_logger().debug('Upload destination {} amended to {} for registration purposes'.format(
prev_folder_uri, folder_uri))
else:
raise ValueError('folder_uri: {} does not start with base url: {}'.format(folder_uri, _base_url))
return folder_uri
def _absolute_object_name(self, path):
""" Returns absolute remote path, including any prefix that is handled by the container """
if not path.startswith(self.base_url):
return self.base_url.rstrip('/') + '///' + path.lstrip('/')
return path
def _normalize_object_name(self, path):
""" Normalize remote path. Remove any prefix that is already handled by the container """
if path.startswith(self.base_url):
path = path[len(self.base_url):]
if path.startswith('/') and os.name == 'nt':
path = path[1:]
if self.scheme in (_Boto3Driver.scheme, _GoogleCloudStorageDriver.scheme,
_AzureBlobServiceStorageDriver.scheme):
path = path.lstrip('/')
return path
def _do_async_upload(self, data):
assert isinstance(data, self._UploadData)
return self._do_upload(data.src_path, data.dest_path, extra=data.extra, cb=data.callback,
verbose=True, retries=data.retries)
def _upload_from_file(self, local_path, dest_path, extra=None):
if not hasattr(self._driver, 'upload_object'):
with open(local_path, 'rb') as stream:
res = self.upload_from_stream(stream=stream, dest_path=dest_path, extra=extra)
else:
object_name = self._normalize_object_name(dest_path)
extra = extra.copy() if extra else {}
extra.update(self._extra)
cb = UploadProgressReport.from_file(local_path, self._verbose, self._log)
res = self._driver.upload_object(
file_path=local_path,
container=self._container,
object_name=object_name,
callback=cb,
extra=extra)
return res
def _do_upload(self, src_path, dest_path, extra=None, cb=None, verbose=False, retries=1):
object_name = self._normalize_object_name(dest_path)
if cb:
try:
cb(None)
except Exception as e:
self._log.error("Calling upload callback when starting upload: %s" % str(e))
if verbose:
msg = 'Starting upload: {} => {}{}'.format(
src_path,
(self._container.name if self._container.name.endswith('/') else self._container.name + '/')
if self._container and self._container.name else '', object_name)
if object_name.startswith('file://') or object_name.startswith('/'):
self._log.debug(msg)
else:
self._log.info(msg)
last_ex = None
for i in range(max(1, retries)):
try:
if not self._upload_from_file(local_path=src_path, dest_path=dest_path, extra=extra):
# retry if failed
last_ex = ValueError("Upload failed")
continue
last_ex = None
break
except Exception as e:
last_ex = e
if last_ex:
self._log.error("Exception encountered while uploading %s" % str(last_ex))
if cb:
try:
cb(False)
except Exception as e:
self._log.warning("Exception on upload callback: %s" % str(e))
raise last_ex
if verbose:
self._log.debug("Finished upload: %s => %s" % (src_path, object_name))
if cb:
try:
cb(dest_path)
except Exception as e:
self._log.warning("Exception on upload callback: %s" % str(e))
return dest_path
def _get_object(self, path):
object_name = self._normalize_object_name(path)
try:
return self._driver.get_object(
container_name=self._container.name if self._container else '', object_name=object_name)
except ConnectionError:
raise DownloadError
except Exception as e:
self.log.warning('Storage helper problem for {}: {}'.format(str(object_name), str(e)))
return None