Skip to content

Commit

Permalink
auth - using http_authx if available
Browse files Browse the repository at this point in the history
  • Loading branch information
commonism committed Jun 8, 2023
1 parent 75bdbc5 commit c1ac973
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 100 deletions.
45 changes: 32 additions & 13 deletions aiopenapi3/v20/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,30 +80,44 @@ def _prepare_security(self):
)

def _prepare_secschemes(self, scheme: str, value: Union[str, List[str]]):
"""
https://swagger.io/specification/v2/#security-scheme-object
"""
if httpx_auth is not None:
return self._prepare_secschemes_extra(scheme, value)
else:
return self._prepare_secschemes_default(scheme, value)

def _prepare_secschemes_default(self, scheme: str, value: Union[str, List[str]]):
ss = self.root.securityDefinitions[scheme]

if ss.type == "basic":
value = cast(List[str], value)
self.req.auth = httpx_auth.Basic(*value) if httpx_auth else httpx.BasicAuth(*value)
self.req.auth = httpx.BasicAuth(*value)

value = cast(str, value)
if ss.type == "apiKey":
if ss.in_ == "query":
# apiKey in query parameter
if httpx_auth:
self.req.auth = httpx_auth.QueryApiKey(value, getattr(ss, "name", None))
else:
self.req.params[ss.name] = value
self.req.params[ss.name] = value

if ss.in_ == "header":
# apiKey in query header data
if httpx_auth:
self.req.auth = httpx_auth.HeaderApiKey(value, getattr(ss, "name", None))
else:
self.req.headers[ss.name] = value
self.req.headers[ss.name] = value

def _prepare_secschemes_extra(self, scheme: str, value: Union[str, List[str]]):
ss = self.root.securityDefinitions[scheme]

if ss.type == "basic":
value = cast(List[str], value)
self.req.auth = httpx_auth.Basic(*value)

value = cast(str, value)
if ss.type == "apiKey":
if ss.in_ == "query":
# apiKey in query parameter
self.req.auth = httpx_auth.QueryApiKey(value, ss.name)

if ss.in_ == "header":
# apiKey in query header data
self.req.auth = httpx_auth.HeaderApiKey(value, ss.name)

def _prepare_parameters(self, provided):
provided = provided or dict()
Expand Down Expand Up @@ -294,4 +308,9 @@ def _process(self, result):


class AsyncRequest(Request, AsyncRequestBase):
pass
def _prepare_secschemes(self, scheme: str, value: Union[str, List[str]]):
"""
httpx_auth does not support async yet
https://github.com/Colin-b/httpx_auth/pull/48
"""
return self._prepare_secschemes_default(scheme, value)
183 changes: 101 additions & 82 deletions aiopenapi3/v30/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,17 @@
from httpx_auth.authentication import SupportMultiAuth
except:
httpx_auth = None
import inspect

if httpx_auth is not None:
import inspect

HTTPX_AUTH_METHODS = {
name.lower(): getattr(httpx_auth, name)
for name in httpx_auth.__all__
if inspect.isclass((class_ := getattr(httpx_auth, name)))
if issubclass(class_, httpx.Auth)
}

import pydantic
import pydantic.json

Expand Down Expand Up @@ -82,107 +92,116 @@ def _prepare_security(self):
)

def _prepare_secschemes(self, scheme: str, value: Union[str, List[str]]):
if httpx_auth is not None:
return self._prepare_secschemes_extra(scheme, value)
else:
return self._prepare_secschemes_default(scheme, value)

def _prepare_secschemes_default(self, scheme: str, value: Union[str, List[str]]):
ss = self.root.components.securitySchemes[scheme]
if httpx_auth:
auth_methods = {
name.lower(): getattr(httpx_auth, name)
for name in httpx_auth.__all__
if inspect.isclass((class_ := getattr(httpx_auth, name)))
if issubclass(class_, httpx.Auth)
}
add_auths = []

