-
Notifications
You must be signed in to change notification settings - Fork 14.1k
/
path_utils.py
294 lines (244 loc) · 11.5 KB
/
path_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Useful tools for various Paths used inside Airflow Sources.
"""
import hashlib
import os
import subprocess
import sys
import tempfile
from functools import lru_cache
from pathlib import Path
from typing import Optional
from airflow_breeze import NAME
from airflow_breeze.utils.confirm import set_forced_answer
from airflow_breeze.utils.console import get_console
from airflow_breeze.utils.reinstall import (
ask_to_reinstall_breeze,
warn_dependencies_changed,
warn_different_location,
warn_non_editable,
)
AIRFLOW_CFG_FILE = "setup.cfg"
def search_upwards_for_airflow_sources_root(start_from: Path) -> Optional[Path]:
root = Path(start_from.root)
d = start_from
while d != root:
attempt = d / AIRFLOW_CFG_FILE
if attempt.exists() and "name = apache-airflow\n" in attempt.read_text():
return attempt.parent
d = d.parent
return None
def in_autocomplete() -> bool:
return os.environ.get(f"_{NAME.upper()}_COMPLETE") is not None
def in_self_upgrade() -> bool:
return "self-upgrade" in sys.argv
def in_help() -> bool:
return "--help" in sys.argv
def skip_upgrade_check():
return in_self_upgrade() or in_autocomplete() or in_help() or hasattr(sys, '_called_from_test')
def get_package_setup_metadata_hash() -> str:
"""
Retrieves hash of setup.py and setup.cfg files from the source of installation of Breeze.
This is used in order to determine if we need to upgrade Breeze, because some
setup files changed. Blake2b algorithm will not be flagged by security checkers
as insecure algorithm (in Python 3.9 and above we can use `usedforsecurity=False`
to disable it, but for now it's better to use more secure algorithms.
"""
# local imported to make sure that autocomplete works
try:
from importlib.metadata import distribution # type: ignore[attr-defined]
except ImportError:
from importlib_metadata import distribution # type: ignore[no-redef]
prefix = "Package config hash: "
for line in distribution('apache-airflow-breeze').metadata.as_string().splitlines(keepends=False):
if line.startswith(prefix):
return line[len(prefix) :]
return "NOT FOUND"
def get_sources_setup_metadata_hash(sources: Path) -> str:
try:
the_hash = hashlib.new("blake2b")
the_hash.update((sources / "dev" / "breeze" / "setup.py").read_bytes())
the_hash.update((sources / "dev" / "breeze" / "setup.cfg").read_bytes())
the_hash.update((sources / "dev" / "breeze" / "pyproject.toml").read_bytes())
return the_hash.hexdigest()
except FileNotFoundError as e:
return f"Missing file {e.filename}"
def get_installation_sources_config_metadata_hash() -> str:
"""
Retrieves hash of setup.py and setup.cfg files from the source of installation of Breeze.
This is used in order to determine if we need to upgrade Breeze, because some
setup files changed. Blake2b algorithm will not be flagged by security checkers
as insecure algorithm (in Python 3.9 and above we can use `usedforsecurity=False`
to disable it, but for now it's better to use more secure algorithms.
"""
installation_sources = get_installation_airflow_sources()
if installation_sources is None:
return "NOT FOUND"
return get_sources_setup_metadata_hash(installation_sources)
def get_used_sources_setup_metadata_hash() -> str:
"""
Retrieves hash of setup.py and setup.cfg files from the currently used sources.
"""
return get_sources_setup_metadata_hash(get_used_airflow_sources())
def set_forced_answer_for_upgrade_check():
"""When we run upgrade check --answer is not parsed yet, so we need to guess it."""
if "--answer n" in " ".join(sys.argv).lower() or os.environ.get('ANSWER', '').lower().startswith("n"):
set_forced_answer("no")
if "--answer y" in " ".join(sys.argv).lower() or os.environ.get('ANSWER', '').lower().startswith("y"):
set_forced_answer("yes")
if "--answer q" in " ".join(sys.argv).lower() or os.environ.get('ANSWER', '').lower().startswith("q"):
set_forced_answer("quit")
def print_warning_if_setup_changed() -> bool:
"""
Prints warning if detected airflow sources are not the ones that Breeze was installed with.
:return: True if warning was printed.
"""
try:
package_hash = get_package_setup_metadata_hash()
except ModuleNotFoundError as e:
if "importlib_metadata" in e.msg:
return False
sources_hash = get_installation_sources_config_metadata_hash()
if sources_hash != package_hash:
installation_sources = get_installation_airflow_sources()
if installation_sources is not None:
breeze_sources = installation_sources / "dev" / "breeze"
warn_dependencies_changed()
set_forced_answer_for_upgrade_check()
ask_to_reinstall_breeze(breeze_sources)
set_forced_answer(None)
return True
return False
def print_warning_if_different_sources(airflow_sources: Path) -> bool:
"""
Prints warning if detected airflow sources are not the ones that Breeze was installed with.
:param airflow_sources: source for airflow code that we are operating on
:return: True if warning was printed.
"""
installation_airflow_sources = get_installation_airflow_sources()
if installation_airflow_sources and airflow_sources != installation_airflow_sources:
warn_different_location(installation_airflow_sources, airflow_sources)
ask_to_reinstall_breeze(airflow_sources / "dev" / "breeze")
return True
return False
def get_installation_airflow_sources() -> Optional[Path]:
"""
Retrieves the Root of the Airflow Sources where Breeze was installed from.
:return: the Path for Airflow sources.
"""
return search_upwards_for_airflow_sources_root(Path(__file__).resolve().parent)
def get_used_airflow_sources() -> Path:
"""
Retrieves the Root of used Airflow Sources which we operate on. Those are either Airflow sources found
upwards in directory tree or sources where Breeze was installed from.
:return: the Path for Airflow sources we use.
"""
current_sources = search_upwards_for_airflow_sources_root(Path.cwd())
if current_sources is None:
current_sources = get_installation_airflow_sources()
if current_sources is None:
warn_non_editable()
sys.exit(1)
return current_sources
@lru_cache(maxsize=None)
def find_airflow_sources_root_to_operate_on() -> Path:
"""
Find the root of airflow sources we operate on. Handle the case when Breeze is installed via `pipx` from
a different source tree, so it searches upwards of the current directory to find the right root of
airflow directory we are actually in. This **might** be different than the sources of Airflow Breeze
was installed from.
If not found, we operate on Airflow sources that we were installed it. This handles the case when
we run Breeze from a "random" directory.
This method also handles the following errors and warnings:
* It fails (and exits hard) if Breeze is installed in non-editable mode (in which case it will
not find the Airflow sources when walking upwards the directory where it is installed)
* It warns (with 2 seconds timeout) if you are using Breeze from a different airflow sources than
the one you operate on.
* If we are running in the same source tree as where Breeze was installed from (so no warning above),
it warns (with 2 seconds timeout) if there is a change in setup.* files of Breeze since installation
time. In such case usesr is encouraged to re-install Breeze to update dependencies.
:return: Path for the found sources.
"""
sources_root_from_env = os.getenv('AIRFLOW_SOURCES_ROOT', None)
if sources_root_from_env:
return Path(sources_root_from_env)
installation_airflow_sources = get_installation_airflow_sources()
if installation_airflow_sources is None and not skip_upgrade_check():
get_console().print(
"\n[error]Breeze should only be installed with -e flag[/]\n\n"
"[warning]Please go to Airflow sources and run[/]\n\n"
f" {NAME} self-upgrade --force\n"
)
sys.exit(1)
airflow_sources = get_used_airflow_sources()
if not skip_upgrade_check():
# only print warning and sleep if not producing complete results
print_warning_if_different_sources(airflow_sources)
print_warning_if_setup_changed()
os.chdir(str(airflow_sources))
return airflow_sources
AIRFLOW_SOURCES_ROOT = find_airflow_sources_root_to_operate_on().resolve()
BUILD_CACHE_DIR = AIRFLOW_SOURCES_ROOT / '.build'
DAGS_DIR = AIRFLOW_SOURCES_ROOT / 'dags'
FILES_DIR = AIRFLOW_SOURCES_ROOT / 'files'
HOOKS_DIR = AIRFLOW_SOURCES_ROOT / 'hooks'
MSSQL_DATA_VOLUME = AIRFLOW_SOURCES_ROOT / 'tmp_mssql_volume'
KUBE_DIR = AIRFLOW_SOURCES_ROOT / ".kube"
LOGS_DIR = AIRFLOW_SOURCES_ROOT / 'logs'
DIST_DIR = AIRFLOW_SOURCES_ROOT / 'dist'
SCRIPTS_CI_DIR = AIRFLOW_SOURCES_ROOT / 'scripts' / 'ci'
DOCKER_CONTEXT_DIR = AIRFLOW_SOURCES_ROOT / 'docker-context-files'
CACHE_TMP_FILE_DIR = tempfile.TemporaryDirectory()
OUTPUT_LOG = Path(CACHE_TMP_FILE_DIR.name, 'out.log')
BREEZE_SOURCES_ROOT = AIRFLOW_SOURCES_ROOT / "dev" / "breeze"
def create_volume_if_missing(volume_name: str):
from airflow_breeze.utils.run_utils import run_command
res_inspect = run_command(
cmd=["docker", "volume", "inspect", volume_name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
if res_inspect.returncode != 0:
run_command(
cmd=["docker", "volume", "create", volume_name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True,
)
def create_static_check_volumes():
create_volume_if_missing("mypy-cache-volume")
def create_directories_and_files() -> None:
"""
Creates all directories and files that are needed for Breeze to work via docker-compose.
Checks if setup has been updates since last time and proposes to upgrade if so.
"""
BUILD_CACHE_DIR.mkdir(parents=True, exist_ok=True)
DAGS_DIR.mkdir(parents=True, exist_ok=True)
FILES_DIR.mkdir(parents=True, exist_ok=True)
HOOKS_DIR.mkdir(parents=True, exist_ok=True)
MSSQL_DATA_VOLUME.mkdir(parents=True, exist_ok=True)
KUBE_DIR.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)
DIST_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_LOG.mkdir(parents=True, exist_ok=True)
(AIRFLOW_SOURCES_ROOT / ".bash_aliases").touch()
(AIRFLOW_SOURCES_ROOT / ".bash_history").touch()
(AIRFLOW_SOURCES_ROOT / ".inputrc").touch()
create_static_check_volumes()