Skip to content

Commit

Permalink
Refactored to the new pattern. Closes #218.
Browse files Browse the repository at this point in the history
  • Loading branch information
jshcodes committed Jul 31, 2021
1 parent fae70f6 commit eec1dd8
Showing 1 changed file with 84 additions and 66 deletions.
150 changes: 84 additions & 66 deletions src/falconpy/hosts.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,40 @@
For more information, please refer to <https://unlicense.org>
"""
from ._util import service_request, parse_id_list, generate_error_result
# pylint: disable=C0103 # Aligning method names to API operation IDs
from ._util import service_request, generate_error_result, force_default, args_to_params
from ._service_class import ServiceClass
from ._endpoint._hosts import _hosts_endpoints as Endpoints


class Hosts(ServiceClass):
""" The only requirement to instantiate an instance of this class
is a valid token provided by the Falcon API SDK OAuth2 class.
"""
def PerformActionV2(self: object, parameters: dict, body: dict, action_name: str = None) -> dict:
""" Take various actions on the hosts in your environment.
Contain or lift containment on a host. Delete or restore a host.
The only requirement to instantiate an instance of this class
is a valid token provided by the Falcon API SDK OAuth2 class.
"""
@force_default(defaults=["parameters"], default_types=["dict"])
def PerformActionV2(self: object, body: dict, parameters: dict = None, **kwargs) -> dict:
"""
Take various actions on the hosts in your environment.
Contain or lift containment on a host. Delete or restore a host.
"""
if "action_name" in parameters:
action_name = parameters["action_name"].lower()
# [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/hosts/PerformActionV2
ALLOWED_ACTIONS = ['contain', 'lift_containment', 'hide_host', 'unhide_host']
if action_name.lower() in ALLOWED_ACTIONS:
FULL_URL = self.base_url+'/devices/entities/devices-actions/v2'
HEADERS = self.headers
PARAMS = parameters
BODY = body
_allowed_actions = ['contain', 'lift_containment', 'hide_host', 'unhide_host']
operation_id = "PerformActionV2"
target_url = f"{self.base_url}{[ep[2] for ep in Endpoints if operation_id in ep[0]][0]}"
header_payload = self.headers
body_payload = body
parameter_payload = args_to_params(parameters, kwargs, Endpoints, operation_id)
if "action_name" not in parameter_payload:
parameter_payload["action_name"] = "Not Specified"
# Only process allowed actions
if parameter_payload["action_name"].lower() in _allowed_actions:
returned = service_request(caller=self,
method="POST",
endpoint=FULL_URL,
params=PARAMS,
body=BODY,
headers=HEADERS,
endpoint=target_url,
body=body_payload,
params=parameter_payload,
headers=header_payload,
verify=self.ssl_verify
)
else:
Expand All @@ -72,13 +79,14 @@ def PerformActionV2(self: object, parameters: dict, body: dict, action_name: str

def UpdateDeviceTags(self: object, action_name: str, ids: list or str, tags: list or str) -> dict:
"""
allows for tagging hosts. If the tags are empty
Allows for tagging hosts. If the tags are empty
"""
ALLOWED_ACTIONS = ["add", "remove"]
_allowed_actions = ["add", "remove"]
# validate action is allowed AND tags is "something"
if action_name.lower() in ALLOWED_ACTIONS and tags is not None:
FULL_URL = self.base_url + '/devices/entities/devices/tags/v1'
HEADERS = self.headers
if action_name.lower() in _allowed_actions and tags is not None:
operation_id = "UpdateDeviceTags"
target_url = f"{self.base_url}{[ep[2] for ep in Endpoints if operation_id in ep[0]][0]}"
header_payload = self.headers
# convert ids/tags to be a list object if not already
if isinstance(ids, str):
ids = ids.split(",")
Expand All @@ -92,86 +100,96 @@ def UpdateDeviceTags(self: object, action_name: str, ids: list or str, tags: lis
else:
tag_name = "FalconGroupingTags/" + tag
patch_tag.append(tag_name)
BODY = {
body_payload = {
"action": action_name,
"device_ids": ids,
"tags": patch_tag
}
returned = service_request(caller=self,
method="PATCH",
endpoint=FULL_URL,
body=BODY,
headers=HEADERS,
endpoint=target_url,
body=body_payload,
headers=header_payload,
verify=self.ssl_verify
)
else:
returned = generate_error_result("Invalid value specified for action_name parameter.")
return returned

def GetDeviceDetails(self: object, ids) -> dict:
""" Get details on one or more hosts by providing agent IDs (AID).
You can get a host's agent IDs (AIDs) from the /devices/queries/devices/v1 endpoint,
the Falcon console or the Streaming API.
@force_default(defaults=["parameters"], default_types=["dict"])
def GetDeviceDetails(self: object, parameters: dict = None, **kwargs) -> dict:
"""
Get details on one or more hosts by providing agent IDs (AID).
You can get a host's agent IDs (AIDs) from the /devices/queries/devices/v1 endpoint,
the Falcon console or the Streaming API.
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/hosts/GetDeviceDetails
ID_LIST = str(parse_id_list(ids)).replace(",", "&ids=")
FULL_URL = self.base_url+'/devices/entities/devices/v1?ids={}'.format(ID_LIST)
HEADERS = self.headers
operation_id = "GetDeviceDetails"
target_url = f"{self.base_url}{[ep[2] for ep in Endpoints if operation_id in ep[0]][0]}".replace("?ids={}", "")
header_payload = self.headers
parameter_payload = args_to_params(parameters, kwargs, Endpoints, operation_id)
returned = service_request(caller=self,
method="GET",
endpoint=FULL_URL,
headers=HEADERS,
endpoint=target_url,
params=parameter_payload,
headers=header_payload,
verify=self.ssl_verify
)
return returned

def QueryHiddenDevices(self: object, parameters: dict = None) -> dict:
""" Perform the specified action on the Prevention Policies specified in the request. """
@force_default(defaults=["parameters"], default_types=["dict"])
def QueryHiddenDevices(self: object, parameters: dict = None, **kwargs) -> dict:
"""
Perform the specified action on the Prevention Policies specified in the request.
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/hosts/QueryHiddenDevices
FULL_URL = self.base_url+'/devices/queries/devices-hidden/v1'
HEADERS = self.headers
if parameters is None:
parameters = {}
PARAMS = parameters
operation_id = "QueryHiddenDevices"
target_url = f"{self.base_url}{[ep[2] for ep in Endpoints if operation_id in ep[0]][0]}"
header_payload = self.headers
parameter_payload = args_to_params(parameters, kwargs, Endpoints, operation_id)
returned = service_request(caller=self,
method="GET",
endpoint=FULL_URL,
params=PARAMS,
headers=HEADERS,
endpoint=target_url,
params=parameter_payload,
headers=header_payload,
verify=self.ssl_verify
)
return returned

