/
package
executable file
·198 lines (149 loc) · 6.78 KB
/
package
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
#!/usr/bin/env python3
import json
import os
import sys
import urllib.request
import zipfile
import io
from collections import OrderedDict
__author__ = 'kristoffer'
download_dir = "/download"
def _read_versions(path, target_versions):
if len(target_versions) > 0:
target_versions = '-' + target_versions
with open(path + f'/src/versions{target_versions}.json', 'r') as file:
return json.load(file)
def _apply_template(template: str, values: 'dict[str,str]') -> str:
for name, value in values.items():
print(name, value)
template = template.replace('{{ ' + name + ' }}', value)
return template
def _read_manifest(path, platform, versions):
with open(path + '/src/manifest_template.json', 'r') as file:
template = file.read()
values = versions
values["platform"] = platform
json_data = _apply_template(template, values)
data = json.loads(json_data)
# Remove files that should not be included
data['files'] = OrderedDict(filter(lambda pair: pair[1]['release'] != 'not_included', data['files'].items()))
return data
def _download_artifact(uri) -> io.BytesIO:
headers = {}
# Add auth headers if the github token is available, on the build servers it is, but when running locally it is not.
# Some operations do not require authentication but we do it anyway to avoid API call rate problems
if 'GH_TOKEN' in os.environ:
GH_TOKEN = os.environ['GH_TOKEN']
headers = {'Authorization': f'token {GH_TOKEN}'}
print(' Downloading with authorization')
else:
print(' Downloading without authorization')
req = urllib.request.Request(uri, headers=headers)
with urllib.request.urlopen(req) as response:
data = response.read()
assert (data)
return io.BytesIO(data)
def _download_latest(path, repository: str, branch: str, fw_platform: str) -> str:
"""
Download the latest version of an artifact, on a specific branch
:param repository: The repository, for instance 'crazyflie-firmware'
:param branch: The branch, for instance 'master'
:param fw_platform: The firmware platform identifier, for instance 'cf2', 'tag' or 'bolt'. If None, the repo
does not have platforms, for instance deck firmware.
"""
run_url = f'https://api.github.com/repos/bitcraze/{repository}/actions/runs?branch={branch};pages=1'
_run_info, _ = urllib.request.urlretrieve(run_url)
run_info = json.load(open(_run_info, 'r'))
if len(run_info['workflow_runs']) == 0:
raise Exception(f'No builds found for branch {branch} in {repository}')
artifacts_url = run_info['workflow_runs'][0]['artifacts_url']
_artifacts, _ = urllib.request.urlretrieve(artifacts_url)
artifacts = json.load(open(_artifacts, 'r'))
# Repos with platforms names artifacts: tag-21k1j2h3k1j2h3k1j2h3LongHash
# Repos without platforms just uses: 9s9asd9as9das9da9sdLongHash
version = ""
found_artifact = None
if fw_platform is not None:
for artifact in artifacts['artifacts']:
if artifact["name"].startswith(fw_platform):
found_artifact = artifact
version = found_artifact['name'].replace(fw_platform + "-", "")
break
else:
found_artifact = artifacts['artifacts'][0]
version = found_artifact['name']
# Download zip
download_url = found_artifact['archive_download_url']
print(f" Downloading artifact from url {download_url}, version: {version}")
archive = _download_artifact(download_url)
# Extract firmware from zip
archive = zipfile.ZipFile(archive)
# Find first file that has ".bin" ending as some zips also contain an elf file
firmware_filename = None
for file_item in archive.filelist:
if file_item.filename.endswith('.bin'):
firmware_filename = file_item.filename
break
firmware = archive.read(firmware_filename)
full_path = path + download_dir + "/" + firmware_filename
print(f" Writing file to {full_path}")
with open(full_path, "wb") as dest:
dest.write(firmware)
return version
def _download(path, manifest):
files_items = list(manifest['files'].items())
for file_name, file_def in files_items:
release = file_def["release"]
if release.startswith('latest:'):
branch = release.replace('latest:', '')
if len(branch) == 0:
raise ValueError('The branch must be supplied when using the "latest" identifier, ' +
'for instance "latest:master"')
print(f'Downloading latest artifact {file_def["repository"]}, branch: {branch}, ' +
f'platform: {file_def["platform"]}, fw_platform: {manifest["fw_platform"]}')
fw_platform = manifest['fw_platform']
if file_def["platform"] == 'deck':
fw_platform = None
version = _download_latest(path, file_def['repository'], branch, fw_platform)
file_def["release"] = version
new_filename = file_name.replace(f'-{release}', '')
manifest['files'][new_filename] = manifest['files'].pop(file_name)
else:
url = 'https://github.com/bitcraze/' + file_def[
'repository'] + '/releases/download/' + file_def[
'release'] + '/' + file_name
dest = path + download_dir + "/" + file_name
print(f'Downloading release artifact {file_def["repository"]} {file_def["release"]} to {file_name}')
stream = _download_artifact(url)
with open(dest, "wb") as outfile:
outfile.write(stream.getbuffer())
def _create_zip(path, manifest, platform):
zip_name = \
path + '/firmware-' + platform + '-' + manifest['release'] + '.zip'
zf = zipfile.ZipFile(zip_name, mode='w')
zf.writestr('manifest.json', json.dumps(manifest))
for file_name in manifest['files']:
full_path = path + download_dir + "/" + file_name
print(f"Adding {full_path} to zip")
zf.write(full_path, arcname=file_name)
zf.close()
print("Created " + zip_name)
def _parse_args(argv):
if len(argv) < 2:
print("ERROR: Target platform missing")
exit(-1)
if len(argv) >= 3:
target_versions = argv[2]
else:
target_versions = ""
return argv[1], target_versions
def _main(argv):
root_dir = os.path.dirname(os.path.realpath(__file__)) + "/../.."
platform, target_versions = _parse_args(argv)
versions = _read_versions(root_dir, target_versions)
print("Building release for platform: " + platform)
manifest = _read_manifest(root_dir, platform, versions)
_download(root_dir, manifest)
_create_zip(root_dir, manifest, platform)
if __name__ == '__main__':
_main(sys.argv)