forked from b97pla/taca-ngi-pipeline
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathfilesystem.py
134 lines (120 loc) · 5.56 KB
/
filesystem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
__author__ = 'Pontus'
from glob import iglob
from logging import getLogger
from os import path, walk, sep as os_sep
from taca.utils.misc import hashfile
logger = getLogger(__name__)
class FileNotFoundException(Exception):
pass
class PatternNotMatchedException(Exception):
pass
def gather_files(patterns, no_checksum=False, hash_algorithm="md5"):
""" This method will locate files matching the patterns specified in
the config and compute the checksum and construct the staging path
according to the config.
The config should contain the key 'files_to_deliver', which should
be a list of tuples with source path patterns and destination path
patterns. The source path can be a file glob and can refer to a
folder or file. File globs will be expanded and folders will be
traversed to include everything beneath.
:returns: A generator of tuples with source path,
destination path and the checksum of the source file
(or None if source is a folder)
"""
def _get_digest(sourcepath, destpath, no_digest_cache=False, no_digest=False):
digest = None
# skip the digest if either the global or the per-file setting is to skip
if not any([no_checksum, no_digest]):
checksumpath = "{}.{}".format(sourcepath, hash_algorithm)
try:
with open(checksumpath, 'r') as fh:
digest = fh.next()
except IOError:
digest = hashfile(sourcepath, hasher=hash_algorithm)
if not no_digest_cache:
try:
with open(checksumpath, 'w') as fh:
fh.write(digest)
except IOError as we:
logger.warning("could not write checksum {} to file {}: {}".format(digest, checksumpath, we))
return sourcepath, destpath, digest
def _walk_files(currpath, destpath):
# if current path is a folder, return all files below it
if path.isdir(currpath):
parent = path.dirname(currpath)
for parentdir, _, dirfiles in walk(currpath, followlinks=True):
for currfile in dirfiles:
fullpath = path.join(parentdir, currfile)
# the relative path will be used in the destination path
relpath = path.relpath(fullpath, parent)
yield (fullpath, path.join(destpath, relpath))
else:
yield (currpath,
path.join(
destpath,
path.basename(currpath)))
if patterns is None:
patterns = []
for pattern in patterns:
sfile, dfile = pattern[0:2]
try:
extra = pattern[2]
except IndexError:
extra = {}
matches = 0
for f in iglob(sfile):
for spath, dpath in _walk_files(f, dfile):
# ignore checksum files
if not spath.endswith(".{}".format(hash_algorithm)):
matches += 1
# skip and warn if a path does not exist, this includes broken symlinks
if path.exists(spath):
yield _get_digest(
spath,
dpath,
no_digest_cache=extra.get('no_digest_cache', False),
no_digest=extra.get('no_digest', False))
else:
# if the file pattern requires a match, throw an error. otherwise warn
msg = "path {} does not exist, possibly because of a broken symlink".format(spath)
if extra.get('required', False):
logger.error(msg)
raise FileNotFoundException(msg)
logger.warning(msg)
if matches == 0:
msg = "no files matching search expression '{}' found ".format(sfile)
if extra.get('required', False):
logger.error(msg)
raise PatternNotMatchedException(msg)
logger.warning(msg)
def parse_hash_file(hfile, last_modified, hash_algorithm="md5", root_path="", files_filter=None):
"""Parse the hash file and return dict with hash value and file size
Files are grouped based on parent directory relative to stage
if 'files_filter' is provided only info for those files are given
"""
mdict = {}
with open(hfile, 'r') as hfl:
for hl in hfl:
hl = hl.strip()
if files_filter and not any(map(lambda pat: pat in hl, files_filter)):
continue
hval, fnm = hl.split()
fkey = fnm.split(os_sep)[0] if os_sep in fnm else path.splitext(fnm)[0]
if fkey not in mdict:
mdict[fkey] = {}
mdict[fkey][fnm] = {'{}_sum'.format(hash_algorithm): hval,
'size_in_bytes': path.getsize(path.join(root_path, fnm)),
'last_modified': last_modified}
return mdict
def merge_dicts(mdict, sdict):
"""Merge the 2 given dictioneries, if a key already exists it is
replaced/updated with new values depending upon data types
"""
for k, v in sdict.iteritems():
if isinstance(v, dict) and isinstance(mdict.get(k), dict):
mdict[k] = merge_dicts(mdict[k], v)
elif isinstance(v, list) and isinstance(mdict.get(k), list):
mdict[k] = list(set(mdict[k] + v))
else:
mdict[k] = v
return mdict