-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
azure_cli.py
173 lines (146 loc) · 6.94 KB
/
azure_cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import asyncio
import os
import shutil
import sys
from typing import Any, List, Optional
from azure.core.exceptions import ClientAuthenticationError
from azure.core.credentials import AccessToken
from .._internal import AsyncContextManager
from .._internal.decorators import log_get_token_async
from ... import CredentialUnavailableError
from ..._credentials.azure_cli import (
AzureCliCredential as _SyncAzureCliCredential,
CLI_NOT_FOUND,
COMMAND_LINE,
EXECUTABLE_NAME,
get_safe_working_dir,
NOT_LOGGED_IN,
parse_token,
sanitize_output,
)
from ..._internal import _scopes_to_resource, resolve_tenant, within_dac, validate_tenant_id, validate_scope
class AzureCliCredential(AsyncContextManager):
"""Authenticates by requesting a token from the Azure CLI.
This requires previously logging in to Azure via "az login", and will use the CLI's currently logged in identity.
:keyword str tenant_id: Optional tenant to include in the token request.
:keyword List[str] additionally_allowed_tenants: Specifies tenants in addition to the specified "tenant_id"
for which the credential may acquire tokens. Add the wildcard value "*" to allow the credential to
acquire tokens for any tenant the application can access.
:keyword int process_timeout: Seconds to wait for the Azure CLI process to respond. Defaults to 10 seconds.
.. admonition:: Example:
.. literalinclude:: ../samples/credential_creation_code_snippets.py
:start-after: [START create_azure_cli_credential_async]
:end-before: [END create_azure_cli_credential_async]
:language: python
:dedent: 4
:caption: Create an AzureCliCredential.
"""
def __init__(
self,
*,
tenant_id: str = "",
additionally_allowed_tenants: Optional[List[str]] = None,
process_timeout: int = 10,
) -> None:
if tenant_id:
validate_tenant_id(tenant_id)
self.tenant_id = tenant_id
self._additionally_allowed_tenants = additionally_allowed_tenants or []
self._process_timeout = process_timeout
@log_get_token_async
async def get_token(
self,
*scopes: str,
claims: Optional[str] = None, # pylint:disable=unused-argument
tenant_id: Optional[str] = None,
**kwargs: Any,
) -> AccessToken:
"""Request an access token for `scopes`.
This method is called automatically by Azure SDK clients. Applications calling this method directly must
also handle token caching because this credential doesn't cache the tokens it acquires.
:param str scopes: desired scope for the access token. This credential allows only one scope per request.
For more information about scopes, see
https://learn.microsoft.com/entra/identity-platform/scopes-oidc.
:keyword str claims: not used by this credential; any value provided will be ignored.
:keyword str tenant_id: optional tenant to include in the token request.
:return: An access token with the desired scopes.
:rtype: ~azure.core.credentials.AccessToken
:raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke the Azure CLI.
:raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked the Azure CLI but didn't
receive an access token.
"""
# only ProactorEventLoop supports subprocesses on Windows (and it isn't the default loop on Python < 3.8)
if sys.platform.startswith("win") and not isinstance(asyncio.get_event_loop(), asyncio.ProactorEventLoop):
return _SyncAzureCliCredential().get_token(*scopes, tenant_id=tenant_id, **kwargs)
if tenant_id:
validate_tenant_id(tenant_id)
for scope in scopes:
validate_scope(scope)
resource = _scopes_to_resource(*scopes)
command = COMMAND_LINE.format(resource)
tenant = resolve_tenant(
default_tenant=self.tenant_id,
tenant_id=tenant_id,
additionally_allowed_tenants=self._additionally_allowed_tenants,
**kwargs,
)
if tenant:
command += " --tenant " + tenant
output = await _run_command(command, self._process_timeout)
token = parse_token(output)
if not token:
sanitized_output = sanitize_output(output)
message = (
f"Unexpected output from Azure CLI: '{sanitized_output}'. \n"
f"To mitigate this issue, please refer to the troubleshooting guidelines here at "
f"https://aka.ms/azsdk/python/identity/azclicredential/troubleshoot."
)
if within_dac.get():
raise CredentialUnavailableError(message=message)
raise ClientAuthenticationError(message=message)
return token
async def close(self) -> None:
"""Calling this method is unnecessary"""
async def _run_command(command: str, timeout: int) -> str:
# Ensure executable exists in PATH first. This avoids a subprocess call that would fail anyway.
if shutil.which(EXECUTABLE_NAME) is None:
raise CredentialUnavailableError(message=CLI_NOT_FOUND)
if sys.platform.startswith("win"):
args = ("cmd", "/c " + command)
else:
args = ("/bin/sh", "-c", command)
working_directory = get_safe_working_dir()
try:
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
cwd=working_directory,
env=dict(os.environ, AZURE_CORE_NO_COLOR="true"),
)
stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout)
output = stdout_b.decode()
stderr = stderr_b.decode()
except asyncio.TimeoutError as ex:
proc.kill()
raise CredentialUnavailableError(message="Timed out waiting for Azure CLI") from ex
except OSError as ex:
# failed to execute 'cmd' or '/bin/sh'
error = CredentialUnavailableError(message="Failed to execute '{}'".format(args[0]))
raise error from ex
if proc.returncode == 0:
return output
# Fallback check in case the executable is not found while executing subprocess.
if proc.returncode == 127 or stderr.startswith("'az' is not recognized"):
raise CredentialUnavailableError(CLI_NOT_FOUND)
if ("az login" in stderr or "az account set" in stderr) and "AADSTS" not in stderr:
raise CredentialUnavailableError(message=NOT_LOGGED_IN)
message = sanitize_output(stderr) if stderr else "Failed to invoke Azure CLI"
if within_dac.get():
raise CredentialUnavailableError(message=message)
raise ClientAuthenticationError(message=message)