Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jan 7, 2026

closes: #60403

Motivation

Motivation is that ProvidersManager is imported in few places in task sdk from core airflow, blocking client server separation.

image

Now coming to a bigger problem and solving this issue better, ProvidersManager is a monolithic structure that forces workers to load infrastructure components they will never use (executors, auth managers). This for no reason provides workers with data they do not need increasing instance size as well.

Simple example:

# Workers need:
pm.hooks  # For task execution
pm.taskflow_decorators  # For @task.docker etc
pm.filesystem_module_names  # For filesystem I/O

# They certainly don't need
pm.executor_class_names
pm.auth_managers  # Only API server needs  

So, my goal here it to enable client server separation by splitting runtime resources of providers (hooks, taskflow_decorators) from others that airflow-core needs.

Approach

A detailed table of what should belong where as per me:

Property ProvidersManagerRuntime (SDK) ProvidersManager (Core) Purpose
hooks Primary Deprecated Connection hooks for external systems
taskflow_decorators Primary Deprecated Task decorators (@task.docker, etc.)
filesystem_module_names Primary Deprecated File I/O implementations
asset_factories Primary Deprecated Asset creation functions
asset_uri_handlers Primary Deprecated URI normalization
asset_to_openlineage_converters Primary Deprecated OpenLineage integration
auth_managers No Yes Authentication managers
executor_class_names No Yes Executor discovery
secrets_backend_class_names No Yes Secrets backends
logging_class_names No Yes Log handlers
queue_class_names No Yes Queue managers
cli_command_functions No Yes CLI commands
cli_command_providers No Yes CLI command providers
providers Yes Yes Provider metadata (both need it)
notification No Yes Notifiers
trigger No Yes Deferrable triggers
dialects No Yes Database dialects
plugins No Yes Provider plugins
extra_links_class_names No Yes UI extra links
connection_form_widgets No Yes Connection form UI
field_behaviours No Yes Connection field behaviors
provider_configs No Yes Provider configuration
already_initialized_provider_configs No Yes Already initialized configs

Decided to extract current ProvidersManager into two portions, one that handles runtime and other that handles what core needs.

What's done:

  • Splitting providers manager: ProvidersManagerRuntime in task-sdk for runtime resources and ProvidersManager as we know today stays to serve server components
  • Extracted common code of ~600 lines used by both into shared library
  • Hacky deprecation delegation in ProvidersManager (will make it better)
  • Basic testing confirms everything works

Task sdk will do this from now one:

from airflow.sdk.providers_manager_runtime import ProvidersManagerRuntime

pm = ProvidersManagerRuntime()
pm.hooks  # connection handling
pm.taskflow_decorators  # @task decorators
pm.filesystem_module_names  # file I/O
pm.asset_uri_handlers  # Asset scheme handlers

Testing

Backward compatibility

Existing code continues to work with deprecation warnings:

from airflow.providers_manager import ProvidersManager
pm = ProvidersManager()
pm.hooks
<ipython-input-3-95b9e89432f5>:1 DeprecationWarning: ProvidersManager.hooks is deprecated. Use ProvidersManagerRuntime.hooks from task-sdk instead.
Out[3]: <airflow._shared.providers_discovery.LazyDictWithCache at 0x1113a8040>
pm.hooks.get("postgres")
<ipython-input-4-9891a5daf791>:1 DeprecationWarning: ProvidersManager.hooks is deprecated. Use ProvidersManagerRuntime.hooks from task-sdk instead.
Out[4]: HookInfo(hook_class_name='airflow.providers.postgres.hooks.postgres.PostgresHook', connection_id_attribute_name='postgres_conn_id', package_name='apache-airflow-providers-postgres', hook_name='Postgres', connection_type='postgres', connection_testable=True, dialects=[])

CLI

Just created a simple script and ran it, output: providers-output.txt

Impact on consumers

For Providers

Nothing now, watch out for deprecation warnings and migrate using runtime properties from task sdk.

For Airflow developers / DAG authors

If you are using ProvidersManager for your DAG code, first of all you shouldn't but if you really are, migrate after watching deprecation warnings for runtime properties.

from airflow.sdk.providers_manager_runtime import ProvidersManagerRuntime
pm = ProvidersManagerRuntime()

What's next

