Skip to content

Commit

Permalink
Bugfix/fix 2fa (#353)
Browse files Browse the repository at this point in the history
* init

* impl changes

* unused import

* wrap login_config response in Py42Response

* move login_config call to only after error

* add login_type to exception message

* more tests

* rename connection method to match sdk method

* changelog

* use .format until py42 removal merges

* use .format until py42 removal merges

* style

* refactor into client

* style

* rm old method and move docstring over

* loginconfig property docstring

* f-strings and style

* correct changelog method name

* Add REST doc link
  • Loading branch information
timabrmsn committed Aug 6, 2021
1 parent c513ef8 commit f53c10d
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 22 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ how a consumer would use the library (e.g. adding unit tests, updating documenta

- New method `sdk.alerts.get_all_alert_details()` as a helper to make getting alerts with details easier (combines `sdk.alerts.search_all_pages()` and `sdk.alerts.get_details()`).

- New method `sdk.loginconfig.get_for_user()` to identify if a user's login type is `LOCAL`, `LOCAL_2FA`, or `CLOUD_SSO`.

### Removed

- py42 no longer supports python 2.7 or python 3.5.

- Usage of `Py42MFARequiredError`. Use `sdk.loginconfig.get_for_user()` instead to check if a user is configured for two-factor authentication.

## 1.16.1 - 2021-07-20

### Fixed
Expand Down
1 change: 1 addition & 0 deletions src/py42/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
"securitydata",
"auditlogs",
"cases",
"loginconfig",
],
)
23 changes: 23 additions & 0 deletions src/py42/clients/loginconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from py42.response import Py42Response


class LoginConfigurationClient:
def __init__(self, connection):
self._connection = connection

def get_for_user(self, username):
"""Retrieves login configuration for a given username. Possible `loginType` values are
`LOCAL`, `LOCAL_2FA`, and `CLOUD_SSO`. If username does not exist the default
return value is `LOCAL_2FA`.
`REST Documentation: <https://console.us.code42.com/swagger/index.html?urls.primaryName=v3#/Feature/get>`__
Args:
username (str): Username to retrieve login configuration for.
Returns:
:class:`py42.response.Py42Response`
"""
uri = f"{self._connection.host_address}/c42api/v3/LoginConfiguration"
response = self._connection._session.get(uri, params={"username": username})
return Py42Response(response)
9 changes: 1 addition & 8 deletions src/py42/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import re

from py42.settings import debug


Expand Down Expand Up @@ -200,7 +198,7 @@ def __init__(self, exception, rule_id, source):


class Py42MFARequiredError(Py42UnauthorizedError):
"""An exception raised when a request requires multi-factor authentication"""
"""Deprecated: An exception raised when a request requires multi-factor authentication"""

def __init__(self, exception, message=None):
message = message or "User requires multi-factor authentication."
Expand Down Expand Up @@ -353,11 +351,6 @@ def raise_py42_error(raised_error):
if raised_error.response.status_code == 400:
raise Py42BadRequestError(raised_error)
elif raised_error.response.status_code == 401:
if raised_error.response.text and re.search(
"(TOTP_AUTH_CONFIGURATION_REQUIRED_FOR_USER|TIME_BASED_ONE_TIME_PASSWORD_REQUIRED)",
raised_error.response.text,
):
raise Py42MFARequiredError(raised_error)
raise Py42UnauthorizedError(raised_error)
elif raised_error.response.status_code == 403:
raise Py42ForbiddenError(raised_error)
Expand Down
25 changes: 24 additions & 1 deletion src/py42/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
from py42.clients.authority import AuthorityClient
from py42.clients.cases import CasesClient
from py42.clients.detectionlists import DetectionListsClient
from py42.clients.loginconfig import LoginConfigurationClient
from py42.clients.securitydata import SecurityDataClient
from py42.exceptions import Py42Error
from py42.exceptions import Py42UnauthorizedError
from py42.services import Services
from py42.services._auth import BearerAuth
from py42.services._auth import CustomJWTAuth
Expand Down Expand Up @@ -58,7 +61,15 @@ def from_local_account(host_address, username, password, totp=None):
client = SDKClient.from_local_account(host_address, username, password, totp)

# test credentials
client.users.get_current()
try:
client.users.get_current()
except Py42UnauthorizedError as err:
login_type = client.loginconfig.get_for_user(username)["loginType"]
if login_type == "CLOUD_SSO":
raise Py42Error("SSO users are not supported in `from_local_account()`.")
msg = f"SDK initialization failed, double-check username/password, and provide two-factor TOTP token if Multi-Factor Auth configured for your user. User LoginConfig: {login_type}"
err.args = (msg,)
raise
return client


Expand Down Expand Up @@ -133,6 +144,16 @@ def from_jwt_provider(cls, host_address, jwt_provider):

return cls(main_connection, custom_auth)

@property
def loginconfig(self):
"""A collection of methods related to getting information about the login configuration
of user accounts.
Returns:
:class:`py42.clients.loginconfig.LoginConfigurationClient.`
"""
return self._clients.loginconfig

