Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add record id to artifacts and record #1242

Merged
merged 1 commit into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion publishers/community/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def remove_fields(alert, publication):

@Register
def remove_streamalert_normalization(_, publication):
"""This publisher removes the super heavyweight 'streamalert:normalization' fields"""
"""This publisher removes the super heavyweight 'streamalert_normalization' fields"""
return _delete_dictionary_fields(publication, Normalizer.NORMALIZATION_KEY)


Expand Down
36 changes: 26 additions & 10 deletions streamalert/artifact_extractor/artifact_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import json
import re
from os import environ as env
import uuid

from streamalert.shared.firehose import FirehoseClient
from streamalert.shared import config
Expand All @@ -23,6 +24,8 @@

LOGGER = get_logger(__name__)

RECORD_ID_KEY = 'streamalert_record_id'

class Artifact:
"""Encapsulation of a single Artifact that is extracted from an input record."""

Expand Down Expand Up @@ -57,7 +60,7 @@ def record(self):
"""
return {
'function': self._function,
'record_id': self._record_id,
RECORD_ID_KEY: self._record_id,
'source_type': self._source_type,
'type': self._type,
'value': self._value,
Expand All @@ -70,9 +73,10 @@ class FirehoseRecord:
def __init__(self, firehose_record, source_type):
"""Create a new Firehose record contains original data and may extract multiple artifacts if
original data was normalized in the classifier.
The original data which will be returned back to source firehose for historical search. And
the artifacts, if any, will be sent to a dedicated firehose with simplified schema and land
in streamalert data bucket for historical search as well.
The transformed data (insert a record_id only) which will be returned back to source
firehose for historical search. And the artifacts, if any, will be sent to a dedicated
firehose with simplified schema and land in streamalert data bucket for historical search
as well.

Args:
firehose_record (dict): the record passed to lambda from source firehose. It has
Expand Down Expand Up @@ -118,7 +122,7 @@ def artifacts(self):
# 'awsRegion': 'us-west-2'
# }
# },
# 'streamalert:normalization': {
# 'streamalert_normalization': {
# 'region': [
# {
# 'values': ['region_name'],
Expand All @@ -132,17 +136,22 @@ def artifacts(self):
# }
# }
#
record_id = self._decoded_record.get(RECORD_ID_KEY) or str(uuid.uuid4())
for key, values in self._decoded_record[Normalizer.NORMALIZATION_KEY].items():
for value in values:
for val in value.get('values', []):
artifacts.append(Artifact(
function=value.get('function'),
record_id=self._decoded_record.get('record_id'),
record_id=record_id,
source_type=self._source_type,
normalized_type=key,
value=val
))

# Add a new key "streamalert_record_id" to "streamalert_normalization" field. This new key
# will behelpful tracing back to the original record when searching in "artifacts" table.
self._decoded_record[Normalizer.NORMALIZATION_KEY][RECORD_ID_KEY] = record_id

return artifacts

@property
Expand All @@ -162,19 +171,26 @@ def transformed_record(self):
Dropped, and ProcessingFailed. The purpose of ArtifactExtractor lambda is to extract
artifacts and it should not change the data. So the result will alway be 'Ok'.

data: The transformed data payload, base64-encoded. The purpose of ArtifactExtract lambda
function is to extract artifacts and it should not change the original data. Thus, it
encapsulates original base64-encoded data.
data: The transformed data payload, base64-encoded. The transformed data payload includes a
new key "streamalert_record_id" and it's the only difference from original data payload.

Returns:
dict: A dictionary with required fields 'result', 'data' and 'recordId'.
"""
return {
'recordId': self._firehose_record_id,
'result': 'Ok',
'data': self._firehose_data
'data': base64.b64encode(self._json_serializer()).decode('utf-8')
}

def _json_serializer(self):
"""Serialize a transformed record to a JSON formatted string

Returns:
str: a JSON formatted string with a newline appened.
"""
return (json.dumps(self._decoded_record, separators=(',', ':')) + '\n').encode('utf-8')

class ArtifactExtractor:
"""ArtifactExtractor class will extract normalized artifacts from batch of records from source
Firehose and return the original records back to Firehose where thoese records will be delivered
Expand Down
2 changes: 1 addition & 1 deletion streamalert/shared/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _parse_param_type(self, params):
class Normalizer:
"""Normalizer class to handle log key normalization in payloads"""

NORMALIZATION_KEY = 'streamalert:normalization'
NORMALIZATION_KEY = 'streamalert_normalization'

# Store the normalized types mapping to original keys from the records
_types_config = dict()
Expand Down
17 changes: 12 additions & 5 deletions tests/unit/streamalert/artifact_extractor/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

from streamalert.shared.normalize import Normalizer

MOCK_RECORD_ID = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'

def native_firehose_records(normalized=False, count=2):
"""Generate sample firehose records for unit tests"""
json_data = [
Expand All @@ -44,7 +46,9 @@ def native_firehose_records(normalized=False, count=2):
return [
{
'recordId': 'record_id_{}'.format(cnt),
'data': base64.b64encode(json.dumps(json_data[cnt]).encode('utf-8')),
'data': base64.b64encode(
(json.dumps(json_data[cnt], separators=(',', ':')) + '\n').encode('utf-8')
).decode('utf-8'),
'approximateArrivalTimestamp': 1583275630000+int(cnt)
} for cnt in range(count)
]
Expand All @@ -69,14 +73,17 @@ def transformed_firehose_records(normalized=False, count=2):
'values': ['value2', 'value3'],
'function': None
}
]
],
'streamalert_record_id': MOCK_RECORD_ID
}

