diff --git a/CHANGELOG.md b/CHANGELOG.md index 422792ba..32463c59 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 The intended audience of this file is for py42 consumers -- as such, changes that don't affect how a consumer would use the library (e.g. adding unit tests, updating documentation, etc) are not captured here. +## 1.27.0 - 2023-08-04 + +### Removed + +- Removed the `detectionlists` client, which included `high_risk_employee` and `departing_employee` clients. +- APIs had been deprecated and were replaced by the `Watchlists` client. APIs are no longer available regardless of Py42 version. + ## 1.26.2 - 2023-05-16 ### Changed diff --git a/docs/guides.md b/docs/guides.md index 735a3595..f57d67bb 100644 --- a/docs/guides.md +++ b/docs/guides.md @@ -10,8 +10,6 @@ Getting Started Basics userguides/searches - userguides/departingemployee - userguides/highriskemployee userguides/orgdevices Device Settings Org Settings @@ -26,8 +24,6 @@ * [Getting Started](userguides/gettingstarted.md) * [Basics](userguides/basics.md) * [Executing Searches](userguides/searches.md) -* [Departing Employee](userguides/departingemployee.md) -* [High Risk Employee](userguides/highriskemployee.md) * [Get Active Devices From An Organization](userguides/orgdevices.md) * [Device Settings](userguides/devicesettings.md) * [Org Settings](userguides/orgsettings.md) diff --git a/docs/methoddocs/constants.md b/docs/methoddocs/constants.md index fb0c0404..a9ceb603 100644 --- a/docs/methoddocs/constants.md +++ b/docs/methoddocs/constants.md @@ -12,30 +12,12 @@ :show-inheritance: ``` -```{eval-rst} -.. autoclass:: py42.constants.RiskTags - :members: - :show-inheritance: -``` - ```{eval-rst} .. autoclass:: py42.constants.TrustedActivityType :members: :show-inheritance: ``` -```{eval-rst} -.. autoclass:: py42.constants.DepartingEmployeeFilters - :members: - :show-inheritance: -``` - -```{eval-rst} -.. autoclass:: py42.constants.HighRiskEmployeeFilters - :members: - :show-inheritance: -``` - ```{eval-rst} .. autoclass:: py42.constants.WatchlistType :members: diff --git a/docs/methoddocs/detectionlists.md b/docs/methoddocs/detectionlists.md deleted file mode 100644 index 73da7e94..00000000 --- a/docs/methoddocs/detectionlists.md +++ /dev/null @@ -1,43 +0,0 @@ -# Detection Lists - -**Detection lists have been deprecated. Use [Watchlists](watchlists.md) instead.** - -```{eval-rst} -.. autoclass:: py42.clients.detectionlists.DetectionListsClient - :members: - :show-inheritance: -``` - -## Departing Employees - -```{eval-rst} -.. autoclass:: py42.services.detectionlists.departing_employee.DepartingEmployeeFilters - :members: - :show-inheritance: -``` - -```{eval-rst} -.. autoclass:: py42.services.detectionlists.departing_employee.DepartingEmployeeService - :members: - :show-inheritance: -``` - -## High Risk Employee - -```{eval-rst} -.. autoclass:: py42.clients.detectionlists.RiskTags - :members: - :show-inheritance: -``` - -```{eval-rst} -.. autoclass:: py42.services.detectionlists.high_risk_employee.HighRiskEmployeeFilters - :members: - :show-inheritance: -``` - -```{eval-rst} -.. autoclass:: py42.services.detectionlists.high_risk_employee.HighRiskEmployeeService - :members: - :show-inheritance: -``` diff --git a/docs/methods.md b/docs/methods.md index ec88af66..0ae6a51d 100644 --- a/docs/methods.md +++ b/docs/methods.md @@ -34,7 +34,6 @@ Explore the complete public documentation for `py42` below. * [Backup Sets](methoddocs/backupset.md) * [Cases](methoddocs/cases.md) * [Constants](methoddocs/constants.md) -* [Detection Lists](methoddocs/detectionlists.md) * [Devices](methoddocs/devices.md) * [Device Settings](methoddocs/devicesettings.md) * [Exceptions](methoddocs/exceptions.md) diff --git a/docs/userguides/basics.md b/docs/userguides/basics.md index 77fd97fd..574063c2 100644 --- a/docs/userguides/basics.md +++ b/docs/userguides/basics.md @@ -69,15 +69,15 @@ returned by the generator gives you access to the actual list of items. Use the for working with generators and paging in py42: ```python -# Prints the username and notes for all departing employees +# Prints the username and user ID for all employees included on a watchlist -pages = sdk.detectionlists.departing_employee.get_all() # pages has 'generator' type +pages = sdk.watchlists.get_all_included_users(WATCHLIST_ID) # pages has 'generator' type for page in pages: # page has 'Py42Response' type - employees = page["items"] - for employee in employees: - username = employee["userName"] - notes = employee["notes"] - print(f"{employee}: {notes}") + users = page["includedUsers"] + for user in users: + username = user["username"] + user_id = user["userId"] + print(f"{username}: {user_id}") ``` Each page is a typical py42 response. The next section covers what you can do with `Py42Response` objects. diff --git a/docs/userguides/departingemployee.md b/docs/userguides/departingemployee.md deleted file mode 100644 index 0a2261fd..00000000 --- a/docs/userguides/departingemployee.md +++ /dev/null @@ -1,40 +0,0 @@ -# Departing Employees - -## Add or Remove Users From the Departing Employees List - -Use py42 to quickly and easily manage users on the Departing Employees list. This guide describes how to add users to and remove users from the Departing Employees list. - -To add a user to the Departing Employees list, all you need to know is the user's Code42 user UID. - -To get the user UID based on username: - -```python -user = sdk.users.get_by_username("username") -uid = user["users"][0]["userUid"] -``` - -`user_id` below refers to the user UID. - -```python -from py42.exceptions import Py42UserAlreadyAddedError - -# Add the departing employee -try: - response = sdk.detectionlists.departing_employee.add(user_id, departure_date) -except Py42UserAlreadyAddedError: - print("The user is already on the Departing Employee list.") -``` - -```{eval-rst} -.. important:: - If the user is already in the Departing Employees list, you will get an `py42.exceptions.Py42UserAlreadyAddedError`. - -``` - -To remove a user from the Departing Employees list: -```python -sdk.detectionlists.departing_employee.remove(user_id) -``` - -For complete details, see - [Departing Employee](../methoddocs/detectionlists.md#departing-employees). diff --git a/docs/userguides/highriskemployee.md b/docs/userguides/highriskemployee.md deleted file mode 100644 index c0f6a284..00000000 --- a/docs/userguides/highriskemployee.md +++ /dev/null @@ -1,56 +0,0 @@ -# High Risk Employees - -## Add or Remove Users From the High Risk Employee List - -Use py42 to quickly and easily manage users on the High Risk Employee list. This guide describes how to add users to and -remove users from the High Risk Employee list. - -To add a user to the High Risk Employees list, all you need to know is the user's Code42 user UID. - -To get the user UID based on username: - -```python -user = sdk.users.get_by_username("username") -uid = user["users"][0]["userUid"] -``` - -`user_id` below refers to the user UID. - -```python -# Add the high risk employee -response = sdk.detectionlists.high_risk_employee.add(user_id) -``` - -```{eval-rst} -.. important:: - If the user is already in the High Risk Employee list, you will get a `py42.exceptions.Py42UserAlreadyAddedError`. - -``` - -To remove a user from the High Risk Employee list: -```python -sdk.detectionlists.high_risk_employee.remove(user_id) -``` - -## Add or Remove Risk Factors From Users - -You can add/remove risk factor tags from a high risk user programmatically using the `add_user_risk_tags()` and -`remove_user_risk_tags()` methods in the `detectionlists` module. Both methods take the `user_id` of a high risk employee, and a list of tags that -you want to add/remove: - -```python -from py42.constants import RiskTags - -tag_list = [RiskTags.CONTRACT_EMPLOYEE, RiskTags.ELEVATED_ACCESS_PRIVILEGES] - -# Add the risk tags -response = sdk.detectionlists.add_user_risk_tags(user_id, tag_list) - -# Remove the risk tags -response = sdk.detectionlists.remove_user_risk_tags(user_id, tag_list) -``` - -Constants for Risk tags are available at [Risk Tags](https://py42docs.code42.com/en/stable/methoddocs/constants.html#py42.constants.RiskTags) - -For complete details, see - [High Risk Employee](../methoddocs/detectionlists.md#high-risk-employee). diff --git a/src/py42/__version__.py b/src/py42/__version__.py index 51d5fe30..8ca36ec6 100644 --- a/src/py42/__version__.py +++ b/src/py42/__version__.py @@ -1,3 +1,3 @@ # py42 -__version__ = "1.26.2" +__version__ = "1.27.0" diff --git a/src/py42/clients/__init__.py b/src/py42/clients/__init__.py index 9eda4d9a..a91fb840 100644 --- a/src/py42/clients/__init__.py +++ b/src/py42/clients/__init__.py @@ -6,7 +6,6 @@ "alerts", "archive", "authority", - "detectionlists", "securitydata", "auditlogs", "cases", diff --git a/src/py42/clients/detectionlists.py b/src/py42/clients/detectionlists.py deleted file mode 100644 index df53b0b9..00000000 --- a/src/py42/clients/detectionlists.py +++ /dev/null @@ -1,179 +0,0 @@ -from py42.choices import Choices -from py42.exceptions import Py42BadRequestError -from py42.exceptions import Py42UnableToCreateProfileError - - -class RiskTags(Choices): - """Deprecated. Use :class:`~py42.clients.watchlists.WatchlistsClient` and :class:`~py42.clients.userriskprofile.UserRiskProfileClient` instead. Constants available as risk tags for :meth:`~py42.clients.detectionlists.DetectionListsClient.add_user_risk_tags()` - and :meth:`~py42.clients.detectionlists.DetectionListsClient.remove_user_risk_tags()`. - - * ``FLIGHT_RISK`` - * ``HIGH_IMPACT_EMPLOYEE`` - * ``ELEVATED_ACCESS_PRIVILEGES`` - * ``PERFORMANCE_CONCERNS`` - * ``SUSPICIOUS_SYSTEM_ACTIVITY`` - * ``POOR_SECURITY_PRACTICES`` - * ``CONTRACT_EMPLOYEE`` - """ - - FLIGHT_RISK = "FLIGHT_RISK" - HIGH_IMPACT_EMPLOYEE = "HIGH_IMPACT_EMPLOYEE" - ELEVATED_ACCESS_PRIVILEGES = "ELEVATED_ACCESS_PRIVILEGES" - PERFORMANCE_CONCERNS = "PERFORMANCE_CONCERNS" - SUSPICIOUS_SYSTEM_ACTIVITY = "SUSPICIOUS_SYSTEM_ACTIVITY" - POOR_SECURITY_PRACTICES = "POOR_SECURITY_PRACTICES" - CONTRACT_EMPLOYEE = "CONTRACT_EMPLOYEE" - - -class DetectionListsClient: - """`Deprecated. Use :class:`~py42.clients.watchlists.WatchlistsClient` and :class:`~py42.clients.userriskprofile.UserRiskProfileClient` instead. Rest documentation `__""" - - def __init__( - self, - user_profile_service, - departing_employee_service, - high_risk_employee_service, - ): - self._user_profile_service = user_profile_service - self._departing_employee_service = departing_employee_service - self._high_risk_employee_service = high_risk_employee_service - - @property - def departing_employee(self): - return self._departing_employee_service - - @property - def high_risk_employee(self): - return self._high_risk_employee_service - - def create_user(self, username): - """Deprecated. Used to create a detection list profile for a user, but now that - happens automatically. Thus, this method instead returns the response from - an API call that gets the user's profile. - - Args: - username (str): The Code42 username of the user. - - Returns: - :class:`py42.response.Py42Response` - """ - try: - return self._user_profile_service.get(username) - except Py42BadRequestError as err: - if "Could not find user" in str(err): - raise Py42UnableToCreateProfileError(err, username) - raise - - def get_user(self, username): - """Deprecated. Use userriskprofile.get_by_username() instead. Get user details by username. - `Rest Documentation `__ - - Args: - username (str): The Code42 username of the user. - - Returns: - :class:`py42.response.Py42Response` - """ - - return self._user_profile_service.get(username) - - def get_user_by_id(self, user_id): - """Deprecated. Use userriskprofile.get_by_id() instead. Get user details by user_id. - `Rest Documentation `__ - - Args: - user_id (str or int): The Code42 userId of the user. - - Returns: - :class:`py42.response.Py42Response` - """ - - return self._user_profile_service.get_by_id(user_id) - - def update_user_notes(self, user_id, notes): - """Deprecated. Use userriskprofile.update() instead. Add or update notes related to the user. - `Rest Documentation `__ - - Args: - user_id (str or int): The userUid of the user whose notes you want to update. - notes (str): User profile notes. - - Returns: - :class:`py42.response.Py42Response` - """ - - return self._user_profile_service.update_notes(user_id, notes) - - def add_user_risk_tags(self, user_id, tags): - """Deprecated. Use watchlists instead. Add one or more risk factor tags. - `Rest Documentation `__ - - Args: - user_id (str or int): The userUid of the user whose risk factor tag(s) you want to update. - tags (str or list of str ): A single tag or multiple tags in a list to be added. For - example: ``"tag1"`` or ``["tag1", "tag2"]``. - - Constants available at :class:`py42.constants.RiskTags` - - Returns: - :class:`py42.response.Py42Response` - """ - - return self._user_profile_service.add_risk_tags(user_id, tags) - - def remove_user_risk_tags(self, user_id, tags): - """Deprecated. Use watchlists instead. Remove one or more risk factor tags. - `Rest Documentation `__ - - Args: - user_id (str or int): The userUid of the user whose risk factor tag(s) needs you want to remove. - tags (str or list of str ): A single tag or multiple tags in a list to be removed. For - example: ``"tag1"`` or ``["tag1", "tag2"]``. - - Constants available at :class:`py42.constants.RiskTags` - - Returns: - :class:`py42.response.Py42Response` - """ - - return self._user_profile_service.remove_risk_tags(user_id, tags) - - def add_user_cloud_alias(self, user_id, alias): - """Deprecated. Use userriskprofile.add_cloud_aliases() instead. Add a cloud alias to a user. - `Rest Documentation `__ - - Args: - user_id (str or int): The userUid of the user whose alias you want to update. - alias (str): The alias to be added. - - Returns: - :class:`py42.response.Py42Response` - """ - - return self._user_profile_service.add_cloud_alias(user_id, alias) - - def remove_user_cloud_alias(self, user_id, alias): - """Deprecated. Use userriskprofile.delete_cloud_aliases() instead. Remove a cloud alias from a user. - `Rest Documentation `__ - - Args: - user_id (str or int): The userUid of the user whose alias needs to be removed. - alias (str): The alias to be removed. - - Returns: - :class:`py42.response.Py42Response` - """ - - return self._user_profile_service.remove_cloud_alias(user_id, alias) - - def refresh_user_scim_attributes(self, user_id): - """Deprecated. Refresh SCIM attributes of a user. - `Rest Documentation `__ - - Args: - user_id (str or int): The userUid of the user whose attributes you wish to refresh. - - Returns: - :class:`py42.response.Py42Response` - """ - return self._user_profile_service.refresh(user_id) diff --git a/src/py42/constants/__init__.py b/src/py42/constants/__init__.py index a11df89d..6394aa17 100644 --- a/src/py42/constants/__init__.py +++ b/src/py42/constants/__init__.py @@ -1,10 +1,7 @@ from py42.choices import Choices from py42.clients.cases import CaseStatus -from py42.clients.detectionlists import RiskTags from py42.clients.trustedactivities import TrustedActivityType from py42.clients.watchlists import WatchlistType -from py42.services.detectionlists.departing_employee import DepartingEmployeeFilters -from py42.services.detectionlists.high_risk_employee import HighRiskEmployeeFilters class SortDirection(Choices): diff --git a/src/py42/sdk/__init__.py b/src/py42/sdk/__init__.py index b1a5fff7..9f306c34 100644 --- a/src/py42/sdk/__init__.py +++ b/src/py42/sdk/__init__.py @@ -250,16 +250,6 @@ def securitydata(self): """ return self._clients.securitydata - @property - def detectionlists(self): - """A collection of properties each containing methods for managing specific detection - lists, such as departing employees. - - Returns: - :class:`py42.clients.detectionlists.DetectionListsClient` - """ - return self._clients.detectionlists - @property def alerts(self): """A collection of methods related to retrieving and updating alerts rules. @@ -328,9 +318,6 @@ def _init_services(main_connection, main_auth, auth_flag=None): from py42.services.auditlogs import AuditLogsService from py42.services.cases import CasesService from py42.services.casesfileevents import CasesFileEventsService - from py42.services.detectionlists.departing_employee import DepartingEmployeeService - from py42.services.detectionlists.high_risk_employee import HighRiskEmployeeService - from py42.services.detectionlists.user_profile import DetectionListUserService from py42.services.devices import DeviceService from py42.services.fileevent import FileEventService from py42.services.legalhold import LegalHoldService @@ -347,7 +334,6 @@ def _init_services(main_connection, main_auth, auth_flag=None): alerts_key = "AlertService-API_URL" file_events_key = "FORENSIC_SEARCH-API_URL" preservation_data_key = "PRESERVATION-DATA-SERVICE_API-URL" - employee_case_mgmt_key = "employeecasemanagementV2-API_URL" kv_prefix = "simple-key-value-store" audit_logs_key = "AUDIT-LOG_API-URL" cases_key = "CASES_API-URL" @@ -369,17 +355,12 @@ def _init_services(main_connection, main_auth, auth_flag=None): pds_conn = Connection.from_microservice_key( kv_service, preservation_data_key, auth=main_auth ) - ecm_conn = Connection.from_microservice_key( - kv_service, employee_case_mgmt_key, auth=main_auth - ) audit_logs_conn = Connection.from_microservice_key( kv_service, audit_logs_key, auth=main_auth ) - user_svc = UserService(main_connection) administration_svc = AdministrationService(main_connection) file_event_svc = FileEventService(file_events_conn) user_ctx = UserContext(administration_svc) - user_profile_svc = DetectionListUserService(ecm_conn, user_ctx, user_svc) cases_conn = Connection.from_microservice_key(kv_service, cases_key, auth=main_auth) trusted_activities_conn = Connection.from_microservice_key( kv_service, trusted_activities_key, auth=main_auth @@ -387,6 +368,7 @@ def _init_services(main_connection, main_auth, auth_flag=None): watchlists_conn = Connection.from_microservice_key( kv_service, watchlists_key, auth=main_auth ) + user_risk_profile_svc = UserRiskProfileService(watchlists_conn) services = Services( administration=administration_svc, @@ -398,21 +380,16 @@ def _init_services(main_connection, main_auth, auth_flag=None): else LegalHoldService(main_connection), orgs=OrgService(main_connection), users=UserService(main_connection), - alertrules=AlertRulesService(alert_rules_conn, user_ctx, user_profile_svc), + alertrules=AlertRulesService(alert_rules_conn, user_ctx, user_risk_profile_svc), alerts=AlertService(alerts_conn, user_ctx), fileevents=file_event_svc, savedsearch=SavedSearchService(file_events_conn, file_event_svc), preservationdata=PreservationDataService(pds_conn), - departingemployee=DepartingEmployeeService( - ecm_conn, user_ctx, user_profile_svc - ), - highriskemployee=HighRiskEmployeeService(ecm_conn, user_ctx, user_profile_svc), - userprofile=user_profile_svc, auditlogs=AuditLogsService(audit_logs_conn), cases=CasesService(cases_conn), casesfileevents=CasesFileEventsService(cases_conn), trustedactivities=TrustedActivitiesService(trusted_activities_conn), - userriskprofile=UserRiskProfileService(watchlists_conn), + userriskprofile=user_risk_profile_svc, watchlists=WatchlistsService(watchlists_conn), ) @@ -430,7 +407,6 @@ def _init_clients(services, connection): from py42.clients.auditlogs import AuditLogsClient from py42.clients.authority import AuthorityClient from py42.clients.cases import CasesClient - from py42.clients.detectionlists import DetectionListsClient from py42.clients.loginconfig import LoginConfigurationClient from py42.clients.securitydata import SecurityDataClient from py42.clients.trustedactivities import TrustedActivitiesClient @@ -446,9 +422,6 @@ def _init_clients(services, connection): orgs=services.orgs, users=services.users, ) - detectionlists = DetectionListsClient( - services.userprofile, services.departingemployee, services.highriskemployee - ) storage_service_factory = StorageServiceFactory(connection, services.devices) alertrules = AlertRulesClient(services.alerts, services.alertrules) securitydata = SecurityDataClient( @@ -469,7 +442,6 @@ def _init_clients(services, connection): watchlists = WatchlistsClient(services.watchlists) clients = Clients( authority=authority, - detectionlists=detectionlists, alerts=alerts, securitydata=securitydata, archive=archive, diff --git a/src/py42/services/__init__.py b/src/py42/services/__init__.py index 51b29559..412aeb32 100644 --- a/src/py42/services/__init__.py +++ b/src/py42/services/__init__.py @@ -30,9 +30,6 @@ def __init__(self, connection): "fileevents", "savedsearch", "preservationdata", - "departingemployee", - "highriskemployee", - "userprofile", "auditlogs", "cases", "casesfileevents", diff --git a/src/py42/services/alertrules.py b/src/py42/services/alertrules.py index 46f5d0af..0af3944f 100644 --- a/src/py42/services/alertrules.py +++ b/src/py42/services/alertrules.py @@ -42,7 +42,7 @@ def filetypemismatch(self): def add_user(self, rule_id, user_id): tenant_id = self._user_context.get_current_tenant_id() user_details = self._user_profile_service.get_by_id(user_id) - user_aliases = user_details.data.get("cloudUsernames") or [] + user_aliases = user_details.data.get("cloudAliases") or [] data = { "tenantId": tenant_id, "ruleId": rule_id, diff --git a/src/py42/services/detectionlists/__init__.py b/src/py42/services/detectionlists/__init__.py deleted file mode 100644 index f2f8e4c3..00000000 --- a/src/py42/services/detectionlists/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -from py42.exceptions import Py42UserAlreadyAddedError - -_PAGE_SIZE = 100 - - -def handle_user_already_added_error(bad_request_err, username_tried_adding, list_name): - if "User already on list" in bad_request_err.response.text: - raise Py42UserAlreadyAddedError( - bad_request_err, username_tried_adding, list_name - ) - - -class _DetectionListFilters: - OPEN = "OPEN" - EXFILTRATION_30_DAYS = "EXFILTRATION_30_DAYS" - EXFILTRATION_24_HOURS = "EXFILTRATION_24_HOURS" diff --git a/src/py42/services/detectionlists/departing_employee.py b/src/py42/services/detectionlists/departing_employee.py deleted file mode 100644 index bd67bc11..00000000 --- a/src/py42/services/detectionlists/departing_employee.py +++ /dev/null @@ -1,239 +0,0 @@ -from datetime import datetime -from warnings import warn - -from py42.choices import Choices -from py42.exceptions import Py42BadRequestError -from py42.exceptions import Py42NotFoundError -from py42.exceptions import Py42UserNotOnListError -from py42.services import BaseService -from py42.services.detectionlists import _DetectionListFilters -from py42.services.detectionlists import _PAGE_SIZE -from py42.services.detectionlists import handle_user_already_added_error -from py42.services.util import get_all_pages - -_DATE_FORMAT = "%Y-%m-%d" - - -class DepartingEmployeeFilters(_DetectionListFilters, Choices): - """Deprecated. Use :class:`~py42.clients.watchlists.WatchlistsClient` and :class:`~py42.clients.userriskprofile.UserRiskProfileClient` instead. Constants available for filtering Departing Employee search results. - - * ``OPEN`` - * ``EXFILTRATION_30_DAYS`` - * ``EXFILTRATION_24_HOURS`` - * ``LEAVING_TODAY`` - """ - - LEAVING_TODAY = "LEAVING_TODAY" - - -class DepartingEmployeeService(BaseService): - """Deprecated. Use :class:`~py42.clients.watchlists.WatchlistsClient` and :class:`~py42.clients.userriskprofile.UserRiskProfileClient` instead. A service for interacting with Code42 Departing Employee APIs.""" - - _uri_prefix = "v2/departingemployee/{0}" - - _CREATED_AT = "CREATED_AT" - - def __init__(self, session, user_context, user_profile_service): - super().__init__(session) - self._user_context = user_context - self._user_profile_service = user_profile_service - - def add(self, user_id, departure_date=None): - """Deprecated. Use watchlists instead. Adds a user to the Departing Employees list. - `REST Documentation `__ - - Raises a :class:`Py42UserAlreadyAddedError` when a user already exists in the Departing Employee \ - detection list. - - Args: - user_id (str or int): The Code42 userUid of the user you want to add to the departing \ - employees list. - departure_date (str or datetime, optional): Date in yyyy-MM-dd format or instance of datetime. - Date is treated as UTC. Defaults to None. - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "Detection lists are deprecated. Use watchlists instead.", - DeprecationWarning, - stacklevel=2, - ) - if isinstance(departure_date, datetime): - departure_date = departure_date.strftime(_DATE_FORMAT) - tenant_id = self._user_context.get_current_tenant_id() - data = { - "tenantId": tenant_id, - "userId": user_id, - "departureDate": departure_date, - } - uri = self._uri_prefix.format("add") - try: - return self._connection.post(uri, json=data) - except Py42BadRequestError as err: - handle_user_already_added_error(err, user_id, "departing-employee list") - raise - - def remove(self, user_id): - """Deprecated. Use watchlists instead. Removes a user from the Departing Employees list. - `REST Documentation `__ - - Args: - user_id (str or int): The Code42 userUid of the user. - - Returns: - :class:`py42.response.Py42Response` - """ - - warn( - "Detection lists are deprecated. Use watchlists instead.", - DeprecationWarning, - stacklevel=2, - ) - tenant_id = self._user_context.get_current_tenant_id() - uri = self._uri_prefix.format("remove") - data = {"userId": user_id, "tenantId": tenant_id} - try: - return self._connection.post(uri, json=data) - except Py42NotFoundError as err: - raise Py42UserNotOnListError(err, user_id, "departing-employee") - - def get(self, user_id): - """Deprecated. Use userriskprofile.get_by_id() instead. Gets departing employee data of a user. - `REST Documentation `__ - - Args: - user_id (str or int): The Code42 userUid of the user. - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "This method is deprecated. Use userriskprofile.get_by_id() instead.", - DeprecationWarning, - stacklevel=2, - ) - tenant_id = self._user_context.get_current_tenant_id() - uri = self._uri_prefix.format("get") - data = {"userId": user_id, "tenantId": tenant_id} - return self._connection.post(uri, json=data) - - def get_all( - self, - filter_type=DepartingEmployeeFilters.OPEN, - sort_key=_CREATED_AT, - sort_direction="DESC", - page_size=_PAGE_SIZE, - ): - """Deprecated. Use userriskprofile.get_all(). Gets all Departing Employees. - - Args: - filter_type (str, optional): Constants available at - :class:`py42.constants.DepartingEmployeeFilters`. - Defaults to "OPEN". - sort_key (str, optional): Sort results based by field. Defaults to "CREATED_AT". - sort_direction (str, optional): ``ASC`` or ``DESC``. Defaults to "DESC". - page_size (int, optional): The number of departing employees to return - per page. Defaults to 100. - - Returns: - generator: An object that iterates over :class:`py42.response.Py42Response` objects - that each contain a page of departing employees. - """ - warn( - "This method is deprecated. Use userriskprofile.get_all() instead.", - DeprecationWarning, - stacklevel=2, - ) - return get_all_pages( - self.get_page, - "items", - filter_type=filter_type, - sort_key=sort_key, - sort_direction=sort_direction, - page_size=page_size or _PAGE_SIZE, - ) - - def get_page( - self, - page_num, - filter_type=DepartingEmployeeFilters.OPEN, - sort_key=_CREATED_AT, - sort_direction="DESC", - page_size=_PAGE_SIZE, - ): - """Deprecated. Use userriskprofile.get_page() instead. Gets a single page of Departing Employees. - - Args: - page_num (int): The page number to request. - filter_type (str, optional): Constants available at - :class:`py42.constants.DepartingEmployeeFilters`. - Defaults to "OPEN". - sort_key (str, optional): Sort results based by field. Defaults to "CREATED_AT". - sort_direction (str. optional): ``ASC`` or ``DESC``. Defaults to "DESC". - page_size (int, optional): The number of departing employees to return - per page. Defaults to 100. - - Returns: - :class:`py42.response.Py42Response` - """ - - warn( - "This method is deprecated. Use userriskprofile.get_page() instead.", - DeprecationWarning, - stacklevel=2, - ) - uri = self._uri_prefix.format("search") - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "pgSize": page_size, - "pgNum": page_num, - "filterType": filter_type, - "srtKey": sort_key, - "srtDirection": sort_direction, - } - return self._connection.post(uri, json=data) - - def set_alerts_enabled(self, alerts_enabled=True): - """Deprecated. Enable or disable email alerting on Departing Employee exposure events. - `REST Documentation `__ - - Args: - alerts_enabled (bool): Set alerting to on (True) or off (False). Defaults to True. - - Returns: - :class:`py42.response.Py42Response` - """ - warn("This method is deprecated.", DeprecationWarning, stacklevel=2) - tenant_id = self._user_context.get_current_tenant_id() - uri = self._uri_prefix.format("setalertstate") - data = {"tenantId": tenant_id, "alertsEnabled": alerts_enabled} - return self._connection.post(uri, json=data) - - def update_departure_date(self, user_id, departure_date): - """Deprecated. Use userriskprofile.update() instead. Add or modify details of an existing Departing Employee case. - `REST Documentation `__ - - Args: - user_id (str): The Code42 userUid of the user. - departure_date (str or datetime): Date in yyyy-MM-dd format or instance of datetime. - Date is treated as UTC. - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "This method is deprecated. Use userriskprofile.update() instead.", - DeprecationWarning, - stacklevel=2, - ) - tenant_id = self._user_context.get_current_tenant_id() - if isinstance(departure_date, datetime): - departure_date = departure_date.strftime(_DATE_FORMAT) - uri = self._uri_prefix.format("update") - data = { - "tenantId": tenant_id, - "userId": user_id, - "departureDate": departure_date, - } - return self._connection.post(uri, json=data) diff --git a/src/py42/services/detectionlists/high_risk_employee.py b/src/py42/services/detectionlists/high_risk_employee.py deleted file mode 100644 index 1ce0c9ee..00000000 --- a/src/py42/services/detectionlists/high_risk_employee.py +++ /dev/null @@ -1,213 +0,0 @@ -from warnings import warn - -from py42.choices import Choices -from py42.exceptions import Py42BadRequestError -from py42.exceptions import Py42NotFoundError -from py42.exceptions import Py42UserNotOnListError -from py42.services import BaseService -from py42.services.detectionlists import _DetectionListFilters -from py42.services.detectionlists import _PAGE_SIZE -from py42.services.detectionlists import handle_user_already_added_error -from py42.services.util import get_all_pages - - -class HighRiskEmployeeFilters(_DetectionListFilters, Choices): - """Deprecated. Use :class:`~py42.clients.watchlists.WatchlistsClient` and :class:`~py42.clients.userriskprofile.UserRiskProfileClient` instead. Constants available for filtering High Risk Employee search results. - - * ``OPEN`` - * ``EXFILTRATION_30_DAYS`` - * ``EXFILTRATION_24_HOURS`` - """ - - -class HighRiskEmployeeService(BaseService): - """`Deprecated. Use :class:`~py42.clients.watchlists.WatchlistsClient` and :class:`~py42.clients.userriskprofile.UserRiskProfileClient` instead. A service for interacting with High Risk Employee APIs.""" - - _resource = "v2/highriskemployee" - - def __init__(self, connection, user_context, user_profile_service): - super().__init__(connection) - self._user_context = user_context - self._user_profile_service = user_profile_service - - def _make_uri(self, action): - return f"{self._resource}{action}" - - def _add_high_risk_employee(self, tenant_id, user_id): - data = {"tenantId": tenant_id, "userId": user_id} - uri = self._make_uri("/add") - return self._connection.post(uri, json=data) - - def add(self, user_id): - """Deprecated. Use watchlists instead. Adds a user to the High Risk Employee detection list. - - Raises a :class:`Py42UserAlreadyAddedError` when a user already exists in the High Risk Employee - detection list. - `REST Documentation `__ - - Args: - user_id (str or int): The Code42 userUid of the user you want to add to the High Risk - Employee detection list. - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "Detection lists are deprecated. Use watchlists instead.", - DeprecationWarning, - stacklevel=2, - ) - tenant_id = self._user_context.get_current_tenant_id() - try: - return self._add_high_risk_employee(tenant_id, user_id) - except Py42BadRequestError as err: - handle_user_already_added_error(err, user_id, "high-risk-employee list") - raise - - def set_alerts_enabled(self, enabled=True): - """Deprecated. Enables alerts. - `Rest Documentation `__ - - Args: - enabled (bool): Whether to enable alerts for all users. - - Returns: - :class:`py42.response.Py42Response` - """ - warn("This method is deprecated.", DeprecationWarning, stacklevel=2) - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "alertsEnabled": enabled, - } - uri = self._make_uri("/setalertstate") - return self._connection.post(uri, json=data) - - def remove(self, user_id): - """Deprecated. Use watchlists instead. Removes a user from the High Risk Employee detection list. - `Rest Documentation `__ - - Args: - user_id (str or int): The Code42 userUid of the user you want to add to the High Risk - Employee detection list. - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "Detection lists are deprecated. Use watchlists instead.", - DeprecationWarning, - stacklevel=2, - ) - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "userId": user_id, - } - uri = self._make_uri("/remove") - try: - return self._connection.post(uri, json=data) - except Py42NotFoundError as err: - raise Py42UserNotOnListError(err, user_id, "high-risk-employee") - - def get(self, user_id): - """Deprecated. Use userriskprofile.get_by_id() instead. Gets user information. - `Rest Documentation `__ - - Args: - user_id (str or int): The Code42 userUid of the user has been added to the High Risk - Employee detection list. - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "This method is deprecated. Use userriskprofile.get_by_id() instead.", - DeprecationWarning, - stacklevel=2, - ) - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "userId": user_id, - } - uri = self._make_uri("/get") - return self._connection.post(uri, json=data) - - def get_all( - self, - filter_type=HighRiskEmployeeFilters.OPEN, - sort_key=None, - sort_direction=None, - page_size=_PAGE_SIZE, - ): - """Deprecated. Use userriskprofile.get_all() instead. Searches High Risk Employee list. Filter results by filter_type. - `Rest Documentation `__ - - Args: - filter_type (str, optional): Constants available at - :class:`py42.constants.HighRiskEmployeeFilters`. - Defaults to ``OPEN``. - sort_key (str, optional): Sort results based by field. Defaults to None. - sort_direction (str, optional): ``ASC`` or ``DESC``. Constants available at - :class:`py42.constants.SortDirection`. Defaults to None. - page_size (int, optional): The number of high risk employees to return - per page. Defaults to 100. - - Returns: - generator: An object that iterates over :class:`py42.response.Py42Response` objects - that each contain a page of users. - """ - warn( - "This method is deprecated. Use userriskprofile.get_all() instead.", - DeprecationWarning, - stacklevel=2, - ) - - return get_all_pages( - self.get_page, - "items", - filter_type=filter_type, - sort_key=sort_key, - sort_direction=sort_direction, - page_size=page_size or _PAGE_SIZE, - ) - - def get_page( - self, - page_num, - filter_type=HighRiskEmployeeFilters.OPEN, - sort_key=None, - sort_direction=None, - page_size=_PAGE_SIZE, - ): - """Deprecated. Use userriskprofile.get_page() instead. Gets a single page of High Risk Employees. - - Args: - page_num (int): The page number to request. - filter_type (str, optional): Constants available at - :class:`py42.constants.HighRiskEmployeeFilters`. - Defaults to "OPEN". - sort_key (str, optional): Sort results based by field. Defaults to None. - sort_direction (str. optional): ``ASC`` or ``DESC``. Constants available at - :class:`py42.constants.SortDirection`. Defaults to None. - page_size (int, optional): The number of high risk employees to return - per page. Defaults to 100. - - Returns: - :class:`py42.response.Py42Response` - """ - - warn( - "This method is deprecated. Use userriskprofile.get_page() instead.", - DeprecationWarning, - stacklevel=2, - ) - - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "filterType": filter_type, - "pgNum": page_num, - "pgSize": page_size, - "srtKey": sort_key, - "srtDirection": sort_direction, - } - uri = self._make_uri("/search") - return self._connection.post(uri, json=data) diff --git a/src/py42/services/detectionlists/user_profile.py b/src/py42/services/detectionlists/user_profile.py deleted file mode 100644 index a27a1aa3..00000000 --- a/src/py42/services/detectionlists/user_profile.py +++ /dev/null @@ -1,221 +0,0 @@ -from warnings import warn - -from py42.exceptions import Py42BadRequestError -from py42.exceptions import Py42CloudAliasCharacterLimitExceededError -from py42.exceptions import Py42CloudAliasLimitExceededError -from py42.services import BaseService - - -class DetectionListUserService(BaseService): - """Deprecated. Use :class:`~py42.clients.watchlists.WatchlistsClient` and :class:`~py42.clients.userriskprofile.UserRiskProfileClient` instead. Administrator utility to manage High Risk employees information. - - `Support Documentation `__ - """ - - _resource = "v2/user" - - def __init__(self, connection, user_context, user_service): - super().__init__(connection) - self._user_context = user_context - self._user_service = user_service - - def _make_uri(self, action): - return f"{self._resource}{action}" - - def get_by_id(self, user_id): - """Deprecated. Use userriskprofile.get_by_id() instead. Get user details by user UID. - - Args: - user_id (str or int): UID of the user. - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "This method is deprecated. Use userriskprofile.get_by_id() instead.", - DeprecationWarning, - stacklevel=2, - ) - - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "userId": user_id, - } - uri = self._make_uri("/getbyid") - return self._connection.post(uri, json=data) - - def get(self, username): - """Deprecated. Use userriskprofile.get_by_username() instead. Get user details by username. - - Args: - username (str): Username of the user. - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "This method is deprecated. Use userriskprofile.get_by_username() instead.", - DeprecationWarning, - stacklevel=2, - ) - - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "username": username, - } - uri = self._make_uri("/getbyusername") - return self._connection.post(uri, json=data) - - def update_notes(self, user_id, notes): - """Deprecated. Use userriskprofile.update() instead. Add or update notes related to the user. - - Args: - user_id (str or int): The user_id whose notes need to be updated. - notes (str): User profile notes. - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "This method is deprecated. Use userriskprofile.update() instead.", - DeprecationWarning, - stacklevel=2, - ) - - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "userId": user_id, - "notes": notes, - } - uri = self._make_uri("/updatenotes") - return self._connection.post(uri, json=data) - - def add_risk_tags(self, user_id, tags): - """Deprecated. Use watchlists instead. Add one or more tags. - - Args: - user_id (str or int): The user_id whose tag(s) needs to be updated. - tags (str or list of str ): A single tag or multiple tags in a list to be added. - e.g "tag1" or ["tag1", "tag2"] - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "Risk tags are deprecated. Use watchlists instead.", - DeprecationWarning, - stacklevel=2, - ) - - if not isinstance(tags, (list, tuple)): - tags = [tags] - - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "userId": user_id, - "riskFactors": tags, - } - uri = self._make_uri("/addriskfactors") - return self._connection.post(uri, json=data) - - def remove_risk_tags(self, user_id, tags): - """Deprecated. Use watchlists instead. Remove one or more tags.Args: - - Args: - user_id (str or int): The user_id whose tag(s) needs to be removed. - tags (str or list of str ): A single tag or multiple tags in a list to be removed. - e.g "tag1" or ["tag1", "tag2"]. - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "Risk tags are deprecated. Use watchlists instead.", - DeprecationWarning, - stacklevel=2, - ) - - if not isinstance(tags, (list, tuple)): - tags = [tags] - - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "userId": user_id, - "riskFactors": tags, - } - uri = self._make_uri("/removeriskfactors") - return self._connection.post(uri, json=data) - - def add_cloud_alias(self, user_id, alias): - """Deprecated. Use userriskprofile.add_cloud_aliases() instead. Add a cloud alias. - - Args: - user_id (str or int): The user_id whose alias needs to be updated. - alias (str): An alias to be added. - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "This method is deprecated. Use userriskprofile.add_cloud_aliases() instead.", - DeprecationWarning, - stacklevel=2, - ) - # check if alias > 50 characters - # this error checking is handled by the frontend of the console - if len(alias) > 50: - raise Py42CloudAliasCharacterLimitExceededError - - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "userId": user_id, - "cloudUsernames": [alias], - } - uri = self._make_uri("/addcloudusernames") - try: - return self._connection.post(uri, json=data) - except Py42BadRequestError as err: - if "Cloud usernames must be less than or equal to" in err.response.text: - raise Py42CloudAliasLimitExceededError(err) - raise - - def remove_cloud_alias(self, user_id, alias): - """Deprecated. Use userriskprofile.delete_cloud_aliases() instead. Remove one or more cloud alias. - - Args: - user_id (str or int): The user_id whose alias needs to be removed. - alias (str): An alias to be removed. - - Returns: - :class:`py42.response.Py42Response` - """ - warn( - "This method is deprecated. Use userriskprofile.delete_cloud_aliases() instead", - DeprecationWarning, - stacklevel=2, - ) - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "userId": user_id, - "cloudUsernames": [alias], - } - uri = self._make_uri("/removecloudusernames") - return self._connection.post(uri, json=data) - - def refresh(self, user_id): - """Deprecated. Refresh SCIM attributes of a user. - - Args: - user_id (str or int): The user_id of the user whose attributes need to be refreshed. - - Returns: - :class:`py42.response.Py42Response` - """ - warn("This method is deprecated.", DeprecationWarning, stacklevel=2) - - data = { - "tenantId": self._user_context.get_current_tenant_id(), - "userId": user_id, - } - uri = self._make_uri("/refresh") - return self._connection.post(uri, json=data) diff --git a/tests/clients/test_detectionlists.py b/tests/clients/test_detectionlists.py deleted file mode 100644 index 6a936d81..00000000 --- a/tests/clients/test_detectionlists.py +++ /dev/null @@ -1,345 +0,0 @@ -import pytest -from tests.conftest import create_mock_error - -from py42.clients.detectionlists import DetectionListsClient -from py42.clients.detectionlists import RiskTags -from py42.exceptions import Py42BadRequestError -from py42.exceptions import Py42UnableToCreateProfileError -from py42.services.detectionlists.departing_employee import DepartingEmployeeService -from py42.services.detectionlists.high_risk_employee import HighRiskEmployeeService -from py42.services.detectionlists.user_profile import DetectionListUserService - - -@pytest.fixture -def mock_detection_list_user_service(mocker): - return mocker.MagicMock(spec=DetectionListUserService) - - -@pytest.fixture -def mock_departing_employee_service(mocker): - return mocker.MagicMock(spec=DepartingEmployeeService) - - -@pytest.fixture -def mock_high_risk_employee_service(mocker): - return mocker.MagicMock(spec=HighRiskEmployeeService) - - -TEST_USER_ID = "12345" - - -class TestRiskTags: - def test_choices_returns_expected_set(self): - choices = RiskTags.choices() - valid_set = { - "FLIGHT_RISK", - "HIGH_IMPACT_EMPLOYEE", - "ELEVATED_ACCESS_PRIVILEGES", - "PERFORMANCE_CONCERNS", - "SUSPICIOUS_SYSTEM_ACTIVITY", - "POOR_SECURITY_PRACTICES", - "CONTRACT_EMPLOYEE", - } - assert set(choices) == valid_set - - -class TestDetectionListClient: - def test_departing_employee_call_get_departing_employee_client_with_expected_values( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - assert client.departing_employee is mock_departing_employee_service - - def test_high_risk_employee_returns_high_risk_employee_client( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - assert client.high_risk_employee is mock_high_risk_employee_service - - def test_create_user_calls_user_client_with_expected_values( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - client.create_user("testusername") - mock_detection_list_user_service.get.assert_called_once_with("testusername") - - def test_create_user_when_service_returns_cannot_find_user_bad_request_raises_unable_to_create_error( - self, - mocker, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - mock_err_response_content = """ - { - "pop-bulletin": { - "type$": "com.code42.casemanagement.CaseMessages.InvalidUser", - "text$": "Could not find user: testusername", - "details": [], - "user": "testusername" - } - }""" - mock_detection_list_user_service.get.side_effect = create_mock_error( - Py42BadRequestError, mocker, mock_err_response_content - ) - - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - with pytest.raises(Py42UnableToCreateProfileError) as err: - client.create_user("testusername") - - assert ( - "Detection-list profiles are now created automatically on the server. " - "Unable to find a detection-list profile for 'testusername'. It is " - "possibly still being created if you just recently created the Code42 " - "user." in str(err.value) - ) - assert err.value.username == "testusername" - - def test_create_user_when_service_returns_bad_request_raises( - self, - mocker, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - mock_detection_list_user_service.get.side_effect = create_mock_error( - Py42BadRequestError, mocker, "" - ) - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - with pytest.raises(Py42BadRequestError): - client.create_user("testusername") - - def test_get_user_calls_user_client_with_expected_values( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - client.get_user("testusername") - mock_detection_list_user_service.get.assert_called_once_with("testusername") - - def test_get_user_by_id_calls_user_client_with_expected_values( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - client.get_user_by_id(TEST_USER_ID) - mock_detection_list_user_service.get_by_id.assert_called_once_with(TEST_USER_ID) - - def test_update_user_notes_calls_user_client_with_expected_values( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - client.update_user_notes(TEST_USER_ID, "newnotes") - mock_detection_list_user_service.update_notes.assert_called_once_with( - TEST_USER_ID, "newnotes" - ) - - def test_update_user_notes_returns_response( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - response = client.update_user_notes(TEST_USER_ID, "newnotes") - assert response - - def test_add_user_risk_tags_calls_user_client_with_expected_values( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - client.add_user_risk_tags(TEST_USER_ID, "newtag") - mock_detection_list_user_service.add_risk_tags.assert_called_once_with( - TEST_USER_ID, "newtag" - ) - - def test_add_user_risk_tags_returns_response( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - response = client.add_user_risk_tags(TEST_USER_ID, "newtag") - assert response - - def test_remove_user_risk_tags_calls_user_client_with_expected_values( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - client.remove_user_risk_tags(TEST_USER_ID, "oldtag") - mock_detection_list_user_service.remove_risk_tags.assert_called_once_with( - TEST_USER_ID, "oldtag" - ) - - def test_remove_user_risk_tags_returns_response( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - response = client.remove_user_risk_tags(TEST_USER_ID, "oldtag") - assert response - - def test_add_user_cloud_alias_calls_user_client_with_expected_values( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - client.add_user_cloud_alias(TEST_USER_ID, "newalias") - mock_detection_list_user_service.add_cloud_alias.assert_called_once_with( - TEST_USER_ID, "newalias" - ) - - def test_add_user_cloud_alias_returns_response( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - response = client.add_user_cloud_alias(TEST_USER_ID, "newalias") - assert response - - def test_remove_user_cloud_alias_calls_user_client_with_expected_values( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - client.remove_user_cloud_alias(TEST_USER_ID, "oldalias") - mock_detection_list_user_service.remove_cloud_alias.assert_called_once_with( - TEST_USER_ID, "oldalias" - ) - - def test_remove_user_cloud_alias_returns_response( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - response = client.remove_user_cloud_alias(TEST_USER_ID, "oldalias") - assert response - - def test_refresh_user_scim_attributes_calls_user_client_with_expected_values( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - client.refresh_user_scim_attributes(TEST_USER_ID) - mock_detection_list_user_service.refresh.assert_called_once_with(TEST_USER_ID) - - def test_refresh_user_scim_attributes_returns_response( - self, - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ): - client = DetectionListsClient( - mock_detection_list_user_service, - mock_departing_employee_service, - mock_high_risk_employee_service, - ) - response = client.refresh_user_scim_attributes(TEST_USER_ID) - assert response diff --git a/tests/integration/test_alertrules.py b/tests/integration/test_alertrules.py index 89ac1219..f1976488 100644 --- a/tests/integration/test_alertrules.py +++ b/tests/integration/test_alertrules.py @@ -10,10 +10,6 @@ def rule_id(connection, observer_id): @pytest.mark.integration class TestAlertRules: - def test_rules_add_user(self, connection, new_user, observer_id): - response = connection.alerts.rules.add_user(observer_id, new_user["userUid"]) - assert_successful_response(response) - def test_rules_get_all(self, connection): response_gen = connection.alerts.rules.get_all() for response in response_gen: diff --git a/tests/integration/test_detectionlists.py b/tests/integration/test_detectionlists.py deleted file mode 100644 index ee96595a..00000000 --- a/tests/integration/test_detectionlists.py +++ /dev/null @@ -1,125 +0,0 @@ -from datetime import datetime -from datetime import timedelta - -import pytest -from tests.integration.conftest import assert_successful_response - -from py42.clients.detectionlists import RiskTags - -alias_user = "test_user@test.com" -user_departure_date = datetime.now() + timedelta(days=10) - - -@pytest.mark.integration -class TestDetectionLists: - # During individual file execution, below test should be executed. - def test_create_user(self, connection, new_user): - username = new_user["username"] - response = connection.detectionlists.create_user(username) - assert_successful_response(response) - - def test_get_user_by_id(self, connection, new_user): - response = connection.detectionlists.get_user_by_id(new_user["userUid"]) - assert_successful_response(response) - - def test_get_user(self, connection, new_user): - response = connection.detectionlists.get_user(new_user["username"]) - assert_successful_response(response) - - def test_refresh_user_scim_attributes(self, connection, new_user): - response = connection.detectionlists.refresh_user_scim_attributes( - new_user["userUid"] - ) - assert_successful_response(response) - - def test_departing_employee_get_page(self, connection): - response = connection.detectionlists.departing_employee.get_page(1) - assert_successful_response(response) - - def test_departing_employee_get_all(self, connection): - response_gen = connection.detectionlists.departing_employee.get_all() - for response in response_gen: - assert_successful_response(response) - break - - def test_high_risk_employee_get_all(self, connection): - response_gen = connection.detectionlists.high_risk_employee.get_all() - for response in response_gen: - assert_successful_response(response) - break - - def test_add_user_cloud_alias(self, connection, new_user): - response = connection.detectionlists.add_user_cloud_alias( - new_user["userUid"], alias_user - ) - assert_successful_response(response) - - def test_remove_user_cloud_alias(self, connection, new_user): - response = connection.detectionlists.remove_user_cloud_alias( - new_user["userUid"], alias_user - ) - assert_successful_response(response) - - def test_departing_employee_add(self, connection, new_user): - response = connection.detectionlists.departing_employee.add(new_user["userUid"]) - assert_successful_response(response) - - def test_departing_employee_get(self, connection, new_user): - response = connection.detectionlists.departing_employee.get(new_user["userUid"]) - assert_successful_response(response) - - def test_departing_employee_update_departure_date(self, connection, new_user): - response = connection.detectionlists.departing_employee.update_departure_date( - new_user["userUid"], user_departure_date - ) - assert_successful_response(response) - - def test_add_user_risk_tags(self, connection, new_user): - response = connection.detectionlists.add_user_risk_tags( - new_user["userUid"], RiskTags.FLIGHT_RISK - ) - assert_successful_response(response) - - def test_remove_user_risk_tags(self, connection, new_user): - response = connection.detectionlists.remove_user_risk_tags( - new_user["userUid"], RiskTags.FLIGHT_RISK - ) - assert_successful_response(response) - - def test_update_user_notes(self, connection, new_user): - response = connection.detectionlists.update_user_notes( - new_user["userUid"], "integration test" - ) - assert_successful_response(response) - - def test_departing_employee_remove(self, connection, new_user): - response = connection.detectionlists.departing_employee.remove( - new_user["userUid"] - ) - assert_successful_response(response) - - def test_departing_employee_set_alerts_enabled(self, connection): - response = connection.detectionlists.departing_employee.set_alerts_enabled() - assert_successful_response(response) - - def test_high_risk_employee_add(self, connection, new_user): - response = connection.detectionlists.high_risk_employee.add(new_user["userUid"]) - assert_successful_response(response) - - def test_high_risk_employee_get(self, connection, new_user): - response = connection.detectionlists.high_risk_employee.get(new_user["userUid"]) - assert_successful_response(response) - - def test_high_risk_employee_remove(self, connection, new_user): - response = connection.detectionlists.high_risk_employee.remove( - new_user["userUid"] - ) - assert_successful_response(response) - - def test_high_risk_employee_get_page(self, connection): - response = connection.detectionlists.departing_employee.get_page(1) - assert_successful_response(response) - - def test_high_risk_employee_set_alerts_enabled(self, connection): - response = connection.detectionlists.high_risk_employee.set_alerts_enabled() - assert_successful_response(response) diff --git a/tests/sdk/test_sdk.py b/tests/sdk/test_sdk.py index 04e8806b..51c24b82 100644 --- a/tests/sdk/test_sdk.py +++ b/tests/sdk/test_sdk.py @@ -6,7 +6,6 @@ from py42.clients.archive import ArchiveClient from py42.clients.auditlogs import AuditLogsClient from py42.clients.cases import CasesClient -from py42.clients.detectionlists import DetectionListsClient from py42.exceptions import Py42UnauthorizedError from py42.sdk import from_local_account from py42.sdk import SDKClient @@ -59,10 +58,6 @@ def test_has_alert_service_set(self, py42_connection, mock_auth): client = SDKClient(py42_connection, mock_auth) assert type(client.alerts) == AlertsClient - def test_has_detection_lists_service_set(self, py42_connection, mock_auth): - client = SDKClient(py42_connection, mock_auth) - assert type(client.detectionlists) == DetectionListsClient - def test_has_legal_hold_service_set(self, py42_connection, mock_auth): client = SDKClient(py42_connection, mock_auth) assert type(client.legalhold) == legalhold.LegalHoldService diff --git a/tests/services/detectionlists/__init__.py b/tests/services/detectionlists/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/services/detectionlists/test_departing_employee.py b/tests/services/detectionlists/test_departing_employee.py deleted file mode 100644 index ef38749e..00000000 --- a/tests/services/detectionlists/test_departing_employee.py +++ /dev/null @@ -1,373 +0,0 @@ -from datetime import datetime - -import pytest -from tests.conftest import create_mock_error -from tests.conftest import create_mock_response -from tests.conftest import TENANT_ID_FROM_RESPONSE - -from py42.exceptions import Py42BadRequestError -from py42.exceptions import Py42UserAlreadyAddedError -from py42.exceptions import Py42UserNotOnListError -from py42.services.detectionlists.departing_employee import DepartingEmployeeFilters -from py42.services.detectionlists.departing_employee import DepartingEmployeeService -from py42.services.detectionlists.user_profile import DetectionListUserService -from py42.services.users import UserService - -_TENANT_ID_PARAM = "22222222-2222-2222-2222-222222222222" -_USER_ID = "890973079883949999" - - -_GET_ALL_RESPONSE = """ -{{ - "items": [ - {{ - "type$": "DEPARTING_EMPLOYEE_V2", - "tenantId": "{0}", - "userId": "890973079883949999", - "userName": "test@example.com", - "displayName": "Name", - "notes": "", - "createdAt": "2019-10-25T13:31:14.1199010Z", - "status": "OPEN", - "cloudUsernames": [ - "test@example.com" - ], - "totalBytes": 139856482, - "numEvents": 11 - }} - ], - "totalCount": 1 -}} -""".format( - _TENANT_ID_PARAM -) - -_GET_ALL_EMPTY_RESPONSE = """ -{ - "type$":"DEPARTING_EMPLOYEE_SEARCH_RESPONSE", - "items":[], - "totalCount":0 -} -""" - - -class TestDepartingEmployeeFilters: - def test_choices_are_correct(self): - actual = DepartingEmployeeFilters.choices() - expected = [ - "OPEN", - "LEAVING_TODAY", - "EXFILTRATION_24_HOURS", - "EXFILTRATION_30_DAYS", - ] - assert set(actual) == set(expected) - - -class TestDepartingEmployeeClient: - @pytest.fixture - def mock_get_all_response(self, mocker): - yield create_mock_response(mocker, _GET_ALL_RESPONSE) - - @pytest.fixture - def mock_get_all_response_empty(self, mocker): - yield create_mock_response(mocker, _GET_ALL_EMPTY_RESPONSE) - - @pytest.fixture - def mock_user_client(self, mock_connection, user_context, mocker): - user_client = UserService(mock_connection) - mock_connection.post.return_value = create_mock_response(mocker, "{}") - return user_client - - @pytest.fixture - def mock_detection_list_user_client( - self, mock_connection, user_context, mocker, mock_user_client - ): - user_client = DetectionListUserService( - mock_connection, user_context, mock_user_client - ) - mock_connection.post.return_value = create_mock_response(mocker, "{}") - return user_client - - @pytest.mark.parametrize( - "departing_date", - [("2022-12-20"), (datetime.strptime("2022-12-20", "%Y-%m-%d"))], - ) - def test_add_posts_expected_data_and_to_expected_url( - self, - mock_connection, - user_context, - mock_get_all_response, - mock_detection_list_user_client, - departing_date, - ): - client = DepartingEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - user_context.get_current_tenant_id.return_value = _TENANT_ID_PARAM - # Return value should have been set based on the arguments passed - # in add, here however as we are mocking it, it doesn't matter. Can be refactored - mock_connection.post.return_value = mock_get_all_response - client.add(_USER_ID, departing_date) - - # Have to convert the request data to a dict because - # older versions of Python don't have deterministic order. - posted_data = mock_connection.post.call_args[1]["json"] - assert ( - posted_data["userId"] == _USER_ID - and posted_data["tenantId"] == _TENANT_ID_PARAM - and posted_data["departureDate"] == "2022-12-20" - ) - assert mock_connection.post.call_args[0][0] == "v2/departingemployee/add" - assert mock_connection.post.call_count == 1 - - def test_add_when_user_already_on_list_raises_user_already_added_error( - self, mocker, mock_connection, user_context, mock_detection_list_user_client - ): - def side_effect(url, json): - if "add" in url: - raise create_mock_error( - Py42BadRequestError, mocker, "User already on list" - ) - - mock_connection.post.side_effect = side_effect - client = DepartingEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - with pytest.raises(Py42UserAlreadyAddedError) as err: - client.add("user_id") - - expected = "User with ID user_id is already on the departing-employee list." - assert expected in str(err.value) - assert err.value.user_id == "user_id" - - def test_remove_posts_expected_data_and_to_expected_url( - self, - mock_connection, - user_context, - mock_get_all_response_empty, - mock_detection_list_user_client, - ): - client = DepartingEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - mock_connection.post.return_value = mock_get_all_response_empty - client.remove("999") - - # Have to convert the request data to a dict because - # older versions of Python don't have deterministic order. - posted_data = mock_connection.post.call_args[1]["json"] - assert ( - posted_data["userId"] == "999" - and posted_data["tenantId"] == TENANT_ID_FROM_RESPONSE - ) - assert mock_connection.post.call_args[0][0] == "v2/departingemployee/remove" - - def test_get_all_posts_expected_data_to_expected_url( - self, - mock_connection, - user_context, - mock_get_all_response, - mock_detection_list_user_client, - ): - client = DepartingEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - mock_connection.post.return_value = mock_get_all_response - for _ in client.get_all(): - break - first_call = mock_connection.post.call_args_list[0] - posted_data = first_call[1]["json"] - assert ( - posted_data["tenantId"] == TENANT_ID_FROM_RESPONSE - and posted_data["pgSize"] == 100 - and posted_data["pgNum"] == 1 - and posted_data["filterType"] == "OPEN" - and posted_data["srtKey"] == "CREATED_AT" - and posted_data["srtDirection"] == "DESC" - ) - assert mock_connection.post.call_args[0][0] == "v2/departingemployee/search" - assert mock_connection.post.call_count == 1 - - def test_get_all_posts_expected_data_with_non_default_values( - self, user_context, mock_connection, mock_detection_list_user_client - ): - client = DepartingEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - - for _ in client.get_all( - filter_type="NEW_FILTER", - sort_direction="DESC", - sort_key="DISPLAY_NAME", - page_size=200, - ): - break - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/departingemployee/search" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["filterType"] == "NEW_FILTER" - and posted_data["pgNum"] == 1 - and posted_data["pgSize"] == 200 - and posted_data["srtKey"] == "DISPLAY_NAME" - and posted_data["srtDirection"] == "DESC" - ) - - def test_get_posts_expected_data_to_expected_url( - self, - mock_connection, - user_context, - mock_get_all_response_empty, - mock_detection_list_user_client, - ): - client = DepartingEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - mock_connection.post.return_value = mock_get_all_response_empty - client.get("999") - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_args[0][0] == "v2/departingemployee/get" - assert ( - posted_data["tenantId"] == TENANT_ID_FROM_RESPONSE - and posted_data["userId"] == "999" - ) - - def test_get_page_posts_data_to_expected_url( - self, - mock_connection, - user_context, - mock_get_all_response, - mock_detection_list_user_client, - ): - client = DepartingEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - client.get_page( - filter_type="OPEN", - sort_key="CREATED_AT", - sort_direction="DESC", - page_num=1, - page_size=100, - ) - mock_connection.post.return_value = mock_get_all_response - first_call = mock_connection.post.call_args_list[0] - posted_data = first_call[1]["json"] - assert ( - posted_data["tenantId"] == TENANT_ID_FROM_RESPONSE - and posted_data["pgSize"] == 100 - and posted_data["pgNum"] == 1 - and posted_data["filterType"] == "OPEN" - and posted_data["srtKey"] == "CREATED_AT" - and posted_data["srtDirection"] == "DESC" - ) - assert mock_connection.post.call_args[0][0] == "v2/departingemployee/search" - assert mock_connection.post.call_count == 1 - - def test_set_alerts_enabled_posts_expected_data( - self, - mock_connection, - user_context, - mock_get_all_response_empty, - mock_detection_list_user_client, - ): - client = DepartingEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - mock_connection.post.return_value = mock_get_all_response_empty - client.set_alerts_enabled() - - posted_data = mock_connection.post.call_args[1]["json"] - assert ( - posted_data["tenantId"] == TENANT_ID_FROM_RESPONSE - and posted_data["alertsEnabled"] is True - ) - - def test_set_alerts_enabled_posts_to_expected_url( - self, - mock_connection, - user_context, - mock_get_all_response_empty, - mock_detection_list_user_client, - ): - client = DepartingEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - mock_connection.post.return_value = mock_get_all_response_empty - client.set_alerts_enabled() - assert ( - mock_connection.post.call_args[0][0] == "v2/departingemployee/setalertstate" - ) - - def test_update_posts_expected_data( - self, - mock_connection, - user_context, - mock_get_all_response, - mock_detection_list_user_client, - ): - client = DepartingEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - mock_connection.post.return_value = mock_get_all_response - client.update_departure_date(_USER_ID, "2020-12-20") - - # Have to convert the request data to a dict because - # older versions of Python don't have deterministic order. - posted_data = mock_connection.post.call_args[1]["json"] - assert ( - posted_data["userId"] == _USER_ID - and posted_data["tenantId"] == TENANT_ID_FROM_RESPONSE - and posted_data["departureDate"] == "2020-12-20" - ) - - def test_update_posts_to_expected_url( - self, mock_connection, user_context, mock_detection_list_user_client - ): - client = DepartingEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - client.update_departure_date(_USER_ID, "2022-12-20") - assert mock_connection.post.call_args[0][0] == "v2/departingemployee/update" - - def test_update_posts_expected_data_with_datetime_instance( - self, - mock_connection, - user_context, - mock_get_all_response, - mock_detection_list_user_client, - ): - client = DepartingEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - mock_connection.post.return_value = mock_get_all_response - dt = datetime.strptime("2020-12-20", "%Y-%m-%d") - client.update_departure_date(_USER_ID, dt) - - # Have to convert the request data to a dict because - # older versions of Python don't have deterministic order. - posted_data = mock_connection.post.call_args[1]["json"] - assert ( - posted_data["userId"] == _USER_ID - and posted_data["tenantId"] == TENANT_ID_FROM_RESPONSE - and posted_data["departureDate"] == "2020-12-20" - ) - - def test_remove_raises_error_when_user_id_does_not_exist( - self, - user_context, - mock_post_not_found_session, - mock_detection_list_user_client, - ): - departing_employee_client = DepartingEmployeeService( - mock_post_not_found_session, user_context, mock_detection_list_user_client - ) - user_id = "942897397520289999" - with pytest.raises(Py42UserNotOnListError) as err: - departing_employee_client.remove(user_id) - assert ( - f"User with ID '{user_id}' is not currently on the departing-employee list." - in str(err.value) - ) diff --git a/tests/services/detectionlists/test_high_risk_employee.py b/tests/services/detectionlists/test_high_risk_employee.py deleted file mode 100644 index 2f435f19..00000000 --- a/tests/services/detectionlists/test_high_risk_employee.py +++ /dev/null @@ -1,254 +0,0 @@ -import pytest -from tests.conftest import create_mock_error -from tests.conftest import create_mock_response - -from py42.exceptions import Py42BadRequestError -from py42.exceptions import Py42NotFoundError -from py42.exceptions import Py42UserAlreadyAddedError -from py42.services.detectionlists.high_risk_employee import HighRiskEmployeeFilters -from py42.services.detectionlists.high_risk_employee import HighRiskEmployeeService -from py42.services.detectionlists.user_profile import DetectionListUserService -from py42.services.users import UserService - -CREATE_USER_SAMPLE_RESPONSE = """ - {"type$":"USER_V2","tenantId":"1d71796f-af5b-4231-9d8e-df6434da4663", - "userId":"942897397520289999", - "userName":"test.employee@example.com", - "displayName":"Test Employee", - "cloudUsernames":["test.employee@test.com"],"riskFactors":[]} -""" - - -class TestHighRiskEmployeeFilters: - def test_choices_are_correct(self): - actual = HighRiskEmployeeFilters.choices() - expected = ["OPEN", "EXFILTRATION_24_HOURS", "EXFILTRATION_30_DAYS"] - assert set(actual) == set(expected) - - -class TestHighRiskEmployeeClient: - @pytest.fixture - def mock_connection_post_success(self, mock_connection, mocker): - response = create_mock_response(mocker, CREATE_USER_SAMPLE_RESPONSE, 201) - mock_connection.post.return_value = response - return mock_connection - - @pytest.fixture - def mock_user_client(self, mock_connection, user_context, mocker): - user_client = UserService(mock_connection) - mock_connection.post.return_value = create_mock_response(mocker, "{}") - return user_client - - @pytest.fixture - def mock_detection_list_user_client( - self, mock_connection, user_context, mocker, mock_user_client - ): - user_client = DetectionListUserService( - mock_connection, user_context, mock_user_client - ) - mock_connection.post.return_value = create_mock_response(mocker, "{}") - return user_client - - def test_add_posts_expected_data( - self, - user_context, - mock_connection_post_success, - mock_detection_list_user_client, - ): - high_risk_employee_client = HighRiskEmployeeService( - mock_connection_post_success, user_context, mock_detection_list_user_client - ) - high_risk_employee_client.add("942897397520289999") - - posted_data = mock_connection_post_success.post.call_args[1]["json"] - assert mock_connection_post_success.post.call_count == 1 - assert ( - mock_connection_post_success.post.call_args[0][0] - == "v2/highriskemployee/add" - ) - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["userId"] == "942897397520289999" - ) - - def test_add_when_user_already_on_list_raises_user_already_added_error( - self, mocker, mock_connection, user_context, mock_detection_list_user_client - ): - def side_effect(url, json): - if "add" in url: - raise create_mock_error( - Py42BadRequestError, mocker, "User already on list" - ) - - mock_connection.post.side_effect = side_effect - client = HighRiskEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - with pytest.raises(Py42UserAlreadyAddedError) as err: - client.add("user_id") - - expected = "User with ID user_id is already on the high-risk-employee list." - assert expected in str(err.value) - assert err.value.user_id == "user_id" - - def test_set_alerts_enabled_posts_expected_data_with_default_value( - self, user_context, mock_connection, mock_detection_list_user_client - ): - high_risk_employee_client = HighRiskEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - high_risk_employee_client.set_alerts_enabled() - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert ( - mock_connection.post.call_args[0][0] == "v2/highriskemployee/setalertstate" - ) - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["alertsEnabled"] is True - ) - - def test_set_alerts_enabled_posts_expected_data( - self, user_context, mock_connection, mock_detection_list_user_client - ): - high_risk_employee_client = HighRiskEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - high_risk_employee_client.set_alerts_enabled(False) - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert ( - mock_connection.post.call_args[0][0] == "v2/highriskemployee/setalertstate" - ) - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["alertsEnabled"] is False - ) - - def test_remove_posts_expected_data( - self, user_context, mock_connection, mock_detection_list_user_client - ): - high_risk_employee_client = HighRiskEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - high_risk_employee_client.remove("942897397520289999") - - posted_data = mock_connection.post.call_args[1]["json"] - - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/highriskemployee/remove" - assert posted_data["tenantId"] == user_context.get_current_tenant_id() - assert posted_data["userId"] == "942897397520289999" - - def test_get_posts_expected_data( - self, - user_context, - mock_connection, - mock_detection_list_user_client, - ): - high_risk_employee_client = HighRiskEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - high_risk_employee_client.get("942897397520289999") - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/highriskemployee/get" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["userId"] == "942897397520289999" - ) - - def test_get_all_posts_expected_data( - self, - user_context, - mock_connection, - mock_detection_list_user_client, - ): - high_risk_employee_client = HighRiskEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - for _ in high_risk_employee_client.get_all(): - break - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/highriskemployee/search" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["filterType"] == "OPEN" - and posted_data["pgNum"] == 1 - and posted_data["pgSize"] == 100 - and posted_data["srtKey"] is None - and posted_data["srtDirection"] is None - ) - - def test_get_all_posts_expected_data_with_non_default_values( - self, user_context, mock_connection, mock_detection_list_user_client - ): - high_risk_employee_client = HighRiskEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - for _ in high_risk_employee_client.get_all( - filter_type="NEW_FILTER", - sort_direction="DESC", - sort_key="DISPLAY_NAME", - page_size=200, - ): - break - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/highriskemployee/search" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["filterType"] == "NEW_FILTER" - and posted_data["pgNum"] == 1 - and posted_data["pgSize"] == 200 - and posted_data["srtKey"] == "DISPLAY_NAME" - and posted_data["srtDirection"] == "DESC" - ) - - def test_get_page_posts_data_to_expected_url( - self, user_context, mock_connection, mock_detection_list_user_client - ): - high_risk_employee_client = HighRiskEmployeeService( - mock_connection, user_context, mock_detection_list_user_client - ) - high_risk_employee_client.get_page( - filter_type="NEW_FILTER", - page_num=3, - page_size=10, - sort_direction="DESC", - sort_key="DISPLAY_NAME", - ) - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/highriskemployee/search" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["filterType"] == "NEW_FILTER" - and posted_data["pgNum"] == 3 - and posted_data["pgSize"] == 10 - and posted_data["srtKey"] == "DISPLAY_NAME" - and posted_data["srtDirection"] == "DESC" - ) - - def test_remove_raises_error_when_user_id_does_not_exist( - self, - user_context, - mock_post_not_found_session, - mock_detection_list_user_client, - ): - high_risk_employee_client = HighRiskEmployeeService( - mock_post_not_found_session, user_context, mock_detection_list_user_client - ) - user_id = "942897397520289999" - with pytest.raises(Py42NotFoundError) as err: - high_risk_employee_client.remove(user_id) - assert ( - f"User with ID '{user_id}' is not currently on the high-risk-employee list." - in str(err.value) - ) diff --git a/tests/services/detectionlists/test_user_profile.py b/tests/services/detectionlists/test_user_profile.py deleted file mode 100644 index 6cd4fead..00000000 --- a/tests/services/detectionlists/test_user_profile.py +++ /dev/null @@ -1,210 +0,0 @@ -import pytest -from tests.conftest import create_mock_error -from tests.conftest import create_mock_response - -from py42.exceptions import Py42BadRequestError -from py42.exceptions import Py42CloudAliasCharacterLimitExceededError -from py42.exceptions import Py42CloudAliasLimitExceededError -from py42.services.detectionlists.user_profile import DetectionListUserService -from py42.services.users import UserService - -CLOUD_ALIAS_LIMIT_EXCEEDED_ERROR_MESSAGE = """{ -"pop-bulletin": { -"type$": "com.code42.detectionlistmanagement.DetectionListMessages.ValidationError", -"text$": "Command or Query is invalid: Cloud usernames must be less than or equal to 2", -"details": [], -"reason": "Cloud usernames must be less than or equal to 2" -} -}""" - - -class TestDetectionListUserClient: - @pytest.fixture - def mock_user_client(self, mock_connection, user_context, mocker): - user_client = UserService(mock_connection) - mock_connection.get.return_value = create_mock_response( - mocker, '{"username":"username"}' - ) - return user_client - - @pytest.fixture - def mock_get_by_id_fails(self, mocker, mock_connection): - mock_connection.post.side_effect = create_mock_error( - Py42BadRequestError, mocker, "" - ) - return mock_connection - - @pytest.fixture - def mock_user_client_raises_exception( - self, - mocker, - mock_connection, - user_context, - ): - user_client = UserService(mock_connection) - mock_connection.post.side_effect = create_mock_error( - Py42BadRequestError, mocker, "" - ) - return user_client - - def test_get_posts_expected_data( - self, mock_connection, user_context, mock_user_client - ): - detection_list_user_client = DetectionListUserService( - mock_connection, user_context, mock_user_client - ) - detection_list_user_client.get("942897397520289999") - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/user/getbyusername" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["username"] == "942897397520289999" - ) - - def test_get_by_id_posts_expected_data( - self, mock_connection, user_context, mock_user_client - ): - detection_list_user_client = DetectionListUserService( - mock_connection, user_context, mock_user_client - ) - detection_list_user_client.get_by_id("942897397520289999") - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/user/getbyid" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["userId"] == "942897397520289999" - ) - - def test_update_notes_posts_expected_data( - self, mock_connection, user_context, mock_user_client - ): - detection_list_user_client = DetectionListUserService( - mock_connection, user_context, mock_user_client - ) - detection_list_user_client.update_notes("942897397520289999", "Test") - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/user/updatenotes" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["userId"] == "942897397520289999" - and posted_data["notes"] == "Test" - ) - - @pytest.mark.parametrize("tags", ["test_tag", ["test_tag"]]) - def test_add_risk_tag_posts_expected_data( - self, mock_connection, user_context, mock_user_client, tags - ): - detection_list_user_client = DetectionListUserService( - mock_connection, user_context, mock_user_client - ) - detection_list_user_client.add_risk_tags("942897397520289999", tags) - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/user/addriskfactors" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["userId"] == "942897397520289999" - and posted_data["riskFactors"] == ["test_tag"] - ) - - @pytest.mark.parametrize("tags", ["test_tag", ["test_tag"]]) - def test_remove_risk_tag_posts_expected_data( - self, mock_connection, user_context, mock_user_client, tags - ): - detection_list_user_client = DetectionListUserService( - mock_connection, user_context, mock_user_client - ) - detection_list_user_client.remove_risk_tags("942897397520289999", "Test") - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/user/removeriskfactors" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["userId"] == "942897397520289999" - and posted_data["riskFactors"] == ["Test"] - ) - - def test_add_cloud_alias_posts_expected_data( - self, mock_connection, user_context, mock_user_client - ): - detection_list_user_client = DetectionListUserService( - mock_connection, user_context, mock_user_client - ) - detection_list_user_client.add_cloud_alias("942897397520289999", "Test") - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/user/addcloudusernames" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["userId"] == "942897397520289999" - and posted_data["cloudUsernames"] == ["Test"] - ) - - def test_remove_cloud_alias_posts_expected_data( - self, mock_connection, user_context, mock_user_client - ): - detection_list_user_client = DetectionListUserService( - mock_connection, user_context, mock_user_client - ) - detection_list_user_client.remove_cloud_alias("942897397520289999", "Test") - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/user/removecloudusernames" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["userId"] == "942897397520289999" - and posted_data["cloudUsernames"] == ["Test"] - ) - - def test_refresh_posts_expected_data( - self, mock_connection, user_context, mock_user_client - ): - detection_list_user_client = DetectionListUserService( - mock_connection, user_context, mock_user_client - ) - detection_list_user_client.refresh("942897397520289999") - - posted_data = mock_connection.post.call_args[1]["json"] - assert mock_connection.post.call_count == 1 - assert mock_connection.post.call_args[0][0] == "v2/user/refresh" - assert ( - posted_data["tenantId"] == user_context.get_current_tenant_id() - and posted_data["userId"] == "942897397520289999" - ) - - def test_add_cloud_alias_limit_raises_custom_error_on_limit( - self, mocker, mock_connection, user_context, mock_user_client - ): - detection_list_user_client = DetectionListUserService( - mock_connection, user_context, mock_user_client - ) - text = "Cloud usernames must be less than or equal to 2" - mock_connection.post.side_effect = create_mock_error( - Py42BadRequestError, mocker, text - ) - with pytest.raises(Py42CloudAliasLimitExceededError) as err: - detection_list_user_client.add_cloud_alias("942897397520289999", "Test") - assert "Cloud alias limit exceeded." in str(err.value) - - def test_add_cloud_alias_when_over_character_limit_raises_custom_error( - self, mock_connection, user_context, mock_user_client - ): - detection_list_user_client = DetectionListUserService( - mock_connection, user_context, mock_user_client - ) - with pytest.raises(Py42CloudAliasCharacterLimitExceededError) as err: - detection_list_user_client.add_cloud_alias( - "942897397520289999", - "a-very-long-cloud-alias-which-exceeds-the-character-limit-of-fifty-characters-per-alias", - ) - assert "Cloud alias character limit exceeded." in str(err.value) diff --git a/tests/services/test_alert_rules.py b/tests/services/test_alert_rules.py index 401fb83a..327588ce 100644 --- a/tests/services/test_alert_rules.py +++ b/tests/services/test_alert_rules.py @@ -5,44 +5,60 @@ from py42.exceptions import Py42InvalidRuleError from py42.exceptions import Py42NotFoundError from py42.services.alertrules import AlertRulesService -from py42.services.detectionlists.user_profile import DetectionListUserService +from py42.services.userriskprofile import UserRiskProfileService -MOCK_DETECTION_LIST_GET_RESPONSE = """ -{"type$": "USER_V2", "tenantId": "1d71796f-af5b-4231-9d8e-df6434da4663", -"userId": "942897397520286581", "userName": "email@test.com", "displayName": "First Name", -"notes": "tests and more tests", "cloudUsernames": ["user.aliases@code42.com"], "riskFactors": []} +MOCK_USER_GET_RESPONSE = """ +{ + "active": true, + "cloudAliases": [ + "user.aliases@code42.com" + ], + "country": "usa", + "deleted": false, + "department": "engineering", + "displayName": "User Name", + "division": "test", + "employmentType": "contract", + "endDate": { + "day": 10, + "month": 20, + "year": 2030 + }, + "locality": "midwest", + "managerDisplayName": "My Manager", + "managerId": "123-manager", + "managerUsername": "manager@email.com", + "notes": "my notes", + "region": "us", + "startDate": { + "day": 1, + "month": 20, + "year": 2020 + }, + "supportUser": true, + "tenantId": "123-456", + "title": "title", + "userId": "user-id", + "username": "username@code42.com" +} """ @pytest.fixture -def mock_detection_list_user_service(mocker): - response = create_mock_response(mocker, MOCK_DETECTION_LIST_GET_RESPONSE) - detection_list_user_service = mocker.MagicMock(spec=DetectionListUserService) - detection_list_user_service.get_by_id.return_value = response - return detection_list_user_service - - -@pytest.fixture -def mock_detection_list_post_failure_when_invalid_rule_id(mocker, mock_connection): - response = mocker.MagicMock(spec=Response) - response.status_code = 400 - exception = mocker.MagicMock(spec=Py42NotFoundError) - exception.response = response - mock_connection.post.side_effect = Py42NotFoundError(exception, "") - detection_list_user_service = mocker.MagicMock(spec=DetectionListUserService) - detection_list_user_service.get_by_id.return_value = create_mock_response( - mocker, "{}" - ) - return detection_list_user_service +def mock_user_profile_service(mocker): + response = create_mock_response(mocker, MOCK_USER_GET_RESPONSE) + service = mocker.MagicMock(spec=UserRiskProfileService) + service.get_by_id.return_value = response + return service class TestAlertRulesService: def test_add_user_posts_expected_data( - self, mock_connection, user_context, mock_detection_list_user_service + self, mock_connection, user_context, mock_user_profile_service ): alert_rule_service = AlertRulesService( - mock_connection, user_context, mock_detection_list_user_service + mock_connection, user_context, mock_user_profile_service ) alert_rule_service.add_user("rule-id", "user-id") @@ -58,10 +74,10 @@ def test_add_user_posts_expected_data( ) def test_remove_user_posts_expected_data( - self, mock_connection, user_context, mock_detection_list_user_service + self, mock_connection, user_context, mock_user_profile_service ): alert_rule_service = AlertRulesService( - mock_connection, user_context, mock_detection_list_user_service + mock_connection, user_context, mock_user_profile_service ) alert_rule_service.remove_user("rule-id", "user-id") @@ -75,10 +91,10 @@ def test_remove_user_posts_expected_data( ) def test_remove_all_users_posts_expected_data( - self, mock_connection, user_context, mock_detection_list_user_service + self, mock_connection, user_context, mock_user_profile_service ): alert_rule_service = AlertRulesService( - mock_connection, user_context, mock_detection_list_user_service + mock_connection, user_context, mock_user_profile_service ) alert_rule_service.remove_all_users("rule-id") @@ -94,14 +110,20 @@ def test_remove_all_users_posts_expected_data( def test_add_user_raises_valid_exception_when_rule_id_is_invalid( self, + mocker, mock_connection, user_context, - mock_detection_list_post_failure_when_invalid_rule_id, + mock_user_profile_service, ): + response = mocker.MagicMock(spec=Response) + response.status_code = 400 + exception = mocker.MagicMock(spec=Py42NotFoundError) + exception.response = response + mock_connection.post.side_effect = Py42NotFoundError(exception, "") alert_rule_service = AlertRulesService( mock_connection, user_context, - mock_detection_list_post_failure_when_invalid_rule_id, + mock_user_profile_service, ) with pytest.raises(Py42InvalidRuleError) as e: alert_rule_service.add_user("invalid-rule-id", "user-id")