Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Choosing Dll exports in a smarter way (like Cuckoo) #185

Merged
merged 2 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
119 changes: 112 additions & 7 deletions cape/cape_main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
from email.header import decode_header
from json import JSONDecodeError, loads
from math import ceil
import os
from pefile import PE, PEFormatError
from random import choice, random
from re import compile, match
from sys import maxsize, setrecursionlimit
import requests
from retrying import retry, RetryError
from zipfile import ZipFile
from SetSimilaritySearch import SearchIndex
from sys import maxsize, setrecursionlimit
from tempfile import SpooledTemporaryFile
from time import sleep
from threading import Thread
from time import sleep
from typing import Optional, Dict, List, Any, Set, Tuple
from zipfile import ZipFile

from assemblyline_v4_service.common.api import ServiceAPIError
from assemblyline_v4_service.common.base import ServiceBase
Expand Down Expand Up @@ -1386,7 +1389,7 @@ def _set_task_parameters(
route = self.request.get_param("routing")

if "dll" in self.request.file_type:
self._prepare_dll_submission(task_options)
self._prepare_dll_submission(task_options, parent_section)

# This is a CAPE workaround because otherwise CAPE will extract an archive
# into extracted files and submit each as a separate task
Expand Down Expand Up @@ -1502,19 +1505,121 @@ def _get_available_images(
available_images.add(image)
return list(available_images)

def _prepare_dll_submission(self, task_options: List[str]) -> None:
def _prepare_dll_submission(self, task_options: List[str], parent_section: ResultSection) -> None:
"""
This method handles if a specific function was requested to be run for a DLL, or what functions to run for a DLL
:param task_options: A list of parameters detailing the specifics of the task
:param parent_section: The overarching result section detailing what image this task is being sent to
:return: None
"""
dll_function = self.request.get_param("dll_function")
# Do DLL specific stuff
if dll_function:
task_options.append(f"function={dll_function}")

# Check to see if there are pipes in the dll_function
# This is reliant on analyzer/windows/modules/packages/dll.py
if ":" in dll_function:
task_options.append("enable_multi=true")

if not dll_function:
self._parse_dll(task_options, parent_section)

def _parse_dll(self, task_options: List[str], parent_section: ResultSection) -> None:
"""
This method parses a DLL file and determines which functions to try and run with the DLL
:param task_options: A list of parameters detailing the specifics of the task
:param parent_section: The overarching result section detailing what image this task is being sent to
:return: None
"""
exports_available: List[str] = []
exports_to_run: List[str] = []
# We have a DLL file, but no user specified function(s) to run. let's try to pick a few...
# This is reliant on analyzer/windows/modules/packages/dll_multi.py
dll_parsed = self._create_pe_from_file_contents()

# Do we have any exports?
if hasattr(dll_parsed, "DIRECTORY_ENTRY_EXPORT"):
for export_symbol in dll_parsed.DIRECTORY_ENTRY_EXPORT.symbols:
if export_symbol.name is not None:
if type(export_symbol.name) == str:
exports_available.append(export_symbol.name)
elif type(export_symbol.name) == bytes:
exports_available.append(export_symbol.name.decode())
else:
exports_available.append(f"#{export_symbol.ordinal}")
else:
# No Exports available? Try DllMain and DllRegisterServer
exports_available.append("DllMain")
exports_available.append("DllRegisterServer")

max_dll_exports = self.config.get("max_dll_exports_exec", 5)

# If the number of available exports is greater than the maximum number of
# exports that we want to run, we will be prioritizing by the following:
# 1. well known exports (dllRegisterServer, etc)
# 2. first exports (10%)
# 3. last exports (10%)
# 4. least common exports (80% - 2 exports for DllRegisterServer and DllMain)
if len(exports_available) > max_dll_exports:
ten_percent_of_exports = ceil(max_dll_exports * 0.1)

# add well-known exports
exports_to_run.extend(["DllMain", "DllRegisterServer"])

# first exports
exports_to_run.extend(exports_available[:ten_percent_of_exports])

# last exports
exports_to_run.extend(exports_available[-1*ten_percent_of_exports:])

# least common exports
index = SearchIndex(exports_available, similarity_func_name='jaccard', similarity_threshold=0.1)
similarity_scores=[]
for exp in exports_available:
res = index.query(exp)
avg_sim = sum(x[1] for x in res)/len(res)
similarity_scores.append((avg_sim, exp))

for _, name in sorted(similarity_scores):
if len(exports_to_run) < max_dll_exports:
if not name in exports_to_run:
exports_to_run.append(name)
else:
break
else:
exports_to_run = exports_available[:max_dll_exports]

task_options.append(f"function={':'.join(exports_to_run)}")
task_options.append("enable_multi=true")
task_options.append("use_export_name=true")
task_options.append(f"max_dll_exports={self.config['max_dll_exports_exec']}")

self.log.debug(
f"Trying to run DLL with following function(s): {':'.join(exports_to_run)}")

if len(exports_available) > 0:
dll_multi_section = ResultTextSection("Executed Multiple DLL Exports")
dll_multi_section.add_line(
f"The following exports were executed: {', '.join(exports_to_run)}")
remaining_exports = set(exports_available) - set(exports_to_run)
if len(remaining_exports) > 0:
available_exports_str = (", ").join(sorted(list(remaining_exports)))
dll_multi_section.add_line(f"There were {len(remaining_exports)} other exports: {available_exports_str}")

parent_section.add_subsection(dll_multi_section)

# Isolating this sequence out because I can't figure out how to mock PE construction
def _create_pe_from_file_contents(self) -> PE:
"""
This file parses a DLL file and handles PEFormatErrors
:return: An optional parsed PE
"""
# TODO: What is this type?
dll_parsed = None
try:
dll_parsed = PE(data=self.request.file_contents)
except PEFormatError as e:
self.log.warning(f"Could not parse PE file due to {safe_str(e)}")
return dll_parsed

def _generate_report(
self,
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
retrying
pefile
SetSimilaritySearch
64 changes: 57 additions & 7 deletions tests/test_cape_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1499,25 +1499,75 @@ def test_set_task_parameters(params, cape_class_instance, dummy_request_class, m
[
({"dll_function": ""}),
({"dll_function": "blah"}),
({"dll_function": "blah,blah"}),
({"dll_function": "blah:blah"}),
({"dll_function": ""}),
]
)
def test_prepare_dll_submission(params, cape_class_instance, dummy_request_class):
kwargs = dict()
correct_kwargs = dict()
def test_prepare_dll_submission(params, cape_class_instance, dummy_request_class, mocker):
mocker.patch.object(CAPE, '_parse_dll', return_value=None)
task_options = []
correct_task_options = []
parent_section = ResultSection("blah")

dll_function = params["dll_function"]
if dll_function:
correct_task_options.append(f'function={dll_function}')
correct_task_options.extend(["enable_multi=true", "use_export_name=true", "max_dll_exports=5"])
if ":" in dll_function:
correct_task_options.append("enable_multi=true")

cape_class_instance.request = dummy_request_class(**params)
cape_class_instance._prepare_dll_submission(task_options)
assert kwargs == correct_kwargs
cape_class_instance._prepare_dll_submission(task_options, parent_section)
assert task_options == correct_task_options

@staticmethod
@pytest.mark.parametrize("dll_parsed", [None, "blah"])
def test_parse_dll(dll_parsed, cape_class_instance, mocker):
task_options = []

# Dummy Symbol class
class Symbol(object):
def __init__(self, name):
self.name = name
self.ordinal = "blah"

# Dummy DIRECTORY_ENTRY_EXPORT class
class DirectoryEntryExport(object):
def __init__(self):
self.symbols = [
Symbol(None),
Symbol("blah"),
Symbol(b"blah"),
Symbol("blah2"),
Symbol("blah3"),
Symbol("blah4")]

# Dummy PE class
class FakePE(object):
def __init__(self):
self.DIRECTORY_ENTRY_EXPORT = DirectoryEntryExport()

parent_section = ResultSection("blah")

if dll_parsed is None:
PE = None
correct_task_options = ['function=DllMain:DllRegisterServer', 'enable_multi=true']
correct_result_section = ResultSection(
title_text="Executed Multiple DLL Exports",
body=f"The following exports were executed: DllMain, DllRegisterServer"
)
else:
PE = FakePE()
correct_task_options = ['function=DllMain:DllRegisterServer:#blah:blah4:blah2', 'enable_multi=true']
correct_result_section = ResultSection(
title_text="Executed Multiple DLL Exports",
body="The following exports were executed: DllMain, DllRegisterServer, #blah, blah4, blah2"
)
correct_result_section.add_line("There were 2 other exports: blah, blah3")

mocker.patch.object(CAPE, '_create_pe_from_file_contents', return_value=PE)
cape_class_instance._parse_dll(task_options, parent_section)
assert task_options == correct_task_options
assert check_section_equality(parent_section.subsections[0], correct_result_section)

@staticmethod
@pytest.mark.parametrize("zip_report", [None, "blah"])
Expand Down