return {
'records': [
{
'result': 'Ok',
'data': base64.b64encode(json.dumps(json_data[cnt]).encode('utf-8')),
'data': base64.b64encode(
(json.dumps(json_data[cnt], separators=(',', ':')) + '\n').encode('utf-8')
).decode('utf-8'),
'recordId': 'record_id_{}'.format(cnt)
} for cnt in range(count)
]
Expand All @@ -97,10 +104,10 @@ def generate_artifacts():
artifacts = [
{
'function': 'None',
'record_id': 'None',
'streamalert_record_id': MOCK_RECORD_ID,
'source_type': 'unit_test',
'type': type,
'value': value,
'value': value
} for type, value in normalized_values
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
native_firehose_records,
transformed_firehose_records,
generate_artifacts,
MOCK_RECORD_ID,
)


Expand All @@ -44,7 +45,7 @@ def test_record(self): # pylint: disable=no-self-use
)
expected_result = {
'function': 'None',
'record_id': 'test_record_id',
'streamalert_record_id': 'test_record_id',
'source_type': 'test_source_type',
'type': 'test_normalized_type',
'value': 'test_value'
Expand Down Expand Up @@ -83,10 +84,12 @@ def test_run_zero_artifact(self, logger_mock):
expected_result = transformed_firehose_records()
assert_equal(result, expected_result)

@patch('uuid.uuid4')
@patch.object(FirehoseClient, '_send_batch')
@patch('streamalert.artifact_extractor.artifact_extractor.LOGGER')
def test_run(self, logger_mock, send_batch_mock):
def test_run(self, logger_mock, send_batch_mock, uuid_mock):
"""ArtifactExtractor - Test run method extract artifacts"""
uuid_mock.return_value = MOCK_RECORD_ID
result = self._artifact_extractor.run(native_firehose_records(normalized=True))

logger_mock.assert_has_calls([
Expand Down
7 changes: 5 additions & 2 deletions tests/unit/streamalert/artifact_extractor/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
from tests.unit.streamalert.artifact_extractor.helpers import (
native_firehose_records,
transformed_firehose_records,
generate_artifacts
generate_artifacts,
MOCK_RECORD_ID,
)


Expand Down Expand Up @@ -69,11 +70,13 @@ def test_handler_zero_artifact(self, logger_mock):
expected_result = transformed_firehose_records()
assert_equal(result, expected_result)

@patch('uuid.uuid4')
@patch.dict(os.environ, {'DESTINATION_FIREHOSE_STREAM_NAME': 'unit_test_dst_fh_arn'})
@patch.object(FirehoseClient, '_send_batch')
@patch('streamalert.artifact_extractor.artifact_extractor.LOGGER')
def test_handler(self, logger_mock, send_batch_mock):
def test_handler(self, logger_mock, send_batch_mock, uuid_mock):
"""ArtifactExtractor - Test handler"""
uuid_mock.return_value = MOCK_RECORD_ID
event = {
'records': native_firehose_records(normalized=True),
'region': 'us-east-1',
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/streamalert/rules_engine/test_threat_intel.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def _sample_payload(self):
'recipientAccountId': '12345'
},
'source': '1.1.1.2',
'streamalert:normalization': {
'streamalert_normalization': {
'sourceAddress': {'1.1.1.2'},
'userName': {'alice'}
}
Expand Down Expand Up @@ -118,7 +118,7 @@ def test_threat_detection(self):
'recipientAccountId': '12345'
},
'source': '1.1.1.2',
'streamalert:normalization': {
'streamalert_normalization': {
'sourceAddress': {'1.1.1.2'},
'userName': {'alice'}
},
Expand Down Expand Up @@ -149,7 +149,7 @@ def test_threat_detection_no_iocs(self):
'recipientAccountId': '12345'
},
'source': '1.1.1.2',
'streamalert:normalization': {
'streamalert_normalization': {
'sourceAddress': {'1.1.1.2'},
'userName': {'alice'}
}
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/streamalert/shared/test_normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def test_normalize(self):
}
},
'sourceIPAddress': '1.1.1.3',
'streamalert:normalization': {
'streamalert_normalization': {
'region': [
{
'values': ['region_name'],
Expand Down Expand Up @@ -263,7 +263,7 @@ def test_normalize_corner_case(self):
'original_key': {
'original_key': 'fizzbuzz',
},
'streamalert:normalization': {
'streamalert_normalization': {
'normalized_key': [
{
'values': ['fizzbuzz'],
Expand Down Expand Up @@ -343,7 +343,7 @@ def test_get_values_for_normalized_type(self):
expected_result = {'1.1.1.3'}
record = {
'sourceIPAddress': '1.1.1.3',
'streamalert:normalization': {
'streamalert_normalization': {
'ip_v4': [
{
'values': expected_result,
Expand All @@ -359,7 +359,7 @@ def test_get_values_for_normalized_type_none(self):
"""Normalizer - Get Values for Normalized Type, None"""
record = {
'sourceIPAddress': '1.1.1.3',
'streamalert:normalization': {}
'streamalert_normalization': {}
}

assert_equal(Normalizer.get_values_for_normalized_type(record, 'ip_v4'), None)
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/streamalert_cli/athena/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ def test_generate_artifact_table_schema():

expected_result = [
('function', 'string'),
('record_id', 'string'),
('source_type', 'string'),
('streamalert_record_id', 'string'),
('type', 'string'),
('value', 'string')
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def test_generate_artifact_extractor(self):
'kms_key_arn': '${aws_kms_key.server_side_encryption.arn}',
'schema': [
['function', 'string'],
['record_id', 'string'],
['source_type', 'string'],
['streamalert_record_id', 'string'],
['type', 'string'],
['value', 'string']
]
Expand Down