Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add three custom metrics #1246

Merged
merged 1 commit into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/source/normalization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,32 @@ Deployment

python manage.py deploy --function classifier

Custom Metrics
==============

Artifact Exactor comes with three custom metrics.

#. ``ArtifactExtractor-ExtractedArtifacts``: Log the number of artifacts extracted from the records
#. ``ArtifactExtractor-FirehoseFailedRecords``: Log the number of records (artifacts) failed sent to Firehose
#. ``ArtifactExtractor-FirehoseRecordsSent``: Log the number of records (artifacts) sent to Firehose

By default, the custom metrics is disabled. Enable custom metrics and follow by a ``build`` to create new ``aws_cloudwatch_log_metric_filter`` resources.

.. code-block::

# conf/lambda.json
"artifact_extractor_config": {
"concurrency_limit": 10,
"enabled": true,
"enable_custom_metrics": true,
...
}

.. code-block::

python manage.py build --target "metric_filters_ArtifactExtractor_*"


Artifacts
=========

Expand Down
9 changes: 8 additions & 1 deletion streamalert/artifact_extractor/artifact_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
import uuid

from streamalert.shared.firehose import FirehoseClient
from streamalert.shared import config
from streamalert.shared import ARTIFACT_EXTRACTOR_NAME, config
from streamalert.shared.metrics import MetricLogger
from streamalert.shared.normalize import Normalizer
from streamalert.shared.logger import get_logger

Expand Down Expand Up @@ -249,6 +250,12 @@ def run(self, records):

LOGGER.debug('Extracted %d artifact(s)', len(self._artifacts))

MetricLogger.log_metric(
ARTIFACT_EXTRACTOR_NAME,
MetricLogger.EXTRACTED_ARTIFACTS,
len(self._artifacts)
)

self.firehose.send_artifacts(self._artifacts, self._dst_firehose_arn)

return {
Expand Down
47 changes: 31 additions & 16 deletions streamalert/shared/firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from botocore.exceptions import ClientError, HTTPClientError
from botocore.exceptions import ConnectionError as BotocoreConnectionError

from streamalert.shared import CLASSIFIER_FUNCTION_NAME as FUNCTION_NAME
from streamalert.shared import ARTIFACT_EXTRACTOR_NAME, CLASSIFIER_FUNCTION_NAME
import streamalert.shared.helpers.boto as boto_helpers
from streamalert.shared.logger import get_logger
from streamalert.shared.metrics import MetricLogger
Expand Down Expand Up @@ -89,7 +89,7 @@ def _records_to_json_list(cls, records):
]

@classmethod
def _record_batches(cls, records):
def _record_batches(cls, records, function_name):
"""Segment the records into batches that conform to Firehose restrictions

This will log any single record that is too large to send, and skip it.
Expand All @@ -116,7 +116,7 @@ def _record_batches(cls, records):

if line_len > cls.MAX_RECORD_SIZE:
LOGGER.error('Record too large (%d) to send to Firehose:\n%s', line_len, record)
cls._log_failed(1)
cls._log_failed(1, function_name)
continue

# Add the record to the batch
Expand Down Expand Up @@ -206,7 +206,7 @@ def _categorize_records(self, payloads):
return categorized_records

@classmethod
def _finalize(cls, response, stream_name, size):
def _finalize(cls, response, stream_name, size, function_name):
"""Perform any final operations for this response, such as metric logging, etc

Args:
Expand All @@ -223,7 +223,7 @@ def _finalize(cls, response, stream_name, size):
failed for failed in response['RequestResponses']
if failed.get('ErrorCode')
]
cls._log_failed(response['FailedPutCount'])
cls._log_failed(response['FailedPutCount'], function_name)

# Only print the first 100 failed records to Cloudwatch logs
LOGGER.error(
Expand All @@ -233,7 +233,7 @@ def _finalize(cls, response, stream_name, size):
)
return

MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.FIREHOSE_RECORDS_SENT, size)
MetricLogger.log_metric(function_name, MetricLogger.FIREHOSE_RECORDS_SENT, size)
LOGGER.info(
'Successfully sent %d message(s) to firehose %s with RequestId \'%s\'',
size,
Expand All @@ -242,15 +242,15 @@ def _finalize(cls, response, stream_name, size):
)

@classmethod
def _log_failed(cls, count):
def _log_failed(cls, count, function_name):
"""Helper to log the failed Firehose records metric

Args:
count (int): Number of failed records
"""
MetricLogger.log_metric(FUNCTION_NAME, MetricLogger.FIREHOSE_FAILED_RECORDS, count)
MetricLogger.log_metric(function_name, MetricLogger.FIREHOSE_FAILED_RECORDS, count)