def QueryDevicesByFilterScroll(self: object, parameters: dict = None) -> dict:
""" Perform the specified action on the Prevention Policies specified in the request. """
@force_default(defaults=["parameters"], default_types=["dict"])
def QueryDevicesByFilterScroll(self: object, parameters: dict = None, **kwargs) -> dict:
"""
Perform the specified action on the Prevention Policies specified in the request.
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/hosts/QueryDevicesByFilterScroll
FULL_URL = self.base_url+'/devices/queries/devices-scroll/v1'
HEADERS = self.headers
if parameters is None:
parameters = {}
PARAMS = parameters
operation_id = "QueryDevicesByFilterScroll"
target_url = f"{self.base_url}{[ep[2] for ep in Endpoints if operation_id in ep[0]][0]}"
header_payload = self.headers
parameter_payload = args_to_params(parameters, kwargs, Endpoints, operation_id)
returned = service_request(caller=self,
method="GET",
endpoint=FULL_URL,
params=PARAMS,
headers=HEADERS,
endpoint=target_url,
params=parameter_payload,
headers=header_payload,
verify=self.ssl_verify
)
return returned

def QueryDevicesByFilter(self: object, parameters: dict = None) -> dict:
""" Search for hosts in your environment by platform, hostname, IP, and other criteria. """
@force_default(defaults=["parameters"], default_types=["dict"])
def QueryDevicesByFilter(self: object, parameters: dict = None, **kwargs) -> dict:
"""
Search for hosts in your environment by platform, hostname, IP, and other criteria.
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/hosts/QueryDevicesByFilter
FULL_URL = self.base_url+'/devices/queries/devices/v1'
HEADERS = self.headers
if parameters is None:
parameters = {}
PARAMS = parameters
operation_id = "QueryDevicesByFilter"
target_url = f"{self.base_url}{[ep[2] for ep in Endpoints if operation_id in ep[0]][0]}"
header_payload = self.headers
parameter_payload = args_to_params(parameters, kwargs, Endpoints, operation_id)
returned = service_request(caller=self,
method="GET",
endpoint=FULL_URL,
params=PARAMS,
headers=HEADERS,
endpoint=target_url,
params=parameter_payload,
headers=header_payload,
verify=self.ssl_verify
)
return returned

0 comments on commit eec1dd8

Please sign in to comment.