if ss.type == "oauth2":
# NOTE: refresh_url is not currently supported by httpx_auth
# REF: https://github.com/Colin-b/httpx_auth/issues/17
if flow := getattr(ss.flows, "implicit", None):
add_auths.append(httpx_auth.OAuth2Implicit(

if ss.type == "http":
if ss.scheme_ == "basic":
self.req.auth = httpx.BasicAuth(*value)
elif ss.scheme_ == "digest":
self.req.auth = httpx.DigestAuth(*value)
elif ss.scheme_ == "bearer":
self.req.headers["Authorization"] = f"Bearer {value:s}"
else:
raise ValueError(f"Authentication {ss.type}/{ss.scheme_} is not supported.")

value = cast(str, value)

if ss.type == "mutualTLS":
# TLS Client certificates (mutualTLS)
self.req.cert = value

if ss.type == "apiKey":
if ss.in_ == "query":
# apiKey in query parameter
self.req.params[ss.name] = value

if ss.in_ == "header":
# apiKey in query header data
self.req.headers[ss.name] = value

if ss.in_ == "cookie":
self.req.cookies = {ss.name: value}

def _prepare_secschemes_extra(self, scheme: str, value: Union[str, List[str]]):
ss = self.root.components.securitySchemes[scheme]
auths = []

if ss.type == "oauth2":
# NOTE: refresh_url is not currently supported by httpx_auth
# REF: https://github.com/Colin-b/httpx_auth/issues/17
if flow := ss.flows.implicit:
auths.append(
httpx_auth.OAuth2Implicit(
**value,
authorization_url=flow.authorizationUrl,
scopes=flow.scopes,
# refresh_url=getattr(flow, "refreshUrl", None),
))
if flow := getattr(ss.flows, "password", None):
add_auths.append(httpx_auth.OAuth2ResourceOwnerPasswordCredentials(
# refresh_url=flow.refreshUrl,
)
)
if flow := ss.flows.password:
auths.append(
httpx_auth.OAuth2ResourceOwnerPasswordCredentials(
**value,
token_url=flow.tokenUrl,
scopes=flow.scopes,
# refresh_url=getattr(flow, "refreshUrl", None),
))
if flow := getattr(ss.flows, "clientCredentials", None):
add_auths.append(httpx_auth.OAuth2ClientCredentials(
# refresh_url=flow.refreshUrl,
)
)
if flow := ss.flows.clientCredentials:
auths.append(
httpx_auth.OAuth2ClientCredentials(
**value,
token_url=flow.tokenUrl,
scopes=flow.scopes,
# refresh_url=getattr(flow, "refreshUrl", None),
))
if flow := getattr(ss.flows, "authorizationCode", None):
add_auths.append(httpx_auth.OAuth2AuthorizationCode(
# refresh_url=flow.refreshUrl,
)
)
if flow := ss.flows.authorizationCode:
auths.append(
httpx_auth.OAuth2AuthorizationCode(
**value,
authorization_url=flow.authorizationUrl,
token_url=flow.tokenUrl,
scopes=flow.scopes,
# refresh_url=getattr(flow, "refreshUrl", None),
))

if ss.type == "http":
if auth := auth_methods.get(ss.scheme_, None):
if isinstance(value, tuple):
add_auths.append(auth(*value))
if isinstance(value, dict):
add_auths.append(auth(**value))
if ss.scheme_ == "bearer":
add_auths.append(auth_methods["headerapikey"](
f"{ss.bearerFormat or 'Bearer'} {value}",
"Authorization"
))

value = cast(str, value)

if ss.type == "mutualTLS":
# TLS Client certificates (mutualTLS)
self.req.cert = value

if ss.type == "apiKey":
if auth := auth_methods.get((ss.in_+ss.type).lower(), None):
add_auths.append(auth(value, getattr(ss, "name", None)))

if ss.in_ == "cookie":
self.req.cookies = {ss.name: value}

for auth in add_auths:
if self.req.auth and isinstance(self.req.auth, SupportMultiAuth):
self.req.auth += auth
else:
self.req.auth = auth
else:
if ss.type == "http" and ss.scheme_ == "basic":
self.req.auth = httpx.BasicAuth(*value)
# refresh_url=flow.refreshUrl,
)
)

