Skip to content

Commit

Permalink
Fixed concurrency issues with trace compression
Browse files Browse the repository at this point in the history
  • Loading branch information
andresriancho committed Apr 24, 2019
1 parent 1907338 commit 21437e5
Showing 1 changed file with 116 additions and 43 deletions.
159 changes: 116 additions & 43 deletions w3af/core/data/db/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ class HistoryItem(object):
_UNCOMPRESSED_FILES = 50
_COMPRESSION_LEVEL = 7

_MIN_FILE_COUNT = _COMPRESSED_FILE_BATCH + _UNCOMPRESSED_FILES

_pending_compression_jobs = []
_latest_compression_job_end = 0

id = None
_request = None
_response = None
Expand All @@ -96,6 +101,7 @@ class HistoryItem(object):
time = 0.2

history_lock = threading.RLock()
compression_lock = threading.RLock()

def __init__(self):
self._db = get_default_temp_db_instance()
Expand Down Expand Up @@ -389,7 +395,9 @@ def read(self, _id, full=True):
return result_item

def save(self):
"""Save object into DB."""
"""
Save History instance to DB and disk
"""
resp = self.response
code = int(resp.get_code()) / 100

Expand Down Expand Up @@ -464,60 +472,111 @@ def save(self):
req_res.write(msgpack_data)
req_res.close()

self._compress_old_traces()

response_id = resp.get_id()
self._queue_compression_requests(response_id)

pending_compression = self._get_pending_compression_job()

if pending_compression is not None:
self._process_pending_compression(pending_compression)

return True

def _compress_old_traces(self):
def _get_pending_compression_job(self):
with HistoryItem.compression_lock:
try:
return self._pending_compression_jobs.pop(0)
except IndexError:
return None

def _queue_compression_requests(self, response_id):
"""
Every N calls to save() check if there are enough files to compress in
the session directory and create a PendingCompressionJob instance.
Save that instance in _pending_compression_jobs. The _get_pending_compression_job
method will be used to read jobs from that list in a thread-safe way.
:param response_id:
:return:
"""
We'll compress 150 of the oldest files in the session directory.
# Performance boost that prevents the disk access and lock from below
# from running on each save()
if response_id % 100 != 0:
return

with HistoryItem.compression_lock:
#
# Get the list of files to compress, checking that we have enough to
# proceed with compression
#
session_dir = self._session_dir

files = [os.path.join(session_dir, f) for f in os.listdir(session_dir)]
files = [f for f in files if f.endswith(self._EXTENSION)]

if len(files) <= HistoryItem._MIN_FILE_COUNT:
return

#
# Sort by ID and remove the last 50 from the list to avoid
# compression-decompression CPU waste and concurrency issues with trace
# files that have not yet completed writing to disk
#
files.sort(key=lambda trace_file: get_trace_id(trace_file))
files = files[:-self._UNCOMPRESSED_FILES]
#
# Compress in 150 file batches, and making sure that the filenames
# are numerically ordered. We need this order to have 1, 2, ... 150 in
# the same file. The filename will be named `1-150.zip` which will later
# be used to find the uncompressed trace.
#
while True:
current_batch_files = files[:self._COMPRESSED_FILE_BATCH]

if len(current_batch_files) != self._COMPRESSED_FILE_BATCH:
# There are not enough files in this batch
break

# Compress the oldest 150 files into a zip
start = get_trace_id(current_batch_files[0])
end = get_trace_id(current_batch_files[-1])

if start <= HistoryItem._latest_compression_job_end:
# This check prevents overlapping PendingCompressionJob from
# being added to the list by different threads
break

pending_compression = PendingCompressionJob(start, end)
HistoryItem._latest_compression_job_end = end
HistoryItem._pending_compression_jobs.append(pending_compression)

# Ignore the first 150, these were already processed, and continue
# iterating in the while loop
files = files[self._COMPRESSED_FILE_BATCH:]

def _process_pending_compression(self, pending_compression):
"""
Compress a PendingCompressionJob, usually the 150 oldest files in the
session directory together inside a zip file.
Not compressing all files because the newest ones might be read by
plugins and we don't want to decompress them right after compressing
them (waste of CPU).
:return: None
"""
#
# Get the list of files to compress, checking that we have enough to
# proceed with compression
#
session_dir = self._session_dir
min_file_count = self._UNCOMPRESSED_FILES + self._COMPRESSED_FILE_BATCH
trace_range = xrange(pending_compression.start, pending_compression.end + 1)

# Initial check to boost performance
if len(os.listdir(session_dir)) <= min_file_count:
return

files = [os.path.join(session_dir, f) for f in os.listdir(session_dir)]
files = [f for f in files if f.endswith(self._EXTENSION)]
files = [f for f in files if os.path.isfile(f)]
files = ['%s.%s' % (i, HistoryItem._EXTENSION) for i in trace_range]
files = [os.path.join(session_dir, filename) for filename in files]

if len(files) <= min_file_count:
return

#
# Sort by ID and remove the last 50 from the list to avoid
# compression-decompression CPU waste
#
files.sort(key=lambda trace_file: get_trace_id(trace_file))
files = files[:-self._UNCOMPRESSED_FILES]

#
# Only compress in 150 file batches, and making sure that the filenames
# are numerically ordered. We need this order to have 1, 2, ... 150 in
# the same file. The filename will be named `1-150.zip` which will later
# be used to find the uncompressed trace.
#
files = files[:self._COMPRESSED_FILE_BATCH]

# Target zip filename
#
# Compress the oldest 150 files into a gzip
#
start = get_trace_id(files[0])
end = get_trace_id(files[-1])
compressed_filename = '%s-%s.%s' % (start,
end,
compressed_filename = '%s-%s.%s' % (pending_compression.start,
pending_compression.end,
self._COMPRESSED_EXTENSION)
compressed_filename = os.path.join(session_dir, compressed_filename)

Expand Down Expand Up @@ -551,16 +610,24 @@ def _compress_old_traces(self):
compression=zipfile.ZIP_DEFLATED)

for filename in files:
_zip.write(filename=filename,
arcname='%s.%s' % (get_trace_id(filename), self._EXTENSION))
try:
_zip.write(filename=filename,
arcname='%s.%s' % (get_trace_id(filename), self._EXTENSION))
except OSError:
# The file might not exist
continue

_zip.close()

#
# And now remove the already compressed files
#
for filename in files:
os.remove(filename)
try:
os.remove(filename)
except OSError:
# The file might not exist
continue

def get_columns(self):
return self._COLUMNS
Expand Down Expand Up @@ -624,3 +691,9 @@ def get_zip_id_range(zip_file):

class TraceReadException(Exception):
pass


class PendingCompressionJob(object):
def __init__(self, start, end):
self.start = start
self.end = end

0 comments on commit 21437e5

Please sign in to comment.