@property
def serveradmin(self):
"""A collection of methods for getting server information for on-premise environments
Expand Down Expand Up @@ -347,6 +368,7 @@ def _init_clients(services, connection):
)
archive = ArchiveClient(archive_accessor_factory, services.archive)
auditlogs = AuditLogsClient(services.auditlogs)
loginconfig = LoginConfigurationClient(connection)
clients = Clients(
authority=authority,
detectionlists=detectionlists,
Expand All @@ -355,5 +377,6 @@ def _init_clients(services, connection):
archive=archive,
auditlogs=auditlogs,
cases=CasesClient(services.cases, services.casesfileevents),
loginconfig=loginconfig,
)
return clients
39 changes: 39 additions & 0 deletions tests/clients/test_loginconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pytest
from requests.sessions import Session

from py42.clients.loginconfig import LoginConfigurationClient
from py42.services._connection import Connection

HOST_ADDRESS = "example.com"


class TestLoginConfiguration:
@pytest.fixture
def mock_session(self, mocker):
mock_session = mocker.MagicMock(spec=Session)
mock_session.headers = {}
return mock_session

def test_get_for_user_calls_session_get_with_expected_uri_and_params(
self, mock_session
):
connection = Connection.from_host_address(HOST_ADDRESS, session=mock_session)
loginconfig = LoginConfigurationClient(connection)
loginconfig.get_for_user("test@example.com")
expected_uri = f"https://{HOST_ADDRESS}/c42api/v3/LoginConfiguration"
expected_params = {"username": "test@example.com"}
mock_session.get.assert_called_once_with(expected_uri, params=expected_params)

def test_get_for_user_does_not_use_py42_connection_get_method(
self, mocker, mock_session
):
"""Because the loginConfiguration endpoint is unauthenticated, we want to make
sure we don't force the Connection's C42RenewableAuth object to make any
authentication requests before making the loginConfig request.
"""
mock_get = mocker.patch("py42.services._connection.Connection.get")
connection = Connection.from_host_address(HOST_ADDRESS, session=mock_session)
loginconfig = LoginConfigurationClient(connection)
loginconfig.get_for_user("test@example.com")
assert mock_get.call_count == 0
assert mock_session.get.call_count == 1
27 changes: 27 additions & 0 deletions tests/sdk/test_sdk.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import pytest
from requests import Session
from tests.conftest import create_mock_response

from py42.clients.alerts import AlertsClient
from py42.clients.archive import ArchiveClient
from py42.clients.auditlogs import AuditLogsClient
from py42.clients.cases import CasesClient
from py42.clients.detectionlists import DetectionListsClient
from py42.clients.securitydata import SecurityDataClient
from py42.exceptions import Py42UnauthorizedError
from py42.sdk import from_local_account
from py42.sdk import SDKClient
from py42.services import administration
from py42.services import devices
Expand Down Expand Up @@ -33,6 +37,12 @@ def py42_connection(self, mocker, successful_response):
def mock_auth(self, mocker):
return mocker.MagicMock(spec=C42RenewableAuth)

@pytest.fixture
def mock_session(self, mocker):
mock_session = mocker.MagicMock(spec=Session)
mock_session.headers = {}
return mock_session

def test_has_administration_service_set(self, py42_connection, mock_auth):
client = SDKClient(py42_connection, mock_auth)
assert type(client.serveradmin) == administration.AdministrationService
Expand Down Expand Up @@ -80,3 +90,20 @@ def test_has_auditlog_service_set(self, py42_connection, mock_auth):
def test_has_cases_service_set(self, py42_connection, mock_auth):
client = SDKClient(py42_connection, mock_auth)
assert type(client.cases) == CasesClient

def test_from_local_account_when_unauthorized_calls_loginConfig_and_returns_config_value_on_raised_exception_text(
self, mocker, mock_session, mock_auth, unauthorized_response
):
login_type = "LOCAL_2FA"
mock_session.send.return_value = unauthorized_response
mock_session.get.return_value = create_mock_response(
mocker, f'{{"loginType": "{login_type}"}}'
)
connection = Connection.from_host_address(HOST_ADDRESS, session=mock_session)
client = SDKClient(connection, mock_auth)
mocker.patch("py42.sdk.SDKClient.from_local_account", return_value=client)

with pytest.raises(Py42UnauthorizedError) as err:
from_local_account(HOST_ADDRESS, TEST_USERNAME, TEST_PASSWORD)

assert f"User LoginConfig: {login_type}" in str(err)
13 changes: 0 additions & 13 deletions tests/test_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from py42.exceptions import Py42ForbiddenError
from py42.exceptions import Py42HTTPError
from py42.exceptions import Py42InternalServerError
from py42.exceptions import Py42MFARequiredError
from py42.exceptions import Py42NotFoundError
from py42.exceptions import Py42ResponseError
from py42.exceptions import Py42TooManyRequestsError
Expand All @@ -24,18 +23,6 @@ def test_raise_py42_error_raises_unauthorized_error(self, error_response):
with pytest.raises(Py42UnauthorizedError, match=REQUEST_EXCEPTION_MESSAGE):
raise_py42_error(error_response)

def test_raise_py42_error_raises_MFA_required_error(self, error_response):
error_response.response.status_code = 401
error_response.response.text = (
'{"error":[{"primaryErrorKey":"TIME_BASED_ONE_TIME_PASSWORD_REQUIRED"}]}'
)
with pytest.raises(Py42MFARequiredError):
raise_py42_error(error_response)

error_response.response.text = '{"error":[{"primaryErrorKey":"TOTP_AUTH_CONFIGURATION_REQUIRED_FOR_USER"}]}'
with pytest.raises(Py42MFARequiredError):
raise_py42_error(error_response)

def test_raise_py42_error_raises_forbidden_error(self, error_response):
error_response.response.status_code = 403
with pytest.raises(Py42ForbiddenError):
Expand Down

0 comments on commit f53c10d

Please sign in to comment.