Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Change rule source configuration to JSON with include and exclude sup…
…port (#99)
- Loading branch information
1 parent
f8141d4
commit 460d2cd
Showing
7 changed files
with
291 additions
and
88 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,34 +1,92 @@ | ||
"""Update YARA rules cloned from remote sources.""" | ||
from fnmatch import fnmatch | ||
import json | ||
import os | ||
import shutil | ||
import subprocess | ||
import tempfile | ||
from typing import Generator, List, Optional | ||
|
||
RULES_DIR = os.path.dirname(os.path.realpath(__file__)) # Directory containing this file. | ||
REMOTE_RULE_SOURCES = { | ||
'https://github.com/Neo23x0/signature-base.git': ['yara'], | ||
'https://github.com/YARA-Rules/rules.git': ['CVE_Rules'] | ||
} | ||
|
||
|
||
def clone_rules_from_github() -> None: | ||
"""Update YARA rules cloned from GitHub.""" | ||
for url, folders in REMOTE_RULE_SOURCES.items(): | ||
# Clone repo into a temporary directory. | ||
print('Cloning YARA rules from {}/{}...'.format(url, folders)) | ||
cloned_repo_root = os.path.join(tempfile.gettempdir(), os.path.basename(url)) | ||
if os.path.exists(cloned_repo_root): | ||
shutil.rmtree(cloned_repo_root) | ||
subprocess.check_call(['git', 'clone', '--depth', '1', url, cloned_repo_root]) | ||
|
||
# Copy each specified folder into the target rules directory. | ||
for folder in folders: | ||
source = os.path.join(cloned_repo_root, folder) | ||
destination = os.path.join(RULES_DIR, url.split('//')[1], folder) | ||
if os.path.exists(destination): | ||
# Remove existing rules in this folder before copying | ||
# (in case upstream rules were deleted). | ||
shutil.rmtree(destination) | ||
shutil.copytree(source, destination) | ||
|
||
shutil.rmtree(cloned_repo_root) # Remove temporary cloned repo. | ||
REMOTE_RULE_SOURCES = os.path.join(RULES_DIR, 'rule_sources.json') | ||
|
||
|
||
def _copy_required(path: str, include: Optional[List[str]], exclude: Optional[List[str]]) -> bool: | ||
"""Return True if the given filepath should be copied, given the include/exclude directives.""" | ||
# 1) If the path is not in the "include" list (which defaults to everything), skip it. | ||
if include and not any(fnmatch(path, pattern) for pattern in include): | ||
return False | ||
|
||
# 2) If the path is specifically excluded, skip it. | ||
if exclude and any(fnmatch(path, pattern) for pattern in exclude): | ||
return False | ||
|
||
# 3) If the path is not a .yar or .yara file, skip it. | ||
lower_filename = path.lower() | ||
if not lower_filename.endswith('.yar') and not lower_filename.endswith('.yara'): | ||
return False | ||
|
||
return True | ||
|
||
|
||
def _files_to_copy( | ||
cloned_repo_root: str, include: Optional[List[str]], | ||
exclude: Optional[List[str]]) -> Generator[str, None, None]: | ||
"""Yields string paths to copy, each relative to the root of the repo.""" | ||
for root, _, files in os.walk(cloned_repo_root): | ||
for filename in files: | ||
# Compute path *relative to the root of its repository* | ||
relative_path = os.path.relpath(os.path.join(root, filename), start=cloned_repo_root) | ||
if _copy_required(relative_path, include, exclude): | ||
yield relative_path | ||
|
||
|
||
def _clone_repo(url: str, include: Optional[List[str]], exclude: Optional[List[str]]) -> int: | ||
"""Clone the given repo and copy only the YARA files from the specified paths. | ||
Returns: | ||
Number of files copied. | ||
""" | ||
# Shallow clone entire repo into a temp directory. | ||
cloned_repo_root = os.path.join(tempfile.gettempdir(), os.path.basename(url)) | ||
if os.path.exists(cloned_repo_root): | ||
shutil.rmtree(cloned_repo_root) | ||
subprocess.check_call(['git', 'clone', '--quiet', '--depth', '1', url, cloned_repo_root]) | ||
|
||
# Remove existing rules in target folder before copying (in case upstream rules were deleted). | ||
target_repo_root = os.path.join(RULES_DIR, url.split('//')[1]) | ||
if os.path.exists(target_repo_root): | ||
shutil.rmtree(target_repo_root) | ||
|
||
# Copy each applicable file into the target folder in the rules/ directory. | ||
files_copied = 0 | ||
for relative_path in _files_to_copy(cloned_repo_root, include, exclude): | ||
# Create all of the intermediate directories, if they don't already exist. | ||
os.makedirs(os.path.join(target_repo_root, os.path.dirname(relative_path)), exist_ok=True) | ||
src = os.path.join(cloned_repo_root, relative_path) | ||
dst = os.path.join(target_repo_root, relative_path) | ||
shutil.copy(src, dst) | ||
files_copied += 1 | ||
|
||
# Remove temporary cloned repo. | ||
shutil.rmtree(cloned_repo_root) | ||
|
||
return files_copied | ||
|
||
|
||
def clone_remote_rules() -> None: | ||
"""Clone YARA rules from all remote sources into the rules/ directory.""" | ||
with open(REMOTE_RULE_SOURCES) as f: | ||
rule_sources = json.load(f) | ||
|
||
num_repos = len(rule_sources['repos']) | ||
total_files_copied = 0 | ||
for count, source in enumerate(rule_sources['repos'], start=1): | ||
print('[{}/{}] Cloning {}... '.format(count, num_repos, source['url']), end='', flush=True) | ||
files_copied = _clone_repo(source['url'], source.get('include'), source.get('exclude')) | ||
print('{} YARA {} copied'.format(files_copied, 'file' if files_copied == 1 else 'files')) | ||
total_files_copied += files_copied | ||
|
||
print('Done! {} YARA {} cloned from {} {}.'.format( | ||
total_files_copied, 'file' if total_files_copied == 1 else 'files', | ||
num_repos, 'repository' if num_repos == 1 else 'repositories')) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
{ | ||
"repos": [ | ||
{ | ||
"url": "https://github.com/Neo23x0/signature-base.git", | ||
"include": [ | ||
"yara/*" | ||
] | ||
}, | ||
{ | ||
"url": "https://github.com/YARA-Rules/rules.git", | ||
"include": [ | ||
"CVE_Rules/*" | ||
] | ||
} | ||
] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
"""Tests for rule update/clone logic.""" | ||
# pylint: disable=protected-access | ||
import json | ||
import os | ||
from typing import List | ||
import unittest | ||
from unittest import mock | ||
|
||
from pyfakefs import fake_filesystem_unittest | ||
|
||
from rules import compile_rules, clone_rules | ||
|
||
|
||
class CopyRequiredTest(unittest.TestCase): | ||
"""Test the _copy_required private method.""" | ||
|
||
def test_copy_required_no_lists(self): | ||
"""If neither an exclude nor an include list is specified, YARA files should be copied.""" | ||
self.assertTrue(clone_rules._copy_required('path/to/file.yar', None, None)) | ||
self.assertTrue(clone_rules._copy_required('path/fo/file.YARA', [], [])) | ||
self.assertFalse(clone_rules._copy_required('.git/HEAD', None, None)) | ||
self.assertFalse(clone_rules._copy_required('path/to/file.txt', None, None)) | ||
|
||
def test_copy_required_include_list(self): | ||
"""Only files matching the include list should be copied.""" | ||
include_list = ['path/to/*', '[abc]?/*/file*'] | ||
|
||
self.assertTrue(clone_rules._copy_required('path/to/rules.yara', include_list, [])) | ||
self.assertTrue(clone_rules._copy_required( | ||
'a1/some/long/path/file_apt.yara', include_list, [])) | ||
self.assertTrue(clone_rules._copy_required('b2/malware/file ROOTKIT.YAR', include_list, [])) | ||
|
||
self.assertFalse(clone_rules._copy_required('base.yara', include_list, [])) | ||
self.assertFalse(clone_rules._copy_required('path/to/file.txt', include_list, [])) | ||
self.assertFalse(clone_rules._copy_required('a1/file.yara', include_list, [])) | ||
|
||
def test_copy_required_exclude_list(self): | ||
"""Skip any file matching the exclude list.""" | ||
exclude_list = ['*.yar', 'skip/these/file*'] | ||
self.assertTrue(clone_rules._copy_required('base.yara', [], exclude_list)) | ||
self.assertTrue(clone_rules._copy_required('path/to/file.yara', [], exclude_list)) | ||
self.assertFalse(clone_rules._copy_required('file.yar', [], exclude_list)) | ||
self.assertFalse(clone_rules._copy_required('skip/these/file.yara', [], exclude_list)) | ||
|
||
def test_copy_required_include_and_exclude(self): | ||
"""Test copy required with both an include and exclude list specified.""" | ||
include = ['yara/*', '*_malware_*'] | ||
exclude = ['*mobile*', 'yara/?.yara'] | ||
|
||
self.assertTrue(clone_rules._copy_required('yara/packed.yara', include, exclude)) | ||
self.assertTrue(clone_rules._copy_required('base_malware_index.yara', include, exclude)) | ||
self.assertTrue(clone_rules._copy_required('yara/mac_malware.yar', include, exclude)) | ||
|
||
self.assertFalse(clone_rules._copy_required('not_included.yara', include, exclude)) | ||
self.assertFalse(clone_rules._copy_required('yara/mobile_malware.yara', include, exclude)) | ||
self.assertFalse(clone_rules._copy_required('yara/A.yara', include, exclude)) | ||
|
||
|
||
class CloneRulesTest(fake_filesystem_unittest.TestCase): | ||
"""Tests for the rule-cloning logic.""" | ||
|
||
def setUp(self): | ||
"""Setup the fake filesystem with the expected rules folder structure.""" | ||
self.setUpPyfakefs() | ||
os.makedirs(clone_rules.RULES_DIR) | ||
|
||
# Add fake rule sources. | ||
self.fs.CreateFile(clone_rules.REMOTE_RULE_SOURCES, contents=json.dumps( | ||
{ | ||
"repos": [ | ||
{ | ||
"url": "https://github.com/test-user1/test-repo1", | ||
"include": ["yara/*"] | ||
}, | ||
{ | ||
"url": "https://github.com/test-user2/test-repo2", | ||
"exclude": ["windows/*", "*_mobile.yara"] | ||
} | ||
] | ||
} | ||
)) | ||
|
||
# Add extra rules (which should be deleted). | ||
self.fs.CreateFile(os.path.join( | ||
clone_rules.RULES_DIR, | ||
'github.com', 'test-user1', 'test-repo1', 'CVE_Rules', 'delete-me.yara' | ||
)) | ||
|
||
# Add some other rules (which should be preserved). | ||
self.fs.CreateFile(os.path.join(clone_rules.RULES_DIR, 'private', 'private.yara')) | ||
|
||
def _mock_git_clone(self, args: List[str]) -> None: | ||
"""Mock out git clone by creating the "cloned" directory.""" | ||
cloned_repo_root = args[-1] | ||
|
||
# Create "cloned" directory and subfolders. | ||
if cloned_repo_root.endswith('test-repo1'): | ||
self.fs.CreateFile(os.path.join(cloned_repo_root, 'yara', 'cloned.yara')) | ||
self.fs.CreateFile(os.path.join(cloned_repo_root, 'not_included.yara')) | ||
else: | ||
self.fs.CreateFile(os.path.join(cloned_repo_root, 'yara', 'cloned.yara')) | ||
self.fs.CreateFile(os.path.join(cloned_repo_root, 'yara', 'exluded_mobile.yara')) | ||
self.fs.CreateFile(os.path.join(cloned_repo_root, 'windows', 'excluded.yara')) | ||
|
||
@mock.patch.object(clone_rules, 'print') | ||
def test_clone_remote_rules(self, mock_print: mock.MagicMock): | ||
"""Mock out the clone process and verify which rules files were saved/deleted.""" | ||
with mock.patch('subprocess.check_call', side_effect=self._mock_git_clone): | ||
clone_rules.clone_remote_rules() | ||
|
||
mock_print.assert_has_calls([ | ||
mock.call('[1/2] Cloning https://github.com/test-user1/test-repo1... ', | ||
end='', flush=True), | ||
mock.call('1 YARA file copied'), | ||
mock.call('[2/2] Cloning https://github.com/test-user2/test-repo2... ', | ||
end='', flush=True), | ||
mock.call('1 YARA file copied'), | ||
mock.call('Done! 2 YARA files cloned from 2 repositories.') | ||
]) | ||
|
||
expected_files = { | ||
'github.com/test-user1/test-repo1/yara/cloned.yara', | ||
'github.com/test-user2/test-repo2/yara/cloned.yara', | ||
'private/private.yara' | ||
} | ||
self.assertEqual(expected_files, set(compile_rules._find_yara_files())) |
Oops, something went wrong.