Skip to content

Commit

Permalink
Initial commit to fix Parser multiprocessing pool is not repopulated #…
Browse files Browse the repository at this point in the history
  • Loading branch information
andresriancho committed Jun 30, 2017
1 parent 772f769 commit 27c6e25
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 125 deletions.
3 changes: 3 additions & 0 deletions w3af/core/controllers/dependency_check/requirements.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@

# tldextract extracts the tld from any domain name
PIPDependency('tldextract', 'tldextract', '1.7.2'),

# pebble multiprocessing
PIPDependency('pebble', 'pebble', '4.3.2'),
]

GUI_PIP_EXTRAS = [PIPDependency('xdot', 'xdot', '0.6')]
Expand Down
138 changes: 23 additions & 115 deletions w3af/core/data/parsers/mp_document_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@
"""
from __future__ import with_statement, print_function

import os
import signal
import atexit
import threading
import multiprocessing

from concurrent.futures import TimeoutError
from tblib.decorators import Error
from pebble import ProcessPool

import w3af.core.controllers.output_manager as om

from w3af.core.controllers.profiling import start_profiling_no_core
from w3af.core.controllers.threads.process_pool import ProcessPool
from w3af.core.controllers.threads.is_main_process import is_main_process
from w3af.core.controllers.output_manager import log_sink_factory
from w3af.core.controllers.exceptions import BaseFrameworkException
Expand All @@ -43,7 +43,6 @@
from w3af.core.controllers.profiling.pytracemalloc import user_wants_pytracemalloc
from w3af.core.controllers.profiling.cpu_usage import user_wants_cpu_profiling
from w3af.core.data.parsers.document_parser import DocumentParser
from w3af.core.data.parsers.utils.request_uniq_id import get_request_unique_id


class MultiProcessingDocumentParser(object):
Expand Down Expand Up @@ -71,7 +70,6 @@ class MultiProcessingDocumentParser(object):

def __init__(self):
self._pool = None
self._processes = None
self._start_lock = threading.RLock()

def start_workers(self):
Expand All @@ -82,15 +80,10 @@ def start_workers(self):
with self._start_lock:
if self._pool is None:

# pylint: disable=E1101
# Keep track of which pid is processing which http response
self._processes = manager.dict()
# pylint: enable=E1101

# The pool
# Start the process pool
log_queue = om.manager.get_in_queue()
self._pool = ProcessPool(self.MAX_WORKERS,
maxtasksperchild=20,
max_tasks=20,
initializer=init_worker,
initargs=(log_queue,))

Expand All @@ -102,49 +95,9 @@ def stop_workers(self):
:return: None
"""
if self._pool is not None:
self._pool.terminate()
self._pool.stop()
self._pool = None

if self._processes is not None:
self._processes.clear()
self._processes = None

def _kill_parser_process(self, hash_string, http_response):
"""
Kill the process that's handling the parsing of http_response which
can be identified by hash_string
:param hash_string: The hash for the http_response
:param http_response: The HTTP response which is being parsed
:return: None
"""
# Near the timeout error, so we make sure that the pid is still
# running our "buggy" input
pid = self._processes.pop(hash_string, None)
if pid is not None:
try:
os.kill(pid, signal.SIGTERM)
except OSError, ose:
msg = ('An error occurred while killing the parser'
' process: "%s"')
om.out.debug(msg % ose)

msg = ('[timeout] The parser took more than %s seconds to complete'
' parsing of "%s", killed it!')

if self.PROFILING_ENABLED:
msg += (' You are running a profiling session which requires more'
' CPU and resources to be run; the'
' MultiProcessingDocumentParser failed to parse the HTML'
' document. Try to increase the PARSER_TIMEOUT and try'
' again.\n\n'
'This issue invalidates the profiling session!\n\n'
'See issue #9713 for more information'
' https://github.com/andresriancho/w3af/issues/9713')

log_function = om.out.error if self.PROFILING_ENABLED else om.out.debug
log_function(msg % (self.PARSER_TIMEOUT, http_response.get_url()))

