Skip to content

Commit

Permalink
Merge 9a9401f into 666b4cd
Browse files Browse the repository at this point in the history
  • Loading branch information
ryandeivert committed Mar 18, 2020
2 parents 666b4cd + 9a9401f commit 45c4cf1
Show file tree
Hide file tree
Showing 17 changed files with 114 additions and 304 deletions.
97 changes: 39 additions & 58 deletions streamalert/classifier/clients/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,12 @@ class FirehoseClient:
# The max length of the firehose stream name is 64. For streamalert data firehose,
# we reserve 12 chars to have `streamalert_` as part of prefix. Please refer to
# terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
FIREHOSE_NAME_MAX_LEN = 52
AWS_FIREHOSE_NAME_MAX_LEN = 64

FIREHOSE_NAME_HASH_LEN = 8
FIREHOSE_NAME_MIN_HASH_LEN = 8

def __init__(self, prefix, firehose_config=None, log_sources=None):
self._original_prefix = prefix
if firehose_config and firehose_config.get('use_prefix', True):
self._use_prefix = True
else:
self._use_prefix = False

self._prefix = (
'{}_'.format(prefix)
# This default value must be consistent with the classifier Terraform config
if self._use_prefix
else ''
)
self._prefix = prefix if firehose_config.get('use_prefix', True) else ''
self._client = boto3.client('firehose', config=boto_helpers.default_config())
self.load_enabled_log_sources(firehose_config, log_sources, force_load=True)

Expand Down Expand Up @@ -138,6 +127,18 @@ def _record_batches(cls, records):
if current_batch:
yield current_batch

@classmethod
def sanitized_value(cls, key):
"""Sanitize a key by replacing non-word characters with '_'
Args:
key (str): a string needs to be sanitized
Returns:
str: sanitized string
"""
return re.sub(cls.SPECIAL_CHAR_REGEX, cls.SPECIAL_CHAR_SUB, key)

@classmethod
def sanitize_keys(cls, record):
"""Remove special characters from parsed record keys
Expand All @@ -153,7 +154,7 @@ def sanitize_keys(cls, record):
"""
new_record = {}
for key, value in record.items():
sanitized_key = re.sub(cls.SPECIAL_CHAR_REGEX, cls.SPECIAL_CHAR_SUB, key)
sanitized_key = cls.sanitized_value(key)

# Handle nested objects
if isinstance(value, dict):
Expand Down Expand Up @@ -301,19 +302,7 @@ def _firehose_request_helper(data):
self._log_failed(len(records_data))

@classmethod
def firehose_log_name(cls, log_name):
"""Convert conventional log names into Firehose delivery stream names
Args:
log_name: The name of the log from logs.json
Returns
str: Converted name which corresponds to a Firehose delivery Stream
"""
return re.sub(cls.SPECIAL_CHAR_REGEX, cls.SPECIAL_CHAR_SUB, log_name)

@classmethod
def generate_firehose_suffix(cls, use_prefix, prefix, log_stream_name):
def generate_firehose_name(cls, prefix, log_stream_name):
"""Generate suffix of stream name complaint to firehose naming restriction, no
longer than 64 characters
Expand All @@ -325,32 +314,29 @@ def generate_firehose_suffix(cls, use_prefix, prefix, log_stream_name):
Returns:
str: suffix of stream name
"""
if prefix:
prefix += '_'

reserved_len = cls.FIREHOSE_NAME_MAX_LEN

if use_prefix:
# the prefix will have a trailing '_' (underscore) that's why deduct 1
# in the end. Please refer to terraform module for more detail
# terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
reserved_len = reserved_len - len(prefix) - 1
# This same substitution method is used when naming the Delivery Streams
stream_name = cls.sanitized_value(cls.DEFAULT_FIREHOSE_FMT.format(prefix, log_stream_name))
if len(stream_name) <= cls.AWS_FIREHOSE_NAME_MAX_LEN:
return stream_name

# Don't change the stream name if its length is complaint
if len(log_stream_name) <= reserved_len:
return log_stream_name

