From 69179d03e4eda28f9ea041be615c8d1ea8e94893 Mon Sep 17 00:00:00 2001 From: DC3-DCCI Date: Tue, 15 May 2018 12:03:43 -0400 Subject: [PATCH] Add unit tests and fixed bugs found along the way. Also, set kordesii to be an optional. --- .gitignore | 1 + CHANGELOG.md | 5 + README.md | 71 ++++++++++- mwcp/__init__.py | 2 +- mwcp/malwareconfigreporter.py | 2 +- mwcp/parsers/__init__.py | 18 +-- mwcp/reporter.py | 90 +++++++------ mwcp/resources/dispatcher.py | 24 +++- mwcp/tester.py | 4 +- mwcp/tools/tool.py | 57 ++++----- mwcp/utils/construct/helpers.py | 47 +++---- mwcp/utils/custombase64.py | 27 ++-- mwcp/utils/pefileutils.py | 2 + setup.py | 11 +- tests/conftest.py | 34 +++++ tests/test_cli.py | 219 ++++++++++++++++++++++++++++++++ tests/test_custombase64.py | 23 ++++ tests/test_dispatcher.py | 102 +++++++++++++++ tests/test_parser_registry.py | 43 +++++++ tests/test_reporter.py | 115 +++++++++++++++++ tox.ini | 14 ++ 21 files changed, 774 insertions(+), 137 deletions(-) create mode 100644 tests/conftest.py create mode 100644 tests/test_cli.py create mode 100644 tests/test_custombase64.py create mode 100644 tests/test_dispatcher.py create mode 100644 tests/test_parser_registry.py create mode 100644 tests/test_reporter.py create mode 100644 tox.ini diff --git a/.gitignore b/.gitignore index 4e1ca0c..54e05e0 100644 --- a/.gitignore +++ b/.gitignore @@ -45,6 +45,7 @@ htmlcov/ nosetests.xml coverage.xml *,cover +.pytest_cache/ # Translations *.mo diff --git a/CHANGELOG.md b/CHANGELOG.md index b0197df..0fe1a10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,13 +2,18 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +## Added +- Added unit testing using tox and pytest. + ### Changed - Added new standard metadata fields - Cleaned up mwcp tool - Updated and added documentation for developing/testing parsers. +- Set DC3-Kordesii as an optional dependency. ### Fixed - Fixed "unorderable types" error when outputting to csv +- Fixed bugs found in unit tests. ## [1.2.0] - 2018-04-17 ### Added diff --git a/README.md b/README.md index 2dc8656..e91eabf 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ command line tool. DC3-MWCP is authored by the Defense Cyber Crime Center (DC3). - [Install](#install) - [No-install Method](#no-install-method) +- [DC3-Kordesii Support](#dc3-kordesii-support) +- [Unit Tests](#unit-tests) - [Usage](#usage) - [CLI Tool](#cli-tool) - [REST API](#rest-api) @@ -45,10 +47,6 @@ git clone https://github.com/Defense-Cyber-Crime-Center/DC3-MWCP.git pip install -e ./DC3-MWCP ``` -When installing locally from a cloned repo, you may need to -install the [kordesii](https://github.com/Defense-Cyber-Crime-Center/kordesii) -dependency first. - ## No-install Method You can also use MWCP without installing using the *mwcp-\*.py* scripts. However, you will need to manually install all the dependencies. @@ -61,6 +59,71 @@ Example: python mwcp-tool.py -h ``` +## DC3-Kordesii Support +DC3-MWCP optionally supports [DC3-Kordesii](https://github.com/Defense-Cyber-Crime-Center/kordesii) +if it is installed. This will allow you to run any DC3-Kordesii decoder from the +`mwcp.FileObject` object with the `run_kordesii_decoder` function. + +You can install DC3-Kordesii along with DC3-MWCP by adding `[kordesii]` to your appropriate install command: +``` +pip install mwcp[kordesii] +pip install ./DC3-MWCP[kordesii] +pip install -e ./DC3-MWCP[kordesii] +``` + +## Unit Tests +DC3-MWCP uses [tox](https://tox.readthedocs.io) with [pytest](https://pytest.org) to test the core code +and parsers. These libraries will be installed when you install DC3-MWCP. +To run all tests on Python 2.7 and 3.6 run the `tox` command after installation. + +```bash +$ tox +GLOB sdist-make: C:\dev\DC3_MWCP\setup.py +py27 inst-nodeps: C:\dev\DC3_MWCP\.tox\dist\mwcp-1.2.0.zip +py27 installed: attrs==17.4.0,bottle==0.12.13,certifi==2018.4.16,chardet==3.0.4,colorama==0.3.9,construct==2.8.12,funcsigs==1.0.2,future==0.16.0,idna==2.6,Jinja2==2.10,MarkupSafe==1.0,mock==2.0.0,more-itertools==4.1.0,mwcp==1.2.0,pbr==4.0.2,pefile==2017.11.5,pluggy==0.6.0,py==1.5.3,pytest==3.5.0,pytest-console-scripts==0.1.4,pytest-mock==1.9.0,requests==2.18.4,six==1.11.0,tox==3.0.0,urllib3==1.22,virtualenv==15.2.0 +py27 runtests: PYTHONHASHSEED='155' +py27 runtests: commands[0] | pytest --doctest-modules +============================= test session starts ============================= +platform win32 -- Python 2.7.14, pytest-3.5.0, py-1.5.3, pluggy-0.6.0 +rootdir: C:\dev\DC3_MWCP, inifile: tox.ini +plugins: mock-1.9.0, console-scripts-0.1.4 +collected 59 items + +mwcp\utils\construct\construct_html.py . [ 1%] +mwcp\utils\construct\helpers.py ......................... [ 44%] +mwcp\utils\construct\windows_enums.py .... [ 50%] +mwcp\utils\construct\windows_structures.py . [ 52%] +tests\test_cli.py ....... [ 64%] +tests\test_custombase64.py ... [ 69%] +tests\test_dispatcher.py ..... [ 77%] +tests\test_parser_registry.py .. [ 81%] +tests\test_reporter.py ........... [100%] + +========================== 59 passed in 8.30 seconds ========================== +py36 inst-nodeps: C:\dev\DC3_MWCP\.tox\dist\mwcp-1.2.0.zip +py36 installed: attrs==17.4.0,bottle==0.12.13,certifi==2018.4.16,chardet==3.0.4,colorama==0.3.9,construct==2.8.12,future==0.16.0,idna==2.6,Jinja2==2.10,MarkupSafe==1.0,mock==2.0.0,more-itertools==4.1.0,mwcp==1.2.0,pbr==4.0.2,pefile==2017.11.5,pluggy==0.6.0,py==1.5.3,pytest==3.5.0,pytest-console-scripts==0.1.4,pytest-mock==1.9.0,requests==2.18.4,six==1.11.0,tox==3.0.0,urllib3==1.22,virtualenv==15.2.0 +py36 runtests: PYTHONHASHSEED='155' +py36 runtests: commands[0] | pytest +============================= test session starts ============================= +platform win32 -- Python 3.6.3, pytest-3.5.0, py-1.5.3, pluggy-0.6.0 +rootdir: C:\dev\DC3_MWCP, inifile: tox.ini +plugins: mock-1.9.0, console-scripts-0.1.4 +collected 28 items + +tests\test_cli.py ....... [ 25%] +tests\test_custombase64.py ... [ 35%] +tests\test_dispatcher.py ..... [ 53%] +tests\test_parser_registry.py .. [ 60%] +tests\test_reporter.py ........... [100%] + +========================== 28 passed in 6.33 seconds ========================== +___________________________________ summary ___________________________________ + py27: commands succeeded + py36: commands succeeded + congratulations :) +``` + + ## Usage DC3-MWCP is designed to allow easy development and use of malware config parsers. DC3-MWCP is also designed to ensure that these parsers are scalable and that DC3-MWCP can be integrated in other systems. diff --git a/mwcp/__init__.py b/mwcp/__init__.py index 1c2c3fb..6f710ca 100644 --- a/mwcp/__init__.py +++ b/mwcp/__init__.py @@ -4,4 +4,4 @@ from mwcp.parsers import register_parser_directory, iter_parsers, get_parser_descriptions from mwcp.reporter import Reporter from mwcp.resources import techanarchy_bridge -from mwcp.resources.dispatcher import Dispatcher, ComponentParser, FileObject, UnableToParse +from mwcp.resources.dispatcher import Dispatcher, ComponentParser, FileObject, UnableToParse, UnidentifiedFile diff --git a/mwcp/malwareconfigreporter.py b/mwcp/malwareconfigreporter.py index 9ad9189..7b133d4 100644 --- a/mwcp/malwareconfigreporter.py +++ b/mwcp/malwareconfigreporter.py @@ -7,7 +7,7 @@ import sys import warnings -warnings.warn('The mwcp.malwareconfigreporter module is deprecated; use mwcp.reporter instead', DeprecationWarning, 2) +warnings.warn('The mwcp.malwareconfigreporter module is deprecated; use mwcp.Reporter instead', DeprecationWarning, 2) from mwcp.reporter import Reporter diff --git a/mwcp/parsers/__init__.py b/mwcp/parsers/__init__.py index e43cc9d..2ee2c0e 100644 --- a/mwcp/parsers/__init__.py +++ b/mwcp/parsers/__init__.py @@ -85,32 +85,32 @@ def iter_parsers(name=None, source=None): (source is either the name of a python package or path to local directory) e.g. - >>> list(iter_parsers()) + >> list(iter_parsers()) [ ('foo', 'C:\...\parsers', ), ('foo', 'mwcp-acme', ), ('bar', 'mwcp-acme', ) ] - >>> list(iter_parsers(name='foo')) + >> list(iter_parsers(name='foo')) [ ('foo', 'C:\...\parsers', ), ('foo', 'mwcp-acme', ) ] - >>> list(iter_parsers(source='mwcp-acme')) + >> list(iter_parsers(source='mwcp-acme')) [ ('foo', 'mwcp-acme', ), ('bar', 'mwcp-acme', ] - >>> list(iter_parsers('mwcp-acme:')) + >> list(iter_parsers('mwcp-acme:')) [ ('foo', 'mwcp-acme', ), ('bar', 'mwcp-acme', ] - >>> list(iter_parsers(name='foo', source='mwcp-acme')) + >> list(iter_parsers(name='foo', source='mwcp-acme')) [ ('foo', 'mwcp-acme', ) ] - >>> list(iter_parsers('mwcp-acme:foo')) + >> list(iter_parsers('mwcp-acme:foo')) [ ('foo', 'mwcp-acme', ) ] @@ -139,7 +139,7 @@ def iter_parsers(name=None, source=None): yield name, source_name, klass -def get_parser_descriptions(): +def get_parser_descriptions(name=None, source=None): """ Retrieve list of parser descriptions @@ -150,9 +150,9 @@ def get_parser_descriptions(): # temporarily initialize them in order to extract their info. # TODO: In the future, this information should be static attributes on the class itself. reporter = mwcp.Reporter() - for name, source, klass in sorted(iter_parsers()): + for _name, _source, klass in sorted(iter_parsers(name=name, source=None)): parser = klass(reporter) - descriptions.append((name, source, parser.author, parser.description)) + descriptions.append((_name, _source, parser.author, parser.description)) return descriptions diff --git a/mwcp/reporter.py b/mwcp/reporter.py index 1c4f02d..3a38e32 100644 --- a/mwcp/reporter.py +++ b/mwcp/reporter.py @@ -5,7 +5,7 @@ import contextlib -from future.builtins import str +from future.builtins import str, open, map import base64 import hashlib @@ -139,7 +139,7 @@ def __init__(self, self._resourcedir = None self.resourcedir = os.path.dirname(resources.__file__) - self.__managed_tempdir = '' + self.__managed_tempdir = None self.__outputdir = outputdir or '' self.__outputfile_prefix = outputfile_prefix or '' @@ -152,19 +152,19 @@ def __init__(self, if self.parserdir != self.DEFAULT_PARSERDIR or not any(mwcp.iter_parsers(source='mwcp')): mwcp.register_parser_directory(self.parserdir) - self.__interpreter_path = interpreter_path - self.__disabledebug = disabledebug - self.__disableoutputfiles = disableoutputfiles - self.__disabletempcleanup = disabletempcleanup + self._interpreter_path = interpreter_path + self._disable_debug = disabledebug + self._disable_output_files = disableoutputfiles + self._disable_temp_cleanup = disabletempcleanup self._disable_auto_subfield_parsing = disableautosubfieldparsing self._disable_value_dedup = disablevaluededup - self.__disablemodulesearch = disablemodulesearch - self.__base64outputfiles = base64outputfiles + self._disable_module_search = disablemodulesearch + self._base64_output_files = base64outputfiles # TODO: Move fields.json to shared data or config folder. fieldspath = os.path.join(os.path.dirname(mwcp.resources.__file__), "fields.json") - with open(fieldspath, b'rb') as f: + with open(fieldspath, 'rb') as f: self.fields = json.load(f) # Allow user to still use resourcedir feature, but warn about deprecation. @@ -185,11 +185,11 @@ def resourcedir(self, resourcedir): # we put resourcedir in PYTHONPATH in case we shell out or children # processes need this # Windows environment variables must be byte strings. - if b'PYTHONPATH' in os.environ: - if resourcedir not in os.environ[b'PYTHONPATH']: - os.environ[b'PYTHONPATH'] = os.environ[b'PYTHONPATH'] + os.pathsep + resourcedir + if 'PYTHONPATH' in os.environ: + if resourcedir not in os.environ['PYTHONPATH']: + os.environ['PYTHONPATH'] = os.environ['PYTHONPATH'] + os.pathsep + resourcedir else: - os.environ[b'PYTHONPATH'] = resourcedir + os.environ['PYTHONPATH'] = resourcedir @property def data(self): @@ -237,7 +237,7 @@ def managed_tempdir(self): self.__managed_tempdir = tempfile.mkdtemp( dir=self.tempdir, prefix="mwcp-managed_tempdir-") - if self.__disabletempcleanup: + if self._disable_temp_cleanup: self.debug("Using managed temp dir: %s" % self.__managed_tempdir) @@ -246,27 +246,27 @@ def managed_tempdir(self): def interpreter_path(self): """ Returns the path for python interpreter, assuming it can be found. Because of various - factors (inlcuding ablity to override) this may not be accurate. + factors (including ability to override) this may not be accurate. """ - if not self.__interpreter_path: + if not self._interpreter_path: # first try sys.executable--this is reliable most of the time but # doesn't work when python is embedded, ex. using wsgi mod for web # server if "python" in os.path.basename(sys.executable): - self.__interpreter_path = sys.executable + self._interpreter_path = sys.executable # second try sys.prefix and common executable names else: possible_path = os.path.join(sys.prefix, "python.exe") if os.path.exists(possible_path): - self.__interpreter_path = possible_path + self._interpreter_path = possible_path possible_path = os.path.join(sys.prefix, "bin", "python") if os.path.exists(possible_path): - self.__interpreter_path = possible_path + self._interpreter_path = possible_path # other options to consider: # look at some library paths, such as os.__file__, use system path to find python # executable that uses that library use shell and let it find python. Ex. which python - return self.__interpreter_path + return self._interpreter_path def error(self, message): """ @@ -278,7 +278,7 @@ def debug(self, message): """ Record a debug message """ - if not self.__disabledebug: + if not self._disable_debug: self.add_metadata("debug", message) def _add_metatadata_listofstrings(self, key, value): @@ -294,7 +294,7 @@ def _add_metatadata_listofstrings(self, key, value): return if key == "filepath": - # use ntpath instead of os.path so we are consistant across platforms. ntpath + # use ntpath instead of os.path so we are consistent across platforms. ntpath # should work for both windows and unix paths. os.path works for the platform # you are running on, not necessarily what the malware was written for. # Ex. when running mwcp on linux to process windows @@ -321,7 +321,7 @@ def _add_metatadata_listofstrings(self, key, value): if key == "ssl_cer_sha1": if not self.SHA1_RE.match(value): - self.debug("Invalid SHA1 hash found: {!r}".format(value)) + self.error("Invalid SHA1 hash found: {!r}".format(value)) if key in ("url", "c2_url"): # http://[fe80::20c:1234:5678:9abc]:80/badness @@ -329,7 +329,7 @@ def _add_metatadata_listofstrings(self, key, value): # ftp://127.0.0.1/really/bad?hostname=pwned match = self.URL_RE.search(value) if not match: - self.debug("Error parsing as url: %s" % value) + self.error("Error parsing as url: %s" % value) return if match.group("path"): @@ -340,7 +340,7 @@ def _add_metatadata_listofstrings(self, key, value): if address.startswith("["): # ipv6--something like # [fe80::20c:1234:5678:9abc]:80 - domain, found, port = address[1:].parition(']:') + domain, found, port = address[1:].partition(']:') else: domain, found, port = address.partition(":") @@ -351,7 +351,7 @@ def _add_metatadata_listofstrings(self, key, value): else: self.add_metadata("socketaddress", [domain, port, "tcp"]) else: - self.debug("Invalid URL {!r} found ':' at end without a port.".format(address)) + self.error("Invalid URL {!r} found ':' at end without a port.".format(address)) else: if key == "c2_url": self.add_metadata("c2_address", address) @@ -359,8 +359,6 @@ def _add_metatadata_listofstrings(self, key, value): self.add_metadata("address", address) def _add_metadata_listofstringtuples(self, key, values): - values = map(convert_to_unicode, values) - # Pad values that allow for shorter versions. expected_size = { 'proxy': 5, @@ -369,6 +367,8 @@ def _add_metadata_listofstringtuples(self, key, values): if key in expected_size: values = tuple(values) + ('',) * (expected_size[key] - len(values)) + values = list(map(convert_to_unicode, values)) + obj = self.metadata.setdefault(key, []) if self._disable_value_dedup or values not in obj: obj.append(values) @@ -504,7 +504,7 @@ def add_metadata(self, key, value): if fieldtype == "dictofstrings": self._add_metadata_dictofstrings(keyu, value) except Exception: - self.debug("Error adding metadata for key: %s\n%s" % + self.error("Error adding metadata for key: %s\n%s" % (keyu, traceback.format_exc())) def run_parser(self, name, file_path=None, data=b"", **kwargs): @@ -518,7 +518,7 @@ def run_parser(self, name, file_path=None, data=b"", **kwargs): self.__reset() if file_path: - with open(file_path, b'rb') as f: + with open(file_path, 'rb') as f: self.input_file = mwcp.FileObject( f.read(), self, file_name=os.path.basename(file_path), output_file=False) self.input_file.file_path = file_path @@ -563,13 +563,13 @@ def output_file(self, data, filename, description=''): self.outputfiles[filename] = { 'data': data, 'description': description, 'md5': md5} - if self.__base64outputfiles: + if self._base64_output_files: self.add_metadata( "outputfile", [basename, description, md5, base64.b64encode(data)]) else: self.add_metadata("outputfile", [basename, description, md5]) - if self.__disableoutputfiles: + if self._disable_output_files: return if self.__outputfile_prefix: @@ -583,7 +583,7 @@ def output_file(self, data, filename, description=''): fullpath = os.path.join(self.__outputdir, basename) try: - with open(fullpath, b"wb") as f: + with open(fullpath, "wb") as f: f.write(data) self.debug("outputfile: %s" % (fullpath)) self.outputfiles[filename]['path'] = fullpath @@ -596,7 +596,7 @@ def report_tempfile(self, filename, description=''): load filename from filesystem and report using output_file """ if os.path.isfile(filename): - with open(filename, b"rb") as f: + with open(filename, "rb") as f: data = f.read() self.output_file(data, os.path.basename(filename), description) else: @@ -627,22 +627,20 @@ def print_report(self): """ Output in human readable report format """ - output = self.get_output_text() - print(output.encode('utf8')) + print(self.get_output_text()) def get_printable_key_value(self, key, value): output = "" printkey = key - if isinstance(value, str): - output += "{:20} {}\n".format(printkey, value) + if isinstance(value, (str, bytes)): + output += "{:20} {}\n".format(printkey, convert_to_unicode(value)) else: for item in value: - if isinstance(item, str): - output += "{:20} {}\n".format(printkey, item) + if isinstance(item, (str, bytes)): + output += "{:20} {}\n".format(printkey, convert_to_unicode(item)) else: - output += "{:20} {}\n".format(printkey, - self.format_list(item, key=key)) + output += "{:20} {}\n".format(printkey, self.format_list(item, key=key)) printkey = "" return output @@ -708,7 +706,7 @@ def __redirect_stdout(self): try: yield finally: - if not self.__disabledebug: + if not self._disable_debug: for line in debug_stdout.getvalue().splitlines(): self.debug(line) sys.stdout = orig_stdout @@ -719,7 +717,7 @@ def __reset(self): Goal is to make the reporter safe to use for multiple run_parser instances """ - self.__managed_tempdir = '' + self.__managed_tempdir = None self.input_file = None self._handle = None @@ -731,7 +729,7 @@ def __cleanup(self): """ Cleanup things """ - if not self.__disabletempcleanup: + if not self._disable_temp_cleanup: if self.__managed_tempdir: try: shutil.rmtree(self.__managed_tempdir, ignore_errors=True) @@ -740,7 +738,7 @@ def __cleanup(self): (self.__managed_tempdir, str(e))) self.__managed_tempdir = '' - self.__managed_tempdir = '' + self.__managed_tempdir = None def __del__(self): self.__cleanup() diff --git a/mwcp/resources/dispatcher.py b/mwcp/resources/dispatcher.py index 95408e3..2b2d367 100644 --- a/mwcp/resources/dispatcher.py +++ b/mwcp/resources/dispatcher.py @@ -4,7 +4,10 @@ content to ease maintenance. """ +from __future__ import unicode_literals + # Python standard imports +import codecs import pefile import hashlib import io @@ -15,7 +18,11 @@ from mwcp.utils import pefileutils # Kordesii framework imports -from kordesii.kordesiireporter import kordesiireporter +try: + from kordesii.kordesiireporter import kordesiireporter +except ImportError: + # Kordesii support is optional. + kordesiireporter = None class UnableToParse(Exception): @@ -68,7 +75,7 @@ def __init__( self.file_name = file_name else: self.file_name = pefileutils.obtain_original_filename( - def_stub or self.md5.encode('hex'), pe=self.pe, reporter=reporter, use_arch=use_arch) + def_stub or codecs.encode(self.md5, 'hex').decode('utf8'), pe=self.pe, reporter=reporter, use_arch=use_arch) # Sanity check assert self.file_name @@ -78,10 +85,10 @@ def __enter__(self): This allows us to use the file_data as a file-like object when used as a context manager. e.g. - >>> file_object = FileObject('hello world', None) - >>> with file_object as fo: - ... _ = fo.seek(6) - ... print fo.read() + >> file_object = FileObject('hello world', None) + >> with file_object as fo: + .. _ = fo.seek(6) + .. print fo.read() world """ self._open_file = io.BytesIO(self.file_data) @@ -176,7 +183,12 @@ def run_kordesii_decoder(self, decoder_name): :param decoder_name: name of the decoder to run :return: Instance of the kordesii_reporter. + + :raises RuntimeError: If kordesii is not installed. """ + if not kordesiireporter: + raise RuntimeError('Please install kordesii to use this function.') + self.reporter.debug('[*] Running {} kordesii decoder on file {}.'.format(decoder_name, self.file_name)) kordesii_reporter = kordesiireporter(base64outputfiles=True, enableidalog=True) diff --git a/mwcp/tester.py b/mwcp/tester.py index bfe08c5..54709ae 100644 --- a/mwcp/tester.py +++ b/mwcp/tester.py @@ -153,7 +153,7 @@ def update_test_results(self, # Write updated data to results file # NOTE: We need to use dumps instead of dump to avoid TypeError. - with open(results_file_path, b'w', encoding='utf8') as results_file: + with open(results_file_path, 'w', encoding='utf8') as results_file: results_file.write(str(json.dumps(results_file_data, results_file, indent=4, sort_keys=True))) def remove_test_results(self, parser_name, filenames): @@ -170,7 +170,7 @@ def remove_test_results(self, parser_name, filenames): else: results_file_data.append(metadata) - with open(self.get_results_filepath(parser_name), b'w', encoding='utf8') as results_file: + with open(self.get_results_filepath(parser_name), 'w', encoding='utf8') as results_file: results_file.write(str(json.dumps(results_file_data, results_file, indent=4, sort_keys=True))) return removed_files diff --git a/mwcp/tools/tool.py b/mwcp/tools/tool.py index 8cf28fe..026b735 100644 --- a/mwcp/tools/tool.py +++ b/mwcp/tools/tool.py @@ -81,7 +81,7 @@ def _write_csv(input_files, results, csv_path, base64_outputfiles=False): # Results in columns: other, other.unique_entry, other.unique_key if 'other' in metadata: for sub_key, sub_value in metadata['other'].items(): - metadata['other.{}'.format(sub_key)] = sub_value + metadata['other.{}'.format(convert_to_unicode(sub_key))] = sub_value del metadata['other'] # Split outputfile into multiple fields. @@ -100,13 +100,11 @@ def _write_csv(input_files, results, csv_path, base64_outputfiles=False): column_names, key=lambda x: str(_STD_CSV_COLUMNS.index(x)) if x in _STD_CSV_COLUMNS else x) # Reformat metadata and write to CSV - with open(csv_path, b'wb') as csvfile: - dw = csv.DictWriter(csvfile, fieldnames=column_names) + with open(csv_path, 'wb' if sys.version_info.major < 3 else 'w') as csvfile: + dw = csv.DictWriter(csvfile, fieldnames=column_names, lineterminator='\n') dw.writeheader() - dw.writerows([ - {k: _format_metadata_value(v).encode('utf8') for k, v in metadata.items()} - for metadata in results - ]) + for metadata in results: + dw.writerow({k: _format_metadata_value(v) for k, v in metadata.items()}) def _print_parsers(json_output=False): @@ -157,7 +155,7 @@ def _get_file_paths(input_args, is_filelist=True): if input_args[0] == "-": return [line.rstrip() for line in sys.stdin] else: - with open(input_args[0], b"rb") as f: + with open(input_args[0], "r") as f: return [line.rstrip() for line in f] else: file_paths = [] @@ -229,62 +227,62 @@ def get_arg_parser(): type=str, dest="parser", help="Malware config parser to call. (use ':' notation to specify source if necessary e.g. 'mwcp-acme:Foo')") - parser.add_argument("-l", + parser.add_argument("-l", "--parsers", action="store_true", default=False, dest="list", help="list all malware config parsers.") - parser.add_argument("-k", + parser.add_argument("-k", "--fields", action="store_true", default=False, dest="fields", help="List all standardized fields and examples. See resources/fields.json") parser.add_argument("-a", "--parserdir", metavar="DIR", - default=default_parserdir, + default=None, dest="parserdir", - help="Parsers directory" + " [default: {}]".format(default_parserdir)) - parser.add_argument("-o", + help="Optional extra parser directory") + parser.add_argument("-o", "--outputdir", metavar="DIR", default="", dest="outputdir", help="Output directory.") - parser.add_argument("-c", + parser.add_argument("-c", "--csv", metavar="CSVWRITE", default="", dest="csvwrite", help="Output CSV file.") - parser.add_argument("-t", + parser.add_argument("-t", "--tempdir", metavar="DIR", default=tempfile.gettempdir(), dest="tempdir", help="Temp directory." + " [default: {}]".format(tempfile.gettempdir())) - parser.add_argument("-j", + parser.add_argument("-j", "--json", action="store_true", default=False, dest="jsonoutput", help="Enable json output for parser reports (instead of formatted text).") - parser.add_argument("-n", + parser.add_argument("-n", "--disable_output", action="store_true", default=False, dest="disableoutputfiles", help="Disable writing output files to filesystem.") - parser.add_argument("-g", + parser.add_argument("-g", "--disable-temp-cleanup", action="store_true", default=False, dest="disabletempcleanup", help="Disable cleanup of framework created temp files including managed tempdir.") - parser.add_argument("-f", + parser.add_argument("-f", "--include-filename", action="store_true", default=False, dest="includefilename", help="Include file information such as filename, hashes, and compile time in parser output.") - parser.add_argument("-d", + parser.add_argument("-d", "--no-debug", action="store_true", default=False, dest="hidedebug", help="Hide debug messages in output.") - parser.add_argument("-u", + parser.add_argument("-u", "--output-prefix", metavar="FILENAME", default="", dest="outputfile_prefix", @@ -293,22 +291,22 @@ def get_arg_parser(): "files for analysis, the default will be 'md5'. Passing in a value with the -u option " + "or using the -U option can be used to override the 'md5' default for multiple files. " + "[default: (No prefix|md5)]") - parser.add_argument("-U", + parser.add_argument("-U", "--no-output-prefix", action="store_true", default=False, dest="disableoutputfileprefix", help="When in effect, parser output files will not have a filename prefix.") - parser.add_argument("-i", + parser.add_argument("-i", "--filelist", action="store_true", default=False, dest="filelistindirection", help="Input file contains a list of filenames to process.") - parser.add_argument("-b", + parser.add_argument("-b", "--base64", action="store_true", default=False, dest="base64outputfiles", help="Base64 encode output files and include in metadata.") - parser.add_argument("-w", + parser.add_argument("-w", "--kwargs", metavar="JSON", default="", dest="kwargs_raw", @@ -319,9 +317,9 @@ def get_arg_parser(): return parser -def main(): +def main(args=None): argparser = get_arg_parser() - args, input_files = argparser.parse_known_args() + args, input_files = argparser.parse_known_args(args) # This is a preliminary check before creating the reporter to establish how output # file prefixes should be set. @@ -352,7 +350,7 @@ def main(): for key, value in list(kwargs.items()): if value and value.startswith('b64file(') and value.endswith(')'): tmp_filename = value[len('b64file('):-1] - with open(tmp_filename, b'rb') as f: + with open(tmp_filename, 'rb') as f: kwargs[key] = base64.b64encode(f.read()) # Run MWCP @@ -375,8 +373,6 @@ def main(): if args.csvwrite: csv_path = args.csvwrite - if not csv_path.endswith('.csv'): - csv_path += '.csv' _write_csv(input_files, results, csv_path, args.base64outputfiles) if not args.jsonoutput: print('Wrote csv file: {}'.format(csv_path)) @@ -386,6 +382,7 @@ def main(): except Exception as e: error_message = "Error running DC3-MWCP: {}".format(e) + traceback.print_exc() if args.jsonoutput: print(json.dumps({'errors': [error_message]})) else: diff --git a/mwcp/utils/construct/helpers.py b/mwcp/utils/construct/helpers.py index f7283c9..602a44b 100644 --- a/mwcp/utils/construct/helpers.py +++ b/mwcp/utils/construct/helpers.py @@ -3,6 +3,7 @@ from __future__ import division import base64 +import operator import os import io import re @@ -195,9 +196,9 @@ class TerminatedString(construct.StringEncoded): >>> TerminatedString(PascalString(Byte)).build(b'hello') '\x05hello' >>> TerminatedString(String(10)).parse(b'hello\x00\x02\x04FA') - u'hello' + 'hello' >>> TerminatedString(String(10)).parse(b'helloworld') - u'helloworld' + 'helloworld' >>> TerminatedString(GreedyString()).parse(b'this is a valid string\x00\x00 GARBAGE!') 'this is a valid string' >>> TerminatedString(PascalString(Byte)).parse(b'\x0Ahello\x00\x01\x03\x04F') @@ -385,11 +386,11 @@ class Printable(Validator): NOTE: A ValidationError is a type of ConstructError and will be cause if catching ConstructError. >>> Printable(String(5)).parse(b'hello') - u'hello' + 'hello' >>> Printable(String(5)).parse(b'he\x11o!') Traceback (most recent call last): ... - ValidationError: ('object failed validation', u'he\x11o!') + ValidationError: ('object failed validation', 'he\x11o!') >>> Printable(Bytes(3)).parse(b'\x01NO') Traceback (most recent call last): ... @@ -674,16 +675,16 @@ class PEPhysicalAddress(Adapter): ) e.g. - >>> with open(r'C:\32bit_exe', 'rb') as fo: - ... file_data = fo.read() - >>> pe = pefileutils.obtain_pe(file_data) - >>> PEPhysicalAddress(Int32ul, pe=pe).build(100) + >> with open(r'C:\32bit_exe', 'rb') as fo: + ... file_data = fo.read() + >> pe = pefileutils.obtain_pe(file_data) + >> PEPhysicalAddress(Int32ul, pe=pe).build(100) 'd\x00@\x00' - >>> PEPhysicalAddress(Int32ul, pe=pe).parse(b'd\x00@\x00') + >> PEPhysicalAddress(Int32ul, pe=pe).parse(b'd\x00@\x00') 100 - >>> PEPhysicalAddress(Int32ul).build(100, pe=pe) + >> PEPhysicalAddress(Int32ul).build(100, pe=pe) 'd\x00@\x00' - >>> PEPhysicalAddress(Int32ul).parse(b'd\x00@\x00', pe=pe) + >> PEPhysicalAddress(Int32ul).parse(b'd\x00@\x00', pe=pe) 100 """ def __init__(self, subcon, pe=None): @@ -939,21 +940,21 @@ class Regex(Construct): >>> regex = re.compile('\x01\x02(?P.{4})\x03\x04(?P[A-Za-z].*\x00)', re.DOTALL) >>> data = 'GARBAGE!\x01\x02\x0A\x00\x00\x00\x03\x04C:\Windows\x00MORE GARBAGE!' >>> Regex(regex, size=Int32ul, path=CString()).parse(data) - Container(path='C:\\Windows')(size=10) + Container(size=10)(path='C:\\Windows') >>> Regex(regex).parse(data) - Container(path='C:\\Windows\x00')(size='\n\x00\x00\x00') + Container(size='\n\x00\x00\x00')(path='C:\\Windows\x00') >>> Struct( ... 're' / Regex(regex, size=Int32ul, path=CString()), ... 'after_re' / Tell, ... 'garbage' / GreedyBytes ... ).parse(data) - Container(re=Container(path='C:\\Windows')(size=10))(after_re=27L)(garbage='MORE GARBAGE!') + Container(re=Container(size=10)(path='C:\\Windows'))(after_re=27L)(garbage='MORE GARBAGE!') >>> Struct( ... Embedded(Regex(regex, size=Int32ul, path=CString())), ... 'after_re' / Tell, ... 'garbage' / GreedyBytes ... ).parse(data) - Container(path='C:\\Windows')(size=10)(after_re=27L)(garbage='MORE GARBAGE!') + Container(size=10)(path='C:\\Windows')(after_re=27L)(garbage='MORE GARBAGE!') You can use Regex as a trigger to find a particular piece of data before you start parsing. >>> Struct( @@ -965,7 +966,7 @@ class Regex(Construct): If no data is captured, the associated subcon will received a stream with the position set at the location of that captured group. Thus, allowing you to use it as an anchor point. >>> Regex('hello (?P)world(?P.*)', anchor=Tell).parse('hello world!!!!') - Container(extra_data='!!!!')(anchor=6L) + Container(anchor=6L)(extra_data='!!!!') If no named capture groups are used, you can instead parse the entire matched string by supplying a subconstruct as a positional argument. (If no subcon is provided, the raw bytes are returned instead. @@ -1032,10 +1033,9 @@ def _parse(self, stream, context, path): raise ConstructError('regex did not match') try: - group_dict = match.groupdict() - + group_index = self.regex.groupindex # If there are no named groups. Return parsed full match instead. - if not group_dict: + if not group_index: if self.subcon: sub_stream = io.BytesIO(match.group()) return self.subcon._parse(sub_stream, context, path) @@ -1046,9 +1046,12 @@ def _parse(self, stream, context, path): obj = Container() context = Container(_=context) - # Default to displaying matched data as pure bytes. - obj.update(group_dict) - context.update(group_dict) + # Default to displaying matched data as pure bytes + # (inserted in the order they show up in the pattern) + for name, index in sorted(group_index.items(), key=operator.itemgetter(1)): + value = match.group(index) + obj[name] = value + context[name] = value # Parse groups using supplied constructs. for name, subcon in self.group_subcons.items(): diff --git a/mwcp/utils/custombase64.py b/mwcp/utils/custombase64.py index f923846..4ee4b58 100644 --- a/mwcp/utils/custombase64.py +++ b/mwcp/utils/custombase64.py @@ -2,17 +2,10 @@ Custom Base64 related utility """ +from future.builtins import str, bytes + import base64 import logging -import sys - - -PY3 = sys.version_info.major == 3 - -if PY3: - maketrans = bytes.maketrans -else: - from string import maketrans logger = logging.getLogger(__name__) @@ -64,21 +57,23 @@ def _adjust_pad(alphabet, data, decode): def _code(data, custom_alpha, size, decode, code_func): - if isinstance(custom_alpha, str if PY3 else unicode): + # TODO: Don't convert and require input to be bytes. + if isinstance(custom_alpha, str): custom_alpha = custom_alpha.encode() - if isinstance(data, str if PY3 else unicode): + if isinstance(data, str): data = data.encode() + _validate_alphabet(custom_alpha, size) if size != 16 and len(custom_alpha) == size: _adjust_pad(custom_alpha, data, decode) std_alpha = _STD_ALPHA[size] if decode: - table = maketrans(custom_alpha, std_alpha) + table = bytes.maketrans(custom_alpha, std_alpha) data = data.translate(table) return code_func(data) else: - table = maketrans(std_alpha, custom_alpha) + table = bytes.maketrans(std_alpha, custom_alpha) data = code_func(data) return data.translate(table) @@ -88,7 +83,7 @@ def b64encode(data, alphabet): def b64decode(data, alphabet): - data += alphabet[-1] * ((-len(data)) % 4) # Pad the data, if necessary + data += bytes([alphabet[-1]]) * ((-len(data)) % 4) # Pad the data, if necessary return _code(data, alphabet, 64, True, base64.b64decode) @@ -106,3 +101,7 @@ def b16encode(data, alphabet): def b16decode(data, alphabet): return _code(data, alphabet, 16, True, base64.b16decode) + +# To match base64 +encode = b64encode +decode = b64decode diff --git a/mwcp/utils/pefileutils.py b/mwcp/utils/pefileutils.py index 222ce6f..08db59b 100644 --- a/mwcp/utils/pefileutils.py +++ b/mwcp/utils/pefileutils.py @@ -3,6 +3,8 @@ python version: 2.7.8 """ +from __future__ import unicode_literals + import pefile import os diff --git a/setup.py b/setup.py index ec62e70..eb02b54 100755 --- a/setup.py +++ b/setup.py @@ -44,7 +44,6 @@ def read(fname): ] }, install_requires=[ - 'kordesii', 'bottle', 'construct==2.8.12', # pin version, since we patch this library 'future', @@ -52,5 +51,13 @@ def read(fname): 'pefile', 'requests', 'six', - ] + + # Testing + 'pytest', + 'pytest-console-scripts', + 'tox', + ], + extras_require={ + 'kordesii': ['kordesii'], + } ) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..8497faa --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,34 @@ + +import os + +import pytest + + +@pytest.fixture +def test_file(tmpdir): + """Fixture for providing a test file to pass to mwcp.""" + file_path = os.path.join(str(tmpdir), 'test.txt') + with open(file_path, 'wb') as f: + f.write(b"This is some test data!") + return file_path + + +TEST_PARSER = ''' +from mwcp import Parser + +class TestParser(Parser): + def __init__(self, reporter): + Parser.__init__(self, description="A test parser", author="Mr. Tester", reporter=reporter) + + def run(self): + pass +''' + + +@pytest.fixture +def test_parser(tmpdir): + """Creates and returns the file path to a test parser.""" + file_path = os.path.join(str(tmpdir), 'test_parser.py') + with open(file_path, 'w') as f: + f.write(TEST_PARSER) + return file_path diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..1844375 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,219 @@ +""" +Tests the CLI tools. +""" + +from __future__ import unicode_literals, print_function + +from future.builtins import open + +import re +import io +import json +import os +import sys + +from mwcp.tools import tool + + +def test_testcases(tmpdir, script_runner): + """Run mwcp-test on all test cases.""" + # Change working directory so we can cleanup outputted files. + cwd = str(tmpdir) + + # Run all parser tests. + ret = script_runner.run('mwcp-test', '-ta', cwd=cwd) + print(ret.stdout) + print(ret.stderr, file=sys.stderr) + assert ret.success + + +def test_parse(tmpdir, script_runner, test_file): + """Test running a parser""" + # Change working directory so we can cleanup outputted files. + cwd = str(tmpdir) + + # Run the foo parser on the test input file. + ret = script_runner.run('mwcp-tool', '-p', 'foo', test_file, cwd=cwd) + print(ret.stdout) + print(ret.stderr, file=sys.stderr) + assert ret.success + output = ret.stdout.replace(test_file, '[INPUT FILE PATH]') + assert output == \ +''' +----Standard Metadata---- + +url http://127.0.0.1 +address 127.0.0.1 + +----Debug---- + +size of inputfile is 23 bytes +outputfile: fooconfigtest.txt +operating on inputfile [INPUT FILE PATH] + +----Output Files---- + +fooconfigtest.txt example output file + 5eb63bbbe01eeed093cb22bb8f5acdc3 + +''' + + # Test the "-f" flag. + ret = script_runner.run('mwcp-tool', '-f', '-p', 'foo', test_file, cwd=cwd) + print(ret.stdout) + print(ret.stderr, file=sys.stderr) + assert ret.success + output = ret.stdout.replace(test_file, '[INPUT FILE PATH]') + assert output == \ +''' +----File Information---- + +inputfilename [INPUT FILE PATH] +md5 fb843efb2ffec987db12e72ca75c9ea2 +sha1 5e90c4c2be31a7a0be133b3dbb4846b0434bc2ab +sha256 fe5af8c641835c24f3bbc237a659814b96ed64d2898fae4cb3d2c0ac5161f5e9 + +----Standard Metadata---- + +url http://127.0.0.1 +address 127.0.0.1 + +----Debug---- + +size of inputfile is 23 bytes +outputfile: fooconfigtest.txt +operating on inputfile [INPUT FILE PATH] + +----Output Files---- + +fooconfigtest.txt example output file + 5eb63bbbe01eeed093cb22bb8f5acdc3 + +''' + + # Check that the output file was created + output_file = os.path.join(cwd, 'fooconfigtest.txt') + assert os.path.isfile(output_file) + + # Test the "-n" flag. + os.unlink(output_file) + assert not os.path.isfile(output_file) + ret = script_runner.run('mwcp-tool', '-n', '-p', 'foo', test_file, cwd=cwd) + assert ret.success + # We should still not have the output file + assert not os.path.isfile(output_file) + + +def test_list_parsers(script_runner): + """Tests the list parser feature.""" + # Test text out + ret = script_runner.run('mwcp-tool', '-l') + print(ret.stdout) + print(ret.stderr, file=sys.stderr) + assert ret.success + assert ret.stdout + assert "bar" in ret.stdout + assert "foo" in ret.stdout + + # Test json out + ret = script_runner.run('mwcp-tool', '-l', '-j') + print(ret.stdout) + print(ret.stderr, file=sys.stderr) + assert ret.success + output = json.loads(ret.stdout) + assert output == [ + ['bar', 'mwcp', 'DC3', 'example parser using the Dispatcher model'], + ['foo', 'mwcp', 'DC3', 'example parser that works on any file'] + ] + + +def test_list_fields(script_runner): + """Test the list fields features.""" + # Test text out + ret = script_runner.run('mwcp-tool', '--fields') + print(ret.stdout) + print(ret.stderr, file=sys.stderr) + assert ret.success + assert ret.stdout + assert "address" in ret.stdout + + # Test json out + ret = script_runner.run('mwcp-tool', '--fields', '--json') + print(ret.stdout) + print(ret.stderr, file=sys.stderr) + assert ret.success + output = json.loads(ret.stdout) + assert output + assert len(output) == 48 + assert "address" in output + assert output["address"]["type"] == "listofstrings" + + +def test_get_file_paths(tmpdir): + """Tests the _get_file_paths in mwcp-tool""" + # tests that it finds valid file paths. + assert tool._get_file_paths([tool.__file__], is_filelist=False) == [tool.__file__] + + # Test file list indirection + file_list = os.path.join(str(tmpdir), 'file_list.txt') + with open(file_list, 'w') as f: + f.write('file1.exe\n') + f.write('file2.exe') + + assert tool._get_file_paths([file_list], is_filelist=True) == ['file1.exe', 'file2.exe'] + + sys.stdin = io.StringIO('file3.exe\nfile4.exe') + assert tool._get_file_paths(["-"], is_filelist=True) == ['file3.exe', 'file4.exe'] + + +def test_csv(tmpdir, monkeypatch): + """Tests the csv feature.""" + # Mock time.ctime() + monkeypatch.setattr('time.ctime', lambda: '[TIMESTAMP]') + + input_files = ['file1.exe', 'file2.exe'] + results = [ + { + 'other': {'field1': 'value1', 'field2': ['value2', 'value3']}, + 'outputfile': [['out_name', 'out_desc', 'out_md5'], ['out_name2', 'out_desc2', 'out_md52']], + 'address': ['https://google.com', 'ftp://amazon.com'] + }, + { + 'a': ['b', 'c'], + } + ] + csv_path = os.path.join(str(tmpdir), 'test.csv') + + tool._write_csv(input_files, results, csv_path) + + expected = ( + 'scan_date,inputfilename,outputfile.name,outputfile.description,outputfile.md5,a,address,other.field1,other.field2\n' + '[TIMESTAMP],file1.exe,"out_name\nout_name2","out_desc\nout_desc2","out_md5\nout_md52",,"https://google.com\nftp://amazon.com",value1,"value2\nvalue3"\n' + '[TIMESTAMP],file2.exe,,,,"b\nc",,,\n' + ) + with open(csv_path, 'r') as fo: + assert fo.read() == expected + + +def test_csv_cli(tmpdir, script_runner, test_file): + """Tests the csv feature on the command line.""" + cwd = str(tmpdir) + csv_path = os.path.join(cwd, 'csv_file.csv') + ret = script_runner.run('mwcp-tool', '-p', 'foo', '-n', test_file, '-c', csv_path, cwd=cwd) + print(ret.stdout) + print(ret.stderr, file=sys.stderr) + + assert ret.success + assert os.path.exists(csv_path) + + expected = ( + 'scan_date,inputfilename,outputfile.name,outputfile.description,outputfile.md5,address,debug,url\n' + '[TIMESTAMP],[INPUT FILE PATH],fooconfigtest.txt,example output file,5eb63bbbe01eeed093cb22bb8f5acdc3,127.0.0.1,' + '"size of inputfile is 23 bytes\noperating on inputfile [INPUT FILE PATH]",http://127.0.0.1\n' + + ) + with open(csv_path, 'r') as fo: + results = fo.read().replace(test_file, '[INPUT FILE PATH]') + # Can't mock timestamp this time, so we are just going to have to use regex to replace it. + results = re.sub('\n.*,\[', '\n[TIMESTAMP],[', results) + assert results == expected \ No newline at end of file diff --git a/tests/test_custombase64.py b/tests/test_custombase64.py new file mode 100644 index 0000000..6d856e1 --- /dev/null +++ b/tests/test_custombase64.py @@ -0,0 +1,23 @@ +"""Tests mwcp.utils.custombase64""" + +from __future__ import unicode_literals + +from mwcp.utils import custombase64 + + +def test_base64(): + custom_alphabet = b'EFGHQRSTUVWefghijklmnopIJKLMNOPABCDqrstuvwxyXYZabcdz0123456789+/=' + assert custombase64.encode(b'hello world', custom_alphabet) == b'LSoXMS8BO29dMSj=' + assert custombase64.decode(b'LSoXMS8BO29dMSj=', custom_alphabet) == b'hello world' + + +def test_base32(): + custom_alphabet = b'FGHIJQ345RSTUVWXYKLMABCDENOPZ267=' + assert custombase64.b32encode(b'hello world', custom_alphabet) == b'VGLCEPIXJGPC6ZMUUY======' + assert custombase64.b32decode(b'VGLCEPIXJGPC6ZMUUY======', custom_alphabet) == b'hello world' + + +def test_base16(): + custom_alphabet = b'78BDE0123F459A6C' + assert custombase64.b16encode(b'hello world', custom_alphabet) == b'131019191CB7221C2B191E' + assert custombase64.b16decode(b'131019191CB7221C2B191E', custom_alphabet) == b'hello world' diff --git a/tests/test_dispatcher.py b/tests/test_dispatcher.py new file mode 100644 index 0000000..fcc6760 --- /dev/null +++ b/tests/test_dispatcher.py @@ -0,0 +1,102 @@ +"""Tests the Dispatcher and FileObject functionality.""" + +from __future__ import unicode_literals + +import codecs +import os + +import pytest + +import mwcp + + +@pytest.fixture +def components(): + """ + Setup for testing some of the dispatcher components. + (Set it as a fixture so we can reuse the variables without having to remake) + """ + reporter = mwcp.Reporter() + file_A = mwcp.FileObject(b'This is file A', reporter, file_name='A_match.txt', output_file=False) + file_B = mwcp.FileObject(b'This is file B', reporter, file_name='B_match.txt', output_file=False) + file_C = mwcp.FileObject(b'This is file C', reporter, file_name='no_match.txt', output_file=False) + + class A(mwcp.ComponentParser): + DESCRIPTION = 'A Component' + @classmethod + def identify(cls, file_object): + return file_object.file_name == 'A_match.txt' + + def run(self): + self.dispatcher.add_to_queue(file_B) + self.dispatcher.add_to_queue(file_C) + + class B(mwcp.ComponentParser): + DESCRIPTION = 'B Component' + @classmethod + def identify(cls, file_object): + return file_object.file_name == 'B_match.txt' + + dispatcher = mwcp.Dispatcher(reporter, [A, B]) + + return locals() + + +def test_identify_file(components): + """Tests the _identify_file""" + dispatcher = components['dispatcher'] + assert list(dispatcher._identify_file(components['file_A'])) == [components['A']] + assert list(dispatcher._identify_file(components['file_B'])) == [components['B']] + assert list(dispatcher._identify_file(components['file_C'])) == [mwcp.UnidentifiedFile] + + +@pytest.mark.parametrize("input_file,expected", [ + ('file_A', {'file_A': 'A Component', 'file_B': 'B Component', 'file_C': 'Unidentified file'}), + ('file_B', {'file_A': None, 'file_B': 'B Component', 'file_C': None}), + ('file_C', {'file_A': None, 'file_B': None, 'file_C': 'Unidentified file'}), +]) +def test_dispatch(components, input_file, expected): + """Test dispatching files.""" + dispatcher = components['dispatcher'] + input_file = components[input_file] + + dispatcher.add_to_queue(input_file) + + # sanity check + for file in ('file_A', 'file_B', 'file_C'): + assert components[file].description is None + + dispatcher.dispatch() + + # make sure the correct files have been identified. + for file, description in sorted(expected.items()): + assert components[file].description == description + + +def test_file_object(tmpdir): + """Tests the mwcp.FileObject class""" + output_dir = str(tmpdir) + reporter = mwcp.Reporter(tempdir=output_dir, outputdir=output_dir) + file_object = mwcp.FileObject(b'This is some test data!', reporter) + + assert file_object.file_name == 'fb843efb2ffec987db12e72ca75c9ea2.bin' + assert file_object.file_data == b'This is some test data!' + assert codecs.encode(file_object.md5, 'hex') == b'fb843efb2ffec987db12e72ca75c9ea2' + assert file_object.resources is None + assert file_object.pe is None + assert file_object.file_path.startswith(os.path.join(output_dir, 'mwcp-managed_tempdir-')) + + with file_object as fo: + assert fo.read() == b'This is some test data!' + + assert not reporter.outputfiles + file_object.output() + file_path = os.path.join(output_dir, 'fb843efb2ffec987db12e72ca75c9ea2.bin') + assert file_object.file_name in reporter.outputfiles + assert reporter.outputfiles[file_object.file_name] == { + 'data': b'This is some test data!', + 'path': file_path, + 'description': '', + 'md5': 'fb843efb2ffec987db12e72ca75c9ea2' + } + assert os.path.exists(file_path) diff --git a/tests/test_parser_registry.py b/tests/test_parser_registry.py new file mode 100644 index 0000000..e0b678e --- /dev/null +++ b/tests/test_parser_registry.py @@ -0,0 +1,43 @@ +"""Tests parser registration functionality.""" + +from __future__ import unicode_literals + +import collections +import os + +import mwcp + + +def test_register_parser_directory(monkeypatch, test_parser): + # Monkey patch parsers registration so previous test runs don't muck with this. + monkeypatch.setattr('mwcp.parsers._PARSERS', collections.defaultdict(dict)) + + # Test registration + assert not list(mwcp.iter_parsers('test_parser')) + mwcp.register_parser_directory(os.path.dirname(test_parser)) + parsers = list(mwcp.iter_parsers('test_parser')) + assert len(parsers) == 1 + + # Test it was register properly + name, source_name, klass = parsers[0] + assert name == 'test_parser' + assert source_name == os.path.dirname(test_parser) + + # Test we can also pull by source name. + parsers = list(mwcp.iter_parsers(source=os.path.dirname(test_parser))) + assert len(parsers) == 1 + parsers = list(mwcp.iter_parsers(os.path.dirname(test_parser) + ':')) + assert len(parsers) == 1 + + +def test_parsers_descriptions(monkeypatch, test_parser): + monkeypatch.setattr('mwcp.parsers._PARSERS', collections.defaultdict(dict)) + mwcp.register_parser_directory(os.path.dirname(test_parser)) + descriptions = list(mwcp.get_parser_descriptions('test_parser')) + assert len(descriptions) == 1 + assert descriptions[0] == ( + 'test_parser', + os.path.dirname(test_parser), + 'Mr. Tester', + 'A test parser' + ) diff --git a/tests/test_reporter.py b/tests/test_reporter.py new file mode 100644 index 0000000..693b3e1 --- /dev/null +++ b/tests/test_reporter.py @@ -0,0 +1,115 @@ +"""Tests the mwcp.Reporter object.""" + +from __future__ import unicode_literals + +import os + +import pytest + +import mwcp + + +def test_managed_tempdir(tmpdir): + reporter = mwcp.Reporter(tempdir=str(tmpdir)) + managed_tempdir = reporter.managed_tempdir() + assert os.path.exists(managed_tempdir) + assert managed_tempdir.startswith(os.path.join(str(tmpdir), 'mwcp-managed_tempdir-')) + + +@pytest.mark.parametrize('key,value,expected', [ + ('filepath', br'C:\dir\file.txt', { + 'filepath': [r'C:\dir\file.txt'], + 'filename': ['file.txt'], + 'directory': [r'C:\dir'] + }), + ('servicedll', br'C:\Windows\Temp\1.tmp', { + 'servicedll': [r'C:\Windows\Temp\1.tmp'], + 'filepath': [r'C:\Windows\Temp\1.tmp'], + 'filename': ['1.tmp'], + 'directory': [r'C:\Windows\Temp'] + }), + ('c2_url', b'http://[fe80::20c:1234:5678:9abc]:80/badness', { + 'c2_url': ['http://[fe80::20c:1234:5678:9abc]:80/badness'], + 'url': ['http://[fe80::20c:1234:5678:9abc]:80/badness'], + 'urlpath': ['/badness'], + 'c2_socketaddress': [['fe80::20c:1234:5678:9abc', '80', 'tcp']], + 'socketaddress': [['fe80::20c:1234:5678:9abc', '80', 'tcp']], + 'c2_address': ['fe80::20c:1234:5678:9abc'], + 'address': ['fe80::20c:1234:5678:9abc'], + 'port': [['80', 'tcp']] + }), + ('url', b'ftp://127.0.0.1/really/bad?hostname=pwned', { + 'url': ['ftp://127.0.0.1/really/bad?hostname=pwned'], + 'urlpath': ['/really/bad'], + 'address': ['127.0.0.1'] + }), + ('proxy', (b'admin', b'pass', b'192.168.1.1', b'80', 'tcp'), { + 'proxy': [['admin', 'pass', '192.168.1.1', '80', 'tcp']], + 'proxy_socketaddress': [['192.168.1.1', '80', 'tcp']], + 'socketaddress': [['192.168.1.1', '80', 'tcp']], + 'proxy_address': ['192.168.1.1'], + 'address': ['192.168.1.1'], + 'port': [['80', 'tcp']], + 'credential': [['admin', 'pass']], + 'password': ['pass'], + 'username': ['admin'] + }), + ('rsa_private_key', ('0x07', '0xbb', '0x17', '0x11', '0x0b', '0x07', '0x03', '0x0e'), { + 'rsa_private_key': [['0x07', '0xbb', '0x17', '0x11', '0x0b', '0x07', '0x03', '0x0e']] + }), + # Test auto padding. + ('rsa_private_key', ('0x07', '0xbb', '0x17', '0x11', '0x0b'), { + 'rsa_private_key': [['0x07', '0xbb', '0x17', '0x11', '0x0b', '', '', '']] + }), + ('other', {b'foo': b'bar', 'biz': 'baz'}, { + 'other': { + 'foo': 'bar', + 'biz': 'baz' + } + }) +]) +def test_add_metadata(key, value, expected): + reporter = mwcp.Reporter() + reporter.add_metadata(key, value) + + # Print out the debug messages to make a more useful message if we fail. + debug = reporter.metadata.get('debug', None) + if debug: + print('\n'.join(debug)) + del reporter.metadata['debug'] + + # We shouldn't have any error messages. If we do that means an exception has occurred. + # Lets raise that exception to create a more useful message. + errors = reporter.errors + if errors: + raise AssertionError('\n'.join(errors)) + + assert reporter.metadata == expected + + +def test_other_add_metadata(): + """Tests that adding multiple 'other' keys of same will convert to a list.""" + reporter = mwcp.Reporter() + reporter.add_metadata('other', {b'foo': b'bar', 'biz': 'baz'}) + assert reporter.metadata == {'other': {'foo': 'bar', 'biz': 'baz'}} + reporter.add_metadata('other', {b'foo': b'boop'}) + assert reporter.metadata == {'other': {'foo': ['bar', 'boop'], 'biz': 'baz'}} + + +def test_output_file(tmpdir): + output_dir = str(tmpdir) + reporter = mwcp.Reporter(outputdir=output_dir) + reporter.output_file(b'This is data!', 'foo.txt', description='A foo file') + + file_path = os.path.join(output_dir, 'foo.txt') + assert os.path.exists(file_path) + with open(file_path, 'rb') as fo: + assert fo.read() == b'This is data!' + assert reporter.outputfiles['foo.txt'] == { + 'data': b'This is data!', + 'description': 'A foo file', + 'md5': '9c91e665b5b7ba5a3066c92dd02d3d7c', + 'path': file_path + } + assert reporter.metadata['outputfile'] == [['foo.txt', 'A foo file', '9c91e665b5b7ba5a3066c92dd02d3d7c']] + diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..1202b4c --- /dev/null +++ b/tox.ini @@ -0,0 +1,14 @@ +[pytest] +script_launch_mode = subprocess +norecursedirs = docs build *.egg-info .git .tox .pytest_cache + +[tox] +envlist = py27,py36 + +[testenv] +deps = pytest +commands = pytest + +; Only run doctest for Python 2 since collection seems to stall and crash in 3. +[testenv:py27] +commands = pytest --doctest-modules