From 86e1a5c80023209449928252674323a8d3bacc29 Mon Sep 17 00:00:00 2001 From: Joshua Hiller Date: Tue, 14 Dec 2021 22:53:21 -0500 Subject: [PATCH] Linting. SSL verify keyword handling. Closes #130. --- Security-Hub/stream.py | 112 ++++++++++++++++++++++++----------------- 1 file changed, 66 insertions(+), 46 deletions(-) diff --git a/Security-Hub/stream.py b/Security-Hub/stream.py index b18517a..aba0500 100644 --- a/Security-Hub/stream.py +++ b/Security-Hub/stream.py @@ -5,17 +5,17 @@ """ import datetime import time -import requests import json -import logger import re -from falconpy.api_complete import APIHarness as FalconSDK - +import requests +from falconpy import APIHarness +import logger -class Stream(): - def __init__(self, stream_config, api_interface, sqs_queue, config, current_cid): - self.dataFeed = stream_config["dataFeedURL"] +class Stream(): # pylint: disable=R0902 + """Class to represent a single stream.""" + def __init__(self, stream_config, api_interface, sqs_queue, config, current_cid): # pylint: disable=R0913 + self.data_feed = stream_config["dataFeedURL"] self.token = stream_config["sessionToken"]["token"] self.token_expiration = stream_config["sessionToken"]["expiration"] self.refresh_url = stream_config["refreshActiveSessionURL"] @@ -28,8 +28,8 @@ def __init__(self, stream_config, api_interface, sqs_queue, config, current_cid) # Fallback to commercial if we can't calculate it self.base_url = "https://api.crowdstrike.com" - self.t1 = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S +0000') - self.headers = {'Authorization': 'Token %s' % (self.token), 'Date': self.t1, 'Connection': 'Keep-Alive'} + self.time_one = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S +0000') + self.headers = {'Authorization': f'Token {self.token}', 'Date': self.time_one, 'Connection': 'Keep-Alive'} self.discarded = 0 self.detections = 0 self.processed = 0 @@ -41,59 +41,73 @@ def __init__(self, stream_config, api_interface, sqs_queue, config, current_cid) self.api_config = config self.sqs_queue = sqs_queue self.partition = self.refresh_url.replace( - "{}/sensors/entities/datafeed-actions/v1/".format(self.base_url), + f"{self.base_url}/sensors/entities/datafeed-actions/v1/", "" ).replace( - "?appId={}&action_name=refresh_active_stream_session".format(str(self.app_id).lower()), + f"?appId={str(self.app_id).lower()}&action_name=refresh_active_stream_session", "" ) # Log, offset and PID files - self.log_file = "{}_{}.log".format(str(self.app_id).lower(), str(self.partition)) - self.offset_file = ".{}_{}.offset".format(str(self.app_id).lower(), str(self.partition)) + self.log_file = f"{str(self.app_id).lower()}_{str(self.partition)}.log" + self.offset_file = f".{str(self.app_id).lower()}_{str(self.partition)}.offset" # Might not use this - self.process_id = "{}_{}.pid".format(str(self.app_id), str(self.partition)) - self.logger = logger.Logger(self.log_file, "stream{}".format(str(self.partition))) + self.process_id = f"{str(self.app_id)}_{str(self.partition)}.pid" + self.logger = logger.Logger(self.log_file, f"stream{str(self.partition)}") # Retrieve our offset if it's been stashed previously - self.offset = self.logger.offsetRead(self.offset_file) + self.offset = self.logger.offset_read(self.offset_file) # Find our old position in the stream if self.offset > 0: - self.dataFeed = self.dataFeed + "&offset={}".format(str(self.offset)) + self.data_feed = self.data_feed + f"&offset={str(self.offset)}" # Our active spout self.spigot = False # Token reporting lambdas - self.token_expired = lambda: True if (int(time.time()) > ((int(self.refresh_interval)-60)+self.epoch)) else False + self.token_expired = bool(int(time.time()) > ((int(self.refresh_interval)-60)+self.epoch)) self.token_remains = lambda: ((int(self.refresh_interval)-60)+self.epoch)-int(time.time()) # Thread running flag self.running = True def __enter__(self): + """Enter handler.""" return self def __exit__(self, exc_type, exc_value, traceback): - pass + """Exit handler.""" + self.logger.status_write("Stream exiting.") - def toJson(self): + def to_json(self): + """Return the entire object in JSON format.""" return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True) - def setOffset(self, offset): + def set_offset(self, offset): + """Write the offset value to the offset log.""" self.offset = offset - self.logger.offsetWrite(self.offset_file, offset) + self.logger.offset_write(self.offset_file, offset) return True def open(self): - self.spigot = requests.get(self.dataFeed, headers=self.headers, stream=True, verify=self.api_config["ssl_verify"]) + """Open the stream.""" + kwargs = { + "url": self.data_feed, + "headers": self.headers, + "stream": True + } + if not self.api_config["ssl_verify"]: + kwargs["verify"] = False + + self.spigot = requests.get(**kwargs) return self.spigot def close(self): + """Close the stream.""" self.spigot.close() self.spigot = False return True def refresh(self): - # Refresh the token + """Refresh the API stream token.""" refresher = self.api_interface.command(action="refreshActiveStreamSession", partition=self.partition, parameters={ "action_name": "refresh_active_stream_session", "appId": str(self.app_id).lower() @@ -104,11 +118,13 @@ def refresh(self): if refresher["status_code"] == 200: refreshed = True self.epoch = int(time.time()) - self.logger.statusWrite("Token refreshed.") + self.logger.status_write("Token refreshed.") return refreshed - def createPayload(self, resource_detail, decoded_line): + @staticmethod + def create_payload(resource_detail, decoded_line): + """Create our SQS payload.""" utc = datetime.datetime.utcfromtimestamp create_time = utc(float(decoded_line['metadata']['eventCreationTime'])/1000.).isoformat()+'Z' update_time = (utc(datetime.datetime.timestamp(datetime.datetime.now()))).isoformat()+'Z' @@ -153,13 +169,14 @@ def createPayload(self, resource_detail, decoded_line): payload['Network']['SourcePort'] = net_traffic['LocalPort'] payload['Network']['DestinationIpV4'] = net_traffic['RemoteAddress'] payload['Network']['DestinationPort'] = net_traffic['RemotePort'] - except KeyError or IndexError: + except (KeyError, IndexError): payload.pop('Network', None) return payload - def get_compare_values(self, decoded_l: dict, resource_det: dict): - decoded_m = "left" + @staticmethod + def get_compare_values(decoded_l: dict, resource_det: dict): + """Retrieve the values for our comparison.""" resource_m = "right" try: decoded_m = decoded_l["event"]["MACAddress"].lower().replace(":", "").replace("-", "") @@ -187,7 +204,7 @@ def check_records(self, decode: dict, sensor: str, api: object, payload: str or if resource_detail["service_provider"] == "AWS_EC2": decoded_mac, resource_mac = self.get_compare_values(decode, resource_detail) if decoded_mac == resource_mac: - payload = self.createPayload(resource_detail, decode) + payload = self.create_payload(resource_detail, decode) # Hit self.detections += 1 else: @@ -199,25 +216,26 @@ def check_records(self, decode: dict, sensor: str, api: object, payload: str or else: decoded_mac, resource_mac = self.get_compare_values(decode, resource_detail) if decoded_mac == resource_mac: - payload = self.createPayload(resource_detail, decode) + payload = self.create_payload(resource_detail, decode) # Hit self.detections += 1 if reviewed == 0: - self.logger.statusWrite("No hosts returned from API that match the reported detection. " - "Confirm your API credentials have access to the Falcon Hosts API." - ) + self.logger.status_write("No hosts returned from API that match the reported detection. " + "Confirm your API credentials have access to the Falcon Hosts API." + ) return payload def parse(self, line): + """Parse the received line.""" # Return a false if this is a miss payload = False decoded_line = json.loads(line.decode("utf-8")) # Grab our current offset cur_offset = decoded_line["metadata"]["offset"] # Detections only - if(decoded_line["metadata"]["eventType"] == "DetectionSummaryEvent"): + if decoded_line["metadata"]["eventType"] == "DetectionSummaryEvent": # Only submit detections that meet our severity threshold - if(int(self.severity_threshold) <= int(decoded_line["event"]["Severity"])): + if int(self.severity_threshold) <= int(decoded_line["event"]["Severity"]): creds = { "client_id": self.api_config["falcon_client_id"], "client_secret": self.api_config["falcon_client_secret"] @@ -229,7 +247,7 @@ def parse(self, line): if member_cid != self.current_cid.lower(): creds["member_cid"] = member_cid # Connect to the Hosts API and check all hosts that match this sensor - falcon = FalconSDK(creds=creds, base_url=self.base_url) + falcon = APIHarness(creds=creds, base_url=self.base_url) sensor = decoded_line["event"]["SensorId"] # print(sensor) payload = self.check_records(decoded_line, sensor, falcon, payload) @@ -243,15 +261,17 @@ def parse(self, line): return payload, cur_offset - def forceQuit(self): - self.logger.statusWrite("Thread quitting.") + def force_quit(self): + """Force the thread to quit.""" + self.logger.status_write("Thread quitting.") return True def process(self): + """Process the thread until told to quit.""" while self.running: with self.open() as stream: logtext = "started" if self.offset == 0 else "resumed" - self.logger.statusWrite("Stream {} at position {}".format(logtext, self.offset)) + self.logger.status_write(f"Stream {logtext} at position {self.offset}") for line in stream.iter_lines(): if self.running: if line: @@ -260,15 +280,15 @@ def process(self): # print(parsed) response = self.sqs_queue.send_message(MessageBody=json.dumps(parsed)) m_id = str(response.get("MessageId")) - self.logger.statusWrite("Sending detection to SQS. (Message ID: %s)" % m_id) - self.logger.outputWrite(parsed) + self.logger.status_write(f"Sending detection to SQS. (Message ID: {m_id})") + self.logger.output_write(parsed) # Update our position in the stream - self.setOffset(cur_offset) + self.set_offset(cur_offset) # Are we close to exceeding our epoch window? (refresh 1 minute before) - if self.token_expired(): + if self.token_expired: self.refresh() else: - self.setOffset(cur_offset) + self.set_offset(cur_offset) break else: continue @@ -276,6 +296,6 @@ def process(self): else: pass - self.forceQuit() + self.force_quit() raise SystemExit("Thread terminated")