Skip to content

Commit

Permalink
Linting. Boolean lookup fix. Relates to #130.
Browse files Browse the repository at this point in the history
  • Loading branch information
jshcodes committed Dec 15, 2021
1 parent 77027b3 commit 2a1f808
Showing 1 changed file with 35 additions and 42 deletions.
77 changes: 35 additions & 42 deletions Security-Hub/credvault.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
# import os
import boto3
"""Credential / configuration lookup handler."""
import urllib
import json
import boto3


class CredVault():

class CredVault(): # pylint: disable=R0902
"""Class to handle configuration lookups."""
def __init__(self, logger):
"""Init the object and base parameters."""
region_lookup = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
self.region = urllib.request.urlopen(region_lookup).read().decode()[:-1]
with urllib.request.urlopen(region_lookup) as region_check:
self.region = region_check.read().decode()[:-1]
self.logger = logger
self.falcon_client_id = None
self.falcon_client_secret = None
self.app_id = None
self.sqs_queue_name = None
self.api_base_url = None
self.severity_threshold = None
self.confirm_provider = None
self.ssl_verify = None

def _getParameter(self, param_name):
def get_parameter(self, param_name):
"""
This function reads a secure parameter from AWS' SSM service.
The request must be passed a valid parameter name, as well as
Expand All @@ -33,58 +43,41 @@ def _getParameter(self, param_name):
# Store the credentials in a variable
not_found = False
for invalid in response['InvalidParameters']:
self.logger.statusWrite("{} SSM parameter not found".format(str(invalid)))
self.logger.status_write(f"{str(invalid)} SSM parameter not found")
not_found = True
if not_found:
credentials = ""
else:
credentials = response['Parameters'][0]['Value']
self.logger.statusWrite("{} parameter loaded successfully.".format(str(param_name)))
self.logger.status_write(f"{str(param_name)} parameter loaded successfully.")

return credentials

def get(self):
"""Retrieve all configuration parameters."""
# API Client ID
try:
self.falcon_client_id = self._getParameter("FIG_FALCON_CLIENT_ID")
except Exception:
pass
self.falcon_client_id = self.get_parameter("FIG_FALCON_CLIENT_ID")
# API Client Secret
try:
self.falcon_client_secret = self._getParameter("FIG_FALCON_CLIENT_SECRET")
except Exception:
pass
self.falcon_client_secret = self.get_parameter("FIG_FALCON_CLIENT_SECRET")
# Application Identifier - Used for Event Streams
try:
self.app_id = self._getParameter("FIG_APP_ID")
except Exception:
pass
self.app_id = self.get_parameter("FIG_APP_ID")
# Severity Threshold - Only alert on detections at or greater
try:
self.severity_threshold = int(self._getParameter("FIG_SEVERITY_THRESHOLD"))
except Exception:
pass
self.severity_threshold = int(self.get_parameter("FIG_SEVERITY_THRESHOLD"))
# AWS SQS queue to publish to
try:
self.sqs_queue_name = self._getParameter("FIG_SQS_QUEUE_NAME")
except Exception:
pass
self.sqs_queue_name = self.get_parameter("FIG_SQS_QUEUE_NAME")
# CrowdStrike base URL (Defaults to US-1)
try:
self.api_base_url = self._getParameter("FIG_API_BASE_URL")
except Exception:
pass
self.api_base_url = self.get_parameter("FIG_API_BASE_URL")
# Should we only alert on detections in AWS? - Defaults to True
self.confirm_provider = True
try:
self.confirm_provider = self._getParameter("FIG_CONFIRM_PROVIDER")
except Exception:
pass
self.confirm_provider = self.get_parameter("FIG_CONFIRM_PROVIDER")
if "f" in self.confirm_provider.lower():
self.confirm_provider = False
else:
self.confirm_provider = True
# Should we enable SSL Verification for Request calls? - Defaults to True
self.ssl_verify = True
try:
self.ssl_verify = self._getParameter("FIG_SSL_VERIFY")
except Exception:
pass
self.ssl_verify = self.get_parameter("FIG_SSL_VERIFY")
if "f" in self.ssl_verify.lower():
self.ssl_verify = False
else:
self.ssl_verify = True
del self.logger
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True)

0 comments on commit 2a1f808

Please sign in to comment.