Skip to content

Commit

Permalink
allow impersonation_chain to be set on Google Cloud connection (#33715)
Browse files Browse the repository at this point in the history
  • Loading branch information
melugoyal committed Aug 30, 2023
1 parent 3ef770e commit 075afe5
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
6 changes: 6 additions & 0 deletions airflow/providers/google/common/hooks/base_google.py
Expand Up @@ -216,6 +216,9 @@ def get_connection_form_widgets() -> dict[str, Any]:
widget=BS3TextFieldWidget(),
default=5,
),
"impersonation_chain": StringField(
lazy_gettext("Impersonation Chain"), widget=BS3TextFieldWidget()
),
}

@staticmethod
Expand Down Expand Up @@ -262,6 +265,9 @@ def get_credentials_and_project_id(self) -> tuple[google.auth.credentials.Creden

credential_config_file: str | None = self._get_field("credential_config_file", None)

if not self.impersonation_chain:
self.impersonation_chain = self._get_field("impersonation_chain", None)

target_principal, delegates = _get_target_principal_and_delegates(self.impersonation_chain)

credentials, project_id = get_credentials_and_project_id(
Expand Down
13 changes: 13 additions & 0 deletions docs/apache-airflow-providers-google/connections/gcp.rst
Expand Up @@ -125,6 +125,16 @@ Number of Retries
represents the last request. If zero (default), we attempt the
request only once.

Impersonation Chain
Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in all requests leveraging this connection.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account.

When specifying the connection in environment variable you should specify
it using URI syntax, with the following requirements:

Expand All @@ -142,6 +152,7 @@ Number of Retries
* ``scope`` - Scopes
* ``num_retries`` - Number of Retries


Note that all components of the URI should be URL-encoded.

For example, with URI format:
Expand All @@ -165,6 +176,8 @@ Google operators support `direct impersonation of a service account
<https://cloud.google.com/iam/docs/understanding-service-accounts#directly_impersonating_a_service_account>`_
via ``impersonation_chain`` argument (``google_impersonation_chain`` in case of operators
that also communicate with services of other cloud providers).
The impersonation chain can also be configured directly on the Google Cloud Connection
as described above, though the ``impersonation_chain`` passed to the operator takes precedence.

For example:

Expand Down
29 changes: 26 additions & 3 deletions tests/providers/google/common/hooks/test_base_google.py
Expand Up @@ -661,29 +661,52 @@ def test_authorize_assert_http_timeout_is_present(self, mock_get_credentials):
assert http_authorized.timeout is not None

@pytest.mark.parametrize(
"impersonation_chain, target_principal, delegates",
"impersonation_chain, impersonation_chain_from_conn, target_principal, delegates",
[
pytest.param("ACCOUNT_1", "ACCOUNT_1", None, id="string"),
pytest.param(["ACCOUNT_1"], "ACCOUNT_1", [], id="single_element_list"),
pytest.param("ACCOUNT_1", None, "ACCOUNT_1", None, id="string"),
pytest.param(None, "ACCOUNT_1", "ACCOUNT_1", None, id="string_in_conn"),
pytest.param("ACCOUNT_2", "ACCOUNT_1", "ACCOUNT_2", None, id="string_with_override"),
pytest.param(["ACCOUNT_1"], None, "ACCOUNT_1", [], id="single_element_list"),
pytest.param(None, ["ACCOUNT_1"], "ACCOUNT_1", [], id="single_element_list_in_conn"),
pytest.param(
["ACCOUNT_1"], ["ACCOUNT_2"], "ACCOUNT_1", [], id="single_element_list_with_override"
),
pytest.param(
["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"],
None,
"ACCOUNT_3",
["ACCOUNT_1", "ACCOUNT_2"],
id="multiple_elements_list",
),
pytest.param(
None,
["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"],
"ACCOUNT_3",
["ACCOUNT_1", "ACCOUNT_2"],
id="multiple_elements_list_in_conn",
),
pytest.param(
["ACCOUNT_2", "ACCOUNT_3", "ACCOUNT_4"],
["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"],
"ACCOUNT_4",
["ACCOUNT_2", "ACCOUNT_3"],
id="multiple_elements_list_with_override",
),
],
)
@mock.patch(MODULE_NAME + ".get_credentials_and_project_id")
def test_get_credentials_and_project_id_with_impersonation_chain(
self,
mock_get_creds_and_proj_id,
impersonation_chain,
impersonation_chain_from_conn,
target_principal,
delegates,
):
mock_credentials = mock.MagicMock()
mock_get_creds_and_proj_id.return_value = (mock_credentials, PROJECT_ID)
self.instance.impersonation_chain = impersonation_chain
self.instance.extras = {"impersonation_chain": impersonation_chain_from_conn}
result = self.instance.get_credentials_and_project_id()
mock_get_creds_and_proj_id.assert_called_once_with(
key_path=None,
Expand Down

0 comments on commit 075afe5

Please sign in to comment.