Skip to content

Commit

Permalink
Body payload abstraction. PEP-257. Closes #399.
Browse files Browse the repository at this point in the history
  • Loading branch information
jshcodes committed Oct 27, 2021
1 parent 2984c82 commit 6c5ac15
Showing 1 changed file with 190 additions and 42 deletions.
232 changes: 190 additions & 42 deletions src/falconpy/kubernetes_protection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""
"""CrowdStrike Falcon Kubernetes Protection API interface class
_______ __ _______ __ __ __
| _ .----.-----.--.--.--.--| | _ | |_.----|__| |--.-----.
|. 1___| _| _ | | | | _ | 1___| _| _| | <| -__|
Expand All @@ -9,8 +10,6 @@
OAuth2 API - Customer SDK
kubernetes_protection - CrowdStrike Falcon Kubernetes Protection API interface class
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
Expand Down Expand Up @@ -42,18 +41,39 @@


class KubernetesProtection(ServiceClass):
"""
The only requirement to instantiate an instance of this class
is a valid token provided by the Falcon API SDK OAuth2 class, a
existing instance of the authentication class as an object or a
valid set of credentials.
"""The only requirement to instantiate an instance of this class is one of the following:
- a valid client_id and client_secret provided as keywords.
- a credential dictionary with client_id and client_secret containing valid API credentials
{
"client_id": "CLIENT_ID_HERE",
"client_secret": "CLIENT_SECRET_HERE"
}
- a previously-authenticated instance of the authentication service class (oauth2.py)
- a valid token provided by the authentication service class (OAuth2.token())
"""
@force_default(defaults=["parameters"], default_types=["dict"])
def get_aws_accounts(self: object, parameters: dict = None, **kwargs) -> dict:
"""Provides a list of AWS accounts.
Keyword arguments:
ids -- AWS Account IDs. String or list of strings.
limit -- The maximum number of records to return in this response. [Integer, 1-500]
Use with the offset parameter to manage pagination of results.
offset -- The offset to start retrieving records from. String.
Use with the limit parameter to manage pagination of results.
parameters - full parameters payload, not required if using other keywords.
status -- Filter by account status. String.
This method only supports keywords for providing arguments.
Returns: dict object containing API response.
HTTP Method: GET
Swagger URL
https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/GetAWSAccountsMixin0
"""
Provides a list of AWS accounts.
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/GetAWSAccountsMixin0
return process_service_request(
calling_object=self,
endpoints=Endpoints,
Expand All @@ -62,11 +82,41 @@ def get_aws_accounts(self: object, parameters: dict = None, **kwargs) -> dict:
params=parameters
)

