Skip to content

Commit

Permalink
Merge baee4c6 into c4d2c12
Browse files Browse the repository at this point in the history
  • Loading branch information
chunyong-lin committed Mar 16, 2020
2 parents c4d2c12 + baee4c6 commit b5f4eb9
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 14 deletions.
6 changes: 3 additions & 3 deletions docs/source/config-global.rst
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ For instance, suppose the following schemas are defined across one or more files
Supposing also that the above ``enabled_logs`` :ref:`example <firehose_example_02>` is used, the
following Firehose resources will be created:

* ``<prefix>_streamalert_data_cloudwatch_cloudtrail``
* ``<prefix>_streamalert_data_osquery_differential``
* ``<prefix>_streamalert_data_osquery_status``
* ``<prefix>_data_cloudwatch_cloudtrail``
* ``<prefix>_data_osquery_differential``
* ``<prefix>_data_osquery_status``

.. note::

Expand Down
63 changes: 60 additions & 3 deletions streamalert/classifier/clients/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""
from collections import defaultdict
import json
import hashlib
import re

import backoff
Expand Down Expand Up @@ -52,19 +53,32 @@ class FirehoseClient:
MAX_RECORD_SIZE = 1000 * 1000 - 2

# Default firehose name format, should be formatted with deployment prefix
DEFAULT_FIREHOSE_FMT = '{}streamalert_data_{}'
DEFAULT_FIREHOSE_FMT = '{}data_{}'

# Exception for which backoff operations should be performed
EXCEPTIONS_TO_BACKOFF = (ClientError, BotocoreConnectionError, HTTPClientError)

# Set of enabled log types for firehose, loaded from configs
_ENABLED_LOGS = dict()

# The max length of the firehose stream name is 64. For streamalert data firehose,
# we reserve 5 chars to have `data_` as part of prefix. Please refer to
# terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
FIREHOSE_NAME_MAX_LEN = 59

FIREHOSE_NAME_HASH_LEN = 8

def __init__(self, prefix, firehose_config=None, log_sources=None):
self._original_prefix = prefix
if firehose_config and firehose_config.get('use_prefix', True):
self._use_prefix = True
else:
self._use_prefix = False

self._prefix = (
'{}_'.format(prefix)
# This default value must be consistent with the classifier Terraform config
if firehose_config and firehose_config.get('use_prefix', True)
if self._use_prefix
else ''
)
self._client = boto3.client('firehose', config=boto_helpers.default_config())
Expand Down Expand Up @@ -298,6 +312,45 @@ def firehose_log_name(cls, log_name):
"""
return re.sub(cls.SPECIAL_CHAR_REGEX, cls.SPECIAL_CHAR_SUB, log_name)

@classmethod
def generate_firehose_stream_name(cls, use_prefix, prefix, log_stream_name):
"""Generate stream name complaint to firehose naming restriction, no
longer than 64 characters
Args:
prefix (str): TODO
log_stream_name (str): TODO
Returns:
str: compliant stream name
"""

reserved_len = cls.FIREHOSE_NAME_MAX_LEN

if use_prefix:
# the prefix will have a trailing '_' (underscore) that's why deduct 1
# in the end. Please refer to terraform module for more detail
# terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
reserved_len = reserved_len - len(prefix) - 1

# Don't change the stream name if its length is complaint
if len(log_stream_name) <= reserved_len:
return log_stream_name

# Otherwise keep the first 51 chars if no prefix and hash the rest string into 8 chars.
# With prefix enabled, keep the first (51 - len(prefix) and hash the rest string into 8
# chars.
pos = reserved_len - cls.FIREHOSE_NAME_HASH_LEN
hash_part = log_stream_name[pos:]
hash_result = hashlib.md5(hash_part.encode()).hexdigest() # nosec

# combine the first part and first 8 chars of hash result together as new
# stream name.
# e.g. if use prefix
# 'very_very_very_long_log_stream_name_abcd_59_characters_long' may hash to
# 'very_very_very_long_log_stream_name_abcd_59_06ceefaa'
return ''.join([log_stream_name[:pos], hash_result[:cls.FIREHOSE_NAME_HASH_LEN]])

@classmethod
def enabled_log_source(cls, log_source_name):
"""Check that the incoming record is an enabled log source for Firehose
Expand Down Expand Up @@ -392,8 +445,12 @@ def send(self, payloads):
for log_type, records in records.items():
# This same substitution method is used when naming the Delivery Streams
formatted_log_type = self.firehose_log_name(log_type)
stream_name = self.DEFAULT_FIREHOSE_FMT.format(self._prefix, formatted_log_type)

