diff --git a/providers/cncf/kubernetes/provider.yaml b/providers/cncf/kubernetes/provider.yaml index 4e5c48ea830d4..ab48e213f817d 100644 --- a/providers/cncf/kubernetes/provider.yaml +++ b/providers/cncf/kubernetes/provider.yaml @@ -463,6 +463,7 @@ config: executors: - airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor + - airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor.LocalKubernetesExecutor cli: - airflow.providers.cncf.kubernetes.cli.definition.get_kubernetes_cli_commands diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py index 9e4d433827e88..d4b174bf21847 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py @@ -311,6 +311,9 @@ def get_provider_info(): }, }, }, - "executors": ["airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor"], + "executors": [ + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor", + "airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor.LocalKubernetesExecutor", + ], "cli": ["airflow.providers.cncf.kubernetes.cli.definition.get_kubernetes_cli_commands"], } diff --git a/scripts/in_container/run_provider_yaml_files_check.py b/scripts/in_container/run_provider_yaml_files_check.py index 7dbb485ca48c3..c0e668e7eb3d1 100755 --- a/scripts/in_container/run_provider_yaml_files_check.py +++ b/scripts/in_container/run_provider_yaml_files_check.py @@ -473,6 +473,233 @@ def check_hook_class_name_entries_in_connection_types(yaml_files: dict[str, dict return num_connection_types, num_errors +@run_check("Checking that hook classes defining conn_type are registered in connection-types") +def check_hook_classes_with_conn_type_are_registered(yaml_files: dict[str, dict]) -> tuple[int, int]: + """Find Hook subclasses that define conn_type but are not listed in connection-types.""" + from airflow.sdk.bases.hook import BaseHook + + num_checks = 0 + num_errors = 0 + for yaml_file_path, provider_data in yaml_files.items(): + connection_types = provider_data.get("connection-types", []) + registered_hook_classes = {ct["hook-class-name"] for ct in connection_types} + # Collect conn_type values that are already covered by a registered hook class + registered_conn_types = {ct["connection-type"] for ct in connection_types} + hook_modules = [ + mod for entry in provider_data.get("hooks", []) for mod in entry.get("python-modules", []) + ] + for module_name in hook_modules: + try: + with warnings.catch_warnings(record=True): + module = importlib.import_module(module_name) + except (ImportError, AirflowOptionalProviderFeatureException): + continue # Import errors are caught by other checks + + for attr_name in dir(module): + if attr_name.startswith("_"): + continue + obj = getattr(module, attr_name, None) + if not (inspect.isclass(obj) and issubclass(obj, BaseHook) and obj is not BaseHook): + continue + # Only check classes defined in this module, not re-exported ones + if obj.__module__ != module_name: + continue + # Skip abstract classes — they are base classes, not concrete hooks + if inspect.isabstract(obj): + continue + num_checks += 1 + # Check conn_type defined directly on the class, not inherited + if "conn_type" not in obj.__dict__: + continue + conn_type = obj.__dict__["conn_type"] + if not conn_type: + continue + full_class_name = f"{module_name}.{attr_name}" + if full_class_name in registered_hook_classes: + continue + # If another hook class already registered the same conn_type, this is fine + # (e.g. async variants sharing conn_type with sync hooks) + if conn_type in registered_conn_types: + continue + errors.append( + f"Hook class `{full_class_name}` defines conn_type='{conn_type}' " + f"but no hook class is registered for this connection type " + f"in 'connection-types' in {yaml_file_path}.\n" + f"[yellow]How to fix it[/]: Add an entry with " + f"hook-class-name: {full_class_name} to the connection-types " + f"section of {yaml_file_path}." + ) + num_errors += 1 + return num_checks, num_errors + + +@run_check( + "Checking that all provider Hook/Operator/Sensor/Trigger/Executor/Notifier" + " classes are registered in provider.yaml" +) +def check_all_provider_classes_are_registered(yaml_files: dict[str, dict]) -> tuple[int, int]: + """ + Walk all provider source files, find Hook/Operator/Sensor/Trigger/Executor/Notifier + subclasses, and verify they are registered in the appropriate provider.yaml section. + + This catches classes placed in non-standard directories or modules that were missed + when updating provider.yaml. + """ + from airflow.executors.base_executor import BaseExecutor + from airflow.models.baseoperator import BaseOperator + from airflow.sdk.bases.hook import BaseHook + from airflow.sdk.bases.notifier import BaseNotifier + from airflow.sensors.base import BaseSensorOperator + from airflow.triggers.base import BaseTrigger + + # Most specific first — BaseSensorOperator is a BaseOperator subclass + base_class_resource_map: list[tuple[type, str]] = [ + (BaseSensorOperator, "sensors"), + (BaseHook, "hooks"), + (BaseTrigger, "triggers"), + (BaseNotifier, "notifications"), + (BaseExecutor, "executors"), + (BaseOperator, "operators"), + ] + + # Resource types where registration is by class path (not module) + class_level_resource_types = {"executors", "notifications"} + + num_checks = 0 + num_errors = 0 + + # Directories that are not expected to contain registered provider classes + skip_dirs = {"tests", "example_dags", "decorators"} + + for yaml_file_path, provider_data in yaml_files.items(): + provider_dir = pathlib.Path(yaml_file_path).parent + package_dir = AIRFLOW_ROOT_PATH.joinpath(provider_dir) + + # Collect all modules registered in provider.yaml across all resource types + registered_modules: set[str] = set() + for resource_type in ("hooks", "operators", "sensors", "triggers", "bundles"): + for entry in provider_data.get(resource_type, []): + registered_modules.update(entry.get("python-modules", [])) + for entry in provider_data.get("transfers", []): + python_module = entry.get("python-module") + if python_module: + registered_modules.add(python_module) + + # Collect class paths for class-level registrations (executors, notifications) + registered_classes: set[str] = set() + for class_path in provider_data.get("executors", []): + registered_classes.add(class_path) + for class_path in provider_data.get("notifications", []): + registered_classes.add(class_path) + + # Find the src directory for the provider + src_dir = package_dir / "src" + if not src_dir.exists(): + src_dir = package_dir + + # Track unregistered modules and their classes + # module_name -> [(class_name, suggested_resource_type)] + unregistered: dict[str, list[tuple[str, str]]] = {} + + for py_file in sorted(src_dir.rglob("*.py")): + if py_file.name == "__init__.py": + continue + if skip_dirs & set(py_file.parts): + continue + + try: + module_name = _filepath_to_module(py_file) + except ValueError: + continue + + if module_name in DEPRECATED_MODULES: + continue + + is_registered = module_name in registered_modules + + try: + with warnings.catch_warnings(record=True): + module = importlib.import_module(module_name) + except (ImportError, AirflowOptionalProviderFeatureException): + continue + except Exception: + continue + + # Track seen classes by identity to skip aliases + # (e.g. send_chime_notification = ChimeNotifier) + seen_classes: set[int] = set() + + for attr_name in dir(module): + if attr_name.startswith("_"): + continue + obj = getattr(module, attr_name, None) + if not inspect.isclass(obj): + continue + # Only check classes defined in this module, not re-exported ones + if obj.__module__ != module_name: + continue + # Skip if this is an alias for a class we already checked + if id(obj) in seen_classes: + continue + seen_classes.add(id(obj)) + # Skip abstract classes — they are base classes, not concrete implementations + if inspect.isabstract(obj): + continue + + for base_class, resource_type in base_class_resource_map: + if issubclass(obj, base_class) and obj is not base_class: + num_checks += 1 + full_class_name = f"{module_name}.{attr_name}" + # Executors and notifications are registered by class path; + # other types are registered by module path. + if resource_type in class_level_resource_types: + # Check both the full path and any registered path + # that ends with the class name (handles __init__.py + # re-exports like airflow.providers.edge3.executors.EdgeExecutor) + is_ok = full_class_name in registered_classes or any( + rc.endswith(f".{attr_name}") for rc in registered_classes + ) + else: + is_ok = is_registered + if is_ok: + console.print( + f" [green]OK[/] {full_class_name} ({resource_type}, {base_class.__name__})" + ) + else: + unregistered.setdefault(module_name, []).append((attr_name, resource_type)) + console.print( + f" [red]MISSING[/] {full_class_name} " + f"({resource_type}, {base_class.__name__})" + ) + break # Most specific match wins, don't double-report + + # Report one error per unregistered module + for module_name, class_info in unregistered.items(): + class_names = ", ".join(f"`{name}`" for name, _ in class_info) + suggested_type = class_info[0][1] + if suggested_type in class_level_resource_types: + full_paths = ", ".join(f"`{module_name}.{name}`" for name, _ in class_info) + errors.append( + f"Class(es) {full_paths} not registered in the " + f"{suggested_type} section of {yaml_file_path}.\n" + f"[yellow]How to fix it[/]: Add the class path(s) to the " + f"{suggested_type} list in {yaml_file_path}." + ) + else: + errors.append( + f"Module `{module_name}` contains {suggested_type} " + f"class(es) ({class_names}) but is not registered in any " + f"resource section of {yaml_file_path}.\n" + f"[yellow]How to fix it[/]: Add `{module_name}` to the " + f"python-modules list in the {suggested_type} section " + f"of {yaml_file_path}, or to the transfers section " + f"if it is a transfer operator." + ) + num_errors += 1 + + return num_checks, num_errors + + @run_check("Checking plugin classes belong to package are importable and belong to package") def check_plugin_classes(yaml_files: dict[str, dict]) -> tuple[int, int]: resource_type = "plugins" @@ -775,11 +1002,13 @@ def check_providers_have_all_documentation_files(yaml_files: dict[str, dict]): check_completeness_of_list_of_transfers(all_parsed_yaml_files) check_hook_class_name_entries_in_connection_types(all_parsed_yaml_files) + check_hook_classes_with_conn_type_are_registered(all_parsed_yaml_files) check_executor_classes(all_parsed_yaml_files) check_queue_classes(all_parsed_yaml_files) check_plugin_classes(all_parsed_yaml_files) check_extra_link_classes(all_parsed_yaml_files) check_correctness_of_list_of_sensors_operators_hook_trigger_modules(all_parsed_yaml_files) + check_all_provider_classes_are_registered(all_parsed_yaml_files) check_notification_classes(all_parsed_yaml_files) check_unique_provider_name(all_parsed_yaml_files) check_providers_have_all_documentation_files(all_parsed_yaml_files)