Skip to content

Commit

Permalink
Add EICAR archive to live_test (#90)
Browse files Browse the repository at this point in the history
  • Loading branch information
austinbyers committed Dec 14, 2017
1 parent d443913 commit b47720f
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 124 deletions.
72 changes: 15 additions & 57 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,21 @@
import argparse
import base64
import getpass
import hashlib
import inspect
import os
import pprint
import re
import subprocess
import sys
import time
from typing import Set
import unittest
import uuid

import boto3
from boto3.dynamodb.conditions import Attr, Key
import hcl

from lambda_functions.analyzer.common import COMPILED_RULES_FILENAME
from lambda_functions.build import build as lambda_build
from rules import compile_rules, clone_rules
from tests.rules.eicar_rule_test import EICAR_STRING
from tests import live_test

# BinaryAlert version.
VERSION = '1.1.0.beta'
Expand All @@ -33,6 +28,7 @@
TERRAFORM_DIR = os.path.join(PROJECT_DIR, 'terraform')
CONFIG_FILE = os.path.join(TERRAFORM_DIR, 'terraform.tfvars')
VARIABLES_FILE = os.path.join(TERRAFORM_DIR, 'variables.tf')
TEST_FILES = os.path.join(PROJECT_DIR, 'tests', 'files')

# Terraform identifiers.
CB_KMS_ALIAS_TERRAFORM_ID = 'aws_kms_alias.encrypt_credentials_alias'
Expand Down Expand Up @@ -167,10 +163,18 @@ def encrypted_carbon_black_api_token(self, value: str):
def force_destroy(self) -> str:
return self._config['force_destroy']

@property
def binaryalert_analyzer_name(self) -> str:
return '{}_binaryalert_analyzer'.format(self.name_prefix)

@property
def binaryalert_batcher_name(self) -> str:
return '{}_binaryalert_batcher'.format(self.name_prefix)

@property
def binaryalert_dynamo_table_name(self) -> str:
return '{}_binaryalert_matches'.format(self.name_prefix)

@property
def binaryalert_s3_bucket_name(self) -> str:
return '{}.binaryalert-binaries.{}'.format(
Expand Down Expand Up @@ -450,60 +454,14 @@ def destroy(self) -> None:
subprocess.call(['terraform', 'destroy'])

def live_test(self) -> None:
"""Upload an EICAR test file to BinaryAlert which should trigger a YARA match alert.
"""Upload test files to BinaryAlert which should trigger YARA matches.
Raises:
TestFailureError: If the live test failed (YARA match not found).
TestFailureError: If the live test failed (YARA matches not found).
"""
bucket_name = self._config.binaryalert_s3_bucket_name
test_filename = 'eicar_test_{}.txt'.format(uuid.uuid4())
s3_identifier = 'S3:{}:{}'.format(bucket_name, test_filename)

print('Uploading EICAR test file {}...'.format(s3_identifier))
bucket = boto3.resource('s3').Bucket(bucket_name)
bucket.put_object(
Body=EICAR_STRING.encode('UTF-8'),
Key=test_filename,
Metadata={'filepath': test_filename}
)

table_name = '{}_binaryalert_matches'.format(self._config.name_prefix)
print('EICAR test file uploaded! Connecting to table DynamoDB:{}...'.format(table_name))
table = boto3.resource('dynamodb').Table(table_name)
eicar_sha256 = hashlib.sha256(EICAR_STRING.encode('UTF-8')).hexdigest()
dynamo_record_found = False

for attempt in range(1, 11):
time.sleep(5)
print('\t[{}/10] Querying DynamoDB table for the expected YARA match entry...'.format(
attempt))
items = table.query(
Select='ALL_ATTRIBUTES',
Limit=1,
ConsistentRead=True,
ScanIndexForward=False, # Sort by AnalyzerVersion descending (e.g. newest first).
KeyConditionExpression=Key('SHA256').eq(eicar_sha256),
FilterExpression=Attr('S3Objects').contains(s3_identifier)
).get('Items')

if items:
print('\nSUCCESS: Expected DynamoDB entry for the EICAR file was found!\n')
dynamo_record_found = True
pprint.pprint(items[0])

print('\nRemoving DynamoDB EICAR entry...')
lambda_version = items[0]['AnalyzerVersion']
table.delete_item(Key={'SHA256': eicar_sha256, 'AnalyzerVersion': lambda_version})
break
elif attempt == 10:
print('\nFAIL: Expected DynamoDB entry for the EICAR file was *not* found!\n')

print('Removing EICAR test file from S3...')
bucket.delete_objects(Delete={'Objects': [{'Key': test_filename}]})

if dynamo_record_found:
print('\nLive test succeeded! Verify the alert was sent to your SNS subscription(s).')
else:
if not live_test.run(self._config.binaryalert_s3_bucket_name,
self._config.binaryalert_analyzer_name,
self._config.binaryalert_dynamo_table_name):
raise TestFailureError(
'\nLive test failed! See https://binaryalert.io/troubleshooting-faq.html')

Expand Down
6 changes: 5 additions & 1 deletion terraform/s3.tf
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ resource "aws_s3_bucket" "binaryalert_binaries" {
prefix = ""
enabled = true

// Old/deleted object versions are permanently removed after 1 day.
// Old object versions are permanently removed after 1 day.
noncurrent_version_expiration {
days = 1
}

expiration {
expired_object_delete_marker = true
}
}

tags {
Expand Down
Binary file added tests/files/eicar.tar.gz.bz2
Binary file not shown.
1 change: 1 addition & 0 deletions tests/files/eicar.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*
161 changes: 161 additions & 0 deletions tests/live_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
"""Upload test files to S3 and see if the expected matches appear in Dynamo."""
import hashlib
import os
import pprint
import time
from typing import Dict, List
import uuid

import boto3

TEST_DIR = os.path.dirname(os.path.realpath(__file__))


def _upload_test_files_to_s3(bucket_name: str) -> Dict[str, str]:
"""Upload test files to S3 and returns a map from SHA256 to S3 identifier."""
bucket = boto3.resource('s3').Bucket(bucket_name)
random_suffix = str(uuid.uuid4()).split('-')[-1]

result = {}
for filename in ['eicar.txt', 'eicar.tar.gz.bz2']:
filepath = os.path.join(TEST_DIR, 'files', filename)
s3_object_key = '{}_{}'.format(filename, random_suffix)
s3_full_identifier = 'S3:{}:{}'.format(bucket_name, s3_object_key)

with open(filepath, 'rb') as f:
sha256 = hashlib.sha256(f.read()).hexdigest()
result[sha256] = s3_full_identifier

print('Uploading {} to {}...'.format(filename, s3_full_identifier))
bucket.upload_file(filepath, s3_object_key, ExtraArgs={'Metadata': {'filepath': filename}})

return result


def _lambda_production_version(function_name: str) -> int:
"""Find the version associated with the Production alias of a Lambda function."""
print('Looking up version of {}:Production...'.format(function_name))
response = boto3.client('lambda').list_aliases(FunctionName=function_name)
for alias in response['Aliases']:
if alias['Name'] == 'Production':
return int(alias['FunctionVersion'])
return -1


def _query_dynamo_for_test_files(
table_name: str, file_info: Dict[str, str], analyzer_version: int,
max_attempts: int = 15) -> List:
"""Repeatedly query DynamoDB to look for the expected YARA matches.
Args:
table_name: Name of the DynamoDB match table.
file_info: Dictionary from _upload_test_files_to_s3.
analyzer_version: The underlying Lambda version for the Production alias of the analyzer.
max_attempts: Max number of times to query for results (with 5 seconds between each).
Returns:
True if the expected entries were found
"""
client = boto3.client('dynamodb')

for attempt in range(1, max_attempts + 1):
if attempt > 1:
time.sleep(5)
print('\t[{}/{}] Querying DynamoDB table for the expected YARA match entries...'.format(
attempt, max_attempts))

results = client.batch_get_item(
RequestItems={
table_name: {
'Keys': [
{
'SHA256': {'S': sha},
'AnalyzerVersion': {'N': str(analyzer_version)}
}
for sha in file_info
]
}
}
)['Responses'][table_name]

if len(results) < len(file_info):
# If there weren't as many matches as files uploaded, stop and try again.
continue

# Make sure the matches found are from the files we uploaded (and not others).
all_objects_found = True
for entry in results:
file_id = file_info[entry['SHA256']['S']]
if file_id not in entry['S3Objects']['SS']:
all_objects_found = False
break

if not all_objects_found:
continue

# The results check out!
return results

return []


def _cleanup(
bucket_name: str, file_info: Dict[str, str], table_name: str,
analyzer_version: int) -> None:
"""Remove test files and match information."""
print('Removing test files from S3...')
bucket = boto3.resource('s3').Bucket(bucket_name)
bucket.delete_objects(
Delete={
'Objects': [
{'Key': s3_identifier.split(':')[-1]}
for s3_identifier in file_info.values()
]
}
)

print('Removing DynamoDB match entries...')
client = boto3.resource('dynamodb')

client.batch_write_item(
RequestItems={
table_name: [
{
'DeleteRequest': {
'Key': {
'SHA256': sha,
'AnalyzerVersion': analyzer_version
}
}
}
for sha in file_info
]
}
)


def run(bucket_name: str, analyzer_function_name: str, table_name: str) -> bool:
"""Upload an EICAR test file to BinaryAlert which should trigger a YARA match alert.
Args:
bucket_name: Name of the S3 bucket containing binaries.
analyzer_function_name: Name of the YARA analyzer Lambda function.
table_name: Name of the Dynamo table storing YARA match information.
Returns:
True if the test was successful, False otherwise.
"""
test_file_info = _upload_test_files_to_s3(bucket_name)
analyzer_version = _lambda_production_version(analyzer_function_name)
results = _query_dynamo_for_test_files(table_name, test_file_info, analyzer_version)

if results:
print()
pprint.pprint(results)
print('\nSUCCESS: Expected DynamoDB entries for the test files were found!')
else:
print('\nFAIL: Expected DynamoDB entries for the test files were *not* found :(\n')

_cleanup(bucket_name, test_file_info, table_name, analyzer_version)
print('Done!')
return bool(results)
67 changes: 6 additions & 61 deletions tests/manage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
import os
import subprocess
import sys
import time
import uuid
from unittest import mock, TestCase

import boto3
from pyfakefs import fake_filesystem_unittest

import manage
from tests.rules.eicar_rule_test import EICAR_STRING


def _mock_input(prompt: str) -> str:
Expand Down Expand Up @@ -375,61 +372,9 @@ def test_destroy(self, mock_check_call: mock.MagicMock, mock_call: mock.MagicMoc
mock_check_call.assert_called_once()
mock_call.assert_called_once()

@mock.patch.object(time, 'sleep', mock.MagicMock())
@mock.patch.object(boto3, 'resource')
@mock.patch.object(manage, 'print')
@mock.patch.object(manage, 'pprint', mock.MagicMock())
@mock.patch.object(uuid, 'uuid4', return_value='test-uuid')
def test_live_test(self, mock_uuid: mock.MagicMock, mock_print: mock.MagicMock,
mock_resource: mock.MagicMock):
"""Verify execution order for boto3 and print mock calls."""
self.manager.live_test()

mock_uuid.assert_called_once()

mock_resource.assert_has_calls([
mock.call('s3'),
mock.call().Bucket('test.prefix.binaryalert-binaries.us-test-1'),
mock.call().Bucket().put_object(
Body=bytes('{}'.format(EICAR_STRING), 'utf-8'),
Key='eicar_test_test-uuid.txt',
Metadata={'filepath': 'eicar_test_test-uuid.txt'}
),
mock.call('dynamodb'),
mock.call().Table('test_prefix_binaryalert_matches'),
mock.call().Table().query(
Select='ALL_ATTRIBUTES',
Limit=1,
ConsistentRead=True,
ScanIndexForward=False,
KeyConditionExpression=mock.ANY,
FilterExpression=mock.ANY
)
])

mock_resource.assert_has_calls([
mock.call().Table().delete_item(Key=mock.ANY),
mock.call().Bucket().delete_objects(
Delete={'Objects': [{'Key': 'eicar_test_test-uuid.txt'}]}
)
])

mock_print.assert_has_calls([
mock.call(
'Uploading EICAR test file '
'S3:test.prefix.binaryalert-binaries.us-test-1:eicar_test_test-uuid.txt...'
),
mock.call(
'EICAR test file uploaded! '
'Connecting to table DynamoDB:test_prefix_binaryalert_matches...'
),
mock.call(
'\t[1/10] Querying DynamoDB table for the expected YARA match entry...'
),
mock.call('\nSUCCESS: Expected DynamoDB entry for the EICAR file was found!\n'),
mock.call('\nRemoving DynamoDB EICAR entry...'),
mock.call('Removing EICAR test file from S3...'),
mock.call(
'\nLive test succeeded! Verify the alert was sent to your SNS subscription(s).'
)
])
@mock.patch.object(manage.live_test, 'run', return_value=False)
def test_live_test(self, mock_live_test: mock.MagicMock):
"""Live test wrapper raises TestFailureError if appropriate."""
with self.assertRaises(manage.TestFailureError):
self.manager.live_test()
mock_live_test.assert_called_once()

0 comments on commit b47720f

Please sign in to comment.