To test the accuracy of new rules, local tests can be written to verify that alerts trigger against valid input.
The manage.py
CLI tool comes built-in with a test
command which does exactly this.
To test a new rule, first create a new JSON file anywhere within the 'tests/integration/rules/' directory with the .json
extension.
This file should contain the following structure:
[
{
"data": "Either a string, or JSON object",
"description": "This test should trigger or not trigger an alert",
"log": "The log name declared in logs.json",
"service": "The service sending the log - kinesis, s3, sns, or stream_alert_app",
"source": "The exact resource which sent the log - kinesis stream name, s3 bucket ID, SNS topic name, or stream_alert_app_function name",
"trigger_rules": [
"rule_name_that_should_trigger_for_this_event",
"another_rule_name_that_should_trigger_for_this_event"
]
}
]
Note
Multiple tests can be included in one file by adding them to the array above.
When specifying the test data, it can be either of two fields:
"data"
: An entire example record, with all necessary fields to properly classify"override_record"
: A subset of the example record, where only relevant fields are populated
The advantage of option #2 is that the overall test event is much smaller.
The testing framework will auto-populate the records behind the scenes with the remaining fields for that given log type.
For example:
[
{
"data": {
"account": "123456789102",
"detail": {
"request": {
"eventName": "putObject",
"bucketName": "testBucket"
}
},
"detail-type": "API Call",
"id": "123456",
"region": "us-west-2",
"resources": [
"testBucket"
],
"source": "aws.s3",
"time": "Jan 01 2018 12:00",
"version": "1.05"
},
"description": "An example test with a full cloudwatch event",
"log": "cloudwatch:events",
"service": "s3",
"source": "test-s3-bucket-name",
"trigger_rules": [
"my_fake_rule"
]
}
]
Let's say a rule is only checking the value of source
in the test event. In that case, there's no added benefit to fill in all the other data. Here is what the event would look like with override_record
:
[
{
"override_record": {
"source": "aws.s3"
},
"description": "An example test with a partial cloudwatch event",
"log": "cloudwatch:events",
"service": "s3",
"source": "test-s3-bucket-name",
"trigger_rules": [
"my_fake_rule"
]
}
]
Both test events would have the same result, but with much less effort.
Note
Either "override_record" or "data" is required in the test event
Key | Type | Required | Description |
---|---|---|---|
|
|
No |
Whether or not to compress records with |
|
|
Yes* |
The record to test against your rules. All |
|
|
Yes* |
A partial record to use in test events, more information below *This is not required if the |
description |
string |
Yes | A short sentence describing the intent of the test |
|
|
Yes |
The log type this test record should parse as. The value of this should be taken from the defined logs in |
|
|
Yes |
The name of the service which sent the log. This should be one of: |
|
|
Yes |
The name of the Kinesis Stream or S3 bucket, SNS topic or StreamAlert App function where the data originated from. This value should match a source provided in |
|
|
No |
A list of zero or more rule names that this test record should trigger. An empty list implies this record should not trigger any alerts |
|
|
No |
Whether or not the test record should go through the rule processing engine. If set to |
For more examples, see the provided default rule tests in tests/integration/rules
Tests are run via the manage.py
script. These tests include the ability to validate defined log schemas for accuracy, as well as rules efficacy. Additionally, alerts can be sent from the local system to a real, live alerting output (if configured).
The below options are available for running tests. Please note that each subsequent test command here includes all of the prior tests. For instance, the rules
command will also test everything that the classifier
command tests. See the Test Options section for available options for all of these commands.
Running tests to ensure test events classify properly:
$ python manage.py test classifier
Note
The classifier
test command does not test the efficacy of rules, and simply ensures defined test events classify as their expected schema type.
Running tests to ensure test events classify properly and trigger the designated rules:
$ python manage.py test rules
Running tests to actually send alerts to a rule's configured outputs:
$ python manage.py test live
Note
The live
test command does not invoke any deployed Lambda functions, and only uses the local code, test events, and rules. However, authentication secrets needed to send alerts are in fact read from S3 during this process, so AWS credentials must still be set up properly.
Any of the test commands above can be restricted to specific files to reduce time and output:
$ python manage.py test classifier --test-files <test_file_01.json> <test_file_02>
Note
Only the name of the file is required, with or without the file extension, not the entire path.
Tests can also be restricted to specific rules:
$ python manage.py test rules --test-rules <rule_01> <rule_02>
Note
Note that this is the name of the rule(s) themselves, not the name of the Python file containing the rule(s).
Tests can be directed to run against an alternative directory of test event files:
$ python manage.py test rules --files-dir /path/to/alternate/test/files/directory
Note
Note that this is the name of the rule(s) themselves, not the name of the Python file containing the rule(s).
Here is a sample command showing how to run tests against two test event files included in the default StreamAlert configuration:
$ python manage.py test rules --test-files cloudtrail_put_bucket_acl.json cloudtrail_root_account_usage.json
This will produce output similar to the following:
Running tests for files found in: tests/integration/rules/
File: cloudtrail/cloudtrail_put_bucket_acl.json
Test #01: Pass
Test #02: Pass
File: cloudtrail/cloudtrail_root_account_usage.json
Test #01: Pass
Test #02: Pass
Summary:
Total Tests: 4
Pass: 4
Fail: 0
To see more verbose output for any of the test commands, add the --verbose
flag. The previous command, with the addition of the --verbose
flag, produces the following output:
Running tests for files found in: tests/integration/rules/
File: cloudtrail/cloudtrail_put_bucket_acl.json
Test #01: Pass
Description: Modifying an S3 bucket to have a bucket ACL of AllUsers or AuthenticatedUsers should create an alert.
Classified Type: cloudwatch:events
Expected Type: cloudwatch:events
Triggered Rules: cloudtrail_put_bucket_acl
Expected Rules: cloudtrail_put_bucket_acl
Test #02: Pass
Description: Modifying an S3 bucket ACL without use of AllUsers or AuthenticatedUsers should not create an alert.
Classified Type: cloudwatch:events
Expected Type: cloudwatch:events
Triggered Rules: <None>
Expected Rules: <None>
File: cloudtrail/cloudtrail_root_account_usage.json
Test #01: Pass
Description: Use of the AWS 'Root' account will create an alert.
Classified Type: cloudwatch:events
Expected Type: cloudwatch:events
Triggered Rules: cloudtrail_root_account_usage
Expected Rules: cloudtrail_root_account_usage
Test #02: Pass
Description: AWS 'Root' account activity initiated automatically by an AWS service on your behalf will not create an alert.
Classified Type: cloudwatch:events
Expected Type: cloudwatch:events
Triggered Rules: <None>
Expected Rules: <None>
Summary:
Total Tests: 4
Pass: 4
Fail: 0
Additionally, any given test that results in a status of Fail will, by default, print verbosely. In the below example, the cloudtrail_put_bucket_acl.json
file has been altered to include a triggering rule that does not actually exist.
$ python manage.py test rules --test-files cloudtrail_put_bucket_acl.json cloudtrail_root_account_usage.json
Running tests for files found in: tests/integration/rules/
File: cloudtrail/cloudtrail_put_bucket_acl.json
Test #01: Fail
Description: Modifying an S3 bucket to have a bucket ACL of AllUsers or AuthenticatedUsers should create an alert.
Classified Type: cloudwatch:events
Expected Type: cloudwatch:events
Triggered Rules: cloudtrail_put_bucket_acl
Expected Rules: cloudtrail_put_bucket_acl, nonexistent_rule (does not exist)
Test #02: Pass
File: cloudtrail/cloudtrail_root_account_usage.json
Test #01: Pass
Test #02: Pass
Summary:
Total Tests: 4
Pass: 3
Fail: 1
It may occasionally be necessary to dynamically fill in values in the test event data. For instance, if a rule relies on the time of an event, the last_hour
helper can be embedded in a test event as a key's value. The embedded helper string will be replaced with the value returned by the helper function.
last_hour
: Generates a unix epoch time within the last hour (ex: 1489105783
).
To use these helpers in rule testing, replace a specific log field value with the following:
"<helper:helper_name_goes_here>"
For example, to replace a time field with a value in the last hour, use last_hour
:
{
"records": [
{
"data": {
"host": "app01.prod.mydomain.net",
"time": "<helper:last_hour>"
},
"description": "example usage of helpers",
"log": "host_time_log",
"service": "kinesis",
"source": "my_demo_kinesis_stream",
"trigger_rules": [
"last_hour_rule_name"
]
}
]
}