diff --git a/publishers/community/generic.py b/publishers/community/generic.py index 4d52eccc6..b388374ed 100644 --- a/publishers/community/generic.py +++ b/publishers/community/generic.py @@ -107,7 +107,7 @@ def remove_fields(alert, publication): @Register def remove_streamalert_normalization(_, publication): - """This publisher removes the super heavyweight 'streamalert:normalization' fields""" + """This publisher removes the super heavyweight 'streamalert_normalization' fields""" return _delete_dictionary_fields(publication, Normalizer.NORMALIZATION_KEY) diff --git a/streamalert/artifact_extractor/artifact_extractor.py b/streamalert/artifact_extractor/artifact_extractor.py index bebf6ef80..558b69101 100644 --- a/streamalert/artifact_extractor/artifact_extractor.py +++ b/streamalert/artifact_extractor/artifact_extractor.py @@ -14,6 +14,7 @@ import json import re from os import environ as env +import uuid from streamalert.shared.firehose import FirehoseClient from streamalert.shared import config @@ -23,6 +24,8 @@ LOGGER = get_logger(__name__) +RECORD_ID_KEY = 'streamalert_record_id' + class Artifact: """Encapsulation of a single Artifact that is extracted from an input record.""" @@ -57,7 +60,7 @@ def record(self): """ return { 'function': self._function, - 'record_id': self._record_id, + RECORD_ID_KEY: self._record_id, 'source_type': self._source_type, 'type': self._type, 'value': self._value, @@ -70,9 +73,10 @@ class FirehoseRecord: def __init__(self, firehose_record, source_type): """Create a new Firehose record contains original data and may extract multiple artifacts if original data was normalized in the classifier. - The original data which will be returned back to source firehose for historical search. And - the artifacts, if any, will be sent to a dedicated firehose with simplified schema and land - in streamalert data bucket for historical search as well. + The transformed data (insert a record_id only) which will be returned back to source + firehose for historical search. And the artifacts, if any, will be sent to a dedicated + firehose with simplified schema and land in streamalert data bucket for historical search + as well. Args: firehose_record (dict): the record passed to lambda from source firehose. It has @@ -118,7 +122,7 @@ def artifacts(self): # 'awsRegion': 'us-west-2' # } # }, - # 'streamalert:normalization': { + # 'streamalert_normalization': { # 'region': [ # { # 'values': ['region_name'], @@ -132,17 +136,22 @@ def artifacts(self): # } # } # + record_id = self._decoded_record.get(RECORD_ID_KEY) or str(uuid.uuid4()) for key, values in self._decoded_record[Normalizer.NORMALIZATION_KEY].items(): for value in values: for val in value.get('values', []): artifacts.append(Artifact( function=value.get('function'), - record_id=self._decoded_record.get('record_id'), + record_id=record_id, source_type=self._source_type, normalized_type=key, value=val )) + # Add a new key "streamalert_record_id" to "streamalert_normalization" field. This new key + # will behelpful tracing back to the original record when searching in "artifacts" table. + self._decoded_record[Normalizer.NORMALIZATION_KEY][RECORD_ID_KEY] = record_id + return artifacts @property @@ -162,9 +171,8 @@ def transformed_record(self): Dropped, and ProcessingFailed. The purpose of ArtifactExtractor lambda is to extract artifacts and it should not change the data. So the result will alway be 'Ok'. - data: The transformed data payload, base64-encoded. The purpose of ArtifactExtract lambda - function is to extract artifacts and it should not change the original data. Thus, it - encapsulates original base64-encoded data. + data: The transformed data payload, base64-encoded. The transformed data payload includes a + new key "streamalert_record_id" and it's the only difference from original data payload. Returns: dict: A dictionary with required fields 'result', 'data' and 'recordId'. @@ -172,9 +180,17 @@ def transformed_record(self): return { 'recordId': self._firehose_record_id, 'result': 'Ok', - 'data': self._firehose_data + 'data': base64.b64encode(self._json_serializer()).decode('utf-8') } + def _json_serializer(self): + """Serialize a transformed record to a JSON formatted string + + Returns: + str: a JSON formatted string with a newline appened. + """ + return (json.dumps(self._decoded_record, separators=(',', ':')) + '\n').encode('utf-8') + class ArtifactExtractor: """ArtifactExtractor class will extract normalized artifacts from batch of records from source Firehose and return the original records back to Firehose where thoese records will be delivered diff --git a/streamalert/shared/normalize.py b/streamalert/shared/normalize.py index 9fae7d010..9395f7d71 100644 --- a/streamalert/shared/normalize.py +++ b/streamalert/shared/normalize.py @@ -171,7 +171,7 @@ def _parse_param_type(self, params): class Normalizer: """Normalizer class to handle log key normalization in payloads""" - NORMALIZATION_KEY = 'streamalert:normalization' + NORMALIZATION_KEY = 'streamalert_normalization' # Store the normalized types mapping to original keys from the records _types_config = dict() diff --git a/tests/unit/streamalert/artifact_extractor/helpers.py b/tests/unit/streamalert/artifact_extractor/helpers.py index 4be8fbf65..84d6d66be 100644 --- a/tests/unit/streamalert/artifact_extractor/helpers.py +++ b/tests/unit/streamalert/artifact_extractor/helpers.py @@ -18,6 +18,8 @@ from streamalert.shared.normalize import Normalizer +MOCK_RECORD_ID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa' + def native_firehose_records(normalized=False, count=2): """Generate sample firehose records for unit tests""" json_data = [ @@ -44,7 +46,9 @@ def native_firehose_records(normalized=False, count=2): return [ { 'recordId': 'record_id_{}'.format(cnt), - 'data': base64.b64encode(json.dumps(json_data[cnt]).encode('utf-8')), + 'data': base64.b64encode( + (json.dumps(json_data[cnt], separators=(',', ':')) + '\n').encode('utf-8') + ).decode('utf-8'), 'approximateArrivalTimestamp': 1583275630000+int(cnt) } for cnt in range(count) ] @@ -69,14 +73,17 @@ def transformed_firehose_records(normalized=False, count=2): 'values': ['value2', 'value3'], 'function': None } - ] + ], + 'streamalert_record_id': MOCK_RECORD_ID } return { 'records': [ { 'result': 'Ok', - 'data': base64.b64encode(json.dumps(json_data[cnt]).encode('utf-8')), + 'data': base64.b64encode( + (json.dumps(json_data[cnt], separators=(',', ':')) + '\n').encode('utf-8') + ).decode('utf-8'), 'recordId': 'record_id_{}'.format(cnt) } for cnt in range(count) ] @@ -97,10 +104,10 @@ def generate_artifacts(): artifacts = [ { 'function': 'None', - 'record_id': 'None', + 'streamalert_record_id': MOCK_RECORD_ID, 'source_type': 'unit_test', 'type': type, - 'value': value, + 'value': value } for type, value in normalized_values ] diff --git a/tests/unit/streamalert/artifact_extractor/test_artifact_extractor.py b/tests/unit/streamalert/artifact_extractor/test_artifact_extractor.py index dca4af8f3..271d6a9c5 100644 --- a/tests/unit/streamalert/artifact_extractor/test_artifact_extractor.py +++ b/tests/unit/streamalert/artifact_extractor/test_artifact_extractor.py @@ -27,6 +27,7 @@ native_firehose_records, transformed_firehose_records, generate_artifacts, + MOCK_RECORD_ID, ) @@ -44,7 +45,7 @@ def test_record(self): # pylint: disable=no-self-use ) expected_result = { 'function': 'None', - 'record_id': 'test_record_id', + 'streamalert_record_id': 'test_record_id', 'source_type': 'test_source_type', 'type': 'test_normalized_type', 'value': 'test_value' @@ -83,10 +84,12 @@ def test_run_zero_artifact(self, logger_mock): expected_result = transformed_firehose_records() assert_equal(result, expected_result) + @patch('uuid.uuid4') @patch.object(FirehoseClient, '_send_batch') @patch('streamalert.artifact_extractor.artifact_extractor.LOGGER') - def test_run(self, logger_mock, send_batch_mock): + def test_run(self, logger_mock, send_batch_mock, uuid_mock): """ArtifactExtractor - Test run method extract artifacts""" + uuid_mock.return_value = MOCK_RECORD_ID result = self._artifact_extractor.run(native_firehose_records(normalized=True)) logger_mock.assert_has_calls([ diff --git a/tests/unit/streamalert/artifact_extractor/test_main.py b/tests/unit/streamalert/artifact_extractor/test_main.py index 72b4e8d4c..27222845d 100644 --- a/tests/unit/streamalert/artifact_extractor/test_main.py +++ b/tests/unit/streamalert/artifact_extractor/test_main.py @@ -25,7 +25,8 @@ from tests.unit.streamalert.artifact_extractor.helpers import ( native_firehose_records, transformed_firehose_records, - generate_artifacts + generate_artifacts, + MOCK_RECORD_ID, ) @@ -69,11 +70,13 @@ def test_handler_zero_artifact(self, logger_mock): expected_result = transformed_firehose_records() assert_equal(result, expected_result) + @patch('uuid.uuid4') @patch.dict(os.environ, {'DESTINATION_FIREHOSE_STREAM_NAME': 'unit_test_dst_fh_arn'}) @patch.object(FirehoseClient, '_send_batch') @patch('streamalert.artifact_extractor.artifact_extractor.LOGGER') - def test_handler(self, logger_mock, send_batch_mock): + def test_handler(self, logger_mock, send_batch_mock, uuid_mock): """ArtifactExtractor - Test handler""" + uuid_mock.return_value = MOCK_RECORD_ID event = { 'records': native_firehose_records(normalized=True), 'region': 'us-east-1', diff --git a/tests/unit/streamalert/rules_engine/test_threat_intel.py b/tests/unit/streamalert/rules_engine/test_threat_intel.py index 3318dbcb9..91f4c44ee 100644 --- a/tests/unit/streamalert/rules_engine/test_threat_intel.py +++ b/tests/unit/streamalert/rules_engine/test_threat_intel.py @@ -85,7 +85,7 @@ def _sample_payload(self): 'recipientAccountId': '12345' }, 'source': '1.1.1.2', - 'streamalert:normalization': { + 'streamalert_normalization': { 'sourceAddress': {'1.1.1.2'}, 'userName': {'alice'} } @@ -118,7 +118,7 @@ def test_threat_detection(self): 'recipientAccountId': '12345' }, 'source': '1.1.1.2', - 'streamalert:normalization': { + 'streamalert_normalization': { 'sourceAddress': {'1.1.1.2'}, 'userName': {'alice'} }, @@ -149,7 +149,7 @@ def test_threat_detection_no_iocs(self): 'recipientAccountId': '12345' }, 'source': '1.1.1.2', - 'streamalert:normalization': { + 'streamalert_normalization': { 'sourceAddress': {'1.1.1.2'}, 'userName': {'alice'} } diff --git a/tests/unit/streamalert/shared/test_normalizer.py b/tests/unit/streamalert/shared/test_normalizer.py index 2f2748ac2..75f8c8072 100644 --- a/tests/unit/streamalert/shared/test_normalizer.py +++ b/tests/unit/streamalert/shared/test_normalizer.py @@ -211,7 +211,7 @@ def test_normalize(self): } }, 'sourceIPAddress': '1.1.1.3', - 'streamalert:normalization': { + 'streamalert_normalization': { 'region': [ { 'values': ['region_name'], @@ -263,7 +263,7 @@ def test_normalize_corner_case(self): 'original_key': { 'original_key': 'fizzbuzz', }, - 'streamalert:normalization': { + 'streamalert_normalization': { 'normalized_key': [ { 'values': ['fizzbuzz'], @@ -343,7 +343,7 @@ def test_get_values_for_normalized_type(self): expected_result = {'1.1.1.3'} record = { 'sourceIPAddress': '1.1.1.3', - 'streamalert:normalization': { + 'streamalert_normalization': { 'ip_v4': [ { 'values': expected_result, @@ -359,7 +359,7 @@ def test_get_values_for_normalized_type_none(self): """Normalizer - Get Values for Normalized Type, None""" record = { 'sourceIPAddress': '1.1.1.3', - 'streamalert:normalization': {} + 'streamalert_normalization': {} } assert_equal(Normalizer.get_values_for_normalized_type(record, 'ip_v4'), None) diff --git a/tests/unit/streamalert_cli/athena/test_helpers.py b/tests/unit/streamalert_cli/athena/test_helpers.py index 7526fcc62..8d55e03a2 100644 --- a/tests/unit/streamalert_cli/athena/test_helpers.py +++ b/tests/unit/streamalert_cli/athena/test_helpers.py @@ -157,8 +157,8 @@ def test_generate_artifact_table_schema(): expected_result = [ ('function', 'string'), - ('record_id', 'string'), ('source_type', 'string'), + ('streamalert_record_id', 'string'), ('type', 'string'), ('value', 'string') ] diff --git a/tests/unit/streamalert_cli/terraform/test_artifact_extractor.py b/tests/unit/streamalert_cli/terraform/test_artifact_extractor.py index d274ef22d..253a09b26 100644 --- a/tests/unit/streamalert_cli/terraform/test_artifact_extractor.py +++ b/tests/unit/streamalert_cli/terraform/test_artifact_extractor.py @@ -74,8 +74,8 @@ def test_generate_artifact_extractor(self): 'kms_key_arn': '${aws_kms_key.server_side_encryption.arn}', 'schema': [ ['function', 'string'], - ['record_id', 'string'], ['source_type', 'string'], + ['streamalert_record_id', 'string'], ['type', 'string'], ['value', 'string'] ]