# firehose stream name has the length limit, no longer than 64 characters
formatted_stream_name = self.generate_firehose_stream_name(
self._use_prefix, self._original_prefix, formatted_log_type
)
stream_name = self.DEFAULT_FIREHOSE_FMT.format(self._prefix, formatted_stream_name)
# Process each record batch in the categorized payload set
for record_batch in self._record_batches(records):
batch_size = len(record_batch)
Expand Down
4 changes: 3 additions & 1 deletion streamalert_cli/terraform/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def generate_firehose(logging_bucket, main_dict, config):
'file_format': get_data_file_format(config),
'use_prefix': firehose_conf.get('use_prefix', True),
'prefix': prefix,
'log_name': log_stream_name,
'log_name': FirehoseClient.generate_firehose_stream_name(
firehose_conf.get('use_prefix', True), prefix, log_stream_name
),
'role_arn': '${module.kinesis_firehose_setup.firehose_role_arn}',
's3_bucket_name': firehose_s3_bucket_name,
'kms_key_arn': '${aws_kms_key.server_side_encryption.arn}',
Expand Down
2 changes: 1 addition & 1 deletion terraform/modules/tf_classifier/firehose.tf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ resource "aws_iam_role_policy" "classifier_firehose" {
}

locals {
stream_prefix = "${var.firehose_use_prefix ? "${var.prefix}_" : ""}streamalert_data_"
stream_prefix = "${var.firehose_use_prefix ? "${var.prefix}_" : ""}data_"
}

// IAM Policy Doc: Allow the Classifier to PutRecord* on any StreamAlert Data Firehose
Expand Down
9 changes: 5 additions & 4 deletions terraform/modules/tf_kinesis_firehose_delivery_stream/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ locals {
# https://docs.aws.amazon.com/athena/latest/ug/tables-location-format.html
# So all data in parquet format will be saved s3 bucket with prefix
# "s3://bucketname/parquet/[data-type]".
s3_path_prefix = "parquet/${var.log_name}"
# glue_catalog_table_name maps to data-type if the length of data-type is not to long.
s3_path_prefix = "parquet/${var.glue_catalog_table_name}"
}