# Otherwise keep the first 51 chars if no prefix and hash the rest string into 8 chars.
# With prefix enabled, keep the first (51 - len(prefix) and hash the rest string into 8
# chars.
pos = reserved_len - cls.FIREHOSE_NAME_HASH_LEN
hash_part = log_stream_name[pos:]
hash_result = hashlib.md5(hash_part.encode()).hexdigest() # nosec
base_name = stream_name[:cls.AWS_FIREHOSE_NAME_MAX_LEN - cls.FIREHOSE_NAME_MIN_HASH_LEN]
if not base_name.endswith('_'):
# make sure this ends in an underscore, but not 2
base_name = '{}_'.format(
base_name[:-1]
) if base_name[-2] != '_' else '{}_'.format(base_name[:-2])

# combine the first part and first 8 chars of hash result together as new
# stream name.
# e.g. if use prefix
# 'very_very_very_long_log_stream_name_abcd_59_characters_long' may hash to
# 'very_very_very_long_log_stream_name_abcd_59_06ceefaa'
return ''.join([log_stream_name[:pos], hash_result[:cls.FIREHOSE_NAME_HASH_LEN]])
return '{}{}'.format(
base_name, hashlib.md5(base_name.encode()).hexdigest() # nosec
)[:cls.AWS_FIREHOSE_NAME_MAX_LEN]

@classmethod
def enabled_log_source(cls, log_source_name):
Expand All @@ -366,7 +352,7 @@ def enabled_log_source(cls, log_source_name):
LOGGER.error('Enabled logs not loaded')
return False

return cls.firehose_log_name(log_source_name) in cls._ENABLED_LOGS
return cls.sanitized_value(log_source_name) in cls._ENABLED_LOGS

