Decouple S3 ObjectStorageProvider from common-sql into amazon provider#64941
Decouple S3 ObjectStorageProvider from common-sql into amazon provider#64941cruseakshay wants to merge 4 commits intoapache:mainfrom
Conversation
ad3d3c8 to
dbfbfb9
Compare
There was a problem hiding this comment.
Pull request overview
Decouples DataFusion S3 object storage support from apache-airflow-providers-common-sql by moving the S3 ObjectStorageProvider implementation into apache-airflow-providers-amazon, and adding a new ProvidersManager registry for resolving object storage providers at runtime.
Changes:
- Add
object-storage-providersdiscovery/registry toProvidersManager(schema + discovery + access property). - Move
S3ObjectStorageProviderinto the Amazon provider and register it via provider metadata. - Update common-sql to resolve non-local storage providers via the registry and add a deprecation shim for the old import path.
Reviewed changes
Copilot reviewed 17 out of 18 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| providers/common/sql/src/airflow/providers/common/sql/datafusion/object_storage_provider.py | Replaces hardcoded S3 provider with ProvidersManager registry lookup + deprecation shim. |
| providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py | Delegates connection/credential resolution to the storage provider (ConnectionConfig now only carries conn_id). |
| airflow-core/src/airflow/providers_manager.py | Adds ObjectStorageProviderInfo + registry discovery and property for object storage providers. |
| airflow-core/src/airflow/provider.yaml.schema.json | Extends provider.yaml schema to allow object-storage-providers. |
| airflow-core/src/airflow/provider_info.schema.json | Extends provider-info schema to include object-storage-providers. |
| airflow-core/tests/unit/always/test_providers_manager.py | Adds unit tests for object storage provider discovery/override behavior. |
| providers/amazon/src/airflow/providers/amazon/aws/datafusion/object_storage.py | New S3 DataFusion-backed S3ObjectStorageProvider implementation in amazon provider. |
| providers/amazon/provider.yaml | Registers S3 object storage provider under object-storage-providers. |
| providers/amazon/src/airflow/providers/amazon/get_provider_info.py | Adds generated provider-info entry for object-storage-providers. |
| providers/amazon/pyproject.toml | Adds a datafusion optional extra for the amazon provider. |
| providers/amazon/tests/unit/amazon/aws/datafusion/test_object_storage.py | Adds unit tests for the moved S3 provider implementation. |
| providers/amazon/tests/unit/amazon/aws/datafusion/init.py | New test package init for amazon datafusion tests. |
| providers/amazon/src/airflow/providers/amazon/aws/datafusion/init.py | New package init for amazon datafusion module. |
| providers/common/sql/tests/unit/common/sql/datafusion/test_object_storage_provider.py | Updates tests to validate registry resolution and deprecation shim behavior. |
| providers/common/sql/tests/unit/common/sql/datafusion/test_engine.py | Updates tests to reflect provider-resolved credentials and simplified ConnectionConfig. |
| providers/common/sql/docs/operators.rst | Documents that S3 support requires apache-airflow-providers-amazon[datafusion]. |
| providers/common/sql/pyproject.toml | Adjusts optional deps ordering; still includes an amazon optional dependency. |
| def get_object_storage_provider(storage_type: StorageType) -> ObjectStorageProvider: | ||
| """Get an object storage provider based on the storage type.""" | ||
| # TODO: Add support for GCS, Azure, HTTP: https://datafusion.apache.org/python/autoapi/datafusion/object_store/index.html | ||
| providers: dict[StorageType, type] = { | ||
| StorageType.S3: S3ObjectStorageProvider, | ||
| StorageType.LOCAL: LocalObjectStorageProvider, | ||
| } | ||
|
|
||
| if storage_type not in providers: | ||
| raise ValueError( | ||
| f"Unsupported storage type: {storage_type}. Supported types: {list(providers.keys())}" | ||
| if storage_type == StorageType.LOCAL: | ||
| return LocalObjectStorageProvider() | ||
|
|
||
| from airflow.providers_manager import ProvidersManager | ||
|
|
||
| registry = ProvidersManager().object_storage_providers | ||
| type_key = storage_type.value |
There was a problem hiding this comment.
get_object_storage_provider() assumes storage_type is a StorageType enum and will raise an AttributeError (on .value) if a caller passes an invalid type (e.g. a plain string). Previously this surfaced as a ValueError with a clearer message; consider adding an explicit isinstance(storage_type, StorageType) check and raising a ValueError for unsupported/invalid inputs to keep the API behavior predictable.
|
|
||
| if type_key in registry: | ||
| provider_cls = import_string(registry[type_key].provider_class_name) | ||
| return provider_cls() | ||
|
|
||
| hint = _STORAGE_TYPE_PROVIDER_HINTS.get(type_key, "the appropriate provider package") |
There was a problem hiding this comment.
When a storage type is registered, import_string(...) can still fail (e.g. provider installed without the required extra like apache-airflow-providers-amazon[datafusion], or a bad class path). Right now that ImportError will bubble up and bypass the friendly install-hint ValueError; consider catching ImportError (and possibly Exception) around import_string/instantiation and raising a ValueError that preserves the underlying error while still including the install/upgrade hint.
| if type_key in registry: | |
| provider_cls = import_string(registry[type_key].provider_class_name) | |
| return provider_cls() | |
| hint = _STORAGE_TYPE_PROVIDER_HINTS.get(type_key, "the appropriate provider package") | |
| hint = _STORAGE_TYPE_PROVIDER_HINTS.get(type_key, "the appropriate provider package") | |
| if type_key in registry: | |
| try: | |
| provider_cls = import_string(registry[type_key].provider_class_name) | |
| return provider_cls() | |
| except ImportError as exc: | |
| raise ValueError( | |
| f"Failed to import ObjectStorageProvider for storage type '{type_key}'. " | |
| f"Install or upgrade {hint}." | |
| ) from exc | |
| except Exception as exc: | |
| raise ValueError( | |
| f"Failed to initialize ObjectStorageProvider for storage type '{type_key}'. " | |
| f"Install or upgrade {hint}." | |
| ) from exc |
| "amazon" = [ | ||
| "apache-airflow-providers-amazon" | ||
| ] |
There was a problem hiding this comment.
The PR description says the amazon dependency is removed from common-sql, but the amazon optional-dependency is still present here. Also, since S3/DataFusion now requires apache-airflow-providers-amazon[datafusion], depending on plain apache-airflow-providers-amazon risks installs where the registry entry exists but importing the provider fails due to missing datafusion. Either remove this optional dependency (and update dev deps accordingly) or change it to apache-airflow-providers-amazon[datafusion] and align the PR description.
| mock_provider_cls = MagicMock() | ||
| mock_import_string.return_value = mock_provider_cls | ||
|
|
||
| mock_pm_cls.return_value.object_storage_providers = { | ||
| "s3": MagicMock( | ||
| provider_class_name="airflow.providers.amazon.aws.datafusion.object_storage.S3ObjectStorageProvider", | ||
| ), | ||
| } |
There was a problem hiding this comment.
Several MagicMock() instances here are created without spec/autospec, which can mask real attribute/method typos in tests. Prefer MagicMock(spec=...) (or using autospec=True where patching) so the mocks match the shape of the real objects being replaced.
| with pytest.warns( | ||
| match="Import it from airflow.providers.amazon", | ||
| ): | ||
| with pytest.raises(ObjectStoreCreationException, match="Failed to create S3 object store"): | ||
| provider.create_object_store("s3://demo-data/path", connection_config) | ||
| cls = mod.S3ObjectStorageProvider |
There was a problem hiding this comment.
These deprecation-shim tests use pytest.warns(match=...) without specifying the expected warning class. To avoid passing on unrelated warnings, assert the warning type explicitly (e.g. AirflowProviderDeprecationWarning) in addition to the message match.
| with pytest.warns( | ||
| match="Import it from airflow.providers.amazon", | ||
| ): | ||
| old_cls = mod.S3ObjectStorageProvider |
There was a problem hiding this comment.
Same as above: pytest.warns(match=...) should specify the warning category to ensure the shim is emitting AirflowProviderDeprecationWarning (and not just any warning that happens to match the message).
| mock_creds = MagicMock() | ||
| mock_creds.access_key = "hook_key" | ||
| mock_creds.secret_key = "hook_secret" | ||
| mock_creds.token = None | ||
| mock_hook_cls.return_value.get_credentials.return_value = mock_creds |
There was a problem hiding this comment.
mock_creds is a MagicMock() without a spec, which can hide attribute mistakes in the test (e.g. typos in access_key/secret_key/token). Prefer MagicMock(spec=...) or a small simple object (e.g. types.SimpleNamespace) with explicit attributes.
| mock_creds = MagicMock() | ||
| mock_creds.access_key = "hook_key" | ||
| mock_creds.secret_key = "hook_secret" | ||
| mock_creds.token = "session_tok" | ||
| mock_hook_cls.return_value.get_credentials.return_value = mock_creds |
There was a problem hiding this comment.
mock_creds is created as an unspec'd MagicMock() here as well; using a spec (or a simple object with explicit attributes) makes the test stricter and less prone to false positives.
| mock_creds = MagicMock() | ||
| mock_creds.access_key = "hook_key" | ||
| mock_creds.secret_key = "hook_secret" | ||
| mock_creds.token = "my_session_token" | ||
| mock_hook_cls.return_value.get_credentials.return_value = mock_creds |
There was a problem hiding this comment.
Same issue here: mock_creds is an unspec'd MagicMock(). Consider using a spec/autospec-based mock or an explicit object to ensure the test fails if unexpected attributes are accessed.
| mock_creds = MagicMock() | ||
| mock_creds.access_key = "k" | ||
| mock_creds.secret_key = "s" | ||
| mock_creds.token = None | ||
| mock_hook_cls.return_value.get_credentials.return_value = mock_creds |
There was a problem hiding this comment.
mock_creds is another unspec'd MagicMock() instance. Using a spec (or a simple explicit credentials object) would make the test more robust against accidental attribute/method drift.
Summary
Decouples
S3ObjectStorageProviderfromapache-airflow-providers-common-sqlintoapache-airflow-providers-amazon.Architecture:
common-sqldefines theObjectStorageProviderabstraction and all object storage providers (LocalStorage, S3, future GCS/Azure) are resolved at runtime viaProvidersManager.Key changes:
object-storage-providersregistration mechanism toProvidersManager(schema, discovery, property)S3ObjectStorageProvidertoproviders/amazon/aws/datafusion/object_storage.pycommon-sqlto dynamically resolve storage providers via theProvidersManagerregistryS3ObjectStorageProviderimport pathamazondependency fromcommon-sqlpyproject.tomlFollow-up PRs:
Testing:
AnalyticsOperatorrelated: #62867
Was generative AI tooling used to co-author this PR?