locals {
Expand All @@ -26,7 +27,7 @@ locals {
}

resource "aws_kinesis_firehose_delivery_stream" "streamalert_data" {
name = "${var.use_prefix ? "${var.prefix}_" : ""}streamalert_data_${var.log_name}"
name = "${var.use_prefix ? "${var.prefix}_" : ""}data_${var.log_name}"
destination = var.file_format == "parquet" ? "extended_s3" : "s3"

// AWS Firehose Stream for data to S3 and saved in JSON format
Expand All @@ -35,7 +36,7 @@ resource "aws_kinesis_firehose_delivery_stream" "streamalert_data" {
content {
role_arn = var.role_arn
bucket_arn = "arn:aws:s3:::${var.s3_bucket_name}"
prefix = "${var.log_name}/"
prefix = "${var.glue_catalog_table_name}/"
buffer_size = var.buffer_size
buffer_interval = var.buffer_interval
compression_format = "GZIP"
Expand Down Expand Up @@ -113,7 +114,7 @@ resource "aws_cloudwatch_metric_alarm" "firehose_records_alarm" {
// data athena table
resource "aws_glue_catalog_table" "data" {
count = var.file_format == "parquet" ? 1 : 0
name = var.log_name
name = var.glue_catalog_table_name
database_name = var.glue_catalog_db_name

table_type = "EXTERNAL_TABLE"
Expand Down
110 changes: 108 additions & 2 deletions tests/unit/streamalert/classifier/clients/test_firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def test_send(self, send_batch_mock):
]
self._client.send(self._sample_payloads)
send_batch_mock.assert_called_with(
'unit-test_streamalert_data_log_type_01_sub_type_01', expected_batch
'unit-test_data_log_type_01_sub_type_01', expected_batch
)

@patch.object(FirehoseClient, '_send_batch')
Expand All @@ -434,5 +434,111 @@ def test_send_no_prefixing(self, send_batch_mock):

client.send(self._sample_payloads)
send_batch_mock.assert_called_with(
'streamalert_data_log_type_01_sub_type_01', expected_batch
'data_log_type_01_sub_type_01', expected_batch
)

@property
def _sample_payloads_long_log_name(self):
return [
Mock(
log_schema_type=(
'very_very_very_long_log_stream_name_abcdefg_'
'abcdefg_70_characters_long'
),
parsed_records=[
{
'unit_key_01': 1,
'unit_key_02': 'test'
},
{
'unit_key_01': 2,
'unit_key_02': 'test'
}
]
)
]

@patch.object(FirehoseClient, '_send_batch')
def test_send_long_log_name(self, send_batch_mock):
"""FirehoseClient - Send data when the log name is very long"""
FirehoseClient._ENABLED_LOGS = {
'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long': {}
}
expected_batch = [
'{"unit_key_01":1,"unit_key_02":"test"}\n',
'{"unit_key_01":2,"unit_key_02":"test"}\n'
]

client = FirehoseClient.load_from_config(
prefix='unit-test',
firehose_config={'enabled': True, 'use_prefix': False},
log_sources=None
)

client.send(self._sample_payloads_long_log_name)
send_batch_mock.assert_called_with(
'data_very_very_very_long_log_stream_name_abcdefg_abcdefgbe9581ad', expected_batch
)

def test_generate_firehose_stream_name(self):
"""FirehoseClient - Test helper to generate firehose stream name when prefix disabled"""
stream_names = [
'logstreamname',
'log_stream_name',
'very_very_very_long_log_stream_name_abcd_59_characters_long',
'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long'
]

# the hex value can be calculated via python intepreter based on the
# generate_firehose_stream_name function. Copy and paste the tips here
# and make it easier if we change the test cases in the future.
#
# >>> import hashlib
# >>> s = 'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long'
# >>> hashlib.md5(s[51:].encode()).hexdigest()[:8]
# >>> be9581ad
#
expected_results = [
'logstreamname',
'log_stream_name',
'very_very_very_long_log_stream_name_abcd_59_characters_long',
'very_very_very_long_log_stream_name_abcdefg_abcdefgbe9581ad'
]
results = [
self._client.generate_firehose_stream_name(False, 'prefix', stream_name)
for stream_name in stream_names
]

assert_equal(expected_results, results)

def test_generate_firehose_stream_name_prefix(self):
"""FirehoseClient - Test helper to generate firehose stream name with prefix"""
stream_names = [
'logstreamname',
'log_stream_name',
'very_very_very_long_log_stream_name_abcd_59_characters_long',
'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long'
]

# >>> import hashlib
# >>> s3 = 'very_very_very_long_log_stream_name_abcd_59_characters_long'
# >>> s4 = 'very_very_very_long_log_stream_name_abcdefg_abcdefg_70_characters_long'
# >>> h3 = hashlib.md5(s3[44:].encode()).hexdigest()
# >>> h4 = hashlib.md5(s4[44:].encode()).hexdigest()
# >>> ''.join([s3[:44], h3[:8]])
# 'very_very_very_long_log_stream_name_abcd_59_06ceefaa'
# >>> ''.join([s4[:44], h4[:8]])
# 'very_very_very_long_log_stream_name_abcdefg_e80fecd8'
#
expected_results = [
'logstreamname',
'log_stream_name',
'very_very_very_long_log_stream_name_abcd_59_06ceefaa',
'very_very_very_long_log_stream_name_abcdefg_e80fecd8'
]
results = [
self._client.generate_firehose_stream_name(True, 'prefix', stream_name)
for stream_name in stream_names
]

assert_equal(expected_results, results)

0 comments on commit b5f4eb9

Please sign in to comment.