-
Notifications
You must be signed in to change notification settings - Fork 13.7k
/
exceptions.py
460 lines (304 loc) · 15.1 KB
/
exceptions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Note: Any AirflowException raised is expected to cause the TaskInstance
# to be marked in an ERROR state
"""Exceptions used by Airflow."""
from __future__ import annotations
import warnings
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, NamedTuple
from airflow.utils.trigger_rule import TriggerRule
if TYPE_CHECKING:
import datetime
from collections.abc import Sized
from airflow.models import DAG, DagRun
class AirflowException(Exception):
"""
Base class for all Airflow's errors.
Each custom exception should be derived from this class.
"""
status_code = HTTPStatus.INTERNAL_SERVER_ERROR
class AirflowBadRequest(AirflowException):
"""Raise when the application or server cannot handle the request."""
status_code = HTTPStatus.BAD_REQUEST
class AirflowNotFoundException(AirflowException):
"""Raise when the requested object/resource is not available in the system."""
status_code = HTTPStatus.NOT_FOUND
class AirflowConfigException(AirflowException):
"""Raise when there is configuration problem."""
class AirflowSensorTimeout(AirflowException):
"""Raise when there is a timeout on sensor polling."""
class AirflowRescheduleException(AirflowException):
"""
Raise when the task should be re-scheduled at a later time.
:param reschedule_date: The date when the task should be rescheduled
"""
def __init__(self, reschedule_date):
super().__init__()
self.reschedule_date = reschedule_date
def serialize(self):
return "AirflowRescheduleException", (), {"reschedule_date": self.reschedule_date}
class InvalidStatsNameException(AirflowException):
"""Raise when name of the stats is invalid."""
# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used
# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat
# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt)
class AirflowTaskTimeout(BaseException):
"""Raise when the task execution times-out."""
class AirflowTaskTerminated(BaseException):
"""Raise when the task execution is terminated."""
class AirflowWebServerTimeout(AirflowException):
"""Raise when the web server times out."""
class AirflowSkipException(AirflowException):
"""Raise when the task should be skipped."""
class AirflowFailException(AirflowException):
"""Raise when the task should be failed without retrying."""
class AirflowOptionalProviderFeatureException(AirflowException):
"""Raise by providers when imports are missing for optional provider features."""
class AirflowInternalRuntimeError(BaseException):
"""
Airflow Internal runtime error.
Indicates that something really terrible happens during the Airflow execution.
:meta private:
"""
class XComNotFound(AirflowException):
"""Raise when an XCom reference is being resolved against a non-existent XCom."""
def __init__(self, dag_id: str, task_id: str, key: str) -> None:
super().__init__()
self.dag_id = dag_id
self.task_id = task_id
self.key = key
def __str__(self) -> str:
return f'XComArg result from {self.task_id} at {self.dag_id} with key="{self.key}" is not found!'
class UnmappableOperator(AirflowException):
"""Raise when an operator is not implemented to be mappable."""
class XComForMappingNotPushed(AirflowException):
"""Raise when a mapped downstream's dependency fails to push XCom for task mapping."""
def __str__(self) -> str:
return "did not push XCom for task mapping"
class UnmappableXComTypePushed(AirflowException):
"""Raise when an unmappable type is pushed as a mapped downstream's dependency."""
def __init__(self, value: Any, *values: Any) -> None:
super().__init__(value, *values)
def __str__(self) -> str:
typename = type(self.args[0]).__qualname__
for arg in self.args[1:]:
typename = f"{typename}[{type(arg).__qualname__}]"
return f"unmappable return type {typename!r}"
class UnmappableXComLengthPushed(AirflowException):
"""Raise when the pushed value is too large to map as a downstream's dependency."""
def __init__(self, value: Sized, max_length: int) -> None:
super().__init__(value)
self.value = value
self.max_length = max_length
def __str__(self) -> str:
return f"unmappable return value length: {len(self.value)} > {self.max_length}"
class AirflowDagCycleException(AirflowException):
"""Raise when there is a cycle in DAG definition."""
class AirflowDagDuplicatedIdException(AirflowException):
"""Raise when a DAG's ID is already used by another DAG."""
def __init__(self, dag_id: str, incoming: str, existing: str) -> None:
super().__init__(dag_id, incoming, existing)
self.dag_id = dag_id
self.incoming = incoming
self.existing = existing
def __str__(self) -> str:
return f"Ignoring DAG {self.dag_id} from {self.incoming} - also found in {self.existing}"
class AirflowDagInconsistent(AirflowException):
"""Raise when a DAG has inconsistent attributes."""
class AirflowClusterPolicyViolation(AirflowException):
"""Raise when there is a violation of a Cluster Policy in DAG definition."""
class AirflowClusterPolicySkipDag(AirflowException):
"""Raise when skipping dag is needed in Cluster Policy."""
class AirflowClusterPolicyError(AirflowException):
"""Raise for a Cluster Policy other than AirflowClusterPolicyViolation or AirflowClusterPolicySkipDag."""
class AirflowTimetableInvalid(AirflowException):
"""Raise when a DAG has an invalid timetable."""
class DagNotFound(AirflowNotFoundException):
"""Raise when a DAG is not available in the system."""
class DagCodeNotFound(AirflowNotFoundException):
"""Raise when a DAG code is not available in the system."""
class DagRunNotFound(AirflowNotFoundException):
"""Raise when a DAG Run is not available in the system."""
class DagRunAlreadyExists(AirflowBadRequest):
"""Raise when creating a DAG run for DAG which already has DAG run entry."""
def __init__(self, dag_run: DagRun, execution_date: datetime.datetime, run_id: str) -> None:
super().__init__(
f"A DAG Run already exists for DAG {dag_run.dag_id} at {execution_date} with run id {run_id}"
)
self.dag_run = dag_run
class DagFileExists(AirflowBadRequest):
"""Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
warnings.warn("DagFileExists is deprecated and will be removed.", DeprecationWarning, stacklevel=2)
class FailStopDagInvalidTriggerRule(AirflowException):
"""Raise when a dag has 'fail_stop' enabled yet has a non-default trigger rule."""
_allowed_rules = (TriggerRule.ALL_SUCCESS, TriggerRule.ALL_DONE_SETUP_SUCCESS)
@classmethod
def check(cls, *, dag: DAG | None, trigger_rule: TriggerRule):
"""
Check that fail_stop dag tasks have allowable trigger rules.
:meta private:
"""
if dag is not None and dag.fail_stop and trigger_rule not in cls._allowed_rules:
raise cls()
def __str__(self) -> str:
return f"A 'fail-stop' dag can only have {TriggerRule.ALL_SUCCESS} trigger rule"
class DuplicateTaskIdFound(AirflowException):
"""Raise when a Task with duplicate task_id is defined in the same DAG."""
class TaskAlreadyInTaskGroup(AirflowException):
"""Raise when a Task cannot be added to a TaskGroup since it already belongs to another TaskGroup."""
def __init__(self, task_id: str, existing_group_id: str | None, new_group_id: str) -> None:
super().__init__(task_id, new_group_id)
self.task_id = task_id
self.existing_group_id = existing_group_id
self.new_group_id = new_group_id
def __str__(self) -> str:
if self.existing_group_id is None:
existing_group = "the DAG's root group"
else:
existing_group = f"group {self.existing_group_id!r}"
return f"cannot add {self.task_id!r} to {self.new_group_id!r} (already in {existing_group})"
class SerializationError(AirflowException):
"""A problem occurred when trying to serialize something."""
class ParamValidationError(AirflowException):
"""Raise when DAG params is invalid."""
class TaskNotFound(AirflowNotFoundException):
"""Raise when a Task is not available in the system."""
class TaskInstanceNotFound(AirflowNotFoundException):
"""Raise when a task instance is not available in the system."""
class PoolNotFound(AirflowNotFoundException):
"""Raise when a Pool is not available in the system."""
class NoAvailablePoolSlot(AirflowException):
"""Raise when there is not enough slots in pool."""
class DagConcurrencyLimitReached(AirflowException):
"""Raise when DAG max_active_tasks limit is reached."""
class TaskConcurrencyLimitReached(AirflowException):
"""Raise when task max_active_tasks limit is reached."""
class BackfillUnfinished(AirflowException):
"""
Raises when not all tasks succeed in backfill.
:param message: The human-readable description of the exception
:param ti_status: The information about all task statuses
"""
def __init__(self, message, ti_status):
super().__init__(message)
self.ti_status = ti_status
class FileSyntaxError(NamedTuple):
"""Information about a single error in a file."""
line_no: int | None
message: str
def __str__(self):
return f"{self.message}. Line number: s{str(self.line_no)},"
class AirflowFileParseException(AirflowException):
"""
Raises when connection or variable file can not be parsed.
:param msg: The human-readable description of the exception
:param file_path: A processed file that contains errors
:param parse_errors: File syntax errors
"""
def __init__(self, msg: str, file_path: str, parse_errors: list[FileSyntaxError]) -> None:
super().__init__(msg)
self.msg = msg
self.file_path = file_path
self.parse_errors = parse_errors
def __str__(self):
from airflow.utils.code_utils import prepare_code_snippet
from airflow.utils.platform import is_tty
result = f"{self.msg}\nFilename: {self.file_path}\n\n"
for error_no, parse_error in enumerate(self.parse_errors, 1):
result += "=" * 20 + f" Parse error {error_no:3} " + "=" * 20 + "\n"
result += f"{parse_error.message}\n"
if parse_error.line_no:
result += f"Line number: {parse_error.line_no}\n"
if parse_error.line_no and is_tty():
result += "\n" + prepare_code_snippet(self.file_path, parse_error.line_no) + "\n"
return result
class ConnectionNotUnique(AirflowException):
"""Raise when multiple values are found for the same connection ID."""
class TaskDeferred(BaseException):
"""
Signal an operator moving to deferred state.
Special exception raised to signal that the operator it was raised from
wishes to defer until a trigger fires.
"""
def __init__(
self,
*,
trigger,
method_name: str,
kwargs: dict[str, Any] | None = None,
timeout: datetime.timedelta | None = None,
):
super().__init__()
self.trigger = trigger
self.method_name = method_name
self.kwargs = kwargs
self.timeout = timeout
# Check timeout type at runtime
if self.timeout is not None and not hasattr(self.timeout, "total_seconds"):
raise ValueError("Timeout value must be a timedelta")
def serialize(self):
return (
self.__class__.__name__,
(),
{
"trigger": self.trigger,
"method_name": self.method_name,
"kwargs": self.kwargs,
"timeout": self.timeout,
},
)
def __repr__(self) -> str:
return f"<TaskDeferred trigger={self.trigger} method={self.method_name}>"
class TaskDeferralError(AirflowException):
"""Raised when a task failed during deferral for some reason."""
# The try/except handling is needed after we moved all k8s classes to cncf.kubernetes provider
# These two exceptions are used internally by Kubernetes Executor but also by PodGenerator, so we need
# to leave them here in case older version of cncf.kubernetes provider is used to run KubernetesPodOperator
# and it raises one of those exceptions. The code should be backwards compatible even if you import
# and try/except the exception using direct imports from airflow.exceptions.
# 1) if you have old provider, both provider and pod generator will throw the "airflow.exceptions" exception.
# 2) if you have new provider, both provider and pod generator will throw the
# "airflow.providers.cncf.kubernetes" as it will be imported here from the provider.
try:
from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException
except ImportError:
class PodMutationHookException(AirflowException): # type: ignore[no-redef]
"""Raised when exception happens during Pod Mutation Hook execution."""
try:
from airflow.providers.cncf.kubernetes.pod_generator import PodReconciliationError
except ImportError:
class PodReconciliationError(AirflowException): # type: ignore[no-redef]
"""Raised when an error is encountered while trying to merge pod configs."""
class RemovedInAirflow3Warning(DeprecationWarning):
"""Issued for usage of deprecated features that will be removed in Airflow3."""
deprecated_since: str | None = None
"Indicates the airflow version that started raising this deprecation warning"
class AirflowProviderDeprecationWarning(DeprecationWarning):
"""Issued for usage of deprecated features of Airflow provider."""
deprecated_provider_since: str | None = None
"Indicates the provider version that started raising this deprecation warning"
class DeserializingResultError(ValueError):
"""Raised when an error is encountered while a pickling library deserializes a pickle file."""
def __str__(self):
return (
"Error deserializing result. Note that result deserialization "
"is not supported across major Python versions. Cause: " + str(self.__cause__)
)