Skip to content

Commit

Permalink
builder, stats: Use temporary shelve to store input URLs.
Browse files Browse the repository at this point in the history
Closes #259
  • Loading branch information
chfoo committed Apr 3, 2015
1 parent 9a287a0 commit a20fb78
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 15 deletions.
1 change: 1 addition & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Unreleased

* Fixed: ``--regex-type`` to accept ``pcre`` instead of ``posix``. Regular expressions always use Python's regex library. Posix regex is not supported.
* Fixed: when using ``--warc-max-size`` and ``--warc-append``, it wrote to existing sequential WARC files unnecessarily.
* Fixed: input URLs stored in memory instead of saved on disk. This issue was notable if there were many URLs provided by the ``--input-file`` option.
* Changed: when using ``--warc-max-size`` and ``--warc-append``, the next sequential WARC file is created to avoid appending to corrupt files.
* Changed: WARC file writing to use journal files and refuse to start program if any journals exist. This avoids corrupting files through naive use of ``--warc-append`` and allow for future automated recovery.
* Added: Open Graph and Twitter Card element links extraction.
Expand Down
36 changes: 27 additions & 9 deletions wpull/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import itertools
import logging
import os.path
import shelve
import socket
import ssl
import sys
Expand Down Expand Up @@ -160,7 +161,10 @@ def __init__(self, args, unit_test=False):
'WebProcessorInstances': WebProcessorInstances,
'YoutubeDlCoprocessor': YoutubeDlCoprocessor,
})
self._url_infos = None
self._input_urls_temp_dir = tempfile.TemporaryDirectory(
prefix='tmp-wpull', dir=os.getcwd())
self._input_urls_db = shelve.open(
os.path.join(self._input_urls_temp_dir.name, 'input_urls.db'))
self._ca_certs_file = None
self._file_log_handler = None
self._console_log_handler = None
Expand Down Expand Up @@ -196,11 +200,15 @@ def build(self):
resource_monitor = self._build_resource_monitor()

self._build_demux_document_scraper()
self._url_infos = tuple(self._build_input_urls())
for url_info in self._build_input_urls():
self._input_urls_db[url_info.url] = url_info

statistics = self._factory.new('Statistics')
statistics.quota = self._args.quota
statistics.required_url_infos.update(self._url_infos)

if self._args.quota:
for url_info in self._input_urls_db.values():
statistics.required_urls_db[url_info.url] = True

url_table = self._build_url_table()
processor = self._build_processor()
Expand All @@ -223,9 +231,19 @@ def build(self):
self._warn_unsafe_options()
self._warn_silly_options()

url_table.add_many(
[{'url': url_info.url} for url_info in self._url_infos]
)
batch = []

for url_info in self._input_urls_db.values():
batch.append({'url': url_info.url})
if len(batch) > 1000:
url_table.add_many(batch)
batch = []

url_table.add_many(batch)

self._input_urls_db.close()
self._input_urls_temp_dir.cleanup()
self._input_urls_temp_dir = None

return self._factory['Application']

Expand Down Expand Up @@ -506,7 +524,7 @@ def _read_input_file_as_lines(self):
input_file = codecs.getreader(
self._args.local_encoding or 'utf-8')(self._args.input_file)

urls = [line.strip() for line in input_file if line.strip()]
urls = (line.strip() for line in input_file if line.strip())

if not urls:
raise ValueError(_('No URLs found in input file.'))
Expand All @@ -519,7 +537,7 @@ def _read_input_file_as_html(self):
self._args.input_file,
encoding=self._args.local_encoding or 'utf-8'
)
links = [context.link for context in scrape_result.link_contexts]
links = (context.link for context in scrape_result.link_contexts)

return links

Expand All @@ -537,7 +555,7 @@ def _build_url_filters(self):
enabled=args.recursive, page_requisites=args.page_requisites
),
SpanHostsFilter(
self._url_infos,
(url_info for url_info in self._input_urls_db.values()),
enabled=args.span_hosts,
page_requisites='page-requisites' in args.span_hosts_allow,
linked_pages='linked-pages' in args.span_hosts_allow,
Expand Down
38 changes: 32 additions & 6 deletions wpull/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
'''Statistics.'''
from collections import Counter
import logging
import os
import shelve
import tempfile
import time
import atexit

from wpull.bandwidth import BandwidthMeter
from wpull.errors import ERROR_PRIORITIES
Expand All @@ -22,8 +26,9 @@ class Statistics(object):
errors: a Counter mapping error types to integer.
quota (int): Threshold of number of bytes when the download quota is
exceeded.
required_url_infos (set): A set of :class:`.url.URLInfo` that must
be completed before the quota can be exceeded.
required_urls_db (dict): A mapping of :class:`.url.URLInfo` to bool
that must be completed before the quota can be exceeded. The
mapping uses a disk store so it is created on demand.
bandwidth_meter (:class:`.network.BandwidthMeter`): The bandwidth
meter.
'''
Expand All @@ -34,9 +39,23 @@ def __init__(self):
self.size = 0
self.errors = Counter()
self.quota = None
self.required_url_infos = set()
self._temp_dir = None
self._required_urls_db = None
self.bandwidth_meter = BandwidthMeter()

@property
def required_urls_db(self):
if not self._required_urls_db:
self._required_urls_db = self._new_required_urls_db()

return self._required_urls_db

def _new_required_urls_db(self):
self._temp_dir = tempfile.TemporaryDirectory(
prefix='tmp-wpull-quota', dir=os.getcwd())

return shelve.open(os.path.join(self._temp_dir.name, 'quota.db'))

def start(self):
'''Record the start time.'''
self.start_time = time.time()
Expand All @@ -46,6 +65,10 @@ def stop(self):
'''Record the stop time.'''
self.stop_time = time.time()

if self._temp_dir:
self._temp_dir.cleanup()
self._temp_dir = None

@property
def duration(self):
'''Return the time in seconds the interval.'''
Expand All @@ -64,16 +87,19 @@ def increment(self, size):
@property
def is_quota_exceeded(self):
'''Return whether the quota is exceeded.'''
if self.required_url_infos:
if self._required_urls_db is None:
return False

if self._required_urls_db:
return False

if self.quota:
return self.size >= self.quota

def mark_done(self, url_info):
'''Set the URLInfo as completed.'''
if url_info in self.required_url_infos:
self.required_url_infos.remove(url_info)
if self._required_urls_db and url_info.url in self._required_urls_db:
del self.required_urls_db[url_info.url]

def increment_error(self, error):
'''Increment the error counter preferring base exceptions.'''
Expand Down

0 comments on commit a20fb78

Please sign in to comment.