Skip to content

Commit

Permalink
Add a UPX-packed binary and a PDF to the live test
Browse files Browse the repository at this point in the history
  • Loading branch information
Austin Byers committed Apr 17, 2018
1 parent 9ba4e8f commit 15c5d71
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 32 deletions.
18 changes: 17 additions & 1 deletion rules/public/eicar.yara
Expand Up @@ -14,5 +14,21 @@ rule eicar_av_test {
$eicar_regex = /^X5O!P%@AP\[4\\PZX54\(P\^\)7CC\)7\}\$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!\$H\+H\*\s*$/
condition:
$eicar_regex
all of them
}

rule eicar_substring_test {
/*
More generic - match just the embedded EICAR string (e.g. in packed executables, PDFs, etc)
*/

meta:
description = "Standard AV test, checking for an EICAR substring"
author = "Austin Byers | Airbnb CSIRT"

strings:
$eicar_substring = "$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!"
condition:
all of them
}
Binary file added tests/files/eicar_packed.py.upx
Binary file not shown.
Binary file added tests/files/eicar_text.pdf
Binary file not shown.
63 changes: 38 additions & 25 deletions tests/live_test.py
@@ -1,14 +1,15 @@
"""Upload test files to S3 and see if the expected matches appear in Dynamo."""
import hashlib
import json
import os
import pprint
import time
from typing import Dict, List
import uuid

import boto3

TEST_DIR = os.path.dirname(os.path.realpath(__file__))
TEST_FILES = ['eicar.txt', 'eicar.tar.gz.bz2', 'eicar_packed.py.upx', 'eicar_text.pdf']


def _upload_test_files_to_s3(bucket_name: str) -> Dict[str, str]:
Expand All @@ -17,7 +18,7 @@ def _upload_test_files_to_s3(bucket_name: str) -> Dict[str, str]:
random_suffix = str(uuid.uuid4()).split('-')[-1]

result = {}
for filename in ['eicar.txt', 'eicar.tar.gz.bz2']:
for filename in TEST_FILES:
filepath = os.path.join(TEST_DIR, 'files', filename)
s3_object_key = '{}_{}'.format(filename, random_suffix)
s3_full_identifier = 'S3:{}:{}'.format(bucket_name, s3_object_key)
Expand All @@ -44,7 +45,7 @@ def _lambda_production_version(function_name: str) -> int:

def _query_dynamo_for_test_files(
table_name: str, file_info: Dict[str, str], analyzer_version: int,
max_attempts: int = 15) -> List:
max_attempts: int = 15) -> Dict[str, List[str]]:
"""Repeatedly query DynamoDB to look for the expected YARA matches.
Args:
Expand All @@ -54,17 +55,18 @@ def _query_dynamo_for_test_files(
max_attempts: Max number of times to query for results (with 5 seconds between each).
Returns:
True if the expected entries were found
dict: Map from test filename (str) to list of matched YARA rules.
"""
client = boto3.client('dynamodb')

results = {} # Map filename to list of matched rules.
for attempt in range(1, max_attempts + 1):
if attempt > 1:
time.sleep(5)
print('\t[{}/{}] Querying DynamoDB table for the expected YARA match entries...'.format(
attempt, max_attempts))

results = client.batch_get_item(
response = client.batch_get_item(
RequestItems={
table_name: {
'Keys': [
Expand All @@ -78,25 +80,13 @@ def _query_dynamo_for_test_files(
}
)['Responses'][table_name]

if len(results) < len(file_info):
# If there weren't as many matches as files uploaded, stop and try again.
continue
for match in response:
results[match['S3Metadata']['M']['filepath']['S']] = match['MatchedRules']['SS']

# Make sure the matches found are from the files we uploaded (and not others).
all_objects_found = True
for entry in results:
file_id = file_info[entry['SHA256']['S']]
if file_id not in entry['S3Objects']['SS']:
all_objects_found = False
break
if len(results) == len(file_info):
break

if not all_objects_found:
continue

# The results check out!
return results

return []
return results


def _cleanup(
Expand Down Expand Up @@ -149,12 +139,35 @@ def run(bucket_name: str, analyzer_function_name: str, table_name: str) -> bool:
analyzer_version = _lambda_production_version(analyzer_function_name)
results = _query_dynamo_for_test_files(table_name, test_file_info, analyzer_version)

if results:
print()
pprint.pprint(results)
expected = {
'eicar.txt': [
'public/eicar.yara:eicar_av_test',
'public/eicar.yara:eicar_substring_test',
'yextend:eicar_av_test',
'yextend:eicar_substring_test'
],
'eicar.tar.gz.bz2': [
'yextend:eicar_av_test',
'yextend:eicar_substring_test'
],
'eicar_packed.py.upx': [
'public/eicar.yara:eicar_substring_test',
'yextend:eicar_substring_test'
],
'eicar_text.pdf': [
'yextend:eicar_substring_test'
]
}

if results == expected:
print('\nSUCCESS: Expected DynamoDB entries for the test files were found!')
print(json.dumps(results, sort_keys=True, indent=4))
else:
print('\nFAIL: Expected DynamoDB entries for the test files were *not* found :(\n')
print('Expected Results:')
print(json.dumps(expected, sort_keys=True, indent=4))
print('Actual Results:')
print(json.dumps(results, sort_keys=True, indent=4))

_cleanup(bucket_name, test_file_info, table_name, analyzer_version)
print('Done!')
Expand Down
25 changes: 19 additions & 6 deletions tests/rules/eicar_rule_test.py
Expand Up @@ -19,19 +19,32 @@ def setUp(self):

def test_match_eicar_string(self):
"""Should match the exact EICAR string."""
self.assertEqual(1, len(self.eicar_rule.match(data=self.eicar_string)))
matches = self.eicar_rule.match(data=self.eicar_string)
self.assertEqual(
['eicar_av_test', 'eicar_substring_test'],
[match.rule for match in matches]
)

def test_match_eicar_with_trailing_spaces(self):
"""Trailing whitespace is allowed after the EICAR string."""
matches = self.eicar_rule.match(data='{} \n\t'.format(self.eicar_string))
self.assertEqual(1, len(matches))
self.assertEqual(
['eicar_av_test', 'eicar_substring_test'],
[match.rule for match in matches]
)

def test_no_match_if_eicar_is_not_beginning(self):
"""No match if EICAR string is not the beginning of the file."""
"""No match for eicar_av_test if EICAR string is not the beginning of the file."""
matches = self.eicar_rule.match(data='other-text {}'.format(self.eicar_string))
self.assertEqual(0, len(matches))
self.assertEqual(
['eicar_substring_test'],
[match.rule for match in matches]
)

def test_no_match_if_eicar_is_not_end(self):
"""No match if non-whitespace comes after the EICAR string."""
"""No match for eicar_av_test if non-whitespace comes after the EICAR string."""
matches = self.eicar_rule.match(data='{} other-text'.format(self.eicar_string))
self.assertEqual(0, len(matches))
self.assertEqual(
['eicar_substring_test'],
[match.rule for match in matches]
)

0 comments on commit 15c5d71

Please sign in to comment.