def get_document_parser_for(self, http_response):
"""
Get a document parser for http_response
Expand All @@ -160,33 +113,29 @@ def get_document_parser_for(self, http_response):
# Start the worker processes if needed
self.start_workers()

hash_string = get_request_unique_id(http_response, prepend='parser')

apply_args = (process_document_parser,
http_response,
self._processes,
hash_string,
self.DEBUG)

# Push the task to the workers
result = self._pool.apply_async(apply_with_return_error, (apply_args,))
future = self._pool.schedule(apply_with_return_error,
args=(apply_args,),
timeout=self.PARSER_TIMEOUT)

try:
parser_output = result.get(timeout=self.PARSER_TIMEOUT)
except multiprocessing.TimeoutError:
self._kill_parser_process(hash_string, http_response)

parser_output = future.result()
except TimeoutError:
# Act just like when there is no parser
msg = 'There is no parser for "%s".' % http_response.get_url()
raise BaseFrameworkException(msg)
msg = ('[timeout] The parser took more than %s seconds'
' to complete parsing of "%s", killed it!')

args = (self.PARSER_TIMEOUT, http_response.get_url())

raise BaseFrameworkException(msg % args)
else:
if isinstance(parser_output, Error):
parser_output.reraise()

finally:
# Just remove it so it doesn't use memory
self._processes.pop(hash_string, None)

return parser_output

def get_tags_by_filter(self, http_response, tags, yield_text=False):
Expand Down Expand Up @@ -218,24 +167,20 @@ def get_tags_by_filter(self, http_response, tags, yield_text=False):
# Start the worker processes if needed
self.start_workers()

hash_string = get_request_unique_id(http_response, prepend='tags')

apply_args = (process_get_tags_by_filter,
http_response,
tags,
yield_text,
self._processes,
hash_string,
self.DEBUG)

# Push the task to the workers
result = self._pool.apply_async(apply_with_return_error, (apply_args,))
future = self._pool.schedule(apply_with_return_error,
args=(apply_args,),
timeout=self.PARSER_TIMEOUT)

try:
filtered_tags = result.get(timeout=self.PARSER_TIMEOUT)
except multiprocessing.TimeoutError:
self._kill_parser_process(hash_string, http_response)

filtered_tags = future.result()
except TimeoutError:
# We hit a timeout, return an empty list
return []
else:
Expand All @@ -244,23 +189,14 @@ def get_tags_by_filter(self, http_response, tags, yield_text=False):
if isinstance(filtered_tags, Error):
return []

finally:
# Just remove it so it doesn't use memory
self._processes.pop(hash_string, None)

return filtered_tags


def process_get_tags_by_filter(http_resp, tags, yield_text,
processes, hash_string, debug):
def process_get_tags_by_filter(http_resp, tags, yield_text, debug):
"""
Simple wrapper to get the current process id and store it in a shared object
so we can kill the process if needed.
"""
# Save this for tracking
pid = multiprocessing.current_process().pid
processes[hash_string] = pid

document_parser = DocumentParser(http_resp)

# Not all parsers have tags
Expand All @@ -274,14 +210,12 @@ def process_get_tags_by_filter(http_resp, tags, yield_text,
return filtered_tags


def process_document_parser(http_resp, processes, hash_string, debug):
def process_document_parser(http_resp, debug):
"""
Simple wrapper to get the current process id and store it in a shared object
so we can kill the process if needed.
"""
# Save this for tracking
pid = multiprocessing.current_process().pid
processes[hash_string] = pid

if debug:
msg = '[mp_document_parser] PID %s is starting to parse %s'
Expand Down Expand Up @@ -313,10 +247,6 @@ def cleanup_pool():
if 'mp_doc_parser' in globals():
mp_doc_parser.stop_workers()

# to be safe -- explicitly shutting down the manager
if 'manager' in globals():
manager.shutdown()


def init_worker(log_queue):
"""
Expand All @@ -331,27 +261,5 @@ def init_worker(log_queue):
start_profiling_no_core()


def init_manager():
"""
Initializer for SyncManager
:see: https://jtushman.github.io/blog/2014/01/14/python-%7C-multiprocessing-and-interrupts/
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)


def Manager():
"""
Returns a manager associated with a running server process
The managers methods such as `Lock()`, `Condition()` and `Queue()`
can be used to create shared objects.
"""
from multiprocessing.managers import SyncManager
m = SyncManager()
m.start(init_manager)
return m


if is_main_process():
manager = Manager()
mp_doc_parser = MultiProcessingDocumentParser()
20 changes: 10 additions & 10 deletions w3af/core/data/parsers/tests/test_mp_document_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ def test_parser_timeout(self):

try:
self.mpdoc.get_document_parser_for(http_resp)
except BaseFrameworkException:
self._is_timeout_exception_message(om_mock, http_resp)
except BaseFrameworkException, bfe:
self._is_timeout_exception_message(bfe, om_mock, http_resp)
else:
self.assertTrue(False)

Expand Down Expand Up @@ -150,8 +150,8 @@ def test_many_parsers_timing_out(self):

try:
self.mpdoc.get_document_parser_for(http_resp)
except BaseFrameworkException:
self._is_timeout_exception_message(om_mock, http_resp)
except BaseFrameworkException, bfe:
self._is_timeout_exception_message(bfe, om_mock, http_resp)
else:
self.assertTrue(False)

Expand All @@ -164,8 +164,8 @@ def test_many_parsers_timing_out(self):

try:
parser = self.mpdoc.get_document_parser_for(http_resp)
except BaseFrameworkException:
self._is_timeout_exception_message(om_mock, http_resp)
except BaseFrameworkException, bfe:
self._is_timeout_exception_message(bfe, om_mock, http_resp)
else:
self.assertIsInstance(parser._parser, HTMLParser)

Expand Down Expand Up @@ -246,14 +246,14 @@ def test_parser_with_large_attr_killed_when_sending_to_queue(self):
parser = self.mpdoc.get_document_parser_for(http_resp)
self.assertIsInstance(parser._parser, HTMLParser)

def _is_timeout_exception_message(self, om_mock, http_resp):
msg = '[timeout] The parser took more than %s seconds' \
' to complete parsing of "%s", killed it!'
def _is_timeout_exception_message(self, bfe, om_mock, http_resp):
msg = ('[timeout] The parser took more than %s seconds to '
'complete parsing of "%s", killed it!')

error = msg % (MultiProcessingDocumentParser.PARSER_TIMEOUT,
http_resp.get_url())

self.assertIn(call.debug(error), om_mock.mock_calls)
self.assertEquals(str(bfe), error)

def test_daemon_child(self):
"""
Expand Down

0 comments on commit 27c6e25

Please sign in to comment.