Next phase:

  • Move UI metadata to yaml format
  • Extract connection form widgets from hook methods to provider.yaml
  • Migrate all 82 providers
    ` Update API server to read from YAML (eliminate hook imports)

Benefits:

  • API server starts without importing provider code
  • UI metadata purely declarative (YAML)

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh amoghrajesh marked this pull request as ready for review January 8, 2026 14:28

self._runtime_manager = None

def __getattribute__(self, name: str):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't love this, maybe change it to each method I guess

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would it look in regards of this surgery if you remove this compatibility shim? Is this called by all providers as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much yes. I will have follow ups to start making providers use providersmanager and providersmanagerruntime where applicable

Comment on lines -86 to -198
def test_hooks_deprecation_warnings_generated(self):
providers_manager = ProvidersManager()
providers_manager._provider_dict["test-package"] = ProviderInfo(
version="0.0.1",
data={"hook-class-names": ["airflow.providers.sftp.hooks.sftp.SFTPHook"]},
)
with pytest.warns(expected_warning=DeprecationWarning, match="hook-class-names") as warning_records:
providers_manager._discover_hooks()
assert warning_records

def test_hooks_deprecation_warnings_not_generated(self):
with warnings.catch_warnings(record=True) as warning_records:
providers_manager = ProvidersManager()
providers_manager._provider_dict["apache-airflow-providers-sftp"] = ProviderInfo(
version="0.0.1",
data={
"hook-class-names": ["airflow.providers.sftp.hooks.sftp.SFTPHook"],
"connection-types": [
{
"hook-class-name": "airflow.providers.sftp.hooks.sftp.SFTPHook",
"connection-type": "sftp",
}
],
},
)
providers_manager._discover_hooks()
assert [w.message for w in warning_records if "hook-class-names" in str(w.message)] == []

def test_warning_logs_generated(self):
providers_manager = ProvidersManager()
providers_manager._hooks_lazy_dict = LazyDictWithCache()
with self._caplog.at_level(logging.WARNING):
providers_manager._provider_dict["apache-airflow-providers-sftp"] = ProviderInfo(
version="0.0.1",
data={
"hook-class-names": ["airflow.providers.sftp.hooks.sftp.SFTPHook"],
"connection-types": [
{
"hook-class-name": "airflow.providers.sftp.hooks.sftp.SFTPHook",
"connection-type": "wrong-connection-type",
}
],
},
)
providers_manager._discover_hooks()
_ = providers_manager._hooks_lazy_dict["wrong-connection-type"]
assert len(self._caplog.entries) == 1
assert "Inconsistency!" in self._caplog[0]["event"]
assert "sftp" not in providers_manager.hooks

def test_warning_logs_not_generated(self):
with self._caplog.at_level(logging.WARNING):
providers_manager = ProvidersManager()
providers_manager._provider_dict["apache-airflow-providers-sftp"] = ProviderInfo(
version="0.0.1",
data={
"hook-class-names": ["airflow.providers.sftp.hooks.sftp.SFTPHook"],
"connection-types": [
{
"hook-class-name": "airflow.providers.sftp.hooks.sftp.SFTPHook",
"connection-type": "sftp",
}
],
},
)
providers_manager._discover_hooks()
_ = providers_manager._hooks_lazy_dict["sftp"]
assert not self._caplog.records
assert "sftp" in providers_manager.hooks

def test_already_registered_conn_type_in_provide(self):
with self._caplog.at_level(logging.WARNING):
providers_manager = ProvidersManager()
providers_manager._provider_dict["apache-airflow-providers-dummy"] = ProviderInfo(
version="0.0.1",
data={
"connection-types": [
{
"hook-class-name": "airflow.providers.dummy.hooks.dummy.DummyHook",
"connection-type": "dummy",
},
{
"hook-class-name": "airflow.providers.dummy.hooks.dummy.DummyHook2",
"connection-type": "dummy",
},
],
},
)
providers_manager._discover_hooks()
_ = providers_manager._hooks_lazy_dict["dummy"]
assert len(self._caplog.records) == 1
msg = self._caplog.messages[0]
assert msg.startswith("The connection type 'dummy' is already registered")
assert (
"different class names: 'airflow.providers.dummy.hooks.dummy.DummyHook'"
" and 'airflow.providers.dummy.hooks.dummy.DummyHook2'."
) in msg

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests deal with hooks, so they have been moved into sdk because ProvidersManagerRuntime deals with hooks

Comment on lines -231 to -299
def test_hooks(self):
with warnings.catch_warnings(record=True) as warning_records:
with self._caplog.at_level(logging.WARNING):
provider_manager = ProvidersManager()
connections_list = list(provider_manager.hooks.keys())
assert len(connections_list) > 60
if len(self._caplog.records) != 0:
for record in self._caplog.records:
print(record.message, file=sys.stderr)
print(record.exc_info, file=sys.stderr)
raise AssertionError("There are warnings generated during hook imports. Please fix them")
assert [w.message for w in warning_records if "hook-class-names" in str(w.message)] == []

@skip_if_not_on_main
@pytest.mark.execution_timeout(150)
def test_hook_values(self):
provider_dependencies = json.loads(
(AIRFLOW_ROOT_PATH / "generated" / "provider_dependencies.json").read_text()
)
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
excluded_providers: list[str] = []
for provider_name, provider_info in provider_dependencies.items():
if python_version in provider_info.get("excluded-python-versions", []):
excluded_providers.append(f"apache-airflow-providers-{provider_name.replace('.', '-')}")
with warnings.catch_warnings(record=True) as warning_records:
with self._caplog.at_level(logging.WARNING):
provider_manager = ProvidersManager()
connections_list = list(provider_manager.hooks.values())
assert len(connections_list) > 60
if len(self._caplog.records) != 0:
real_warning_count = 0
for record in self._caplog.entries:
# When there is error importing provider that is excluded the provider name is in the message
if any(excluded_provider in record["event"] for excluded_provider in excluded_providers):
continue
print(record["event"], file=sys.stderr)
print(record.get("exc_info"), file=sys.stderr)
real_warning_count += 1
if real_warning_count:
if PY313:
only_ydb_and_yandexcloud_warnings = True
for record in warning_records:
if "ydb" in str(record.message) or "yandexcloud" in str(record.message):
continue
only_ydb_and_yandexcloud_warnings = False
if only_ydb_and_yandexcloud_warnings:
print(
"Only warnings from ydb and yandexcloud providers are generated, "
"which is expected in Python 3.13+",
file=sys.stderr,
)
return
raise AssertionError("There are warnings generated during hook imports. Please fix them")
assert [w.message for w in warning_records if "hook-class-names" in str(w.message)] == []
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Comment on lines -378 to -419
@patch("airflow.providers_manager.import_string")
def test_optional_feature_no_warning(self, mock_importlib_import_string):
with self._caplog.at_level(logging.WARNING):
mock_importlib_import_string.side_effect = AirflowOptionalProviderFeatureException()
providers_manager = ProvidersManager()
providers_manager._hook_provider_dict["test_connection"] = HookClassProvider(
package_name="test_package", hook_class_name="HookClass"
)
providers_manager._import_hook(
hook_class_name=None, provider_info=None, package_name=None, connection_type="test_connection"
)
assert self._caplog.messages == []

@patch("airflow.providers_manager.import_string")
def test_optional_feature_debug(self, mock_importlib_import_string):
with self._caplog.at_level(logging.INFO):
mock_importlib_import_string.side_effect = AirflowOptionalProviderFeatureException()
providers_manager = ProvidersManager()
providers_manager._hook_provider_dict["test_connection"] = HookClassProvider(
package_name="test_package", hook_class_name="HookClass"
)
providers_manager._import_hook(
hook_class_name=None, provider_info=None, package_name=None, connection_type="test_connection"
)
assert self._caplog.messages == [
"Optional provider feature disabled when importing 'HookClass' from 'test_package' package"
]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Comment on lines -446 to -548
@pytest.mark.parametrize(
("value", "expected_outputs"),
[
("a", "a"),
(1, 1),
(None, None),
(lambda: 0, 0),
(lambda: None, None),
(lambda: "z", "z"),
],
)
def test_lazy_cache_dict_resolving(value, expected_outputs):
lazy_cache_dict = LazyDictWithCache()
lazy_cache_dict["key"] = value
assert lazy_cache_dict["key"] == expected_outputs
# Retrieve it again to see if it is correctly returned again
assert lazy_cache_dict["key"] == expected_outputs


def test_lazy_cache_dict_raises_error():
def raise_method():
raise RuntimeError("test")

lazy_cache_dict = LazyDictWithCache()
lazy_cache_dict["key"] = raise_method
with pytest.raises(RuntimeError, match="test"):
_ = lazy_cache_dict["key"]


def test_lazy_cache_dict_del_item():
lazy_cache_dict = LazyDictWithCache()

def answer():
return 42

lazy_cache_dict["spam"] = answer
assert "spam" in lazy_cache_dict._raw_dict
assert "spam" not in lazy_cache_dict._resolved # Not resoled yet
assert lazy_cache_dict["spam"] == 42
assert "spam" in lazy_cache_dict._resolved
del lazy_cache_dict["spam"]
assert "spam" not in lazy_cache_dict._raw_dict
assert "spam" not in lazy_cache_dict._resolved

lazy_cache_dict["foo"] = answer
assert lazy_cache_dict["foo"] == 42
assert "foo" in lazy_cache_dict._resolved
# Emulate some mess in data, e.g. value from `_raw_dict` deleted but not from `_resolved`
del lazy_cache_dict._raw_dict["foo"]
assert "foo" in lazy_cache_dict._resolved
with pytest.raises(KeyError):
# Error expected here, but we still expect to remove also record into `resolved`
del lazy_cache_dict["foo"]
assert "foo" not in lazy_cache_dict._resolved

lazy_cache_dict["baz"] = answer
# Key in `_resolved` not created yet
assert "baz" in lazy_cache_dict._raw_dict
assert "baz" not in lazy_cache_dict._resolved
del lazy_cache_dict._raw_dict["baz"]
assert "baz" not in lazy_cache_dict._raw_dict
assert "baz" not in lazy_cache_dict._resolved


def test_lazy_cache_dict_clear():
def answer():
return 42

lazy_cache_dict = LazyDictWithCache()
assert len(lazy_cache_dict) == 0
lazy_cache_dict["spam"] = answer
lazy_cache_dict["foo"] = answer
lazy_cache_dict["baz"] = answer

assert len(lazy_cache_dict) == 3
assert len(lazy_cache_dict._raw_dict) == 3
assert not lazy_cache_dict._resolved
assert lazy_cache_dict["spam"] == 42
assert len(lazy_cache_dict._resolved) == 1
# Emulate some mess in data, contain some data into the `_resolved`
lazy_cache_dict._resolved.add("biz")
assert len(lazy_cache_dict) == 3
assert len(lazy_cache_dict._resolved) == 2
# And finally cleanup everything
lazy_cache_dict.clear()
assert len(lazy_cache_dict) == 0
assert not lazy_cache_dict._raw_dict
assert not lazy_cache_dict._resolved
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These deal with LazyDictWithCache which is now in shared library and due to that it these tests are in shared now

@amoghrajesh amoghrajesh self-assigned this Jan 12, 2026
@amoghrajesh amoghrajesh requested a review from uranusjr January 12, 2026 08:33
@kaxil
Copy link
Member

kaxil commented Jan 14, 2026

cc @potiuk Would be good to get your review on this one, you'd know the ProvidersManager best

return provider_info_cache_decorator


def discover_all_providers_from_packages(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep its a copy paste, since we will be needing it in both runtime and core providersmanager, it is put in shared

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correct:

Would it be better to replace _discover_all_providers_from_packages private method of core ProviderManager (

self._discover_all_providers_from_packages()
self._verify_all_providers_all_compatible()
)

with discover_all_providers_from_packages(self._provider_dict, self._provider_schema_validator) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I think I had misundertood kaxil's comments. Yeah that makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, thanks @jason810496

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close enough but I'd wait for @potiuk to do a review too

@amoghrajesh
Copy link
Contributor Author

Agreed, I would wait for a review from @potiuk too

Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. LGTM overall!

return provider_info_cache_decorator


def discover_all_providers_from_packages(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correct:

Would it be better to replace _discover_all_providers_from_packages private method of core ProviderManager (

self._discover_all_providers_from_packages()
self._verify_all_providers_all_compatible()
)

with discover_all_providers_from_packages(self._provider_dict, self._provider_schema_validator) ?

are placeholders for extra fields (i.e. anything other than the built-in conn
attrs), and if those extra fields are unprefixed, then add the prefix.
The reason we need to do this is, all custom conn fields live in the same dictionary,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

)
for conn_type, class_name in (
("fs", "airflow.providers.standard.hooks.filesystem.FSHook"),
("package_index", "airflow.providers.standard.hooks.package_index.PackageIndexHook"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Should not that be moved to standard provider

return imported_class


class ProvidersManagerRuntime(LoggingMixin):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class ProvidersManagerRuntime(LoggingMixin):
class ProvidersManagerTaskRuntime(LoggingMixin):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to use "Task" here to indicate that this is related to task-sdk usage. I know it is inside the sdk package which already kind of implies it, but adding Task here will make it crystal clear that this is about disovery and registration of providers for "task" purposes.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice and clean - just one question and one suggestion. Great Job @amoghrajesh on making it closer to task-isolation and provider split "north star".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Break ProvidersManager into infrastructure and runtime managers

6 participants