Skip to content

Commit

Permalink
Merge b0b105b into 666b4cd
Browse files Browse the repository at this point in the history
  • Loading branch information
blakemotl committed Mar 18, 2020
2 parents 666b4cd + b0b105b commit 8d52b0c
Show file tree
Hide file tree
Showing 31 changed files with 308 additions and 361 deletions.
10 changes: 5 additions & 5 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ docs/build
__pycache__

# Terraform files
*.terraform/
terraform.tfvars
terraform/terraform.tfstate*
terraform/*.zip
terraform/*.tf.json
streamalert_cli/_infrastructure/.terraform/
streamalert_cli/_infrastructure/terraform.tfvars
streamalert_cli/_infrastructure/terraform.tfstate*
streamalert_cli/_infrastructure/*.zip
streamalert_cli/_infrastructure/*.tf.json

# Coveralls repo token
.coveralls.yml
Expand Down
3 changes: 0 additions & 3 deletions conf/global.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@
"write_capacity": 5
}
},
"s3_access_logging": {
"bucket_name": "specify bucket name here"
},
"classifier_sqs": {
"use_prefix": true
}
Expand Down
Binary file modified docs/images/sa-banner.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion docs/source/getting-started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Download StreamAlert
.. code-block:: bash
cd streamalert
python3.7 -m virtualenv venv
python3.7 -m venv venv
source venv/bin/activate
3. Install the StreamAlert requirements:
Expand Down
4 changes: 2 additions & 2 deletions streamalert/apps/_apps/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ on ec2 instance
$ rm -rf $HOME/.cache/pip/
# Create and source venv
$ python3.7 -m virtualenv $HOME/venv
$ python3.7 -m venv $HOME/venv
$ source $HOME/venv/bin/activate
# Upgrade pip and setuptools (they are super old)
Expand Down Expand Up @@ -126,7 +126,7 @@ SSH and Build Dependencies
$ which python3.7
# Create and source venv
$ python3.7 -m virtualenv venv && source venv/bin/activate
$ python3.7 -m venv venv && source venv/bin/activate
# upgrade pip and setuptools if neccessary
$ pip install --upgrade pip setuptools
Expand Down
109 changes: 43 additions & 66 deletions streamalert/classifier/clients/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,12 @@ class FirehoseClient:
# The max length of the firehose stream name is 64. For streamalert data firehose,
# we reserve 12 chars to have `streamalert_` as part of prefix. Please refer to
# terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
FIREHOSE_NAME_MAX_LEN = 52
AWS_FIREHOSE_NAME_MAX_LEN = 64

FIREHOSE_NAME_HASH_LEN = 8
FIREHOSE_NAME_MIN_HASH_LEN = 8

def __init__(self, prefix, firehose_config=None, log_sources=None):
self._original_prefix = prefix
if firehose_config and firehose_config.get('use_prefix', True):
self._use_prefix = True
else:
self._use_prefix = False

self._prefix = (
'{}_'.format(prefix)
# This default value must be consistent with the classifier Terraform config
if self._use_prefix
else ''
)
self._prefix = prefix if firehose_config.get('use_prefix', True) else ''
self._client = boto3.client('firehose', config=boto_helpers.default_config())
self.load_enabled_log_sources(firehose_config, log_sources, force_load=True)

Expand Down Expand Up @@ -138,6 +127,18 @@ def _record_batches(cls, records):
if current_batch:
yield current_batch

@classmethod
def sanitized_value(cls, key):
"""Sanitize a key by replacing non-word characters with '_'
Args:
key (str): a string needs to be sanitized
Returns:
str: sanitized string
"""
return re.sub(cls.SPECIAL_CHAR_REGEX, cls.SPECIAL_CHAR_SUB, key)

@classmethod
def sanitize_keys(cls, record):
"""Remove special characters from parsed record keys
Expand All @@ -153,7 +154,7 @@ def sanitize_keys(cls, record):
"""
new_record = {}
for key, value in record.items():
sanitized_key = re.sub(cls.SPECIAL_CHAR_REGEX, cls.SPECIAL_CHAR_SUB, key)
sanitized_key = cls.sanitized_value(key)

# Handle nested objects
if isinstance(value, dict):
Expand Down Expand Up @@ -301,56 +302,37 @@ def _firehose_request_helper(data):
self._log_failed(len(records_data))

@classmethod
def firehose_log_name(cls, log_name):
"""Convert conventional log names into Firehose delivery stream names
Args:
log_name: The name of the log from logs.json
Returns
str: Converted name which corresponds to a Firehose delivery Stream
"""
return re.sub(cls.SPECIAL_CHAR_REGEX, cls.SPECIAL_CHAR_SUB, log_name)

@classmethod
def generate_firehose_suffix(cls, use_prefix, prefix, log_stream_name):
def generate_firehose_name(cls, prefix, log_stream_name):
"""Generate suffix of stream name complaint to firehose naming restriction, no
longer than 64 characters
Args:
use_prefix (bool): Does apply prefix defined in conf/global.json to firehose stream name
prefix (str): The prefix defined in conf/global.json to firehose stream name
log_stream_name (str): The name of the log from conf/logs.json or conf/schemas/*.json
Returns:
str: suffix of stream name
"""

reserved_len = cls.FIREHOSE_NAME_MAX_LEN

if use_prefix:
# the prefix will have a trailing '_' (underscore) that's why deduct 1
# in the end. Please refer to terraform module for more detail
# terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
reserved_len = reserved_len - len(prefix) - 1

# Don't change the stream name if its length is complaint
if len(log_stream_name) <= reserved_len:
return log_stream_name

# Otherwise keep the first 51 chars if no prefix and hash the rest string into 8 chars.
# With prefix enabled, keep the first (51 - len(prefix) and hash the rest string into 8
# chars.
pos = reserved_len - cls.FIREHOSE_NAME_HASH_LEN
hash_part = log_stream_name[pos:]
hash_result = hashlib.md5(hash_part.encode()).hexdigest() # nosec

# combine the first part and first 8 chars of hash result together as new
if prefix:
prefix += '_'

# This same substitution method is used when naming the Delivery Streams
stream_name = cls.sanitized_value(cls.DEFAULT_FIREHOSE_FMT.format(prefix, log_stream_name))
if len(stream_name) <= cls.AWS_FIREHOSE_NAME_MAX_LEN:
return stream_name

base_name = stream_name[:cls.AWS_FIREHOSE_NAME_MAX_LEN - cls.FIREHOSE_NAME_MIN_HASH_LEN]
if not base_name.endswith('_'):
# make sure this ends in an underscore, but not 2
base_name = '{}_'.format(
base_name[:-1]
) if base_name[-2] != '_' else '{}_'.format(base_name[:-2])

# combine the base_name and first 8 chars of hash result together as new
# stream name.
# e.g. if use prefix
# 'very_very_very_long_log_stream_name_abcd_59_characters_long' may hash to
# 'very_very_very_long_log_stream_name_abcd_59_06ceefaa'
return ''.join([log_stream_name[:pos], hash_result[:cls.FIREHOSE_NAME_HASH_LEN]])
return '{}{}'.format(
base_name, hashlib.md5(base_name.encode()).hexdigest() # nosec
)[:cls.AWS_FIREHOSE_NAME_MAX_LEN]

@classmethod
def enabled_log_source(cls, log_source_name):
Expand All @@ -366,7 +348,7 @@ def enabled_log_source(cls, log_source_name):
LOGGER.error('Enabled logs not loaded')
return False

return cls.firehose_log_name(log_source_name) in cls._ENABLED_LOGS
return cls.sanitized_value(log_source_name) in cls._ENABLED_LOGS

@classmethod
def load_enabled_log_sources(cls, firehose_config, log_sources, force_load=False):
Expand Down Expand Up @@ -396,7 +378,7 @@ def load_enabled_log_sources(cls, firehose_config, log_sources, force_load=False
# Expand to all subtypes
if len(enabled_log_parts) == 1:
expanded_logs = {
cls.firehose_log_name(log_name): log_name
cls.sanitized_value(log_name): log_name
for log_name in log_sources
if log_name.split(':')[0] == enabled_log_parts[0]
}
Expand All @@ -412,7 +394,7 @@ def load_enabled_log_sources(cls, firehose_config, log_sources, force_load=False
LOGGER.error('Enabled Firehose log %s not declared in logs.json', enabled_log)
continue

cls._ENABLED_LOGS[cls.firehose_log_name('_'.join(enabled_log_parts))] = enabled_log
cls._ENABLED_LOGS[cls.sanitized_value(enabled_log)] = enabled_log

return cls._ENABLED_LOGS

Expand Down Expand Up @@ -444,16 +426,11 @@ def send(self, payloads):
# Each batch will be processed to their specific Firehose, which lands the data
# in a specific prefix in S3.
for log_type, records in records.items():
# This same substitution method is used when naming the Delivery Streams
formatted_log_type = self.firehose_log_name(log_type)

# firehose stream name has the length limit, no longer than 64 characters
formatted_stream_name = self.generate_firehose_suffix(
self._use_prefix, self._original_prefix, formatted_log_type
)
stream_name = self.DEFAULT_FIREHOSE_FMT.format(self._prefix, formatted_stream_name)
formatted_stream_name = self.generate_firehose_name(self._prefix, log_type)

# Process each record batch in the categorized payload set
for record_batch in self._record_batches(records):
batch_size = len(record_batch)
response = self._send_batch(stream_name, record_batch)
self._finalize(response, stream_name, batch_size)
response = self._send_batch(formatted_stream_name, record_batch)
self._finalize(response, formatted_stream_name, batch_size)
79 changes: 0 additions & 79 deletions streamalert/shared/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,88 +225,9 @@ def load_config(conf_dir='conf/', exclude=None, include=None, validate=True):
if validate:
_validate_config(config)

if config.get('logs'):
config['logs'] = _sanitize_log_names(config['logs'])

if _requires_sanitized_log_names(config):
config['global']['infrastructure']['firehose']['enabled_logs'] = _sanitize_log_names(
config['global']['infrastructure']['firehose']['enabled_logs']
)

return config


def _requires_sanitized_log_names(config):
"""Check if firehose has enabled_logs settings whose log names need to be sanitized
Args:
config (dict): loaded config from conf/ directory
Returns:
boolean: True if there are settings under "enabled_logs" in the "firehose" conf,
otherwise False.
"""
infra_config = config.get('global', {}).get('infrastructure')
if not infra_config:
return False

firehose_config = infra_config.get('firehose', {})
if not firehose_config:
return False

if not infra_config['firehose'].get('enabled_logs'):
return False

return True

def sanitize_key(key):
"""Sanitize a key by replacing non-characters with '_' (underscore), except ':' (colon).
Args:
key (str): a string needs to be sanitized
Returns:
str: sanitized string
"""
key_parts = key.split(':')
if len(key_parts) == 1:
# if no ':' in the key name, replace all special characters with '_'
# e.g.
# 'osquery_differential' -> 'osquery_differential'
# 'osquery_differntial.with.dots' -> 'osquery_differntial_with_dots'
sanitized_key = re.sub(SPECIAL_CHAR_REGEX, SPECIAL_CHAR_SUB, key)
elif len(key_parts) == 2:
# if there is a ':', replace the special chars in 2nd part and reconstruct
# sanitized key name
# e.g.
# 'carbonblack:alert.status.updated' -> 'carbonblack:alert_status_updated'
key_parts[1] = re.sub(SPECIAL_CHAR_REGEX, SPECIAL_CHAR_SUB, key_parts[1])
sanitized_key = ':'.join(key_parts)
else:
message = (
'Found offended log name "{}". Log name can only contain up to one colon. '
'Please check naming convention in conf/schemas/ or conf/logs.json'.format(key)
)
raise ConfigError(message)
return sanitized_key

def _sanitize_log_names(config):
"""Sanitize the name of logs and replace the dots with underscores. For some reason,
we have log name with dots in it in the conf/schemas/carbonblack.json or conf/logs.json.
Args:
config (dict): loaded config from conf/ directory
Returns:
new_config (dict): new config with sanitized keys
"""
new_config = dict()
for key, _ in config.items():
sanitized_key = sanitize_key(key)
new_config[sanitized_key] = config[key]

return new_config

def _load_schemas(schemas_dir, schema_files):
"""Helper to load all schemas from the schemas directory into one ordered dictionary.
Expand Down
18 changes: 18 additions & 0 deletions streamalert_cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
Copyright 2017-present Airbnb, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os

STREAMALERT_CLI_ROOT = os.path.dirname(os.path.abspath(__file__))
Empty file.
Loading

0 comments on commit 8d52b0c

Please sign in to comment.