Skip to content

Commit

Permalink
Linting. FalconPy updates. Relates to #130.
Browse files Browse the repository at this point in the history
  • Loading branch information
jshcodes committed Dec 15, 2021
1 parent a5bcfe3 commit c9004be
Showing 1 changed file with 76 additions and 65 deletions.
141 changes: 76 additions & 65 deletions Security-Hub/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
Original version: Dixon Styres
Modification: 11.15.20 - jshcodes@CrowdStrike, SQS integration
Modification: 07.15.20 - jshcodes@CrowdStrike, MSSP functionality
Modification: 07.15.21 - jshcodes@CrowdStrike, MSSP functionality
Modification: 12.14.21 - jshcodes@CrowdStrike, FalconPy Updates, SSL verification tweaks
Event Streams -> {Parsed & Confirmed} -> SQS -> Lambda -> {Instance Confirmed} -> Security Hub
"""
Expand Down Expand Up @@ -38,26 +39,30 @@
import platform
import time
import threading
import stream as event_stream
import credvault
import logger
import atexit
from falconpy import api_complete as FalconSDK
from falconpy import APIHarness
try:
import boto3
from botocore.exceptions import ClientError, EndpointConnectionError
except ImportError:
raise SystemExit("The AWS boto3 library is required to run Falcon Data Replicator.\nPlease execute 'pip3 install boto3'")
except ImportError as no_boto:
raise SystemExit(
"The AWS boto3 library is required to run Falcon Data Replicator.\nPlease execute 'pip3 install boto3'"
) from no_boto
import stream as event_stream
import credvault
import logger


def shutdown():
status.statusWrite("Process terminated")
"""Inform the user of shutdown."""
status.status_write("Process terminated")
raise SystemExit("Process terminated")


def startStreaming(new_streams, cid):
def start_streaming(streams, cid):
"""Start streaming detections."""
# Loop thru each stream in our list
for active_stream in new_streams:
for active_stream in streams:
# Connect to the stream
with event_stream.Stream(active_stream, falcon, queue, config, cid) as active:
start = False
Expand All @@ -70,23 +75,24 @@ def startStreaming(new_streams, cid):
# Start all available streams if partition is not specified
start = True
if start:
status.statusWrite("Starting listener on partition number {}...".format(str(active.partition)))
status.status_write(f"Starting listener on partition number {str(active.partition)}...")
# Create a thread to handle stream processing, daemonize so the thread shuts down when we do
t = threading.Thread(target=active.process, daemon=True)
stream_thread = threading.Thread(target=active.process, daemon=True)
# Begin processing the stream's contents
t.start()
stream_thread.start()

return True


def loadConfig(config_file: str = "config.json"):
def load_config(config_file: str = "config.json"):
"""Load our configuration parameters."""
cfg = False
try:
with open(config_file, 'r') as file_config:
with open(config_file, 'r', encoding="utf-8") as file_config:
cfg = json.loads(file_config.read())
status.statusWrite("Configuration parameters loaded from local file")
status.status_write("Configuration parameters loaded from local file")
except FileNotFoundError:
status.statusWrite("Specified configuration file not found")
status.status_write("Specified configuration file not found")

return cfg

Expand All @@ -99,91 +105,96 @@ def loadConfig(config_file: str = "config.json"):
# Load configuration parameters
if platform.system().lower() in "windows,darwin".split(","):
# We're running locally, read in the config from the local config file
config = loadConfig()
config = load_config()
if not config:
status.statusWrite("Unable to load configuration parameters")
status.status_write("Unable to load configuration parameters")
raise SystemExit("Unable to load configuration parameters")
else:
config = loadConfig()
config = load_config()
if not config:
# That didn't work
try:
# Try to get our creds from ssm
config = json.loads(credvault.CredVault(status).get())
status.statusWrite("Configuration parameters loaded from SSM Parameter Store.")
status.status_write("Configuration parameters loaded from SSM Parameter Store.")

except Exception as err:
# Total failure
status.statusWrite("Unable to load configuration parameters: %s" % err)
raise SystemExit("Unable to load configuration parameters: %s" % err)
status.status_write("Unable to load configuration parameters.")
raise SystemExit("Unable to load configuration parameters.") from err

# MAIN ROUTINE
try:
# Check to see if they've specified a different API base url
baseURL = config["api_base_url"]
if baseURL == "":
baseURL = "https://api.crowdstrike.com"
BASE_URL = config["api_base_url"]
if BASE_URL == "":
BASE_URL = "us1"
except KeyError:
# Any failure assume we're doing commercial
baseURL = "https://api.crowdstrike.com"
# Any failure assume we're doing commercial / US-1
BASE_URL = "us1"

try:
verify_ssl_connections = config["ssl_verify"]
if verify_ssl_connections == "":
verify_ssl_connections = True
VERIFY_SSL_CONNECTIONS = config["ssl_verify"]
if VERIFY_SSL_CONNECTIONS == "":
VERIFY_SSL_CONNECTIONS = True
except KeyError:
verify_ssl_connections = True
config["ssl_verify"] = verify_ssl_connections
VERIFY_SSL_CONNECTIONS = True
config["ssl_verify"] = VERIFY_SSL_CONNECTIONS

# Connect to the API
falcon = FalconSDK.APIHarness(creds={'client_id': config["falcon_client_id"],
'client_secret': config["falcon_client_secret"]
}, base_url=baseURL, ssl_verify=verify_ssl_connections)
falcon = APIHarness(client_id=config["falcon_client_id"],
client_secret=config["falcon_client_secret"],
base_url=BASE_URL,
ssl_verify=VERIFY_SSL_CONNECTIONS
)
# Authenticate to the API
falcon.authenticate()
# Cry about our bad keys
if not falcon.authenticated:
status.statusWrite(f"Failed to connect to the API on {baseURL}. Check base_url and ssl_verify configuration settings.")
raise SystemExit(f"Failed to connect to the API on {baseURL}. Check base_url and ssl_verify configuration settings.")
else:
# Retrieve our current CID (MSSP functionality) or add it to config?
# This method requires Sensor Install API, our fallback option uses the Hosts API but a device must exist
status.status_write(f"Failed to connect to the API on {BASE_URL}. Check base_url and ssl_verify configuration settings.")
raise SystemExit(f"Failed to connect to the API on {BASE_URL}. Check base_url and ssl_verify configuration settings.")

# Retrieve our current CID (MSSP functionality) or add it to config?
# This method requires Sensor Install API, our fallback option uses the Hosts API but a device must exist
try:
current_cid = falcon.command("GetSensorInstallersCCIDByQuery")["body"]["resources"][0][:-3]
except KeyError:
try:
current_cid = falcon.command("GetSensorInstallersCCIDByQuery")["body"]["resources"][0][:-3]
except KeyError:
current_cid = falcon.command("GetDeviceDetails",
ids=falcon.command("QueryDevicesByFilter", limit=1)["body"]["resources"][0]
)["body"]["resources"][0]["cid"]
except IndexError:
try:
current_cid = falcon.command("GetDeviceDetails",
ids=falcon.command("QueryDevicesByFilter")["body"]["resources"][0]
)["body"]["resources"][0]["cid"]
except IndexError:
try:
current_cid = config["falcon_cid"]
except KeyError:
status.statusWrite("Unable to retrieve CID")
raise SystemExit("Unable to retrieve CID")
# Default to confirming this is an AWS alert
if "confirm_provider" not in config:
config["confirm_provider"] = True
# Ask for a list of available streams
new_streams = falcon.command(action="listAvailableStreamsOAuth2", parameters={"appId": "{}".format(config["app_id"])})
if "resources" in new_streams["body"]:
current_cid = config["falcon_cid"]
except KeyError as no_cid:
status.status_write("Unable to retrieve CID")
raise SystemExit("Unable to retrieve CID") from no_cid
# Default to confirming this is an AWS alert
if "confirm_provider" not in config:
config["confirm_provider"] = True
# Ask for a list of available streams
new_streams = falcon.command(action="listAvailableStreamsOAuth2", appId=config["app_id"])
if "resources" in new_streams["body"]:
if new_streams["body"]["resources"]:
# Retrieve the SQS queue we'll use for notifications
try:
queue = boto3.resource('sqs', region_name=config["region"]).get_queue_by_name(QueueName=config["sqs_queue_name"])
except ClientError:
status.statusWrite("Unable to retrieve specified SQS queue")
status.status_write("Unable to retrieve specified SQS queue")
except EndpointConnectionError:
status.statusWrite("Invalid region specified")
status.status_write("Invalid region specified")
else:
startStreaming(new_streams["body"]["resources"], current_cid)
start_streaming(new_streams["body"]["resources"], current_cid)
# Only sleep if we have threads opened
while threading.active_count() > 0:
status.statusWrite("All threads started, main process sleeping.")
status.status_write("All threads started, main process sleeping.")
# Force a wake up / restart of all streams
time.sleep(21600)
status.statusWrite("Restarting service and refreshing all streams.")
status.status_write("Restarting service and refreshing all streams.")
# Discard all objects and restart the main service thread
os.execv(sys.executable, [os.path.abspath(sys.argv[0]), "main.py"])

else:
status.statusWrite("No streams available")
status.status_write("No streams available")

else:
status.status_write("No streams available")

0 comments on commit c9004be

Please sign in to comment.