/
utils.py
2292 lines (1895 loc) · 72.6 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# emacs: -*- mode: python; py-indent-offset: 4; tab-width: 4; indent-tabs-mode: nil -*-
# ex: set sts=4 ts=4 sw=4 noet:
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
#
# See COPYING file distributed along with the datalad package for the
# copyright and license terms.
#
# ## ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ### ##
import collections
import hashlib
import re
import six.moves.builtins as __builtin__
import time
try:
from collections.abc import Callable
except ImportError: # Python <= 3.3
from collections import Callable
import logging
import shutil
import os
import sys
import tempfile
import platform
import gc
import glob
import gzip
import string
import warnings
import wrapt
from copy import copy as shallow_copy
from contextlib import contextmanager
from functools import wraps
from time import sleep
import inspect
from itertools import tee
import os.path as op
from os.path import sep as dirsep
from os.path import commonprefix
from os.path import curdir, basename, exists, realpath, islink, join as opj
from os.path import isabs, normpath, expandvars, expanduser, abspath, sep
from os.path import isdir
from os.path import relpath
from os.path import stat
from os.path import dirname
from os.path import split as psplit
import posixpath
from six import PY2, text_type, binary_type, string_types
from six.moves import shlex_quote
# from datalad.dochelpers import get_docstring_split
from datalad.consts import TIMESTAMP_FMT
if PY2:
unicode_srctypes = string_types
else:
unicode_srctypes = string_types + (bytes,)
lgr = logging.getLogger("datalad.utils")
lgr.log(5, "Importing datalad.utils")
#
# Some useful variables
#
platform_system = platform.system().lower()
on_windows = platform_system == 'windows'
on_osx = platform_system == 'darwin'
on_linux = platform_system == 'linux'
on_msys_tainted_paths = on_windows \
and 'MSYS_NO_PATHCONV' not in os.environ \
and os.environ.get('MSYSTEM', '')[:4] in ('MSYS', 'MING')
def get_linux_distribution():
"""Compatibility wrapper for {platform,distro}.linux_distribution().
"""
if hasattr(platform, "linux_distribution"):
# Use deprecated (but faster) method if it's available.
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
result = platform.linux_distribution()
else:
import distro # We require this for Python 3.8 and above.
result = distro.linux_distribution(full_distribution_name=False)
return result
try:
linux_distribution_name, linux_distribution_release \
= get_linux_distribution()[:2]
on_debian_wheezy = on_linux \
and linux_distribution_name == 'debian' \
and linux_distribution_release.startswith('7.')
except: # pragma: no cover
# MIH: IndexError?
on_debian_wheezy = False
linux_distribution_name = linux_distribution_release = None
# Maximal length of cmdline string
# Query the system and use hardcoded "knowledge" if None
# probably getconf ARG_MAX might not be available
# The last one would be the most conservative/Windows
CMD_MAX_ARG_HARDCODED = 2097152 if on_linux else 262144 if on_osx else 32767
try:
CMD_MAX_ARG = os.sysconf('SC_ARG_MAX')
assert CMD_MAX_ARG > 0
if sys.version_info[:2] == (3, 4):
# workaround for some kind of a bug which comes up with python 3.4
# see https://github.com/datalad/datalad/issues/3150
CMD_MAX_ARG = min(CMD_MAX_ARG, CMD_MAX_ARG_HARDCODED)
except Exception as exc:
# ATM (20181005) SC_ARG_MAX available only on POSIX systems
# so exception would be thrown e.g. on Windows, or
# somehow during Debian build for nd14.04 it is coming up with -1:
# https://github.com/datalad/datalad/issues/3015
CMD_MAX_ARG = CMD_MAX_ARG_HARDCODED
lgr.debug(
"Failed to query or got useless SC_ARG_MAX sysconf, "
"will use hardcoded value: %s", exc)
# Even with all careful computations we do, due to necessity to account for
# environment and what not, we still could not figure out "exact" way to
# estimate it, but it was shown that 300k safety margin on linux was sufficient.
# https://github.com/datalad/datalad/pull/2977#issuecomment-436264710
# 300k is ~15%, so to be safe, and for paranoid us we will just use up to 50%
# of the length for "safety margin". We might probably still blow due to
# env vars, unicode, etc... so any hard limit imho is not a proper solution
CMD_MAX_ARG = int(0.5 * CMD_MAX_ARG)
lgr.debug(
"Maximal length of cmdline string (adjusted for safety margin): %d",
CMD_MAX_ARG)
#
# Little helpers
#
# `getargspec` has been deprecated in Python 3.
if hasattr(inspect, "getfullargspec"):
ArgSpecFake = collections.namedtuple(
"ArgSpecFake", ["args", "varargs", "keywords", "defaults"])
def getargspec(func):
return ArgSpecFake(*inspect.getfullargspec(func)[:4])
else:
getargspec = inspect.getargspec
def get_func_kwargs_doc(func):
""" Provides args for a function
Parameters
----------
func: str
name of the function from which args are being requested
Returns
-------
list
of the args that a function takes in
"""
return getargspec(func)[0]
# TODO: format error message with descriptions of args
# return [repr(dict(get_docstring_split(func)[1]).get(x)) for x in getargspec(func)[0]]
def any_re_search(regexes, value):
"""Return if any of regexes (list or str) searches succesfully for value"""
for regex in assure_tuple_or_list(regexes):
if re.search(regex, value):
return True
return False
def not_supported_on_windows(msg=None):
"""A little helper to be invoked to consistently fail whenever functionality is
not supported (yet) on Windows
"""
if on_windows:
raise NotImplementedError("This functionality is not yet implemented for Windows OS"
+ (": %s" % msg if msg else ""))
def shortened_repr(value, l=30):
try:
if hasattr(value, '__repr__') and (value.__repr__ is not object.__repr__):
value_repr = repr(value)
if not value_repr.startswith('<') and len(value_repr) > l:
value_repr = "<<%s...>>" % (value_repr[:l - 8])
elif value_repr.startswith('<') and value_repr.endswith('>') and ' object at 0x':
raise ValueError("I hate those useless long reprs")
else:
raise ValueError("gimme class")
except Exception as e:
value_repr = "<%s>" % value.__class__.__name__.split('.')[-1]
return value_repr
def __auto_repr__(obj):
attr_names = tuple()
if hasattr(obj, '__dict__'):
attr_names += tuple(obj.__dict__.keys())
if hasattr(obj, '__slots__'):
attr_names += tuple(obj.__slots__)
items = []
for attr in sorted(set(attr_names)):
if attr.startswith('_'):
continue
value = getattr(obj, attr)
# TODO: should we add this feature to minimize some talktative reprs
# such as of URL?
#if value is None:
# continue
items.append("%s=%s" % (attr, shortened_repr(value)))
return "%s(%s)" % (obj.__class__.__name__, ', '.join(items))
def auto_repr(cls):
"""Decorator for a class to assign it an automagic quick and dirty __repr__
It uses public class attributes to prepare repr of a class
Original idea: http://stackoverflow.com/a/27799004/1265472
"""
cls.__repr__ = __auto_repr__
return cls
def _is_stream_tty(stream):
try:
# TODO: check on windows if hasattr check would work correctly and
# add value:
return stream.isatty()
except ValueError as exc:
# Who knows why it is a ValueError, but let's try to be specific
# If there is a problem with I/O - non-interactive, otherwise reraise
if "I/O" in str(exc):
return False
raise
def is_interactive():
"""Return True if all in/outs are open and tty.
Note that in a somewhat abnormal case where e.g. stdin is explicitly
closed, and any operation on it would raise a
`ValueError("I/O operation on closed file")` exception, this function
would just return False, since the session cannot be used interactively.
"""
return all(_is_stream_tty(s) for s in (sys.stdin, sys.stdout, sys.stderr))
def get_ipython_shell():
"""Detect if running within IPython and returns its `ip` (shell) object
Returns None if not under ipython (no `get_ipython` function)
"""
try:
return get_ipython()
except NameError:
return None
def md5sum(filename):
"""Compute an MD5 sum for the given file
"""
from datalad.support.digests import Digester
return Digester(digests=['md5'])(filename)['md5']
def sorted_files(dout):
"""Return a (sorted) list of files under dout
"""
return sorted(sum([[opj(r, f)[len(dout) + 1:] for f in files]
for r, d, files in os.walk(dout)
if not '.git' in r], []))
_VCS_REGEX = r'%s\.(?:git|gitattributes|svn|bzr|hg)(?:%s|$)' % (dirsep, dirsep)
_DATALAD_REGEX = r'%s\.(?:datalad)(?:%s|$)' % (dirsep, dirsep)
def find_files(regex, topdir=curdir, exclude=None, exclude_vcs=True, exclude_datalad=False, dirs=False):
"""Generator to find files matching regex
Parameters
----------
regex: basestring
exclude: basestring, optional
Matches to exclude
exclude_vcs:
If True, excludes commonly known VCS subdirectories. If string, used
as regex to exclude those files (regex: `%r`)
exclude_datalad:
If True, excludes files known to be datalad meta-data files (e.g. under
.datalad/ subdirectory) (regex: `%r`)
topdir: basestring, optional
Directory where to search
dirs: bool, optional
Whether to match directories as well as files
"""
for dirpath, dirnames, filenames in os.walk(topdir):
names = (dirnames + filenames) if dirs else filenames
# TODO: might want to uniformize on windows to use '/'
paths = (opj(dirpath, name) for name in names)
for path in filter(re.compile(regex).search, paths):
path = path.rstrip(dirsep)
if exclude and re.search(exclude, path):
continue
if exclude_vcs and re.search(_VCS_REGEX, path):
continue
if exclude_datalad and re.search(_DATALAD_REGEX, path):
continue
yield path
find_files.__doc__ %= (_VCS_REGEX, _DATALAD_REGEX)
def expandpath(path, force_absolute=True):
"""Expand all variables and user handles in a path.
By default return an absolute path
"""
path = expandvars(expanduser(path))
if force_absolute:
path = abspath(path)
return path
def posix_relpath(path, start=None):
"""Behave like os.path.relpath, but always return POSIX paths...
on any platform."""
# join POSIX style
return posixpath.join(
# split and relpath native style
# python2.7 ntpath implementation of relpath cannot handle start=None
*psplit(
relpath(path, start=start if start is not None else '')))
def is_explicit_path(path):
"""Return whether a path explicitly points to a location
Any absolute path, or relative path starting with either '../' or
'./' is assumed to indicate a location on the filesystem. Any other
path format is not considered explicit."""
path = expandpath(path, force_absolute=False)
return isabs(path) \
or path.startswith(os.curdir + os.sep) \
or path.startswith(os.pardir + os.sep)
def rotree(path, ro=True, chmod_files=True):
"""To make tree read-only or writable
Parameters
----------
path : string
Path to the tree/directory to chmod
ro : bool, optional
Whether to make it R/O (default) or RW
chmod_files : bool, optional
Whether to operate also on files (not just directories)
"""
if ro:
chmod = lambda f: os.chmod(f, os.stat(f).st_mode & ~stat.S_IWRITE)
else:
chmod = lambda f: os.chmod(f, os.stat(f).st_mode | stat.S_IWRITE | stat.S_IREAD)
for root, dirs, files in os.walk(path, followlinks=False):
if chmod_files:
for f in files:
fullf = opj(root, f)
# might be the "broken" symlink which would fail to stat etc
if exists(fullf):
chmod(fullf)
chmod(root)
def rmtree(path, chmod_files='auto', children_only=False, *args, **kwargs):
"""To remove git-annex .git it is needed to make all files and directories writable again first
Parameters
----------
chmod_files : string or bool, optional
Whether to make files writable also before removal. Usually it is just
a matter of directories to have write permissions.
If 'auto' it would chmod files on windows by default
children_only : bool, optional
If set, all files and subdirectories would be removed while the path
itself (must be a directory) would be preserved
`*args` :
`**kwargs` :
Passed into shutil.rmtree call
"""
# Give W permissions back only to directories, no need to bother with files
if chmod_files == 'auto':
chmod_files = on_windows
# TODO: yoh thinks that if we could quickly check our Flyweight for
# repos if any of them is under the path, and could call .precommit
# on those to possibly stop batched processes etc, we did not have
# to do it on case by case
# Check for open files
assert_no_open_files(path)
if children_only:
if not os.path.isdir(path):
raise ValueError("Can remove children only of directories")
for p in os.listdir(path):
rmtree(op.join(path, p))
return
if not (os.path.islink(path) or not os.path.isdir(path)):
rotree(path, ro=False, chmod_files=chmod_files)
_rmtree(path, *args, **kwargs)
else:
# just remove the symlink
unlink(path)
def rmdir(path, *args, **kwargs):
"""os.rmdir with our optional checking for open files"""
assert_no_open_files(path)
os.rmdir(path)
def get_open_files(path, log_open=False):
"""Get open files under a path
Parameters
----------
path : str
File or directory to check for open files under
log_open : bool or int
If set - logger level to use
Returns
-------
dict
path : pid
"""
# Original idea: https://stackoverflow.com/a/11115521/1265472
import psutil
files = {}
# since the ones returned by psutil would not be aware of symlinks in the
# path we should also get realpath for path
path = realpath(path)
for proc in psutil.process_iter():
try:
open_paths = [p.path for p in proc.open_files()] + [proc.cwd()]
for p in open_paths:
# note: could be done more efficiently so we do not
# renormalize path over and over again etc
if path_startswith(p, path):
files[p] = proc
# Catch a race condition where a process ends
# before we can examine its files
except psutil.NoSuchProcess:
pass
except psutil.AccessDenied:
pass
if files and log_open:
lgr.log(log_open, "Open files under %s: %s", path, files)
return files
_assert_no_open_files_cfg = os.environ.get('DATALAD_ASSERT_NO_OPEN_FILES')
if _assert_no_open_files_cfg:
def assert_no_open_files(path):
files = get_open_files(path, log_open=40)
if _assert_no_open_files_cfg == 'assert':
assert not files
elif files:
if _assert_no_open_files_cfg == 'pdb':
import pdb
pdb.set_trace()
elif _assert_no_open_files_cfg == 'epdb':
import epdb
epdb.serve()
pass
# otherwise we would just issue that error message in the log
else:
def assert_no_open_files(*args, **kwargs):
pass
def rmtemp(f, *args, **kwargs):
"""Wrapper to centralize removing of temp files so we could keep them around
It will not remove the temporary file/directory if DATALAD_TESTS_TEMP_KEEP
environment variable is defined
"""
if not os.environ.get('DATALAD_TESTS_TEMP_KEEP'):
if not os.path.lexists(f):
lgr.debug("Path %s does not exist, so can't be removed" % f)
return
lgr.log(5, "Removing temp file: %s" % f)
# Can also be a directory
if os.path.isdir(f):
rmtree(f, *args, **kwargs)
else:
unlink(f)
else:
lgr.info("Keeping temp file: %s" % f)
def file_basename(name, return_ext=False):
"""
Strips up to 2 extensions of length up to 4 characters and starting with alpha
not a digit, so we could get rid of .tar.gz etc
"""
bname = basename(name)
fbname = re.sub(r'(\.[a-zA-Z_]\S{1,4}){0,2}$', '', bname)
if return_ext:
return fbname, bname[len(fbname) + 1:]
else:
return fbname
def escape_filename(filename):
"""Surround filename in "" and escape " in the filename
"""
filename = filename.replace('"', r'\"').replace('`', r'\`')
filename = '"%s"' % filename
return filename
def encode_filename(filename):
"""Encode unicode filename
"""
if isinstance(filename, text_type):
return filename.encode(sys.getfilesystemencoding())
else:
return filename
def decode_input(s):
"""Given input string/bytes, decode according to stdin codepage (or UTF-8)
if not defined
If fails -- issue warning and decode allowing for errors
being replaced
"""
if isinstance(s, text_type):
return s
else:
encoding = sys.stdin.encoding or 'UTF-8'
try:
return s.decode(encoding)
except UnicodeDecodeError as exc:
lgr.warning(
"Failed to decode input string using %s encoding. "
"Decoding allowing for errors", encoding)
return s.decode(encoding, errors='replace')
if on_windows:
def lmtime(filepath, mtime):
"""Set mtime for files. On Windows a merely adapter to os.utime
"""
os.utime(filepath, (time.time(), mtime))
else:
def lmtime(filepath, mtime):
"""Set mtime for files, while not de-referencing symlinks.
To overcome absence of os.lutime
Works only on linux and OSX ATM
"""
from .cmd import Runner
# convert mtime to format touch understands [[CC]YY]MMDDhhmm[.SS]
smtime = time.strftime("%Y%m%d%H%M.%S", time.localtime(mtime))
lgr.log(3, "Setting mtime for %s to %s == %s", filepath, mtime, smtime)
Runner().run(['touch', '-h', '-t', '%s' % smtime, filepath])
rfilepath = realpath(filepath)
if islink(filepath) and exists(rfilepath):
# trust noone - adjust also of the target file
# since it seemed like downloading under OSX (was it using curl?)
# didn't bother with timestamps
lgr.log(3, "File is a symlink to %s Setting mtime for it to %s",
rfilepath, mtime)
os.utime(rfilepath, (time.time(), mtime))
# doesn't work on OSX
# Runner().run(['touch', '-h', '-d', '@%s' % mtime, filepath])
def assure_tuple_or_list(obj):
"""Given an object, wrap into a tuple if not list or tuple
"""
if isinstance(obj, (list, tuple)):
return obj
return (obj,)
def assure_iter(s, cls, copy=False, iterate=True):
"""Given not a list, would place it into a list. If None - empty list is returned
Parameters
----------
s: list or anything
cls: class
Which iterable class to assure
copy: bool, optional
If correct iterable is passed, it would generate its shallow copy
iterate: bool, optional
If it is not a list, but something iterable (but not a text_type)
iterate over it.
"""
if isinstance(s, cls):
return s if not copy else shallow_copy(s)
elif isinstance(s, text_type):
return cls((s,))
elif iterate and hasattr(s, '__iter__'):
return cls(s)
elif s is None:
return cls()
else:
return cls((s,))
def assure_list(s, copy=False, iterate=True):
"""Given not a list, would place it into a list. If None - empty list is returned
Parameters
----------
s: list or anything
copy: bool, optional
If list is passed, it would generate a shallow copy of the list
iterate: bool, optional
If it is not a list, but something iterable (but not a text_type)
iterate over it.
"""
return assure_iter(s, list, copy=copy, iterate=iterate)
def assure_list_from_str(s, sep='\n'):
"""Given a multiline string convert it to a list of return None if empty
Parameters
----------
s: str or list
"""
if not s:
return None
if isinstance(s, list):
return s
return s.split(sep)
def assure_dict_from_str(s, **kwargs):
"""Given a multiline string with key=value items convert it to a dictionary
Parameters
----------
s: str or dict
Returns None if input s is empty
"""
if not s:
return None
if isinstance(s, dict):
return s
out = {}
for value_str in assure_list_from_str(s, **kwargs):
if '=' not in value_str:
raise ValueError("{} is not in key=value format".format(repr(value_str)))
k, v = value_str.split('=', 1)
if k in out:
err = "key {} was already defined in {}, but new value {} was provided".format(k, out, v)
raise ValueError(err)
out[k] = v
return out
def assure_bytes(s, encoding='utf-8'):
"""Convert/encode unicode to str (PY2) or bytes (PY3) if of 'text_type'
Parameters
----------
encoding: str, optional
Encoding to use. "utf-8" is the default
"""
if not isinstance(s, text_type):
return s
return s.encode(encoding)
def assure_unicode(s, encoding=None, confidence=None):
"""Convert/decode to unicode (PY2) or str (PY3) if of 'binary_type'
Parameters
----------
encoding: str, optional
Encoding to use. If None, "utf-8" is tried, and then if not a valid
UTF-8, encoding will be guessed
confidence: float, optional
A value between 0 and 1, so if guessing of encoding is of lower than
specified confidence, ValueError is raised
"""
if not isinstance(s, binary_type):
return s
if encoding is None:
# Figure out encoding, defaulting to 'utf-8' which is our common
# target in contemporary digital society
try:
return s.decode('utf-8')
except UnicodeDecodeError as exc:
from .dochelpers import exc_str
lgr.debug("Failed to decode a string as utf-8: %s", exc_str(exc))
# And now we could try to guess
from chardet import detect
enc = detect(s)
denc = enc.get('encoding', None)
if denc:
denc_confidence = enc.get('confidence', 0)
if confidence is not None and denc_confidence < confidence:
raise ValueError(
"Failed to auto-detect encoding with high enough "
"confidence. Highest confidence was %s for %s"
% (denc_confidence, denc)
)
return s.decode(denc)
else:
raise ValueError(
"Could not decode value as utf-8, or to guess its encoding: %s"
% repr(s)
)
else:
return s.decode(encoding)
def assure_bool(s):
"""Convert value into boolean following convention for strings
to recognize on,True,yes as True, off,False,no as False
"""
if isinstance(s, string_types):
if s.isdigit():
return bool(int(s))
sl = s.lower()
if sl in {'y', 'yes', 'true', 'on'}:
return True
elif sl in {'n', 'no', 'false', 'off'}:
return False
else:
raise ValueError("Do not know how to treat %r as a boolean" % s)
return bool(s)
def as_unicode(val, cast_types=object):
"""Given an arbitrary value, would try to obtain unicode value of it
For unicode it would return original value, for python2 str or python3
bytes it would use assure_unicode, for None - an empty (unicode) string,
and for any other type (see `cast_types`) - would apply the unicode
constructor. If value is not an instance of `cast_types`, TypeError
is thrown
Parameters
----------
cast_types: type
Which types to cast to unicode by providing to constructor
"""
if val is None:
return u''
elif isinstance(val, text_type):
return val
elif isinstance(val, unicode_srctypes):
return assure_unicode(val)
elif isinstance(val, cast_types):
return text_type(val)
else:
raise TypeError(
"Value %r is not of any of known or provided %s types"
% (val, cast_types))
def unique(seq, key=None, reverse=False):
"""Given a sequence return a list only with unique elements while maintaining order
This is the fastest solution. See
https://www.peterbe.com/plog/uniqifiers-benchmark
and
http://stackoverflow.com/a/480227/1265472
for more information.
Enhancement -- added ability to compare for uniqueness using a key function
Parameters
----------
seq:
Sequence to analyze
key: callable, optional
Function to call on each element so we could decide not on a full
element, but on its member etc
reverse: bool, optional
If True, uniqueness checked in the reverse order, so that the later ones
will take the order
"""
seen = set()
seen_add = seen.add
trans = reversed if reverse else lambda x: x
if not key:
out = [x for x in trans(seq) if not (x in seen or seen_add(x))]
else:
# OPT: could be optimized, since key is called twice, but for our cases
# should be just as fine
out = [x for x in trans(seq) if not (key(x) in seen or seen_add(key(x)))]
return out[::-1] if reverse else out
def all_same(items):
"""Quick check if all items are the same.
Identical to a check like len(set(items)) == 1 but
should be more efficient while working on generators, since would
return False as soon as any difference detected thus possibly avoiding
unnecessary evaluations
"""
first = True
first_item = None
for item in items:
if first:
first = False
first_item = item
else:
if item != first_item:
return False
# So we return False if was empty
return not first
def map_items(func, v):
"""A helper to apply `func` to all elements (keys and values) within dict
No type checking of values passed to func is done, so `func`
should be resilient to values which it should not handle
Initial usecase - apply_recursive(url_fragment, assure_unicode)
"""
# map all elements within item
return v.__class__(
item.__class__(map(func, item))
for item in v.items()
)
def partition(items, predicate=bool):
"""Partition `items` by `predicate`.
Parameters
----------
items : iterable
predicate : callable
A function that will be mapped over each element in `items`. The
elements will partitioned based on whether the return value is false or
true.
Returns
-------
A tuple with two generators, the first for 'false' items and the second for
'true' ones.
Notes
-----
Taken from Peter Otten's snippet posted at
https://nedbatchelder.com/blog/201306/filter_a_list_into_two_parts.html
"""
a, b = tee((predicate(item), item) for item in items)
return ((item for pred, item in a if not pred),
(item for pred, item in b if pred))
def generate_chunks(container, size):
"""Given a container, generate chunks from it with size up to `size`
"""
# There could be a "smarter" solution but I think this would suffice
assert size > 0, "Size should be non-0 positive"
while container:
yield container[:size]
container = container[size:]
def generate_file_chunks(files, cmd=None):
"""Given a list of files, generate chunks of them to avoid exceding cmdline length
Parameters
----------
files: list of str
cmd: str or list of str, optional
Command to account for as well
"""
files = assure_list(files)
cmd = assure_list(cmd)
maxl = max(map(len, files)) if files else 0
chunk_size = max(
1, # should at least be 1. If blows then - not our fault
(CMD_MAX_ARG
- sum((len(x) + 3) for x in cmd)
- 4 # for '--' below
) // (maxl + 3) # +3 for possible quotes and a space
)
# TODO: additional treatment for "too many arguments"? although
# as https://github.com/datalad/datalad/issues/1883#issuecomment
# -436272758
# shows there seems to be no hardcoded limit on # of arguments,
# but may be we decide to go for smth like follow to be on safe side
# chunk_size = min(10240 - len(cmd), chunk_size)
file_chunks = generate_chunks(files, chunk_size)
return file_chunks
#
# Generators helpers
#
def saved_generator(gen):
"""Given a generator returns two generators, where 2nd one just replays
So the first one would be going through the generated items and 2nd one
would be yielding saved items
"""
saved = []
def gen1():
for x in gen: # iterating over original generator
saved.append(x)
yield x
def gen2():
for x in saved: # yielding saved entries
yield x
return gen1(), gen2()
#
# Decorators
#
def better_wraps(to_be_wrapped):
"""Decorator to replace `functools.wraps`
This is based on `wrapt` instead of `functools` and in opposition to `wraps`
preserves the correct signature of the decorated function.
It is written with the intention to replace the use of `wraps` without any
need to rewrite the actual decorators.
"""
@wrapt.decorator(adapter=to_be_wrapped)
def intermediator(to_be_wrapper, instance, args, kwargs):
return to_be_wrapper(*args, **kwargs)
return intermediator
# Borrowed from pandas
# Copyright: 2011-2014, Lambda Foundry, Inc. and PyData Development Team
# License: BSD-3
def optional_args(decorator):
"""allows a decorator to take optional positional and keyword arguments.
Assumes that taking a single, callable, positional argument means that
it is decorating a function, i.e. something like this::
@my_decorator
def function(): pass
Calls decorator with decorator(f, `*args`, `**kwargs`)"""
@better_wraps(decorator)
def wrapper(*args, **kwargs):
def dec(f):
return decorator(f, *args, **kwargs)
is_decorating = not kwargs and len(args) == 1 and isinstance(args[0], Callable)
if is_decorating:
f = args[0]
args = []
return dec(f)
else:
return dec