if ss.type == "http" and ss.scheme_ == "digest":
self.req.auth = httpx.DigestAuth(*value)
if ss.type == "http":
if auth := HTTPX_AUTH_METHODS.get(ss.scheme_, None):
if isinstance(value, tuple):
auths.append(auth(*value))
elif isinstance(value, dict):
auths.append(auth(**value))
elif ss.scheme_ == "bearer":
auths.append(httpx_auth.HeaderApiKey(f"Bearer {value}", "Authorization"))
else:
raise ValueError(f"Authentication method {ss.type}/{ss.scheme_} is not supported by httpx-auth")

value = cast(str, value)
if ss.type == "http" and ss.scheme_ == "bearer":
header = ss.bearerFormat or "Bearer {}"
self.req.headers["Authorization"] = header.format(value)
value = cast(str, value)

if ss.type == "mutualTLS":
# TLS Client certificates (mutualTLS)
self.req.cert = value
if ss.type == "mutualTLS":
# TLS Client certificates (mutualTLS)
self.req.cert = value

if ss.type == "apiKey":
if ss.in_ == "query":
# apiKey in query parameter
self.req.params[ss.name] = value
if ss.type == "apiKey":
if auth := HTTPX_AUTH_METHODS.get((ss.in_ + ss.type).lower(), None):
auths.append(auth(value, ss.name))

if ss.in_ == "header":
# apiKey in query header data
self.req.headers[ss.name] = value
if ss.in_ == "cookie":
self.req.cookies = {ss.name: value}

if ss.in_ == "cookie":
self.req.cookies = {ss.name: value}

for auth in auths:
if self.req.auth and isinstance(self.req.auth, SupportMultiAuth):
self.req.auth += auth
else:
self.req.auth = auth

def _prepare_parameters(self, provided):
"""
Expand Down
3 changes: 2 additions & 1 deletion docs/source/advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ Advanced usage

Authentication
==============

The authentication requirements are part of the definition of an operation, either global or - it it exists - operation scope.
Authentication can combine/require multiple identifiert as well as providing a choice of a set.
Authentication can combine/require multiple identifiers as well as providing a choice of a set.

Given the following section of a description document:

Expand Down
3 changes: 3 additions & 0 deletions docs/source/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ Installation
.. code:: bash
$ pip install aiopenapi3
* aiopenapi3[auth] will install httpx-auth_ which is required to authenticate using oauth2/azuread/. Currently httpx-auth is `limited to Sync <https://github.com/Colin-b/httpx_auth/pull/48>`_ operations.
3 changes: 2 additions & 1 deletion docs/source/links.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.. |aiopenapi3| replace:: **aiopenapi3**
.. _OpenAPI: https://github.com/OAI/OpenAPI-Specification/
.. _pydantic: https://github.com/pydantic/pydantic
.. _httpx: https://github.com/encode/httpx/
.. _httpx: https://github.com/encode/httpx
.. _httpx-auth: https://github.com/Colin-b/httpx_auth
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,8 @@ tests =
wtforms
asgiref

auth =
httpx-auth

socks =
httpx_socks
9 changes: 6 additions & 3 deletions tests/path_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,12 @@ def test_paths_security(httpx_mock, with_paths_security):
request = httpx_mock.get_requests()[-1]
assert request.headers["Authorization"].split(" ")[1] == base64.b64encode((auth + ":" + auth).encode()).decode()

api.authenticate(None, digestAuth=(auth, auth))
api._.api_v1_auth_login_create(data={}, parameters={})
request = httpx_mock.get_requests()[-1]
try:
import httpx_auth
except:
api.authenticate(None, digestAuth=(auth, auth))
api._.api_v1_auth_login_create(data={}, parameters={})
request = httpx_mock.get_requests()[-1]
# can't test?

api.authenticate(None, bearerAuth=auth)
Expand Down

0 comments on commit c1ac973

Please sign in to comment.