-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix Mypy errors in airflow/dag_processing #20470
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
airflow/dag_processing/processor.py
Outdated
| with redirect_stdout(StreamLogWriter(log, logging.INFO)), ( # type: ignore[type-var] | ||
| redirect_stderr(StreamLogWriter(log, logging.WARN)) # type: ignore[type-var] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errors here:
airflow/dag_processing/processor.py:152: error: Value of type variable "_T_io"
of "redirect_stdout" cannot be "StreamLogWriter"
with redirect_stdout(StreamLogWriter(log, logging.INFO)), ...
^
airflow/dag_processing/processor.py:152: error: Value of type variable "_T_io"
of "redirect_stderr" cannot be "StreamLogWriter"
Typing for args of both redirect_stdout() and redirect_stderr() is handled in an external
contextlib stub file and defined as ContextManager[TypeVar("_T_io", bound=Optional[IO[str]])]
so ignoring the Mypy errors here. But any feedback on a proper handling of this is welcomed of course.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can implement the IO protocol on StreamLogWriter instead. It already has most of the important things implemented, the rest are mostly trivial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed a fix (which is really a workaround). I could not find a way to make it works but It seems that unless we want to reimplement all the methods manually, the IOBase and IO type hierarchies are incompatible and other than ignoring the MyPy errors there is not much we can do about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least I pushed it down as much as I could. The ignores now simply ignore those errors:
airflow/utils/log/logging_mixin.py:83: error: Definition of "__next__" in base class "IOBase" is incompatible with definition in base class "IO"
class StreamLogWriter(IOBase, IO[str]):
^
airflow/utils/log/logging_mixin.py:83: error: Definition of "__next__" in base class "IOBase" is incompatible with definition in base class "Iterator"
class StreamLogWriter(IOBase, IO[str]):
^
airflow/utils/log/logging_mixin.py:83: error: Definition of "readlines" in base class "IOBase" is incompatible with definition in base class "IO"
class StreamLogWriter(IOBase, IO[str]):
^
airflow/utils/log/logging_mixin.py:83: error: Definition of "__iter__" in base class "IOBase" is incompatible with definition in base class "IO"
class StreamLogWriter(IOBase, IO[str]):
^
airflow/utils/log/logging_mixin.py:83: error: Definition of "__iter__" in base class "IOBase" is incompatible with definition in base class "Iterator"
class StreamLogWriter(IOBase, IO[str]):
^
airflow/utils/log/logging_mixin.py:83: error: Definition of "__iter__" in base class "IOBase" is incompatible with definition in base class "Iterable"
class StreamLogWriter(IOBase, IO[str]):
^
airflow/utils/log/logging_mixin.py:83: error: Definition of "readline" in base class "IOBase" is incompatible with definition in base class "IO"
class StreamLogWriter(IOBase, IO[str]):
^
airflow/utils/log/logging_mixin.py:83: error: Definition of "writelines" in base class "IOBase" is incompatible with definition in base class "IO"
class StreamLogWriter(IOBase, IO[str]):
^
airflow/utils/log/logging_mixin.py:83: error: Definition of "__enter__" in base class "IOBase" is incompatible with definition in base class "IO"
class StreamLogWriter(IOBase, IO[str]):
^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe there is an IO Protocol you mentioned @uranusjr which I simply could not find ? The only "IO" I see is the Generic (which I now implemented) - but being Generic it does not have any implementation. The only "base" implementation I am aware of that implements IO is BaseIO family (but then it's incompatible with the Generic).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also not 100% sure if removing encoding = None is a good thing and whether it does not have any side effects :(.
| self._execute_task_callbacks(dagbag, request) | ||
| elif isinstance(request, SlaCallbackRequest): | ||
| self.manage_slas(dagbag.dags.get(request.dag_id)) | ||
| self.manage_slas(dagbag.dags[request.dag_id]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error here:
airflow/dag_processing/processor.py:577: error: Argument 1 to "manage_slas" of "DagFileProcessor" has incompatible type "Optional[DAG]"; expected "DAG"
self.manage_slas(dagbag.dags.get(request.dag_id))
Not entirely sure if there is a case when an SLA callback would be executed but not have the corresponding DAG in the DagBag. If it's possible, I can certainly modify manage_slas() typing and raise an exception for a missing DAG if this is a possible scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SLA doesn’t make sense without a DAG, so this is better IMO. It’s good to fail here if things go wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. They basically can't go wrong here or we are in a really deep mess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, the change should be good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also
def test_encoding(self):
logger = mock.MagicMock()
logger.log = mock.MagicMock()
log = StreamLogWriter(logger, 1)
> assert log.encoding is None
E AttributeError: 'StreamLogWriter' object has no attribute 'encoding'
Although I’m not sure why we need encoding exactly; maybe we could remove this test entirely?
| # redirect stdout/stderr to log | ||
| with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr( | ||
| StreamLogWriter(log, logging.WARN) | ||
| with redirect_stdout(StreamLogWriter(log, logging.INFO)), ( | ||
| redirect_stderr(StreamLogWriter(log, logging.WARN)) | ||
| ), Stats.timer() as timer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not work because you can’t use parentheses in a with statement like anywhere else in the language, until 3.10 😞
A popular workaround is
import contextlib
with contextlib.ExitStack() as stack:
stack.enter_context(redirect_stdout(StreamLogWriter(log, logging.INFO)))
stack.enter_context(redirect_stderr(StreamLogWriter(log, logging.WARN)))
timer = stack.enter_context(Stats.timer())There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is...unfortunate. This change was made to make flake8 happy for line-length validation. Now that the Mypy ignore comments are gone, we could revert this change. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although I’m not sure why we need
encodingexactly; maybe we could remove this test entirely?
I looked at the history and the reason is that some of the internals of subprocess use stderr.encoding and stdout.enconding and we effectvely replace stderr and stdout with StreamLogWriter (which has other bad consequences like infinte redirect when someone wants to also log stufff to stderr/stdout - buit this is a different topic - #20500 )
This is an excerpt from subprcess.py where it woudl fail:
# Translate newlines, if requested.
# This also turns bytes into strings.
if self.text_mode:
if stdout is not None:
stdout = self._translate_newlines(stdout,
self.stdout.encoding,
self.stdout.errors)
if stderr is not None:
stderr = self._translate_newlines(stderr,
self.stderr.encoding,
self.stderr.errors)
return (stdout, stderr)
On the other hand - we should not really IMHO set it to None (though it results in "utf-8" being used in decode of bytes) but likely with the sys.getdefaultencoding()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(and BTW -yeah, we should rewrite big part of our logging implementation to make it sane).
|
Needs rebase :( |
Will do. What do we want to do about |
airflow/stats.py
Outdated
|
|
||
| @classmethod | ||
| def timing(cls, stat: str, dt: Union[float, datetime.timedelta]) -> None: | ||
| def timing(cls, stat: str, dt: Optional[Union[float, datetime.timedelta]]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead you should have:
diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 408bb48aa..4483c6a58 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -70,8 +70,8 @@ class DagFileStat(NamedTuple):
num_dags: int
import_errors: int
- last_finish_time: Optional[datetime]
- last_duration: Optional[float]
+ last_finish_time: datetime
+ last_duration: float
run_count: intThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Optional typing was to handle this instance. Should we try something else instead?
https://github.com/apache/airflow/blob/main/airflow/dag_processing/manager.py#L1043-L1047
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case the following should be ok:
diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 408bb48aa..25c6ec70f 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -920,7 +920,7 @@ class DagFileProcessorManager(LoggingMixin):
self._file_stats[processor.file_path] = stat
file_name = os.path.splitext(os.path.basename(processor.file_path))[0].replace(os.sep, '.')
- Stats.timing(f'dag_processing.last_duration.{file_name}', stat.last_duration)
+ Stats.timing(f'dag_processing.last_duration.{file_name}', cast(float, stat.last_duration))
def collect_results(self) -> None:
"""Collect the result from any finished DAG processors"""Stats.timing should never receive None as value - https://github.com/jsocol/pystatsd/blob/f3f304b4b2c3d5eddeb9f4977d9c82c64c37a052/statsd/client/base.py#L26-L30
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh OK. Right on, let me try that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I move the cast() here we're golden:
for file_path in files_paths_to_queue:
if file_path not in self._file_stats:
self._file_stats[file_path] = DagFileStat(
num_dags=0,
import_errors=0,
last_finish_time=None,
last_duration=cast(float, None),
run_count=0,
)Thanks for the tip @kaxil!
|
I was also going to take care of the context manager situation @uranusjr pointed out. Since the ignore comments are being moved, I'll just revert the formatting change. |
f2e4c3d to
dab7b66
Compare
|
How should we handle the |
|
dab7b66 to
506daf2
Compare
|
Fixed in #20795 |
Related: #19891
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.