def create_aws_account(self: object, body: dict) -> dict:
"""
Creates a new AWS account in our system for a customer and generates the installation script
@force_default(defaults=["body"], default_types=["dict"])
def create_aws_account(self: object, body: dict = None, **kwargs) -> dict:
"""Creates a new AWS account in our system for a
customer and generates the installation script.
Keyword arguments:
body -- full body payload, not required if using other keywords.
{
"resources": [
{
"account_id": "string",
"region": "string"
}
]
}
account_id -- Account ID. String.
region -- Region. String.
This method only supports keywords for providing arguments.
Returns: dict object containing API response.
HTTP Method: POST
Swagger URL
https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/CreateAWSAccount
"""
# [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/CreateAWSAccount
if not body:
item = {}
if kwargs.get("account_id", None):
item["account_id"] = kwargs.get("account_id", None)
if kwargs.get("region", None):
item["region"] = kwargs.get("region", None)

body["resources"] = [item]
return process_service_request(
calling_object=self,
endpoints=Endpoints,
Expand All @@ -76,11 +126,22 @@ def create_aws_account(self: object, body: dict) -> dict:

@force_default(defaults=["parameters"], default_types=["dict"])
def delete_aws_accounts(self: object, *args, parameters: dict = None, **kwargs) -> dict:
"""Delete AWS accounts.
Keyword arguments:
ids -- ID(s) of AWS accounts to delete. String or list of strings.
parameters -- full parameters payload, not required if ids is provided as a keyword.
Arguments: When not specified, the first argument to this method is assumed to be 'ids'.
All others are ignored.
Returns: dict object containing API response.
HTTP Method: DELETE
Swagger URL
https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/DeleteAWSAccountsMixin0
"""
Delete AWS accounts.
"""
# [DELETE] https://assets.falcon.crowdstrike.com/support/api/swagger.html#
# /kubernetes-protection/DeleteAWSAccountsMixin0
return process_service_request(
calling_object=self,
endpoints=Endpoints,
Expand All @@ -91,10 +152,22 @@ def delete_aws_accounts(self: object, *args, parameters: dict = None, **kwargs)

@force_default(defaults=["parameters"], default_types=["dict"])
def update_aws_account(self: object, parameters: dict = None, **kwargs) -> dict:
"""Updates the AWS account per the query parameters provided
Keyword arguments:
ids -- ID(s) of AWS accounts to update. String or list of strings.
parameters -- full parameters payload, not required if ids is provided as a keyword.
region -- Default region for Account Automation.
This method only supports keywords for providing arguments.
Returns: dict object containing API response.
HTTP Method: PATCH
Swagger URL
https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/UpdateAWSAccount
"""
Updates the AWS account per the query parameters provided
"""
# [PATCH] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/UpdateAWSAccount
return process_service_request(
calling_object=self,
endpoints=Endpoints,
Expand All @@ -105,10 +178,22 @@ def update_aws_account(self: object, parameters: dict = None, **kwargs) -> dict:

@force_default(defaults=["parameters"], default_types=["dict"])
def get_locations(self: object, *args, parameters: dict = None, **kwargs) -> dict:
"""Provides the cloud locations acknowledged by the Kubernetes Protection service
Keyword arguments:
clouds -- Cloud provider. String or list of strings.
parameters - full parameters payload, not required if using other keywords.
Arguments: When not specified, the first argument to this method is assumed to be 'clouds'.
All others are ignored.
Returns: dict object containing API response.
HTTP Method: GET
Swagger URL
https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/GetLocations
"""
Provides the cloud locations acknowledged by the Kubernetes Protection service
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/GetLocations
return process_service_request(
calling_object=self,
endpoints=Endpoints,
Expand All @@ -119,10 +204,23 @@ def get_locations(self: object, *args, parameters: dict = None, **kwargs) -> dic

@force_default(defaults=["parameters"], default_types=["dict"])
def get_helm_values_yaml(self: object, *args, parameters: dict = None, **kwargs) -> dict:
"""Provides a sample Helm values.yaml file for a customer to install
alongside the agent Helm chart.
Keyword arguments:
cluster_name -- Cloud provider. String.
parameters - full parameters payload, not required if using other keywords.
Arguments: When not specified, the first argument to this method is assumed to be
'cluster_name'. All others are ignored.
Returns: dict object containing API response.
HTTP Method: GET
Swagger URL
https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/GetHelmValuesYaml
"""
Provides a sample Helm values.yaml file for a customer to install alongside the agent Helm chart
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/GetHelmValuesYaml
return process_service_request(
calling_object=self,
endpoints=Endpoints,
Expand All @@ -131,24 +229,54 @@ def get_helm_values_yaml(self: object, *args, parameters: dict = None, **kwargs)
params=handle_single_argument(args, parameters, "cluster_name")
)

def regenerate(self: object, body: dict = None) -> dict: # pylint: disable=W0613 # No params accepted for POST
"""
Regenerate API key for docker registry integrations
@force_default(defaults=["body"], default_types=["dict"])
def regenerate(self: object, body: dict = None) -> dict:
"""Regenerate API key for docker registry integrations
Keyword arguments:
body -- Body payload is accepted but is not used.
This method has no default argument or keywords.
Returns: dict object containing API response.
HTTP Method: GET
Swagger URL
https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/RegenerateAPIKey
"""
# [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/RegenerateAPIKey
return process_service_request(
calling_object=self,
endpoints=Endpoints,
operation_id="RegenerateAPIKey",
# body={}
body=body
)

@force_default(defaults=["parameters"], default_types=["dict"])
def get_clusters(self: object, parameters: dict = None, **kwargs) -> dict:
"""Provides the clusters acknowledged by the Kubernetes Protection service
Keyword arguments:
account_ids -- Cluster Account IDs. For EKS, this would be the AWS Account ID.
String or list of strings.
cluster_names -- Cluster name. For EKS it will be cluster ARN. String or list of strings.
cluster_service -- Cluster Service. Available values: `eks`
limit -- The maximum number of records to return in this response. [Integer, 1-500]
Use with the offset parameter to manage pagination of results.
locations -- Cloud location. String or list of strings.
offset -- The offset to start retrieving records from. String.
Use with the limit parameter to manage pagination of results.
parameters - full parameters payload, not required if using other keywords.
This method only supports keywords for providing arguments.
Returns: dict object containing API response.
HTTP Method: GET
Swagger URL
https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/GetClusters
"""
Provides the clusters acknowledged by the Kubernetes Protection service
"""
# [GET] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/GetClusters
return process_service_request(
calling_object=self,
endpoints=Endpoints,
Expand All @@ -157,18 +285,38 @@ def get_clusters(self: object, parameters: dict = None, **kwargs) -> dict:
params=parameters
)

@force_default(defaults=["parameters"], default_types=["dict"])
def trigger_scan(self: object, *args, parameters: dict = None, **kwargs) -> dict:
"""
Triggers a dry run or a full scan of a customer's kubernetes footprint
@force_default(defaults=["parameters", "body"], default_types=["dict", "dict"])
def trigger_scan(self: object,
*args,
body: dict = None,
parameters: dict = None,
**kwargs
) -> dict:
"""Triggers a dry run or a full scan of a customer's kubernetes footprint
Keyword arguments:
body -- Body payload is accepted but is not used.
scan_type -- Type of scan to perform. String. Default value: `dry-run`.
Available Values: `cluster-refresh`, `dry-run`, or `full`.
parameters - full parameters payload, not required if using other keywords.
Arguments: When not specified, the first argument to this method is assumed to be
'scan_type'. All others are ignored.
Returns: dict object containing API response.
HTTP Method: POST
Swagger URL
https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/TriggerScan
"""
# [POST] https://assets.falcon.crowdstrike.com/support/api/swagger.html#/kubernetes-protection/TriggerScan
return process_service_request(
calling_object=self,
endpoints=Endpoints,
operation_id="TriggerScan",
keywords=kwargs,
params=handle_single_argument(args, parameters, "scan_type")
params=handle_single_argument(args, parameters, "scan_type"),
body=body
)

# These method names align to the operation IDs in the API but
Expand Down

0 comments on commit 6c5ac15

Please sign in to comment.