@classmethod
def load_enabled_log_sources(cls, firehose_config, log_sources, force_load=False):
Expand Down Expand Up @@ -396,7 +382,7 @@ def load_enabled_log_sources(cls, firehose_config, log_sources, force_load=False
# Expand to all subtypes
if len(enabled_log_parts) == 1:
expanded_logs = {
cls.firehose_log_name(log_name): log_name
cls.sanitized_value(log_name): log_name
for log_name in log_sources
if log_name.split(':')[0] == enabled_log_parts[0]
}
Expand All @@ -412,7 +398,7 @@ def load_enabled_log_sources(cls, firehose_config, log_sources, force_load=False
LOGGER.error('Enabled Firehose log %s not declared in logs.json', enabled_log)
continue

cls._ENABLED_LOGS[cls.firehose_log_name('_'.join(enabled_log_parts))] = enabled_log
cls._ENABLED_LOGS[cls.sanitized_value(enabled_log)] = enabled_log

return cls._ENABLED_LOGS

Expand Down Expand Up @@ -444,16 +430,11 @@ def send(self, payloads):
# Each batch will be processed to their specific Firehose, which lands the data
# in a specific prefix in S3.
for log_type, records in records.items():
# This same substitution method is used when naming the Delivery Streams
formatted_log_type = self.firehose_log_name(log_type)

# firehose stream name has the length limit, no longer than 64 characters
formatted_stream_name = self.generate_firehose_suffix(
self._use_prefix, self._original_prefix, formatted_log_type
)
stream_name = self.DEFAULT_FIREHOSE_FMT.format(self._prefix, formatted_stream_name)
formatted_stream_name = self.generate_firehose_name(self._prefix, log_type)

# Process each record batch in the categorized payload set
for record_batch in self._record_batches(records):
batch_size = len(record_batch)
response = self._send_batch(stream_name, record_batch)
self._finalize(response, stream_name, batch_size)
response = self._send_batch(formatted_stream_name, record_batch)
self._finalize(response, formatted_stream_name, batch_size)
79 changes: 0 additions & 79 deletions streamalert/shared/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,88 +225,9 @@ def load_config(conf_dir='conf/', exclude=None, include=None, validate=True):
if validate:
_validate_config(config)

if config.get('logs'):
config['logs'] = _sanitize_log_names(config['logs'])

if _requires_sanitized_log_names(config):
config['global']['infrastructure']['firehose']['enabled_logs'] = _sanitize_log_names(
config['global']['infrastructure']['firehose']['enabled_logs']
)

return config


def _requires_sanitized_log_names(config):
"""Check if firehose has enabled_logs settings whose log names need to be sanitized
Args:
config (dict): loaded config from conf/ directory
Returns:
boolean: True if there are settings under "enabled_logs" in the "firehose" conf,
otherwise False.
"""
infra_config = config.get('global', {}).get('infrastructure')
if not infra_config:
return False

firehose_config = infra_config.get('firehose', {})
if not firehose_config:
return False

if not infra_config['firehose'].get('enabled_logs'):
return False

return True

def sanitize_key(key):
"""Sanitize a key by replacing non-characters with '_' (underscore), except ':' (colon).
Args:
key (str): a string needs to be sanitized
Returns:
str: sanitized string
"""
key_parts = key.split(':')
if len(key_parts) == 1:
# if no ':' in the key name, replace all special characters with '_'
# e.g.
# 'osquery_differential' -> 'osquery_differential'
# 'osquery_differntial.with.dots' -> 'osquery_differntial_with_dots'
sanitized_key = re.sub(SPECIAL_CHAR_REGEX, SPECIAL_CHAR_SUB, key)
elif len(key_parts) == 2:
# if there is a ':', replace the special chars in 2nd part and reconstruct
# sanitized key name
# e.g.
# 'carbonblack:alert.status.updated' -> 'carbonblack:alert_status_updated'
key_parts[1] = re.sub(SPECIAL_CHAR_REGEX, SPECIAL_CHAR_SUB, key_parts[1])
sanitized_key = ':'.join(key_parts)
else:
message = (
'Found offended log name "{}". Log name can only contain up to one colon. '
'Please check naming convention in conf/schemas/ or conf/logs.json'.format(key)
)
raise ConfigError(message)
return sanitized_key

def _sanitize_log_names(config):
"""Sanitize the name of logs and replace the dots with underscores. For some reason,
we have log name with dots in it in the conf/schemas/carbonblack.json or conf/logs.json.
Args:
config (dict): loaded config from conf/ directory
Returns:
new_config (dict): new config with sanitized keys
"""
new_config = dict()
for key, _ in config.items():
sanitized_key = sanitize_key(key)
new_config[sanitized_key] = config[key]

return new_config

def _load_schemas(schemas_dir, schema_files):
"""Helper to load all schemas from the schemas directory into one ordered dictionary.
Expand Down
8 changes: 4 additions & 4 deletions streamalert_cli/athena/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def rebuild_partitions(table, bucket, config):
Returns:
bool: False if errors occurred, True otherwise
"""
sanitized_table_name = FirehoseClient.firehose_log_name(table)
sanitized_table_name = FirehoseClient.sanitized_value(table)

athena_client = get_athena_client(config)

Expand Down Expand Up @@ -272,6 +272,7 @@ def rebuild_partitions(table, bucket, config):
LOGGER.info('Successfully rebuilt all partitions for %s', sanitized_table_name)
return True


def write_partitions_statements(statements, sanitized_table_name):
"""Write partitions statements to a file if re-creating new partitions failed"""
file_name = 'partitions_{}.txt'.format(sanitized_table_name)
Expand All @@ -282,6 +283,7 @@ def write_partitions_statements(statements, sanitized_table_name):
with open(file_name, 'w') as partition_file:
partition_file.write(statements)


def drop_all_tables(config):
"""Drop all 'streamalert' Athena tables
Expand Down Expand Up @@ -331,8 +333,6 @@ def _construct_create_table_statement(schema, table_name, bucket, file_format='p
)
schema_statement.append('{0} struct<{1}>'.format(key_name, struct_schema))



return CREATE_TABLE_STATEMENT.format(
table_name=table_name,
schema=', '.join(schema_statement),
Expand Down Expand Up @@ -360,7 +360,7 @@ def create_table(table, bucket, config, schema_override=None):
)

# Convert special characters in schema name to underscores
sanitized_table_name = FirehoseClient.firehose_log_name(table)
sanitized_table_name = FirehoseClient.sanitized_value(table)

# Check that the log type is enabled via Firehose
if sanitized_table_name != 'alerts' and sanitized_table_name not in enabled_logs:
Expand Down
4 changes: 3 additions & 1 deletion streamalert_cli/athena/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
# Athena query statement length limit
MAX_QUERY_LENGTH = 262144


def add_partition_statements(partitions, bucket, table_name):
"""Generate ALTER TABLE commands from existing partitions. It wil yield Athena
statement string(s), the length of each string should be less than Athena query
Expand Down Expand Up @@ -186,6 +187,7 @@ def generate_alerts_table_schema():

return format_schema_tf(athena_schema)


def generate_data_table_schema(config, table, schema_override=None):
"""Generate the schema for data table in terraform
Expand All @@ -202,7 +204,7 @@ def generate_data_table_schema(config, table, schema_override=None):
)

# Convert special characters in schema name to underscores
sanitized_table_name = FirehoseClient.firehose_log_name(table)
sanitized_table_name = FirehoseClient.sanitized_value(table)

# Check that the log type is enabled via Firehose
if sanitized_table_name not in enabled_logs:
Expand Down
2 changes: 1 addition & 1 deletion streamalert_cli/manage_lambda/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def _copy_files(self, temp_package_path):
# Copy the directory, skipping any files with a 'dependencies.zip' suffix
shutil.copytree(
path, os.path.join(temp_package_path, path),
ignore=shutil.ignore_patterns(*{'*dependencies.zip'})
ignore=shutil.ignore_patterns('*dependencies.zip', '*.json')
)
else:
# Ensure the parent directory of the file being copied already exists
Expand Down
8 changes: 3 additions & 5 deletions streamalert_cli/terraform/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def generate_firehose(logging_bucket, main_dict, config):

db_name = get_database_name(config)

firehose_prefix = prefix if firehose_conf.get('use_prefix', True) else ''

# Add the Delivery Streams individually
for log_stream_name, log_type_name in enabled_logs.items():
module_dict = {
Expand All @@ -70,11 +72,7 @@ def generate_firehose(logging_bucket, main_dict, config):
firehose_conf.get('buffer_interval', 300)
),
'file_format': get_data_file_format(config),
'use_prefix': firehose_conf.get('use_prefix', True),
'prefix': prefix,
'log_name': FirehoseClient.generate_firehose_suffix(
firehose_conf.get('use_prefix', True), prefix, log_stream_name
),
'stream_name': FirehoseClient.generate_firehose_name(firehose_prefix, log_stream_name),
'role_arn': '${module.kinesis_firehose_setup.firehose_role_arn}',
's3_bucket_name': firehose_s3_bucket_name,
'kms_key_arn': '${aws_kms_key.server_side_encryption.arn}',
Expand Down
10 changes: 7 additions & 3 deletions streamalert_cli/test/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ def suppressed(self, value):
self._suppressed = self._suppressed or value
return self._suppressed

@property
def is_valid(self):
def is_valid(self, config):
"""Check if the test event contains the required keys
Returns:
Expand Down Expand Up @@ -148,6 +147,10 @@ def is_valid(self):
)
return False

if self.log not in config['logs']:
self.error = 'No defined schema in config for log type: {}'.format(self.log)
return False

# Log a warning if there are extra keys declared in the test log, but this is not an error
key_diff = test_event_keys.difference(
self.REQUIRED_KEYS | self.OPTIONAL_KEYS | self.ACCEPTABLE_DATA_KEYS
Expand All @@ -173,7 +176,7 @@ def setup_fixtures(self):
ThreatIntelMocks.add_fixtures(self.threat_intel_fixtures)

def prepare(self, config):
if not self.is_valid:
if not self.is_valid(config):
return False

if not self.format_test_record(config):
Expand Down Expand Up @@ -359,6 +362,7 @@ def _apply_defaults(self, config):
if not self.override_record:
return

# The existence of this is checked in the is_valid function
event_log = config['logs'].get(self.log)

configuration = event_log.get('configuration', {})
Expand Down
Loading

0 comments on commit 45c4cf1

Please sign in to comment.