diff --git a/axonius_api_client/api/api_endpoint.py b/axonius_api_client/api/api_endpoint.py index ea7f7de0..a597fe14 100644 --- a/axonius_api_client/api/api_endpoint.py +++ b/axonius_api_client/api/api_endpoint.py @@ -110,15 +110,11 @@ def __post_init__(self): def str_properties(self) -> t.List[str]: """Get the properties for this endpoint as a list of strs.""" return [ - f"method={self.method!r}", - f"path={self.path!r}", + f"method={self.method!r}, path={self.path!r}", f"request_schema={get_cls_path(self.request_schema_cls)}", f"request_model={get_cls_path(self.request_model_cls)}", f"response_schema={get_cls_path(self.response_schema_cls)}", f"response_model={get_cls_path(self.response_model_cls)}", - f"request_as_none={self.request_as_none}", - f"response_as_text={self.response_as_text}", - f"http_args_required={self.http_args_required}", ] def perform_request( @@ -253,6 +249,10 @@ def handle_response( data = self.load_response( http=http, response=response, **combo_dicts(kwargs, data=data) ) + try: + data.RESPONSE = response + except Exception: + pass return data def get_response_json(self, response: requests.Response) -> JSON_TYPES: diff --git a/axonius_api_client/api/api_endpoints.py b/axonius_api_client/api/api_endpoints.py index cb67e6e0..5395548e 100644 --- a/axonius_api_client/api/api_endpoints.py +++ b/axonius_api_client/api/api_endpoints.py @@ -1290,6 +1290,31 @@ class DataScopes(ApiEndpointGroup): ) +@dataclasses.dataclass(eq=True, frozen=True, repr=False) +class Account(ApiEndpointGroup): + """Pass.""" + + login: ApiEndpoint = ApiEndpoint( + method="post", + path="api/login", + request_schema_cls=json_api.account.LoginRequestSchema, + request_model_cls=json_api.account.LoginRequest, + response_schema_cls=json_api.account.LoginResponseSchema, + response_model_cls=json_api.account.LoginResponse, + ) + + get_api_keys: ApiEndpoint = ApiEndpoint( + method="get", + path="api/settings/api_key", + request_schema_cls=None, + request_model_cls=None, + response_schema_cls=None, + response_model_cls=None, + ) + + validate: ApiEndpoint = SystemSettings.get_constants + + @dataclasses.dataclass(eq=True, frozen=True, repr=False) class ApiEndpoints(BaseData): """Pass.""" @@ -1313,6 +1338,7 @@ class ApiEndpoints(BaseData): dashboard_spaces: ApiEndpointGroup = DashboardSpaces() folders_queries: ApiEndpointGroup = FoldersQueries() folders_enforcements: ApiEndpointGroup = FoldersEnforcements() + account: ApiEndpointGroup = Account() @classmethod def get_groups(cls) -> Dict[str, ApiEndpointGroup]: diff --git a/axonius_api_client/api/json_api/__init__.py b/axonius_api_client/api/json_api/__init__.py index 45ad0a10..9ee64a84 100644 --- a/axonius_api_client/api/json_api/__init__.py +++ b/axonius_api_client/api/json_api/__init__.py @@ -2,6 +2,7 @@ """Models for API requests & responses.""" from . import ( + account, adapters, assets, audit_logs, @@ -57,4 +58,5 @@ "dashboard_spaces", "spaces_export", "folders", + "account", ) diff --git a/axonius_api_client/api/json_api/account.py b/axonius_api_client/api/json_api/account.py new file mode 100644 index 00000000..7e1b1496 --- /dev/null +++ b/axonius_api_client/api/json_api/account.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +"""Models for API requests & responses.""" +import dataclasses +import typing as t + +import marshmallow + +from ...exceptions import AuthError +from .base import BaseModel, BaseSchemaJson +from .custom_fields import SchemaBool +from .generic import Metadata, MetadataSchema + + +class LoginRequestSchema(BaseSchemaJson): + """Schema for issuing a login request.""" + + user_name = marshmallow.fields.Str( + allow_none=True, + dump_default=None, + load_default=None, + description="Axonius User Name", + ) + password = marshmallow.fields.Str( + allow_none=True, + dump_default=None, + load_default=None, + description="Axonius Password", + ) + saml_token = marshmallow.fields.Str( + allow_none=True, + dump_default=None, + load_default=None, + description="SAML token from 2FA negotiation", + ) + remember_me = SchemaBool( + load_default=False, dump_default=False, description="Used for browser controls" + ) + eula_agreed = SchemaBool( + load_default=False, dump_default=False, description="EULA has been agreed to by user" + ) + + class Meta: + """Pass.""" + + type_ = "login_schema" + + @staticmethod + def get_model_cls() -> t.Optional[type]: + """Pass.""" + return LoginRequest + + +LOGIN_REQUEST_SCHEMA = LoginRequestSchema() + + +@dataclasses.dataclass +class LoginRequest(BaseModel): + """Model for issuing a login request.""" + + user_name: t.Optional[str] = None + password: t.Optional[str] = None + saml_token: t.Optional[str] = None + remember_me: bool = False + eula_agreed: bool = False + + @staticmethod + def get_schema_cls() -> t.Optional[type]: + """Pass.""" + return LoginRequestSchema + + def _check_credential(self, attr: str) -> str: + """Check that a credential is a non-empty string.""" + value: t.Any = getattr(self, attr) + + if isinstance(value, str) and value.strip(): + value = value.strip() + setattr(self, attr, value) + return value + + field: marshmallow.Field = LOGIN_REQUEST_SCHEMA.declared_fields[attr] + description: str = field.metadata.get("description", f"{attr}") + msgs: t.List[str] = [ + f"Value provided for {description} is not a non-empty string" + f"Provided type {type(value)}, value: {value!r}" + ] + raise AuthError(msgs) + + def check_credentials(self): + """Check that username and password are not empty.""" + self._check_credential(attr="user_name") + self._check_credential(attr="password") + + +class LoginResponseSchema(MetadataSchema): + """Schema for receiving a login response.""" + + class Meta: + """Pass.""" + + type_ = "metadata_schema" + + @staticmethod + def get_model_cls() -> t.Optional[type]: + """Pass.""" + return LoginResponse + + +@dataclasses.dataclass +class LoginResponse(Metadata): + """Model for receiving a login response.""" + + document_meta: t.Optional[dict] = dataclasses.field(default_factory=dict) + + @staticmethod + def get_schema_cls() -> t.Optional[type]: + """Pass.""" + return LoginResponseSchema + + @property + def access_token(self) -> str: + """Get the Access token for use in auth header.""" + return self.document_meta["access_token"] + + @property + def refresh_token(self) -> str: + """Get the Refresh token for use in auth header.""" + return self.document_meta["refresh_token"] + + @property + def authorization(self) -> str: + """Get the value to use in the authorization header.""" + return f"Bearer {self.access_token}" diff --git a/axonius_api_client/api/system/signup.py b/axonius_api_client/api/system/signup.py index d98aea66..fa0f6947 100644 --- a/axonius_api_client/api/system/signup.py +++ b/axonius_api_client/api/system/signup.py @@ -143,4 +143,4 @@ def __init__(self, url, **kwargs): log_level = kwargs.get("log_level", LOG_LEVEL_API) self.LOG = get_obj_log(obj=self, level=log_level) kwargs.setdefault("certwarn", False) - self.http = Http(url=url, **kwargs) + self.HTTP = self.http = Http(url=url, **kwargs) diff --git a/axonius_api_client/auth/__init__.py b/axonius_api_client/auth/__init__.py index 27eaeda8..bc724ce7 100644 --- a/axonius_api_client/auth/__init__.py +++ b/axonius_api_client/auth/__init__.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- """Authenticating with Axonius.""" -from . import api_key, models +from . import api_key, credentials, models from .api_key import ApiKey +from .credentials import Credentials from .models import Mixins, Model __all__ = ( @@ -10,4 +11,6 @@ "Model", "Mixins", "ApiKey", + "credentials", + "Credentials", ) diff --git a/axonius_api_client/auth/credentials.py b/axonius_api_client/auth/credentials.py new file mode 100644 index 00000000..e94ab6d0 --- /dev/null +++ b/axonius_api_client/auth/credentials.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +"""Authentication via API key and API secret.""" +import typing as t + +from ..api.api_endpoints import ApiEndpoint +from ..api.json_api.account import LoginRequest, LoginResponse +from ..http import Http +from .models import Mixins + + +class Credentials(Mixins): + """Authentication method using username and password credentials.""" + + def __init__( + self, + http: Http, + username: t.Optional[str] = None, + password: t.Optional[str] = None, + **kwargs, + ): + """Authenticate using username and password. + + Args: + http (Http): HTTP client to use to send requests + username (t.Optional[str], optional): Axonius User Name + password (t.Optional[str], optional): Axonius Password + prompt (bool, optional): Prompt for credentials that are not non-empty strings + """ + creds: LoginRequest = LoginRequest(user_name=username, password=password, eula_agreed=True) + super().__init__(http=http, creds=creds, **kwargs) + + def login(self): + """Login to API.""" + if not self.is_logged_in: + self._creds.check_credentials() + self._login_response: LoginResponse = self._login(request_obj=self._creds) + headers: dict = {"authorization": self._login_response.authorization} + self._api_keys: dict = self._get_api_keys(headers=headers) + self.http.session.headers["api-key"] = self._api_keys["api_key"] + self.http.session.headers["api-secret"] = self._api_keys["api_secret"] + self._validate() + self._logged_in = True + self.LOG.debug(f"Successfully logged in using {self._cred_fields}") + + def logout(self): + """Logout from API.""" + super().logout() + + def _login(self, request_obj: LoginRequest) -> LoginResponse: + """Direct API method to issue a login request. + + Args: + request_obj (LoginRequest): Request object to send + + Returns: + LoginResponse: Response object received + """ + endpoint: ApiEndpoint = self.endpoints.login + response: LoginResponse = endpoint.perform_request(http=self.http, request_obj=request_obj) + return response + + @property + def _cred_fields(self) -> t.List[str]: + """Credential fields used by this auth model.""" + return [f"username={self._creds.user_name!r}", "password"] + + def _logout(self): + """Logout from API.""" + self._logged_in = False + self.http.session.headers = {} diff --git a/axonius_api_client/auth/models.py b/axonius_api_client/auth/models.py index 5209aced..9df17b7c 100644 --- a/axonius_api_client/auth/models.py +++ b/axonius_api_client/auth/models.py @@ -2,6 +2,7 @@ """Authentication models.""" import abc import logging +import typing as t from ..api.api_endpoint import ApiEndpoint from ..api.api_endpoints import ApiEndpoints @@ -46,10 +47,7 @@ class Mixins(Model): _logged_in: bool = False """Attribute checked by :meth:`is_logged_in`.""" - _validate_endpoint: ApiEndpoint = ApiEndpoints.system_settings.get_constants - """Endpoint to use to validate logins.""" - - def __init__(self, http: Http, creds: dict, **kwargs): + def __init__(self, http: Http, creds: t.Any, **kwargs): """Mixins for Auth Models. Args: @@ -64,7 +62,7 @@ def __init__(self, http: Http, creds: dict, **kwargs): self._http: Http = http """HTTP Client.""" - self._creds: dict = creds + self._creds: t.Any = creds """Credential store.""" self._check_http_lock() @@ -97,6 +95,25 @@ def is_logged_in(self) -> bool: """Check if login has been called.""" return self._logged_in + def get_api_keys(self) -> dict: + """Get the API key and secret for the current user.""" + return self._get_api_keys() + + @property + def endpoints(self) -> ApiEndpoints: + """Get the endpoint group for this module.""" + return ApiEndpoints.account + + def _validate(self): + """Validate credentials.""" + try: + self.endpoints.validate.perform_request(http=self.http) + except Exception: + self._logged_in = False + raise + else: + self._logged_in = True + def __str__(self) -> str: """Show object info.""" bits = [f"url={self.http.url!r}", f"is_logged_in={self.is_logged_in}"] @@ -119,16 +136,12 @@ def _check_http_lock(self): if auth_lock: raise AuthError(f"{self.http} already being used by {auth_lock}") + def _get_api_keys(self, **http_args) -> dict: + """Direct API method to get the API keys for the current user.""" + endpoint: ApiEndpoint = self.endpoints.get_api_keys + response: dict = endpoint.perform_request(http=self.http, http_args=http_args) + return response + def _set_http_lock(self): """Set HTTP Client auth lock.""" self._http._auth_lock = self - - def _validate(self): - """Validate credentials.""" - try: - self._validate_endpoint.perform_request(http=self.http) - except Exception: - self._logged_in = False - raise - else: - self._logged_in = True diff --git a/axonius_api_client/cli/__init__.py b/axonius_api_client/cli/__init__.py index 38b85055..46ae8e53 100644 --- a/axonius_api_client/cli/__init__.py +++ b/axonius_api_client/cli/__init__.py @@ -27,6 +27,7 @@ from ..setup_env import DEFAULT_ENV_FILE from . import ( context, + grp_account, grp_adapters, grp_assets, grp_certs, @@ -371,6 +372,16 @@ type=click.INT, show_default=True, ) +@click.option( + "--credentials/--keys", + "-creds/-keys", + "credentials", + default=False, + help="Treat key as Username and secret as password", + is_flag=True, + show_envvar=True, + show_default=True, +) @click.version_option(version.__version__) @context.pass_context @click.pass_context @@ -397,3 +408,4 @@ def cli(click_ctx, ctx, quiet, **kwargs): cli.add_command(grp_enforcements.enforcements) cli.add_command(grp_spaces.spaces) cli.add_command(grp_folders.folders) +cli.add_command(grp_account.account) diff --git a/axonius_api_client/cli/grp_account/__init__.py b/axonius_api_client/cli/grp_account/__init__.py new file mode 100644 index 00000000..23208196 --- /dev/null +++ b/axonius_api_client/cli/grp_account/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +"""Command line interface for Axonius API Client.""" +import click + +from ..context import AliasedGroup, load_cmds +from ..grp_tools import cmd_signup, cmd_use_token_reset_token, cmd_write_config + + +@click.group(cls=AliasedGroup) +def account(): + """Group: Account commands.""" + + +load_cmds(path=__file__, package=__package__, group=account) +account.add_command(cmd_write_config.cmd) +account.add_command(cmd_signup.cmd) +account.add_command(cmd_use_token_reset_token.cmd) diff --git a/axonius_api_client/cli/grp_account/cmd_get_api_keys.py b/axonius_api_client/cli/grp_account/cmd_get_api_keys.py new file mode 100644 index 00000000..81cdc00b --- /dev/null +++ b/axonius_api_client/cli/grp_account/cmd_get_api_keys.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +"""Command line interface for Axonius API Client.""" + +from ..context import CONTEXT_SETTINGS, click +from ..grp_tools.grp_common import EXPORT_FORMATS +from ..grp_tools.grp_options import OPT_ENV, OPT_EXPORT +from ..options import AUTH, add_options + +OPTIONS = [*AUTH, OPT_EXPORT, OPT_ENV] + + +@click.command(name="get-api-keys", context_settings=CONTEXT_SETTINGS) +@add_options(OPTIONS) +@click.pass_context +def cmd(ctx, url, key, secret, export_format, env): + """Get the API keys for the current user.""" + client = ctx.obj.start_client(url=url, key=key, secret=secret) + + with ctx.obj.exc_wrap(wraperror=ctx.obj.wraperror): + data = client.api_keys + + click.secho(EXPORT_FORMATS[export_format](data=data, env=env, url=client.AUTH.http.url)) + ctx.exit(0) diff --git a/axonius_api_client/cli/grp_assets/grp_saved_query/cmd_add.py b/axonius_api_client/cli/grp_assets/grp_saved_query/cmd_add.py index 6f64a8aa..e0eb338b 100644 --- a/axonius_api_client/cli/grp_assets/grp_saved_query/cmd_add.py +++ b/axonius_api_client/cli/grp_assets/grp_saved_query/cmd_add.py @@ -135,7 +135,7 @@ def cmd(ctx, url, key, secret, export_format, table_format, wizard_content, **kw with ctx.obj.exc_wrap(wraperror=ctx.obj.wraperror): kwargs = load_wiz(apiobj=apiobj, wizard_content=wizard_content, exprs=True, kwargs=kwargs) - data = apiobj.saved_query.add(as_dataclass=True, echo=True, **kwargs) + data = apiobj.saved_query.add(as_dataclass=True, **kwargs) ctx.obj.echo_ok(f"Successfully created saved query: {data.name}") click.secho(EXPORT_FORMATS[export_format](data=data, table_format=table_format)) diff --git a/axonius_api_client/cli/grp_assets/grp_saved_query/cmd_copy.py b/axonius_api_client/cli/grp_assets/grp_saved_query/cmd_copy.py index da84c099..4e612936 100644 --- a/axonius_api_client/cli/grp_assets/grp_saved_query/cmd_copy.py +++ b/axonius_api_client/cli/grp_assets/grp_saved_query/cmd_copy.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """Command line interface for Axonius API Client.""" from ...context import CONTEXT_SETTINGS, click -from ...grp_folders.grp_options import OPT_CREATE_FOLDER_REQ, OPT_ECHO, OPT_FOLDER +from ...grp_folders.grp_options import OPTS_OBJECT_CREATE from ...options import AUTH, add_options from .grp_common import EXPORT_FORMATS, OPTS_EXPORT @@ -59,9 +59,7 @@ show_default=True, required=False, ), - OPT_ECHO, - OPT_FOLDER, - OPT_CREATE_FOLDER_REQ, + *OPTS_OBJECT_CREATE, ] @@ -76,7 +74,7 @@ def cmd(ctx, url, key, secret, export_format, table_format, sq, **kwargs): apiobj = getattr(client, p_grp) with ctx.obj.exc_wrap(wraperror=ctx.obj.wraperror): - data = apiobj.saved_query.copy(sq=sq, as_dataclass=True, echo=True, **kwargs) + data = apiobj.saved_query.copy(sq=sq, as_dataclass=True, **kwargs) ctx.obj.echo_ok(f"Successfully copied Saved Query {sq!r} to {data.name!r}") click.secho(EXPORT_FORMATS[export_format](data=data, table_format=table_format)) diff --git a/axonius_api_client/cli/grp_folders/grp_options.py b/axonius_api_client/cli/grp_folders/grp_options.py index 22c61a33..8cec23ba 100644 --- a/axonius_api_client/cli/grp_folders/grp_options.py +++ b/axonius_api_client/cli/grp_folders/grp_options.py @@ -5,36 +5,6 @@ from ..options import TABLE_FMT from .grp_common import SEARCH_EXPORT_DEFAULT, SEARCH_EXPORTS -# OPT_FOLDER = click.option( -# "--folder", -# "-F", -# "folder", -# help="Path of folder to store object in", -# required=False, -# show_envvar=True, -# show_default=True, -# ) -# OPT_REQ_FOLDER = click.option( -# "--folder", -# "-F", -# "folder", -# help="Path of folder to store object in", -# required=False, -# show_envvar=True, -# show_default=True, -# ) -# OPT_FOLDER_CREATE = click.option( -# "--create/--no-create", -# "-c/-nc", -# "create", -# default=FolderDefaults.create_action, -# help="Create --folder if not found.", -# show_envvar=True, -# show_default=True, -# ) -# OPTS_FOLDER = [OPT_FOLDER, OPT_FOLDER_CREATE] -# OPTS_REQ_FOLDER = [OPT_REQ_FOLDER, OPT_FOLDER_CREATE] - OPT_FOLDER = click.option( "--folder", "-f", @@ -451,13 +421,14 @@ OPT_FOLDER_DELETE, ] -# update-folder +# object.update-folder OPTS_UPDATE_FOLDER = [ OPT_ECHO, OPT_CREATE_FOLDER_REQ, OPT_FOLDER_UPDATE, ] +# object.copy/add/create/etc OPTS_OBJECT_CREATE = [ OPT_ECHO, OPT_CREATE_FOLDER_REQ, diff --git a/axonius_api_client/cli/grp_tools/cmd_signup.py b/axonius_api_client/cli/grp_tools/cmd_signup.py index 1e78a40e..3d37e7df 100644 --- a/axonius_api_client/cli/grp_tools/cmd_signup.py +++ b/axonius_api_client/cli/grp_tools/cmd_signup.py @@ -1,52 +1,11 @@ # -*- coding: utf-8 -*- """Command line interface for Axonius API Client.""" -import datetime -import typing as t - import click from ...api import Signup -from ...tools import json_dump from ..options import add_options - - -def get_fb(obj: dict, keys: t.List[str]) -> str: - """Pass.""" - for key in keys: - if key not in obj: - continue - value = obj.get(key, None) - if value: - return value - raise Exception(f"Unable to find any keys {keys} in dict {obj}") - - -def export_str(data): - """Pass.""" - date: datetime.datetime = datetime.datetime.utcnow() - ax_url: str = get_fb(obj=data, keys=["url"]) - ax_secret: str = get_fb(obj=data, keys=["ax_secret", "api_secret"]) - ax_key: str = get_fb(obj=data, keys=["ax_key", "api_key"]) - ax_banner: str = f"signup on {date}" - - lines = [ - f'AX_URL="{ax_url}"', - f'AX_KEY="{ax_key}"', - f'AX_SECRET="{ax_secret}"', - f'AX_BANNER="{ax_banner}"', - ] - return "\n".join(lines) - - -def export_json(data): - """Pass.""" - return json_dump(data) - - -EXPORT_FORMATS: dict = { - "json": export_json, - "str": export_str, -} +from .grp_common import EXPORT_FORMATS +from .grp_options import OPT_ENV, OPT_EXPORT URL = click.option( "--url", @@ -60,16 +19,6 @@ def export_json(data): show_default=True, ) -EXPORT = click.option( - "--export-format", - "-xf", - "export_format", - type=click.Choice(list(EXPORT_FORMATS)), - help="Format of to export data in", - default="str", - show_envvar=True, - show_default=True, -) PASSWORD = click.option( "--password", "-p", @@ -103,21 +52,22 @@ def export_json(data): show_default=True, ) -OPTIONS = [URL, PASSWORD, COMPANY, CONTACT, EXPORT] +OPTIONS = [URL, PASSWORD, COMPANY, CONTACT, OPT_EXPORT, OPT_ENV] @click.command(name="signup") @add_options(OPTIONS) @click.pass_context -def cmd(ctx, url, password, company_name, contact_email, export_format): +def cmd(ctx, url, password, company_name, contact_email, export_format, env): """Perform the initial signup to an instance.""" entry = Signup(url=url) with ctx.obj.exc_wrap(wraperror=ctx.obj.wraperror): data = entry.signup( password=password, company_name=company_name, contact_email=contact_email ) - data["url"] = url - click.secho(EXPORT_FORMATS[export_format](data=data)) + click.secho( + EXPORT_FORMATS[export_format](data=data, signup=True, env=env, url=entry.auth.http.url) + ) ctx.obj.echo_ok("Signup completed successfully!") ctx.exit(0) diff --git a/axonius_api_client/cli/grp_tools/cmd_use_token_reset_token.py b/axonius_api_client/cli/grp_tools/cmd_use_token_reset_token.py index 37fda6f3..adb43ffd 100644 --- a/axonius_api_client/cli/grp_tools/cmd_use_token_reset_token.py +++ b/axonius_api_client/cli/grp_tools/cmd_use_token_reset_token.py @@ -36,8 +36,8 @@ @click.pass_context def cmd(ctx, url, token, password): """Use a password reset token.""" - entry = Signup(url=url) + client = Signup(url=url) with ctx.obj.exc_wrap(wraperror=ctx.obj.wraperror): - name = entry.use_password_reset_token(token=token, password=password) + name = client.use_password_reset_token(token=token, password=password) ctx.obj.echo_ok(f"Password successfully reset for user {name!r}") diff --git a/axonius_api_client/cli/grp_tools/cmd_write_config.py b/axonius_api_client/cli/grp_tools/cmd_write_config.py index ac172337..53a3be73 100644 --- a/axonius_api_client/cli/grp_tools/cmd_write_config.py +++ b/axonius_api_client/cli/grp_tools/cmd_write_config.py @@ -1,38 +1,21 @@ # -*- coding: utf-8 -*- """Command line interface for Axonius API Client.""" -import os - import click -import dotenv -from ...tools import get_path +from ..context import CONTEXT_SETTINGS from ..options import AUTH, add_options +from .grp_common import export_env +from .grp_options import OPT_ENV +OPTIONS = [*AUTH, OPT_ENV] -@click.command(name="write-config") -@add_options(AUTH) -@click.pass_context -def cmd(ctx, url, key, secret): - """Create/Update a '.env' file with url, key, and secret. - - File is created in the current working directory. - """ - ctx.obj.start_client(url=url, key=key, secret=secret) - - cwd = os.getcwd() - path = get_path(cwd) / ".env" - path_str = f"{path}" - - if not path.is_file(): - click.secho(message=f"Creating file {path_str!r}", err=True, fg="green") - path.touch() - path.chmod(0o600) - else: - click.secho(message=f"Updating file {path_str!r}", err=True, fg="green") - click.secho( - message=f"Setting AX_URL, AX_KEY, and AX_SECRET in {path_str!r}", err=True, fg="green" - ) - dotenv.set_key(dotenv_path=path_str, key_to_set="AX_URL", value_to_set=url) - dotenv.set_key(dotenv_path=path_str, key_to_set="AX_KEY", value_to_set=key) - dotenv.set_key(dotenv_path=path_str, key_to_set="AX_SECRET", value_to_set=secret) +@click.command(name="write-config", context_settings=CONTEXT_SETTINGS) +@add_options(OPTIONS) +@click.pass_context +def cmd(ctx, url, key, secret, env): + """Create/Update a '.env' file with url, key, and secret.""" + client = ctx.obj.start_client(url=url, key=key, secret=secret) + data = {"api_secret": secret, "api_key": key} + export_env(data=data, env=env, url=client.AUTH.http.url) + ctx.exit(0) diff --git a/axonius_api_client/cli/grp_tools/grp_common.py b/axonius_api_client/cli/grp_tools/grp_common.py new file mode 100644 index 00000000..c40df0d8 --- /dev/null +++ b/axonius_api_client/cli/grp_tools/grp_common.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +"""Command line interface for Axonius API Client.""" +import datetime +import os +import typing as t + +import click +import dotenv + +from ...tools import get_path, json_dump + +AX_ENV: str = str(get_path(os.environ.get("AX_ENV", "") or ".env")) + + +def multi_get(obj: dict, keys: t.List[str]) -> str: + """Pass.""" + for key in keys: + if obj.get(key): + return obj[key] + raise Exception(f"Unable to find any keys {keys} in dict {obj}") + + +def export_str(data: dict, url: str, signup: bool = False, **kwargs) -> str: + """Pass.""" + ax_secret: str = multi_get(obj=data, keys=["ax_secret", "api_secret"]) + ax_key: str = multi_get(obj=data, keys=["ax_key", "api_key"]) + lines = [ + f'AX_URL="{url}"', + f'AX_KEY="{ax_key}"', + f'AX_SECRET="{ax_secret}"', + ] + if signup: + lines.append(f'AX_BANNER="signup on {datetime.datetime.utcnow()}"') + return "\n".join(lines) + + +def export_env( + data: dict, url: str, signup: bool = False, env: t.Optional[str] = AX_ENV, **kwargs +) -> str: + """Pass.""" + ax_secret: str = multi_get(obj=data, keys=["ax_secret", "api_secret"]) + ax_key: str = multi_get(obj=data, keys=["ax_key", "api_key"]) + + env = get_path(env) + if not env.is_file(): + click.secho(message=f"Creating file {str(env)!r}", err=True, fg="green") + env.touch() + env.chmod(0o600) + else: + click.secho(message=f"Updating file {str(env)!r}", err=True, fg="green") + + click.secho( + message=f"Setting AX_URL, AX_KEY, and AX_SECRET in {str(env)!r}", + err=True, + fg="green", + ) + dotenv.set_key(dotenv_path=str(env), key_to_set="AX_URL", value_to_set=url) + dotenv.set_key(dotenv_path=str(env), key_to_set="AX_KEY", value_to_set=ax_key) + dotenv.set_key(dotenv_path=str(env), key_to_set="AX_SECRET", value_to_set=ax_secret) + return "" + + +def export_json(data: dict, url: str, **kwargs): + """Pass.""" + data["url"] = url + return json_dump(data) + + +EXPORT_FORMATS: dict = { + "json": export_json, + "str": export_str, + "env": export_env, +} diff --git a/axonius_api_client/cli/grp_tools/grp_options.py b/axonius_api_client/cli/grp_tools/grp_options.py new file mode 100644 index 00000000..6a4835e9 --- /dev/null +++ b/axonius_api_client/cli/grp_tools/grp_options.py @@ -0,0 +1,26 @@ +# -*- coding: utf-8 -*- +"""Command line interface for Axonius API Client.""" +import click + +from .grp_common import AX_ENV, EXPORT_FORMATS + +OPT_EXPORT = click.option( + "--export-format", + "-xf", + "export_format", + type=click.Choice(list(EXPORT_FORMATS)), + help="Format of to export data in", + default="str", + show_envvar=True, + show_default=True, +) + +OPT_ENV = click.option( + "--env", + "-e", + "env", + default=AX_ENV, + help="Path to .env file when --export-format==env", + show_envvar=True, + show_default=True, +) diff --git a/axonius_api_client/connect.py b/axonius_api_client/connect.py index be88d920..ba609ad0 100644 --- a/axonius_api_client/connect.py +++ b/axonius_api_client/connect.py @@ -30,7 +30,7 @@ Users, Vulnerabilities, ) -from .auth import ApiKey +from .auth import ApiKey, Credentials from .constants.api import TIMEOUT_CONNECT, TIMEOUT_RESPONSE from .constants.logs import ( LOG_FILE_MAX_FILES, @@ -117,6 +117,7 @@ def __init__( proxy: Optional[str] = None, headers: Optional[T_Headers] = None, cookies: Optional[T_Cookies] = None, + credentials: bool = False, **kwargs, ): """Easy all-in-one connection handler. @@ -295,14 +296,18 @@ def __init__( } """arguments to use for creating :attr:`HTTP`""" - self.AUTH_ARGS: dict = {"key": key, "secret": secret, "log_level": self.LOG_LEVEL_AUTH} - """arguments to use for creating :attr:`AUTH`""" - self.HTTP = Http(**self.HTTP_ARGS) """:obj:`axonius_api_client.http.Http` client to use for :attr:`AUTH`""" + self.AUTH_ARGS: dict = {"log_level": self.LOG_LEVEL_AUTH} - self.AUTH = ApiKey(http=self.HTTP, **self.AUTH_ARGS) - """:obj:`axonius_api_client.auth.api_key.ApiKey` auth method to use for all API models""" + if credentials: + self.AUTH_ARGS.update({"username": key, "password": secret}) + self.AUTH = Credentials(http=self.HTTP, **self.AUTH_ARGS) + """:obj:`Credentials` auth method to use for all API models""" + else: + self.AUTH_ARGS.update({"key": key, "secret": secret}) + self.AUTH = ApiKey(http=self.HTTP, **self.AUTH_ARGS) + """:obj:`ApiKey` auth method to use for all API models""" self.API_ARGS: dict = {"auth": self.AUTH, "log_level": self.LOG_LEVEL_API} """arguments to use for all API models""" @@ -554,6 +559,11 @@ def folders(self) -> Folders: self._folders = Folders(**self.API_ARGS) return self._folders + @property + def api_keys(self) -> dict: + """Get the API keys for the current user.""" + return self.AUTH.get_api_keys() + @classmethod def _get_exc_reason(cls, exc: Exception) -> str: """Trim exceptions down to a more user friendly display. diff --git a/axonius_api_client/http.py b/axonius_api_client/http.py index ddab64af..6ca2f8a9 100644 --- a/axonius_api_client/http.py +++ b/axonius_api_client/http.py @@ -516,8 +516,6 @@ def log_body(self, body: Any, body_type: str, src=None) -> str: body: content to log body_type: 'request' or 'response' """ - if not isinstance(body, str): - body = "" body = json_log(obj=coerce_str(value=body), trim=self.LOG_BODY_MAX_LEN) return f"{body_type} BODY:\n{body}" diff --git a/axonius_api_client/logs.py b/axonius_api_client/logs.py index d9496408..59126485 100644 --- a/axonius_api_client/logs.py +++ b/axonius_api_client/logs.py @@ -3,8 +3,10 @@ import logging import logging.handlers import pathlib +import re import sys import time +import typing as t from typing import Callable, Dict, List, Optional, Tuple, Union from . import LOG @@ -39,6 +41,24 @@ ) +class HideFormatter(logging.Formatter): + """Hide the rest of the line for any lines against :attr:`HIDE_REGEX`.""" + + HIDE_ENABLED: bool = True + """Enable hiding of matches to HIDE_REGEX.""" + HIDE_REGEX: t.Pattern = re.compile(r"(password|secret).*", re.I) + """Pattern of sensitive info to hide.""" + HIDE_REPLACE: str = r"\1 ...REST OF LINE HIDDEN..." + """Value to replace matches to HIDE_REGEX with.""" + + def format(self, record): + """Pass.""" + record = super().format(record) + if self.HIDE_ENABLED: + record = self.HIDE_REGEX.sub(self.HIDE_REPLACE, record) + return record + + def get_echoer(level: Union[int, str]) -> Callable: """Pass.""" level_str = str_level(level=level) @@ -59,11 +79,13 @@ def get_log_method(obj: logging.Logger, level: Optional[str] = None) -> Callable def gmtime(): """Set the logging system to use GMT for time strings.""" logging.Formatter.converter = time.gmtime + HideFormatter.converter = time.gmtime def localtime(): """Set the logging system to use local time for time strings.""" logging.Formatter.converter = time.localtime + HideFormatter.converter = time.localtime def get_obj_log(obj: object, level: Optional[Union[int, str]] = None, **kwargs) -> logging.Logger: @@ -261,7 +283,7 @@ def add_handler( handler = htype(**kwargs) handler.name = hname set_log_level(obj=handler, level=level) - handler.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt)) + handler.setFormatter(HideFormatter(fmt=fmt, datefmt=datefmt)) obj.addHandler(handler) return handler diff --git a/axonius_api_client/tests/tests_api/tests_folders/test_folders.py b/axonius_api_client/tests/tests_api/tests_folders/test_folders.py index 2c048dee..d7436cb4 100644 --- a/axonius_api_client/tests/tests_api/tests_folders/test_folders.py +++ b/axonius_api_client/tests/tests_api/tests_folders/test_folders.py @@ -3,7 +3,6 @@ import datetime import pytest - from axonius_api_client.api.json_api import folders from axonius_api_client.data import BaseEnum from axonius_api_client.exceptions import ( # SearchZeroObjectsError, @@ -1068,13 +1067,14 @@ class TestFolderQueries(FolderBase, FolderBaseQueries): class TestFoldersQueries(FoldersBase, FolderBaseQueries): - def test_property_path_archive(self, apiobj, jsonapi_module): - root = apiobj.get() - - archive_name: str = str(root.get_enum_names().archive) - assert isinstance(root.path_archive, jsonapi_module.FolderModel) - assert root.path_archive.name == archive_name - assert root.path_archive.depth == 1 + # NEXT: not in release yet + # def test_property_path_archive(self, apiobj, jsonapi_module): + # root = apiobj.get() + + # archive_name: str = str(root.get_enum_names().archive) + # assert isinstance(root.path_archive, jsonapi_module.FolderModel) + # assert root.path_archive.name == archive_name + # assert root.path_archive.depth == 1 def test_property_path_predefined(self, apiobj, jsonapi_module): root = apiobj.get() diff --git a/axonius_api_client/tools.py b/axonius_api_client/tools.py index 65c3c5c2..ee7bcd82 100644 --- a/axonius_api_client/tools.py +++ b/axonius_api_client/tools.py @@ -514,8 +514,6 @@ def json_load( f"Unable to load JSON from supplied {tlens(obj)}", f"error: {exc}", ] - msgs = "\n".join(msgs) - LOG.exception(msgs) if error: nexc = ToolsError(msgs) nexc.obj = obj