def _send_batch(self, stream_name, record_batch):
def _send_batch(self, stream_name, record_batch, function_name):
"""Send record batches to Firehose

Args:
Expand Down Expand Up @@ -299,7 +299,7 @@ def _firehose_request_helper(data):
LOGGER.exception('Firehose request failed')
# Use the current length of the records_data in case some records were
# successful but others were not
self._log_failed(len(records_data))
self._log_failed(len(records_data), function_name)

@classmethod
def generate_firehose_name(cls, prefix, log_stream_name):
Expand Down Expand Up @@ -470,18 +470,33 @@ def send(self, payloads):
formatted_stream_name = self.generate_firehose_name(self._prefix, log_type)

# Process each record batch in the categorized payload set
for record_batch in self._record_batches(records):
for record_batch in self._record_batches(records, CLASSIFIER_FUNCTION_NAME):
batch_size = len(record_batch)
response = self._send_batch(formatted_stream_name, record_batch)
self._finalize(response, formatted_stream_name, batch_size)
response = self._send_batch(
formatted_stream_name,
record_batch,
CLASSIFIER_FUNCTION_NAME
)

self._finalize(
response,
formatted_stream_name,
batch_size,
CLASSIFIER_FUNCTION_NAME
)

def send_artifacts(self, artifacts, stream_name):
"""Send artifacts to artifacts Firehose delievery stream
Args:
artifacts (list(dict)): A list of artifacts extracted from normalized records.
stream_name (str): Stream name of destination Firehose.
"""
for artifact_batch in self._record_batches(artifacts):
for artifact_batch in self._record_batches(artifacts, ARTIFACT_EXTRACTOR_NAME):
batch_size = len(artifact_batch)
response = self._send_batch(stream_name, artifact_batch)
self._finalize(response, stream_name, batch_size)
response = self._send_batch(stream_name, artifact_batch, ARTIFACT_EXTRACTOR_NAME)
self._finalize(
response,
stream_name,
batch_size,
ARTIFACT_EXTRACTOR_NAME
)
13 changes: 13 additions & 0 deletions streamalert/shared/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from streamalert.shared import (
ALERT_MERGER_NAME,
ALERT_PROCESSOR_NAME,
ARTIFACT_EXTRACTOR_NAME,
ATHENA_PARTITIONER_NAME,
CLASSIFIER_FUNCTION_NAME,
RULES_ENGINE_FUNCTION_NAME
Expand All @@ -33,6 +34,7 @@
# below when metrics are supported there
FUNC_PREFIXES = {
ALERT_MERGER_NAME: 'AlertMerger',
ARTIFACT_EXTRACTOR_NAME: 'ArtifactExtractor',
CLASSIFIER_FUNCTION_NAME: 'Classifier',
RULES_ENGINE_FUNCTION_NAME: 'RulesEngine'
}
Expand Down Expand Up @@ -75,6 +77,9 @@ class MetricLogger:
# Alert Merger metric names
ALERT_ATTEMPTS = 'AlertAttempts'

# Artifact Extractor metric names
EXTRACTED_ARTIFACTS = 'ExtractedArtifacts'

_default_filter = '{{ $.metric_name = "{}" }}'
_default_value_lookup = '$.metric_value'

Expand All @@ -89,6 +94,14 @@ class MetricLogger:
ALERT_ATTEMPTS: (_default_filter.format(ALERT_ATTEMPTS), _default_value_lookup)
},
ALERT_PROCESSOR_NAME: {}, # Placeholder for future alert processor metrics
ARTIFACT_EXTRACTOR_NAME: {
EXTRACTED_ARTIFACTS: (_default_filter.format(EXTRACTED_ARTIFACTS),
_default_value_lookup),
FIREHOSE_FAILED_RECORDS: (_default_filter.format(FIREHOSE_FAILED_RECORDS),
_default_value_lookup),
FIREHOSE_RECORDS_SENT: (_default_filter.format(FIREHOSE_RECORDS_SENT),
_default_value_lookup)
},
ATHENA_PARTITIONER_NAME: {}, # Placeholder for future athena processor metrics
CLASSIFIER_FUNCTION_NAME: {
FAILED_PARSES: (_default_filter.format(FAILED_PARSES),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def test_run(self, logger_mock, send_batch_mock, uuid_mock):

send_batch_mock.assert_called_with(
'unit_test_dst_fh_arn',
generate_artifacts()
generate_artifacts(),
'artifact_extractor'
)

expected_result = transformed_firehose_records(normalized=True)
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/streamalert/artifact_extractor/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ def test_handler(self, logger_mock, send_batch_mock, uuid_mock):

send_batch_mock.assert_called_with(
'unit_test_dst_fh_arn',
generate_artifacts()
generate_artifacts(),
'artifact_extractor'
)

expected_result = transformed_firehose_records(normalized=True)
Expand Down
28 changes: 15 additions & 13 deletions tests/unit/streamalert/shared/test_firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_record_batches(self):
]
]

result = list(FirehoseClient._record_batches(records))
result = list(FirehoseClient._record_batches(records, 'test_function_name'))
assert_equal(result, expected_result)

@patch.object(FirehoseClient, '_log_failed')
Expand All @@ -107,15 +107,15 @@ def test_record_batches_rec_too_large(self, failure_mock):
{'key': 'test' * 1000 * 1000}
]

result = list(FirehoseClient._record_batches(records))
result = list(FirehoseClient._record_batches(records, 'test_function_name'))
assert_equal(result, [])
failure_mock.assert_called_with(1)
failure_mock.assert_called_with(1, 'test_function_name')

def test_record_batches_max_batch_count(self):
"""FirehoseClient - Record Batches, Max Batch Count"""
records = self._sample_raw_records(count=501)

result = list(FirehoseClient._record_batches(records))
result = list(FirehoseClient._record_batches(records, 'test_function_name'))
assert_equal(len(result), 2)
assert_equal(len(result[0]), 500)
assert_equal(len(result[1]), 1)
Expand All @@ -126,7 +126,7 @@ def test_record_batches_max_batch_size(self):
{'key_{}'.format(i): 'test' * 100000}
for i in range(10)
]
result = list(FirehoseClient._record_batches(records))
result = list(FirehoseClient._record_batches(records, 'test_function_name'))
assert_equal(len(result), 2)
assert_equal(len(result[0]), 9)
assert_equal(len(result[1]), 1)
Expand Down Expand Up @@ -229,8 +229,8 @@ def test_finalize_failures(self, failure_mock):
]
}

FirehoseClient._finalize(response, 'stream_name', 3)
failure_mock.assert_called_with(1)
FirehoseClient._finalize(response, 'stream_name', 3, 'test_function_name')
failure_mock.assert_called_with(1, 'test_function_name')

@patch('logging.Logger.info')
def test_finalize_success(self, log_mock):
Expand All @@ -244,7 +244,7 @@ def test_finalize_success(self, log_mock):
}
}

FirehoseClient._finalize(response, stream_name, count)
FirehoseClient._finalize(response, stream_name, count, 'test_function_name')
log_mock.assert_called_with(
'Successfully sent %d message(s) to firehose %s with RequestId \'%s\'',
count,
Expand Down Expand Up @@ -280,7 +280,7 @@ def test_send_batch(self):
}
]

self._client._send_batch(stream_name, records)
self._client._send_batch(stream_name, records, 'test_function_name')

boto_mock.put_record_batch.assert_called_with(
DeliveryStreamName=stream_name,
Expand All @@ -296,7 +296,7 @@ def test_send_batch_error(self, log_mock):
error = ClientError({'Error': {'Code': 10}}, 'InvalidRequestException')
boto_mock.put_record_batch.side_effect = error

self._client._send_batch(stream_name, ['data'])
self._client._send_batch(stream_name, ['data'], 'test_function_name')

log_mock.assert_called_with('Firehose request failed')

Expand Down Expand Up @@ -412,7 +412,7 @@ def test_send(self, send_batch_mock):
]
self._client.send(self._sample_payloads)
send_batch_mock.assert_called_with(
'unit_test_streamalert_log_type_01_sub_type_01', expected_batch
'unit_test_streamalert_log_type_01_sub_type_01', expected_batch, 'classifier'
)

@patch.object(FirehoseClient, '_send_batch')
Expand All @@ -434,7 +434,7 @@ def test_send_no_prefixing(self, send_batch_mock):

client.send(self._sample_payloads)
send_batch_mock.assert_called_with(
'streamalert_log_type_01_sub_type_01', expected_batch
'streamalert_log_type_01_sub_type_01', expected_batch, 'classifier'
)

@property
Expand Down Expand Up @@ -476,7 +476,9 @@ def test_send_long_log_name(self, send_batch_mock):

client.send(self._sample_payloads_long_log_name)
send_batch_mock.assert_called_with(
'streamalert_very_very_very_long_log_stream_name_abcdefg_7c88167b', expected_batch
'streamalert_very_very_very_long_log_stream_name_abcdefg_7c88167b',
expected_batch,
'classifier'
)

def test_generate_firehose_name(self):
Expand Down