From 7c55ceddc2fd7a6ebc6c7c3f3299ffd4cc9aa86e Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Tue, 14 Dec 2021 22:50:53 -0500 Subject: [PATCH] Linting. Boolean lookup fix. Relates to #130. --- Security-Hub/credvault.py | 77 ++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 42 deletions(-) diff --git a/Security-Hub/credvault.py b/Security-Hub/credvault.py index 6ef0499..165eda4 100644 --- a/Security-Hub/credvault.py +++ b/Security-Hub/credvault.py @@ -1,17 +1,27 @@ -# import os -import boto3 +"""Credential / configuration lookup handler.""" import urllib import json +import boto3 -class CredVault(): - +class CredVault(): # pylint: disable=R0902 + """Class to handle configuration lookups.""" def __init__(self, logger): + """Init the object and base parameters.""" region_lookup = "http://169.254.169.254/latest/meta-data/placement/availability-zone" - self.region = urllib.request.urlopen(region_lookup).read().decode()[:-1] + with urllib.request.urlopen(region_lookup) as region_check: + self.region = region_check.read().decode()[:-1] self.logger = logger + self.falcon_client_id = None + self.falcon_client_secret = None + self.app_id = None + self.sqs_queue_name = None + self.api_base_url = None + self.severity_threshold = None + self.confirm_provider = None + self.ssl_verify = None - def _getParameter(self, param_name): + def get_parameter(self, param_name): """ This function reads a secure parameter from AWS' SSM service. The request must be passed a valid parameter name, as well as @@ -33,58 +43,41 @@ def _getParameter(self, param_name): # Store the credentials in a variable not_found = False for invalid in response['InvalidParameters']: - self.logger.statusWrite("{} SSM parameter not found".format(str(invalid))) + self.logger.status_write(f"{str(invalid)} SSM parameter not found") not_found = True if not_found: credentials = "" else: credentials = response['Parameters'][0]['Value'] - self.logger.statusWrite("{} parameter loaded successfully.".format(str(param_name))) + self.logger.status_write(f"{str(param_name)} parameter loaded successfully.") return credentials def get(self): + """Retrieve all configuration parameters.""" # API Client ID - try: - self.falcon_client_id = self._getParameter("FIG_FALCON_CLIENT_ID") - except Exception: - pass + self.falcon_client_id = self.get_parameter("FIG_FALCON_CLIENT_ID") # API Client Secret - try: - self.falcon_client_secret = self._getParameter("FIG_FALCON_CLIENT_SECRET") - except Exception: - pass + self.falcon_client_secret = self.get_parameter("FIG_FALCON_CLIENT_SECRET") # Application Identifier - Used for Event Streams - try: - self.app_id = self._getParameter("FIG_APP_ID") - except Exception: - pass + self.app_id = self.get_parameter("FIG_APP_ID") # Severity Threshold - Only alert on detections at or greater - try: - self.severity_threshold = int(self._getParameter("FIG_SEVERITY_THRESHOLD")) - except Exception: - pass + self.severity_threshold = int(self.get_parameter("FIG_SEVERITY_THRESHOLD")) # AWS SQS queue to publish to - try: - self.sqs_queue_name = self._getParameter("FIG_SQS_QUEUE_NAME") - except Exception: - pass + self.sqs_queue_name = self.get_parameter("FIG_SQS_QUEUE_NAME") # CrowdStrike base URL (Defaults to US-1) - try: - self.api_base_url = self._getParameter("FIG_API_BASE_URL") - except Exception: - pass + self.api_base_url = self.get_parameter("FIG_API_BASE_URL") # Should we only alert on detections in AWS? - Defaults to True - self.confirm_provider = True - try: - self.confirm_provider = self._getParameter("FIG_CONFIRM_PROVIDER") - except Exception: - pass + self.confirm_provider = self.get_parameter("FIG_CONFIRM_PROVIDER") + if "f" in self.confirm_provider.lower(): + self.confirm_provider = False + else: + self.confirm_provider = True # Should we enable SSL Verification for Request calls? - Defaults to True - self.ssl_verify = True - try: - self.ssl_verify = self._getParameter("FIG_SSL_VERIFY") - except Exception: - pass + self.ssl_verify = self.get_parameter("FIG_SSL_VERIFY") + if "f" in self.ssl_verify.lower(): + self.ssl_verify = False + else: + self.ssl_verify = True del self.logger return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True)