Permalink
Browse files

Optimize datastructure, lowering disk usage substantially.

  • Loading branch information...
1 parent 9c5e79c commit 8821a7de2137f94ea33cbbb46017ed798be52fd3 @malthe malthe committed Jun 25, 2012
View
@@ -1,6 +1,15 @@
Changes
=======
+In next release ...
+
+- Datastructure optimization.
+
+ Use bucket-based data types when possible, and avoid copying strings
+ (instead using an integer-based lookup table).
+
+ Note: Migration required. Please run the upgrade step.
+
1.0.2 (2012-06-15)
------------------
View
@@ -10,7 +10,7 @@ def read(*pathnames):
return open(os.path.join(os.path.dirname(__file__), *pathnames)).read().\
decode('utf-8')
-version = '1.0.2'
+version = '1.1-dev'
setup(name='collective.linkcheck',
version=version,
@@ -40,7 +40,7 @@ def read(*pathnames):
# 2 environment, try the `fakezope2eggs` recipe
install_requires=[
'setuptools',
- 'zc.queue',
+ 'zc.queue >= 1.3',
'requests',
'plone.z3cform',
],
@@ -18,6 +18,15 @@
provides="Products.GenericSetup.interfaces.EXTENSION"
/>
+ <genericsetup:upgradeStep
+ source="1.0"
+ destination="1.1"
+ profile="collective.linkcheck:default"
+ title="Migrate from 1.0 to 1.1"
+ description="Optimizes tool datastructures."
+ handler=".upgrades.upgrade_tool"
+ />
+
<!-- Lifecycle events -->
<subscriber
@@ -150,13 +150,13 @@ def list_entries(self, count=100):
entries = list(self.tool.checked.items())
entries.sort(
- key=lambda (url, entry): (
- triage(None if url in self.tool.queue else entry[1]),
+ key=lambda (i, entry): (
+ triage(None if i in self.tool.queue else entry[1]),
entry[0]),
reverse=True,
)
- for url, entry in entries:
+ for i, entry in entries:
status = entry[1]
# Skip entries with unknown status.
@@ -171,7 +171,9 @@ def list_entries(self, count=100):
if len(rows) == count:
break
+ url = self.tool.links[i]
age = timestamp - (entry[0] or timestamp)
+
rows.append({
'url': url,
'quoted_url': urllib.quote_plus(url),
@@ -52,6 +52,11 @@ def end_request(event):
# Update the status of the present request.
status = response.getStatus()
+
+ if not tool.is_available():
+ logger.warn("Tool not available; please run update step.")
+ return
+
tool.update(event.request['PATH_INFO'], status)
# Must be good response.
@@ -39,7 +39,7 @@ def run(app, args):
for handler in root.handlers:
handler.setLevel(level)
- logger = logging.getLogger("linkcheck.events")
+ logger = logging.getLogger("linkcheck.processor")
logger.setLevel(level)
logger.info("looking for sites...")
@@ -105,6 +105,12 @@ def worker():
# Synchronize database
tool._p_jar.sync()
+ if not tool.is_available():
+ logger.warn("Tool not available; please run update step.")
+ logger.info("Sleeping for 10 seconds...")
+ time.sleep(10)
+ break
+
if not counter % 3600:
now = datetime.datetime.now()
@@ -139,10 +145,12 @@ def worker():
del tool.checked[url]
# Fetch set of URLs to check (up to transaction size).
- urls = tool.queue[:settings.transaction_size]
- if not urls:
+ queued = tool.queue[:settings.transaction_size]
+ if not queued:
continue
+ urls = filter(None, map(tool.links.get, queued))
+
# This keeps track of status updates, which we'll apply at
# the end.
updates = []
@@ -223,19 +231,21 @@ def worker():
while urls:
try:
- url = tool.queue.pull()
+ i = tool.queue.pull()
except IndexError:
transaction.abort()
continue
try:
+ url = tool.links[i]
urls.remove(url)
except KeyError:
- unchanged.append(url)
+ unchanged.append(i)
# This shouldn't happen to frequently.
- for url in unchanged:
- tool.queue.put(url)
+ for i in unchanged:
+ tool.queue.put(i)
+ url = tool.links[i]
logger.warn("putting back unprocessed url: %s." % url)
for url in invalid:
@@ -1,3 +1,3 @@
<metadata>
- <version>1.0</version>
+ <version>1.1</version>
</metadata>
@@ -0,0 +1,123 @@
+from ZODB.POSException import ConflictError
+from zc.queue._queue import BucketQueue
+from zc.queue import CompositeQueue
+
+
+class CompositeQueue(CompositeQueue):
+ def __init__(self, compositeSize=15, subfactory=BucketQueue):
+ super(CompositeQueue, self).__init__(compositeSize, subfactory)
+ self.size = 0
+
+ def pull(self, index=0):
+ item = super(CompositeQueue, self).pull(index)
+ self.size -= 1
+ return item
+
+ def put(self, item):
+ super(CompositeQueue, self).put(item)
+ self.size += 1
+
+ def __getitem__(self, index):
+ """Optimize frequently accessed index."""
+
+ return super(CompositeQueue, self).__getitem__(index)
+
+ if index == -1:
+ queue = index
+ while not len(self._data[queue]):
+ queue -= 1
+ return self._data[queue][-1]
+
+ if isinstance(index, slice):
+ start, stop, stride = index.indices(len(self))
+ res = []
+ stride_ct = 1
+ for ix, v in enumerate(self):
+ if ix >= stop:
+ break
+ if ix < start:
+ continue
+ stride_ct -= 1
+ if stride_ct == 0:
+ res.append(v)
+ stride_ct = stride
+ return res
+ else:
+ if index < 0: # not efficient, but quick and easy
+ len_self = len(self)
+ rindex = index + len_self
+ if rindex < 0:
+ raise IndexError(index)
+ else:
+ rindex = index
+ for ix, v in enumerate(self):
+ if ix == rindex:
+ return v
+ raise IndexError(index)
+
+ def __len__(self):
+ return self.size
+
+ def _p_resolveConflict(self, oldstate, committedstate, newstate):
+ return resolveQueueConflict(oldstate, committedstate, newstate)
+
+
+def resolveQueueConflict(oldstate, committedstate, newstate, bucket=False):
+ # We only know how to merge _data and the size of the top-level queue.
+ # If anything else is different, puke.
+ if set(committedstate.keys()) != set(newstate.keys()):
+ raise ConflictError # can't resolve
+ for key, val in newstate.items():
+ if key not in ('_data', 'size') and val != committedstate[key]:
+ raise ConflictError # can't resolve
+
+ # basically, we are ok with anything--willing to merge--
+ # unless committedstate and newstate have one or more of the
+ # same deletions or additions in comparison to the oldstate.
+ old = oldstate['_data']
+ committed = committedstate['_data']
+ new = newstate['_data']
+
+ old_set = set(old)
+ committed_set = set(committed)
+ new_set = set(new)
+
+ if bucket and bool(old_set) and (bool(committed_set) ^ bool(new_set)):
+ # This is a bucket, part of a CompositePersistentQueue. The old set
+ # of this bucket had items, and one of the two transactions cleaned
+ # it out. There's a reasonable chance that this bucket will be
+ # cleaned out by the parent in one of the two new transactions.
+ # We can't know for sure, so we take the conservative route of
+ # refusing to be resolvable.
+ raise ConflictError
+
+ committed_added = committed_set - old_set
+ committed_removed = old_set - committed_set
+ new_added = new_set - old_set
+ new_removed = old_set - new_set
+
+ if new_removed & committed_removed:
+ # they both removed (claimed) the same one. Puke.
+ raise ConflictError # can't resolve
+ elif new_added & committed_added:
+ # they both added the same one. Puke.
+ raise ConflictError # can't resolve
+
+ # Now we do the merge. We'll merge into the committed state and
+ # return it.
+ mod_committed = []
+ for v in committed:
+ if v not in new_removed:
+ mod_committed.append(v)
+ if new_added:
+ ordered_new_added = new[-len(new_added):]
+ assert set(ordered_new_added) == new_added
+ mod_committed.extend(ordered_new_added)
+ # Set the new size on top level queues
+ if not bucket:
+ committed_size_diff = committedstate['size'] - oldstate['size']
+ new_size_diff = newstate['size'] - oldstate['size']
+ new_size = oldstate['size'] + committed_size_diff + new_size_diff
+ committedstate['size'] = new_size
+ committedstate['_data'] = tuple(mod_committed)
+ return committedstate
Oops, something went wrong.

0 comments on commit 8821a7d

Please sign in to comment.