Skip to content

Commit

Permalink
Linting. SSL verify keyword handling. Closes #130.
Browse files Browse the repository at this point in the history
  • Loading branch information
jshcodes committed Dec 15, 2021
1 parent 455d3a5 commit afbf6d7
Showing 1 changed file with 66 additions and 46 deletions.
112 changes: 66 additions & 46 deletions Security-Hub/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
"""
import datetime
import time
import requests
import json
import logger
import re
from falconpy.api_complete import APIHarness as FalconSDK

import requests
from falconpy import APIHarness
import logger

class Stream():

def __init__(self, stream_config, api_interface, sqs_queue, config, current_cid):
self.dataFeed = stream_config["dataFeedURL"]
class Stream(): # pylint: disable=R0902
"""Class to represent a single stream."""
def __init__(self, stream_config, api_interface, sqs_queue, config, current_cid): # pylint: disable=R0913
self.data_feed = stream_config["dataFeedURL"]
self.token = stream_config["sessionToken"]["token"]
self.token_expiration = stream_config["sessionToken"]["expiration"]
self.refresh_url = stream_config["refreshActiveSessionURL"]
Expand All @@ -28,8 +28,8 @@ def __init__(self, stream_config, api_interface, sqs_queue, config, current_cid)
# Fallback to commercial if we can't calculate it
self.base_url = "https://api.crowdstrike.com"

self.t1 = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S +0000')
self.headers = {'Authorization': 'Token %s' % (self.token), 'Date': self.t1, 'Connection': 'Keep-Alive'}
self.time_one = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S +0000')
self.headers = {'Authorization': f'Token {self.token}', 'Date': self.time_one, 'Connection': 'Keep-Alive'}
self.discarded = 0
self.detections = 0
self.processed = 0
Expand All @@ -41,59 +41,73 @@ def __init__(self, stream_config, api_interface, sqs_queue, config, current_cid)
self.api_config = config
self.sqs_queue = sqs_queue
self.partition = self.refresh_url.replace(
"{}/sensors/entities/datafeed-actions/v1/".format(self.base_url),
f"{self.base_url}/sensors/entities/datafeed-actions/v1/",
""
).replace(
"?appId={}&action_name=refresh_active_stream_session".format(str(self.app_id).lower()),
f"?appId={str(self.app_id).lower()}&action_name=refresh_active_stream_session",
""
)
# Log, offset and PID files
self.log_file = "{}_{}.log".format(str(self.app_id).lower(), str(self.partition))
self.offset_file = ".{}_{}.offset".format(str(self.app_id).lower(), str(self.partition))
self.log_file = f"{str(self.app_id).lower()}_{str(self.partition)}.log"
self.offset_file = f".{str(self.app_id).lower()}_{str(self.partition)}.offset"
# Might not use this
self.process_id = "{}_{}.pid".format(str(self.app_id), str(self.partition))
self.logger = logger.Logger(self.log_file, "stream{}".format(str(self.partition)))
self.process_id = f"{str(self.app_id)}_{str(self.partition)}.pid"
self.logger = logger.Logger(self.log_file, f"stream{str(self.partition)}")

# Retrieve our offset if it's been stashed previously
self.offset = self.logger.offsetRead(self.offset_file)
self.offset = self.logger.offset_read(self.offset_file)
# Find our old position in the stream
if self.offset > 0:
self.dataFeed = self.dataFeed + "&offset={}".format(str(self.offset))
self.data_feed = self.data_feed + f"&offset={str(self.offset)}"
# Our active spout
self.spigot = False
# Token reporting lambdas
self.token_expired = lambda: True if (int(time.time()) > ((int(self.refresh_interval)-60)+self.epoch)) else False
self.token_expired = bool(int(time.time()) > ((int(self.refresh_interval)-60)+self.epoch))
self.token_remains = lambda: ((int(self.refresh_interval)-60)+self.epoch)-int(time.time())
# Thread running flag
self.running = True

def __enter__(self):
"""Enter handler."""
return self

def __exit__(self, exc_type, exc_value, traceback):
pass
"""Exit handler."""
self.logger.status_write("Stream exiting.")

def toJson(self):
def to_json(self):
"""Return the entire object in JSON format."""
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True)

def setOffset(self, offset):
def set_offset(self, offset):
"""Write the offset value to the offset log."""
self.offset = offset
self.logger.offsetWrite(self.offset_file, offset)
self.logger.offset_write(self.offset_file, offset)
return True

def open(self):
self.spigot = requests.get(self.dataFeed, headers=self.headers, stream=True, verify=self.api_config["ssl_verify"])
"""Open the stream."""
kwargs = {
"url": self.data_feed,
"headers": self.headers,
"stream": True
}
if not self.api_config["ssl_verify"]:
kwargs["verify"] = False

self.spigot = requests.get(**kwargs)

return self.spigot

def close(self):
"""Close the stream."""
self.spigot.close()
self.spigot = False

return True

def refresh(self):
# Refresh the token
"""Refresh the API stream token."""
refresher = self.api_interface.command(action="refreshActiveStreamSession", partition=self.partition, parameters={
"action_name": "refresh_active_stream_session",
"appId": str(self.app_id).lower()
Expand All @@ -104,11 +118,13 @@ def refresh(self):
if refresher["status_code"] == 200:
refreshed = True
self.epoch = int(time.time())
self.logger.statusWrite("Token refreshed.")
self.logger.status_write("Token refreshed.")

return refreshed

def createPayload(self, resource_detail, decoded_line):
@staticmethod
def create_payload(resource_detail, decoded_line):
"""Create our SQS payload."""
utc = datetime.datetime.utcfromtimestamp
create_time = utc(float(decoded_line['metadata']['eventCreationTime'])/1000.).isoformat()+'Z'
update_time = (utc(datetime.datetime.timestamp(datetime.datetime.now()))).isoformat()+'Z'
Expand Down Expand Up @@ -153,13 +169,14 @@ def createPayload(self, resource_detail, decoded_line):
payload['Network']['SourcePort'] = net_traffic['LocalPort']
payload['Network']['DestinationIpV4'] = net_traffic['RemoteAddress']
payload['Network']['DestinationPort'] = net_traffic['RemotePort']
except KeyError or IndexError:
except (KeyError, IndexError):
payload.pop('Network', None)

return payload

def get_compare_values(self, decoded_l: dict, resource_det: dict):
decoded_m = "left"
@staticmethod
def get_compare_values(decoded_l: dict, resource_det: dict):
"""Retrieve the values for our comparison."""
resource_m = "right"
try:
decoded_m = decoded_l["event"]["MACAddress"].lower().replace(":", "").replace("-", "")
Expand Down Expand Up @@ -187,7 +204,7 @@ def check_records(self, decode: dict, sensor: str, api: object, payload: str or
if resource_detail["service_provider"] == "AWS_EC2":
decoded_mac, resource_mac = self.get_compare_values(decode, resource_detail)
if decoded_mac == resource_mac:
payload = self.createPayload(resource_detail, decode)
payload = self.create_payload(resource_detail, decode)
# Hit
self.detections += 1
else:
Expand All @@ -199,25 +216,26 @@ def check_records(self, decode: dict, sensor: str, api: object, payload: str or
else:
decoded_mac, resource_mac = self.get_compare_values(decode, resource_detail)
if decoded_mac == resource_mac:
payload = self.createPayload(resource_detail, decode)
payload = self.create_payload(resource_detail, decode)
# Hit
self.detections += 1
if reviewed == 0:
self.logger.statusWrite("No hosts returned from API that match the reported detection. "
"Confirm your API credentials have access to the Falcon Hosts API."
)
self.logger.status_write("No hosts returned from API that match the reported detection. "
"Confirm your API credentials have access to the Falcon Hosts API."
)
return payload

def parse(self, line):
"""Parse the received line."""
# Return a false if this is a miss
payload = False
decoded_line = json.loads(line.decode("utf-8"))
# Grab our current offset
cur_offset = decoded_line["metadata"]["offset"]
# Detections only
if(decoded_line["metadata"]["eventType"] == "DetectionSummaryEvent"):
if decoded_line["metadata"]["eventType"] == "DetectionSummaryEvent":
# Only submit detections that meet our severity threshold
if(int(self.severity_threshold) <= int(decoded_line["event"]["Severity"])):
if int(self.severity_threshold) <= int(decoded_line["event"]["Severity"]):
creds = {
"client_id": self.api_config["falcon_client_id"],
"client_secret": self.api_config["falcon_client_secret"]
Expand All @@ -229,7 +247,7 @@ def parse(self, line):
if member_cid != self.current_cid.lower():
creds["member_cid"] = member_cid
# Connect to the Hosts API and check all hosts that match this sensor
falcon = FalconSDK(creds=creds, base_url=self.base_url)
falcon = APIHarness(creds=creds, base_url=self.base_url)
sensor = decoded_line["event"]["SensorId"]
# print(sensor)
payload = self.check_records(decoded_line, sensor, falcon, payload)
Expand All @@ -243,15 +261,17 @@ def parse(self, line):

return payload, cur_offset

def forceQuit(self):
self.logger.statusWrite("Thread quitting.")
def force_quit(self):
"""Force the thread to quit."""
self.logger.status_write("Thread quitting.")
return True

def process(self):
"""Process the thread until told to quit."""
while self.running:
with self.open() as stream:
logtext = "started" if self.offset == 0 else "resumed"
self.logger.statusWrite("Stream {} at position {}".format(logtext, self.offset))
self.logger.status_write(f"Stream {logtext} at position {self.offset}")
for line in stream.iter_lines():
if self.running:
if line:
Expand All @@ -260,22 +280,22 @@ def process(self):
# print(parsed)
response = self.sqs_queue.send_message(MessageBody=json.dumps(parsed))
m_id = str(response.get("MessageId"))
self.logger.statusWrite("Sending detection to SQS. (Message ID: %s)" % m_id)
self.logger.outputWrite(parsed)
self.logger.status_write(f"Sending detection to SQS. (Message ID: {m_id})")
self.logger.output_write(parsed)
# Update our position in the stream
self.setOffset(cur_offset)
self.set_offset(cur_offset)
# Are we close to exceeding our epoch window? (refresh 1 minute before)
if self.token_expired():
if self.token_expired:
self.refresh()
else:
self.setOffset(cur_offset)
self.set_offset(cur_offset)
break
else:
continue
break
else:
pass

self.forceQuit()
self.force_quit()

raise SystemExit("Thread terminated")

0 comments on commit afbf6d7

Please sign in to comment.