-
Notifications
You must be signed in to change notification settings - Fork 25
/
utilities.py
244 lines (216 loc) · 9.9 KB
/
utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import hashlib
import itertools
import os
from pathlib import Path
import re
import shutil
import subprocess
import sys
from colcon_bundle.verb import logger
def get_ros_distribution_version():
"""
Discover and return ROS distribution version.
:return: the ROS distribution version to be used.
example: kinetic, melodic
"""
ros_distribution_version = 'kinetic'
if get_ubuntu_distribution_version() == 'bionic':
ros_distribution_version = 'melodic'
return ros_distribution_version
def get_ubuntu_distribution_version():
"""
Discover and return Ubuntu distribution version.
:return: the Ubuntu distribution version of the build server.
example: xenial, bionic
"""
import distro
distribution = distro.linux_distribution()
if distribution[0] == 'Ubuntu' and distribution[1] == '16.04':
return 'xenial'
elif distribution[0] == 'Ubuntu' and distribution[1] == '18.04':
return 'bionic'
else:
raise ValueError('Unsupported distribution', distribution)
def update_shebang(path):
"""
Search for python shebangs in path and all sub-paths.
It then replaces them with /usr/bin/env.
env does not support parameters so we need to so something
else if python is invoked with parameters
:param path: Path to file to replace shebang in
"""
# TODO: We should handle scripts that have parameters in the shebang
# TODO: We should hangle scripts that are doing other /usr/bin executables
py3_shebang_regex = re.compile(r'#!\s*.+python3')
py_shebang_regex = re.compile(r'#!\s*.+python')
sh_shebang_regex = re.compile(r'#!\s*.+sh')
logger.info('Starting shebang update...')
for (root, dirs, files) in os.walk(path):
for file in files:
file_path = os.path.join(root, file)
if not os.path.islink(file_path) and \
'.so' not in os.path.basename(file_path) and \
'README' not in os.path.basename(file_path):
with open(file_path, 'rb+') as file_handle:
contents = file_handle.read()
try:
str_contents = contents.decode()
except UnicodeError:
continue
py3_replacement_tuple = py3_shebang_regex.subn(
'#!/usr/bin/env python3', str_contents, count=1)
if py3_replacement_tuple[1] > 0:
logger.info('Found shebang in {file_path}'.format_map(
locals()))
file_handle.seek(0)
file_handle.truncate()
file_handle.write(py3_replacement_tuple[0].encode())
continue
py_replacement_tuple = py_shebang_regex.subn(
'#!/usr/bin/env python', str_contents, count=1)
if py_replacement_tuple[1] > 0:
logger.info('Found shebang in {file_path}'.format_map(
locals()))
file_handle.seek(0)
file_handle.truncate()
file_handle.write(py_replacement_tuple[0].encode())
sh_replacement_tuple = sh_shebang_regex.subn(
'#!/usr/bin/env sh', str_contents, count=1)
if sh_replacement_tuple[1] > 0:
logger.info('Found shebang in {file_path}'.format_map(
locals()))
file_handle.seek(0)
file_handle.truncate()
file_handle.write(py_replacement_tuple[0].encode())
def update_symlinks(base_path):
"""
Update all symlinks inside of base_path to be relative.
Recurse through the path and update all symlinks to be relative except
symlinks to libc this is because we want our applications to call into
the libraries we are bundling. We do not bundle libc and want to use the
system's version, so we should not update those. Copy any other libraries,
not found in the bundle, into the bundle so that relative symlinks work.
:param base_path: Directory that will be recursed
"""
logger.info('Updating symlinks in {base_path}'.format_map(locals()))
encoding = sys.getfilesystemencoding()
dpkg_libc_paths = subprocess.check_output(['dpkg', '-L', 'libc6']).decode(
encoding).strip()
libc_paths = set(dpkg_libc_paths.split('\n'))
for root, subdirs, files in os.walk(base_path):
for name in itertools.chain(subdirs, files):
symlink_path = os.path.join(root, name)
if os.path.islink(symlink_path) and os.path.isabs(
os.readlink(symlink_path)):
symlink_dest_path = os.readlink(symlink_path)
if symlink_dest_path in libc_paths:
# We don't want to update symlinks which are pointing to
# libc
continue
else:
logger.info(
'Symlink: {symlink_path} Points to: {'
'symlink_dest_path}'.format_map(locals()))
bundle_library_path = os.path.join(base_path,
symlink_dest_path[1:])
if os.path.exists(bundle_library_path):
# Dep is already installed, update symlink
logger.info(
'Linked file is already in bundle at {}, '
'updating symlink...'.format(bundle_library_path))
else:
# Dep is not installed, we need to copy it...
logger.info(
'Linked file is not in bundle, copying and '
'updating symlink...')
if not os.path.exists(
os.path.dirname(bundle_library_path)):
# Create directory (permissions?)
os.makedirs(os.path.dirname(bundle_library_path),
exist_ok=True)
if os.path.exists(symlink_dest_path):
shutil.copy(symlink_dest_path,
bundle_library_path)
else:
logger.error('Attempted to copy {} for symlink, '
'but it does not exist. Skipping'
.format(symlink_dest_path))
continue
bundle_library_path_obj = Path(bundle_library_path)
symlink_path_obj = Path(symlink_path)
relative_path = os.path.relpath(bundle_library_path,
symlink_path)
logger.info(
'bundle_library_path {} relative path {}'.format(
bundle_library_path, relative_path))
os.remove(symlink_path)
os.symlink(relative_path, symlink_path)
def rewrite_catkin_package_path(base_path):
"""
Update catkin/profile.d to use correct shebangs.
:param base_path: Path to the bundle staging directory
"""
# TODO: This should be in the ros package
import re
python_regex = re.compile('/usr/bin/python')
logger.info('Starting shebang update...')
ros_distribution_version = get_ros_distribution_version()
# These files contain references to /usr/bin/python that need
# to be converted to avoid errors when setting up the ROS workspace
files = ['1.ros_package_path.sh', '10.ros.sh']
profiled_path = os.path.join(
base_path, 'opt', 'ros', ros_distribution_version,
'etc', 'catkin', 'profile.d')
for file in map(lambda s: os.path.join(profiled_path, s), files):
if os.path.isfile(file):
with open(file, 'rb+') as file_handle:
contents = file_handle.read()
try:
str_contents = contents.decode()
except UnicodeError:
logger.error(
'{file} should be a text file'.format_map(
locals()))
continue
replacement_tuple = python_regex.subn('python', str_contents)
if replacement_tuple[1] > 0:
logger.info(
'Found direct python invocation in {file}'
.format_map(locals()))
file_handle.seek(0)
file_handle.truncate()
file_handle.write(replacement_tuple[0].encode())
def filechecksum(filename, algorithm='sha256', printing=False):
"""
Generate hash of file.
:param filename: path to file to generate hash from
:param algorithm: Choose one of sha256, sha512, sha1, md5
:param printing: print to stdout
:return: the hash
:rtype: str
"""
if algorithm == 'sha256':
hasher = hashlib.sha256()
elif algorithm == 'sha512':
hasher = hashlib.sha512()
elif algorithm == 'sha1':
hasher = hashlib.sha1()
elif algorithm == 'md5':
hasher = hashlib.md5()
else:
raise RuntimeError('Unsupported hash algorithm')
try:
with open(filename, 'rb') as afile:
FILE_READER_CHUNK_SIZE = 65536 # noqa: N806
buf = afile.read(FILE_READER_CHUNK_SIZE)
while len(buf) > 0:
hasher.update(buf)
buf = afile.read(FILE_READER_CHUNK_SIZE)
checksum = hasher.hexdigest()
if printing:
print(filename + ' - ' + checksum)
return checksum
except Exception as e:
raise RuntimeError(e)