Skip to content

Commit

Permalink
Refactored to the new pattern. Closes #210.
Browse files Browse the repository at this point in the history
  • Loading branch information
jshcodes committed Jul 29, 2021
1 parent af37af1 commit 26dfa63
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 66 deletions.
94 changes: 45 additions & 49 deletions src/falconpy/detects.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
For more information, please refer to <https://unlicense.org>
"""
from ._util import service_request
# pylint: disable=C0103 # Aligning method names to API operation IDs
from ._util import service_request, args_to_params, force_default
from ._service_class import ServiceClass
from ._endpoint._detects import _detects_endpoints as Endpoints


class Detects(ServiceClass):
Expand All @@ -47,87 +49,81 @@ class Detects(ServiceClass):
def GetAggregateDetects(self: object, body: dict) -> dict:
""" Get detect aggregates as specified via json in request body. """
# [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/detects/GetAggregateDetects
FULL_URL = self.base_url+'/detects/aggregates/detects/GET/v1'
HEADERS = self.headers
BODY = body
# VALIDATOR = {"resources": list} # TODO: Confirm body payload format
# REQUIRED = ["resources"]
operation_id = "GetAggregateDetects"
target_url = f"{self.base_url}{[ep[2] for ep in Endpoints if operation_id in ep[0]][0]}"
header_payload = self.headers
body_payload = body
returned = service_request(caller=self,
method="POST",
endpoint=FULL_URL,
headers=HEADERS,
body=BODY,
# body_validator=VALIDATOR,
# body_required=REQUIRED,
endpoint=target_url,
body=body_payload,
headers=header_payload,
verify=self.ssl_verify
)
return returned

def UpdateDetectsByIdsV2(self: object, body: dict) -> dict:
""" Modify the state, assignee, and visibility of detections. """
# [PATCH] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/detects/UpdateDetectsByIdsV2
FULL_URL = self.base_url+'/detects/entities/detects/v2'
HEADERS = self.headers
BODY = body
VALIDATOR = {
operation_id = "UpdateDetectsByIdsV2"
target_url = f"{self.base_url}{[ep[2] for ep in Endpoints if operation_id in ep[0]][0]}"
header_payload = self.headers
body_payload = body
body_validator = {
"assigned_to_uuid": str,
"ids": list,
"show_in_ui": bool,
"status": str,
"comment": str
}
REQUIRED = ["ids"]
body_required = ["ids"]
returned = service_request(caller=self,
method="PATCH",
endpoint=FULL_URL,
headers=HEADERS,
body=BODY,
body_validator=VALIDATOR,
body_required=REQUIRED,
endpoint=target_url,
body=body_payload,
body_validator=body_validator,
body_required=body_required,
headers=header_payload,
verify=self.ssl_verify
)
return returned

def GetDetectSummaries(self: object, body: dict) -> dict:
""" View information about detections. """
# [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/detects/GetDetectSummaries
FULL_URL = self.base_url+'/detects/entities/summaries/GET/v1'
HEADERS = self.headers
BODY = body
VALIDATOR = {"ids": list} # TODO: Confirm list datatype for ingested IDs via the body payload
REQUIRED = ["ids"]
operation_id = "GetDetectSummaries"
target_url = f"{self.base_url}{[ep[2] for ep in Endpoints if operation_id in ep[0]][0]}"
header_payload = self.headers
body_payload = body
body_validator = {"ids": list}
body_required = ["ids"]
returned = service_request(caller=self,
method="POST",
endpoint=FULL_URL,
headers=HEADERS,
body=BODY,
body_validator=VALIDATOR,
body_required=REQUIRED,
endpoint=target_url,
body=body_payload,
body_validator=body_validator,
body_required=body_required,
headers=header_payload,
verify=self.ssl_verify
)
return returned

def QueryDetects(self: object, parameters: dict = None) -> dict:
""" Search for detection IDs that match a given query. """
@force_default(defaults=["parameters"], default_types=["dict"])
def QueryDetects(self: object, parameters: dict = None, **kwargs) -> dict:
"""
Search for detection IDs that match a given query.
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/detects/QueryDetects
FULL_URL = self.base_url+'/detects/queries/detects/v1'
HEADERS = self.headers
if parameters is None:
parameters = {}
PARAMS = parameters
VALIDATOR = {
"limit": int,
"offset": int,
"sort": str,
"filter": str,
"q": str
}
operation_id = "QueryDetects"
target_url = f"{self.base_url}{[ep[2] for ep in Endpoints if operation_id in ep[0]][0]}"
header_payload = self.headers
parameter_payload = args_to_params(parameters, kwargs, Endpoints, operation_id)
returned = service_request(caller=self,
method="GET",
endpoint=FULL_URL,
params=PARAMS,
headers=HEADERS,
params_validator=VALIDATOR,
endpoint=target_url,
params=parameter_payload,
headers=header_payload,
verify=self.ssl_verify
)

