Skip to content

Commit

Permalink
D205 Support - Stragglers (#33301)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferruzzi committed Aug 15, 2023
1 parent 4d99705 commit 9bf68ad
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 13 deletions.
5 changes: 3 additions & 2 deletions airflow/kubernetes/pre_7_4_0_compatibility/pod_generator.py
Expand Up @@ -15,8 +15,9 @@
# specific language governing permissions and limitations
# under the License.
"""
Pod generator compatible with cncf-providers released before 2.7.0 of airflow (so pre-7.4.0 of
the cncf.kubernetes provider).
Pod generator compatible with cncf-providers released before 2.7.0 of airflow.
Compatible with pre-7.4.0 of the cncf.kubernetes provider.
This module provides an interface between the previous Pod
API and outputs a kubernetes.client.models.V1Pod.
Expand Down
3 changes: 2 additions & 1 deletion airflow/operators/python.py
Expand Up @@ -792,7 +792,8 @@ def _get_airflow_version_from_target_env(self) -> str | None:

class BranchExternalPythonOperator(ExternalPythonOperator, SkipMixin):
"""
A workflow can "branch" or follow a path after the execution of this task,
A workflow can "branch" or follow a path after the execution of this task.
Extends ExternalPythonOperator, so expects to get Python:
virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path,
so it can run on separate virtualenv similarly to ExternalPythonOperator.
Expand Down
1 change: 1 addition & 0 deletions airflow/plugins_manager.py
Expand Up @@ -206,6 +206,7 @@ def is_valid_plugin(plugin_obj):
def register_plugin(plugin_instance):
"""
Start plugin load and register it after success initialization.
If plugin is already registered, do nothing.
:param plugin_instance: subclass of AirflowPlugin
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/papermill/operators/papermill.py
Expand Up @@ -17,7 +17,7 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, ClassVar, Collection, Optional, Sequence
from typing import TYPE_CHECKING, ClassVar, Collection, Sequence

import attr
import papermill as pm
Expand All @@ -42,8 +42,8 @@ class NoteBook(File):
*(File.template_fields if hasattr(File, "template_fields") else {"url"}),
}

type_hint: Optional[str] = "jupyter_notebook" # noqa: UP007
parameters: Optional[dict] = {} # noqa: UP007
type_hint: str | None = "jupyter_notebook"
parameters: dict | None = {}

meta_schema: str = __name__ + ".NoteBook"

Expand Down
3 changes: 3 additions & 0 deletions airflow/providers_manager.py
Expand Up @@ -307,6 +307,7 @@ def _correctness_check(
) -> type[BaseHook] | None:
"""
Performs coherence check on provider classes.
For apache-airflow providers - it checks if it starts with appropriate package. For all providers
it tries to import the provider - checking that there are no exceptions during importing.
It logs appropriate warning in case it detects any problems.
Expand Down Expand Up @@ -615,6 +616,7 @@ def _discover_all_providers_from_packages(self) -> None:
def _discover_all_airflow_builtin_providers_from_local_sources(self) -> None:
"""
Finds all built-in airflow providers if airflow is run from the local sources.
It finds `provider.yaml` files for all such providers and registers the providers using those.
This 'provider.yaml' scanning takes precedence over scanning packages installed
Expand Down Expand Up @@ -1174,6 +1176,7 @@ def extra_links_class_names(self) -> list[str]:
def connection_form_widgets(self) -> dict[str, ConnectionFormWidgetInfo]:
"""
Returns widgets for connection forms.
Dictionary keys in the same order that it defined in Hook.
"""
self.initialize_providers_hooks()
Expand Down
5 changes: 1 addition & 4 deletions airflow/sensors/external_task.py
Expand Up @@ -327,10 +327,7 @@ def poke(self, context: Context, session: Session = NEW_SESSION) -> bool:
return count_allowed == len(dttm_filter)

def execute(self, context: Context) -> None:
"""
Airflow runs this method on the worker and defers using the triggers
if deferrable is set to True.
"""
"""Runs on the worker and defers using the triggers if deferrable is set to True."""
if not self.deferrable:
super().execute(context)
else:
Expand Down
2 changes: 1 addition & 1 deletion airflow/triggers/external_task.py
Expand Up @@ -180,7 +180,7 @@ def serialize(self) -> tuple[str, dict[str, typing.Any]]:
)

async def run(self) -> typing.AsyncIterator[TriggerEvent]:
"""Check the database to see if the dag run exists, and has hit one of the states yet, or not."""
"""Check periodically if the dag run exists, and has hit one of the states yet, or not."""
while True:
# mypy confuses typing here
num_dags = await self.count_dags() # type: ignore[call-arg]
Expand Down
3 changes: 1 addition & 2 deletions airflow/utils/hashlib_wrapper.py
Expand Up @@ -28,8 +28,7 @@

def md5(__string: ReadableBuffer = b"") -> hashlib._Hash:
"""
Safely allows calling the ``hashlib.md5`` function when ``usedforsecurity`` is disabled in
the configuration.
Safely allows calling the ``hashlib.md5`` function when ``usedforsecurity`` is disabled in configuration.
:param __string: The data to hash. Default to empty str byte.
:return: The hashed value.
Expand Down

0 comments on commit 9bf68ad

Please sign in to comment.