-
Notifications
You must be signed in to change notification settings - Fork 20
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #225 from Axonius/develop
Bugfixes for QA gold release SA-3419
- Loading branch information
Showing
24 changed files
with
494 additions
and
167 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Models for API requests & responses.""" | ||
import dataclasses | ||
import typing as t | ||
|
||
import marshmallow | ||
|
||
from ...exceptions import AuthError | ||
from .base import BaseModel, BaseSchemaJson | ||
from .custom_fields import SchemaBool | ||
from .generic import Metadata, MetadataSchema | ||
|
||
|
||
class LoginRequestSchema(BaseSchemaJson): | ||
"""Schema for issuing a login request.""" | ||
|
||
user_name = marshmallow.fields.Str( | ||
allow_none=True, | ||
dump_default=None, | ||
load_default=None, | ||
description="Axonius User Name", | ||
) | ||
password = marshmallow.fields.Str( | ||
allow_none=True, | ||
dump_default=None, | ||
load_default=None, | ||
description="Axonius Password", | ||
) | ||
saml_token = marshmallow.fields.Str( | ||
allow_none=True, | ||
dump_default=None, | ||
load_default=None, | ||
description="SAML token from 2FA negotiation", | ||
) | ||
remember_me = SchemaBool( | ||
load_default=False, dump_default=False, description="Used for browser controls" | ||
) | ||
eula_agreed = SchemaBool( | ||
load_default=False, dump_default=False, description="EULA has been agreed to by user" | ||
) | ||
|
||
class Meta: | ||
"""Pass.""" | ||
|
||
type_ = "login_schema" | ||
|
||
@staticmethod | ||
def get_model_cls() -> t.Optional[type]: | ||
"""Pass.""" | ||
return LoginRequest | ||
|
||
|
||
LOGIN_REQUEST_SCHEMA = LoginRequestSchema() | ||
|
||
|
||
@dataclasses.dataclass | ||
class LoginRequest(BaseModel): | ||
"""Model for issuing a login request.""" | ||
|
||
user_name: t.Optional[str] = None | ||
password: t.Optional[str] = None | ||
saml_token: t.Optional[str] = None | ||
remember_me: bool = False | ||
eula_agreed: bool = False | ||
|
||
@staticmethod | ||
def get_schema_cls() -> t.Optional[type]: | ||
"""Pass.""" | ||
return LoginRequestSchema | ||
|
||
def _check_credential(self, attr: str) -> str: | ||
"""Check that a credential is a non-empty string.""" | ||
value: t.Any = getattr(self, attr) | ||
|
||
if isinstance(value, str) and value.strip(): | ||
value = value.strip() | ||
setattr(self, attr, value) | ||
return value | ||
|
||
field: marshmallow.Field = LOGIN_REQUEST_SCHEMA.declared_fields[attr] | ||
description: str = field.metadata.get("description", f"{attr}") | ||
msgs: t.List[str] = [ | ||
f"Value provided for {description} is not a non-empty string" | ||
f"Provided type {type(value)}, value: {value!r}" | ||
] | ||
raise AuthError(msgs) | ||
|
||
def check_credentials(self): | ||
"""Check that username and password are not empty.""" | ||
self._check_credential(attr="user_name") | ||
self._check_credential(attr="password") | ||
|
||
|
||
class LoginResponseSchema(MetadataSchema): | ||
"""Schema for receiving a login response.""" | ||
|
||
class Meta: | ||
"""Pass.""" | ||
|
||
type_ = "metadata_schema" | ||
|
||
@staticmethod | ||
def get_model_cls() -> t.Optional[type]: | ||
"""Pass.""" | ||
return LoginResponse | ||
|
||
|
||
@dataclasses.dataclass | ||
class LoginResponse(Metadata): | ||
"""Model for receiving a login response.""" | ||
|
||
document_meta: t.Optional[dict] = dataclasses.field(default_factory=dict) | ||
|
||
@staticmethod | ||
def get_schema_cls() -> t.Optional[type]: | ||
"""Pass.""" | ||
return LoginResponseSchema | ||
|
||
@property | ||
def access_token(self) -> str: | ||
"""Get the Access token for use in auth header.""" | ||
return self.document_meta["access_token"] | ||
|
||
@property | ||
def refresh_token(self) -> str: | ||
"""Get the Refresh token for use in auth header.""" | ||
return self.document_meta["refresh_token"] | ||
|
||
@property | ||
def authorization(self) -> str: | ||
"""Get the value to use in the authorization header.""" | ||
return f"Bearer {self.access_token}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
# -*- coding: utf-8 -*- | ||
"""Authentication via API key and API secret.""" | ||
import typing as t | ||
|
||
from ..api.api_endpoints import ApiEndpoint | ||
from ..api.json_api.account import LoginRequest, LoginResponse | ||
from ..http import Http | ||
from .models import Mixins | ||
|
||
|
||
class Credentials(Mixins): | ||
"""Authentication method using username and password credentials.""" | ||
|
||
def __init__( | ||
self, | ||
http: Http, | ||
username: t.Optional[str] = None, | ||
password: t.Optional[str] = None, | ||
**kwargs, | ||
): | ||
"""Authenticate using username and password. | ||
Args: | ||
http (Http): HTTP client to use to send requests | ||
username (t.Optional[str], optional): Axonius User Name | ||
password (t.Optional[str], optional): Axonius Password | ||
prompt (bool, optional): Prompt for credentials that are not non-empty strings | ||
""" | ||
creds: LoginRequest = LoginRequest(user_name=username, password=password, eula_agreed=True) | ||
super().__init__(http=http, creds=creds, **kwargs) | ||
|
||
def login(self): | ||
"""Login to API.""" | ||
if not self.is_logged_in: | ||
self._creds.check_credentials() | ||
self._login_response: LoginResponse = self._login(request_obj=self._creds) | ||
headers: dict = {"authorization": self._login_response.authorization} | ||
self._api_keys: dict = self._get_api_keys(headers=headers) | ||
self.http.session.headers["api-key"] = self._api_keys["api_key"] | ||
self.http.session.headers["api-secret"] = self._api_keys["api_secret"] | ||
self._validate() | ||
self._logged_in = True | ||
self.LOG.debug(f"Successfully logged in using {self._cred_fields}") | ||
|
||
def logout(self): | ||
"""Logout from API.""" | ||
super().logout() | ||
|
||
def _login(self, request_obj: LoginRequest) -> LoginResponse: | ||
"""Direct API method to issue a login request. | ||
Args: | ||
request_obj (LoginRequest): Request object to send | ||
Returns: | ||
LoginResponse: Response object received | ||
""" | ||
endpoint: ApiEndpoint = self.endpoints.login | ||
response: LoginResponse = endpoint.perform_request(http=self.http, request_obj=request_obj) | ||
return response | ||
|
||
@property | ||
def _cred_fields(self) -> t.List[str]: | ||
"""Credential fields used by this auth model.""" | ||
return [f"username={self._creds.user_name!r}", "password"] | ||
|
||
def _logout(self): | ||
"""Logout from API.""" | ||
self._logged_in = False | ||
self.http.session.headers = {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.