return returned
42 changes: 25 additions & 17 deletions tests/test_detects.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
# test_detects.py
# This class tests the detects service class

import json
import os
import sys
import pytest
# Authentication via the test_authorization.py
from tests import test_authorization as Authorization

#Import our sibling src folder into the path
# Import our sibling src folder into the path
sys.path.append(os.path.abspath('src'))
# Classes to test - manually imported from sibling folder
from falconpy import detects as FalconDetections

auth = Authorization.TestAuthorization()
auth.serviceAuth()
falcon = FalconDetections.Detects(access_token=auth.token)
AllowedResponses = [200, 429] #Adding rate-limiting as an allowed response for now
AllowedResponses = [200, 429] # Adding rate-limiting as an allowed response for now


class TestDetects:

def serviceDetects_QueryDetects(self):
if falcon.QueryDetects(parameters={"limit":1})["status_code"] in AllowedResponses:
if falcon.QueryDetects(parameters={"limit": 1})["status_code"] in AllowedResponses:
return True
else:
return False

def serviceDetects_GetDetectSummaries(self):
if falcon.GetDetectSummaries(body={"ids":falcon.QueryDetects(parameters={"limit":1})["body"]["resources"]})["status_code"] in AllowedResponses:
if falcon.GetDetectSummaries(
body={"ids": falcon.QueryDetects(parameters={"limit": 1})["body"]["resources"]}
)["status_code"] in AllowedResponses:
return True
else:
return False
Expand All @@ -43,29 +44,36 @@ def serviceDetects_GetDetectSummaries(self):
def serviceDetects_GenerateErrors(self):
falcon.base_url = "nowhere"
errorChecks = True
if falcon.QueryDetects()["status_code"] != 500:
if falcon.QueryDetects()["status_code"] not in [400, 500]:
errorChecks = False
if falcon.GetDetectSummaries(body={"ids": {"oops": False}})["status_code"] not in [400, 500]:
errorChecks = False
if falcon.GetDetectSummaries(body={})["status_code"] != 500:
if falcon.GetAggregateDetects(body={"resource": {"bad": True}})["status_code"] not in [400, 500]:
errorChecks = False
if falcon.GetAggregateDetects(body={})["status_code"] != 500:
if falcon.UpdateDetectsByIdsV2(body={"bananas": "Are yellow or green"})["status_code"] not in [400, 500]:
errorChecks = False
if falcon.UpdateDetectsByIdsV2(body={})["status_code"] != 500:
if falcon.GetDetectSummaries(body={"something": "something"})["status_code"] not in [400, 500]:
errorChecks = False

if falcon.GetDetectSummaries(body={"something": "something", "ids": "12345678"})["status_code"] not in [400, 500]:
errorChecks = False

# else:
# print(f"Correct fail {falcon.GetDetectSummaries(body={'something': 'something'})['status_code']}")
# print(falcon.GetDetectSummaries(body={"something": "something"}))
return errorChecks

def test_QueryDetects(self):
assert self.serviceDetects_QueryDetects() == True
@pytest.mark.skipif(falcon.QueryDetects(parameters={"limit":1})["status_code"] == 429, reason="API rate limit reached")
assert self.serviceDetects_QueryDetects() is True

@pytest.mark.skipif(falcon.QueryDetects(parameters={"limit": 1})["status_code"] == 429, reason="API rate limit reached")
def test_GetDetectSummaries(self):
assert self.serviceDetects_GetDetectSummaries() == True
assert self.serviceDetects_GetDetectSummaries() is True

# def test_GetAggregateDetects(self):
# assert self.serviceDetects_GetAggregateDetects() == True

def test_logout(self):
assert auth.serviceRevoke() == True
assert auth.serviceRevoke() is True

def test_Errors(self):
assert self.serviceDetects_GenerateErrors() == True
assert self.serviceDetects_GenerateErrors() is True

0 comments on commit 26dfa63

Please sign in to comment.