In [22]:
from starbelly.shell import *
import pickle
from IPython.display import display

# Overview

When a crawl is paused, some of the in-memory state of the crawl is discarded, such as the set of URLs that have been previously seen. If the crawl is later resumed, then this state needs to be recomputed. The current implementation of the frontier reloads the set by iterating over all `crawl_item` and `frontier` documents for the given job and extracting the `url_hash` field. This notebook demonstrates the problematic performance of this implementation and explores some alternatives.

# Loading from crawl_item

The set of seen URLs is partially loaded from the `crawl_item` table. This section benchmarks the current implementation against a new implementation using a real crawl with 14k downloaded items and 164k items in the frontier.

In [2]:
async def get_item_url_hashes_old():
    url_hashes = set()
    query = (
        r.table('crawl_item')
        .between(('d06fedd2-c2a8-4ed6-9e39-94ed4036b254',r.minval),
                 ('d06fedd2-c2a8-4ed6-9e39-94ed4036b254',r.maxval),
                index='sync_index')
        .pluck('url_hash')
    )
    async with db_pool.connection() as conn:
        cursor = await query.run(conn)
        async for item in starbelly.db.AsyncCursorIterator(cursor):
            url_hashes.add(bytes(item['url_hash']))
    return url_hashes

In [3]:
%time old_item_url_hashes = crun(get_item_url_hashes_old())

CPU times: user 23.4 s, sys: 8.47 s, total: 31.9 s
Wall time: 31.9 s


In [4]:
len(old_item_url_hashes)

14223

In [5]:
async def get_item_url_hashes_new():
    url_hashes = set()
    row = 0
    BATCH_SIZE = 1000
    async with db_pool.connection() as conn:
        while True:
            query = (
                r.table('crawl_item')
                .between(('d06fedd2-c2a8-4ed6-9e39-94ed4036b254', row),
                         ('d06fedd2-c2a8-4ed6-9e39-94ed4036b254', row + BATCH_SIZE),
                         index='sync_index')
                .map(lambda doc: [doc['url_hash']])
                .reduce(lambda l, r: l.add(r))
            )
            try:
                results = await query.run(conn)
            except r.ReqlNonExistenceError:
                break
            url_hashes.update(bytes(r) for r in results)
            row += BATCH_SIZE
    return url_hashes

In [6]:
%time new_item_url_hashes = crun(get_item_url_hashes_new())

CPU times: user 144 ms, sys: 16 ms, total: 160 ms
Wall time: 239 ms


In [7]:
len(new_item_url_hashes)

14223

In [10]:
new_item_url_hashes == old_item_url_hashes

True

In [8]:
old_item_url_hashes.pop()

b'\x83\xe8\xdb\xb1y/\xa1g"\xb6T\x0f"O\x98\x15'

In [9]:
new_item_url_hashes.pop()

b'\x83\xe8\xdb\xb1y/\xa1g"\xb6T\x0f"O\x98\x15'

Using map/reduce to eliminate key names and reduce the number of documents transferred is a major performance win: over 100x faster.

# Loading from frontier

The rest of the "seen URLs" set are loaded from the `frontier` table. Loading from frontier is trickier, because there's no index on this table that is suitable for fetching small batches. The index currently being used, `cost_index` is on the fields `(job_id, cost)`, and tens of thousands of items can have the same exact index value.

Either a limit/offset approach needs to be used, or a else a new index must be added to include job_id and some secondary field that is more unique than cost. That secondary field could be `id`, but I can't think of any other use case for that index. A better secondary field might be `url` or `url_hash` since that might be useful for some other use case.

In [12]:
async def get_frontier_url_hashes_old():
    url_hashes = set()
    query = (
        r.table('frontier')
        .between(('d06fedd2-c2a8-4ed6-9e39-94ed4036b254',r.minval),
                 ('d06fedd2-c2a8-4ed6-9e39-94ed4036b254',r.maxval),
                index='cost_index')
        .pluck('url_hash')
    )
    async with db_pool.connection() as conn:
        cursor = await query.run(conn)
        async for item in starbelly.db.AsyncCursorIterator(cursor):
            url_hashes.add(bytes(item['url_hash']))
    return url_hashes

In [13]:
%time old_frontier_url_hashes = crun(get_frontier_url_hashes_old())

Executing <Task pending coro=<ConnectionInstance._reader() running at /usr/local/lib/python3.6/dist-packages/rethinkdb/asyncio_net/net_asyncio.py:241> wait_for=<Future pending cb=[Task._wakeup()] created at /usr/lib/python3.6/asyncio/base_events.py:266> created at /usr/lib/python3.6/asyncio/tasks.py:540> took 0.233 seconds
Executing <Task pending coro=<ConnectionInstance._reader() running at /usr/local/lib/python3.6/dist-packages/rethinkdb/asyncio_net/net_asyncio.py:241> wait_for=<Future pending cb=[Task._wakeup()] created at /usr/lib/python3.6/asyncio/base_events.py:266> created at /usr/lib/python3.6/asyncio/tasks.py:540> took 0.239 seconds
Executing <Task pending coro=<ConnectionInstance._reader() running at /usr/local/lib/python3.6/dist-packages/rethinkdb/asyncio_net/net_asyncio.py:241> wait_for=<Future pending cb=[Task._wakeup()] created at /usr/lib/python3.6/asyncio/base_events.py:266> created at /usr/lib/python3.6/asyncio/tasks.py:540> took 0.203 seconds
Executing <Task pending c

CPU times: user 4min 36s, sys: 1min 39s, total: 6min 16s
Wall time: 6min 16s


Ouch... not only does that take ages to run, the intense RethinkDB work locks up the event loop for long periods of time.

In [14]:
len(old_frontier_url_hashes)

163739

This next example uses the same map/reduce as seen in `get_item_url_hashes_new()` and limit/skip instead of directly manipulating an index.

In [19]:
async def get_frontier_url_hashes_limit():
    url_hashes = set()
    row = 0
    BATCH_SIZE = 1000
    async with db_pool.connection() as conn:
        while True:
            query = (
                r.table('frontier')
                .between(('d06fedd2-c2a8-4ed6-9e39-94ed4036b254',r.minval),
                         ('d06fedd2-c2a8-4ed6-9e39-94ed4036b254',r.maxval),
                        index='cost_index')
                .skip(row)
                .limit(BATCH_SIZE)
                .map(lambda doc: [doc['url_hash']])
                .reduce(lambda l, r: l.add(r))
            )
            try:
                results = await query.run(conn)
            except r.ReqlNonExistenceError:
                break
            url_hashes.update(bytes(r) for r in results)
            row += BATCH_SIZE
    return url_hashes

In [20]:
%time new_frontier_url_hashes_limit = crun(get_frontier_url_hashes_limit())

CPU times: user 1.55 s, sys: 216 ms, total: 1.76 s
Wall time: 37.1 s


Using the skip/offset approach is about 10x faster, but still feels very slow. I'll add an index on `(job_id, url_hash)` and try that next.

In [31]:
# Run this cell to create index.
qrun(
    r.table('frontier').index_create(
        'reload_index', 
        [r.row["job_id"], r.row["url_hash"]]
    ),
    pool=super_db_pool
)

{'created': 1}

In [29]:
# Run this cell to drop index.
qrun(
    r.table('frontier').index_drop('reload_index'),
    pool=super_db_pool
)

{'dropped': 1}

In [13]:
res = qrun(
   r.table('frontier')
    .between(('d06fedd2-c2a8-4ed6-9e39-94ed4036b254',r.minval),
             ('d06fedd2-c2a8-4ed6-9e39-94ed4036b254',r.maxval),
            index='reload_index')
    .order_by(index='reload_index')
    .limit(15)
    .map(lambda doc: [doc['url_hash']])
    .reduce(lambda l, r: l.add(r))
)

display([bytes(r) for r in res])

[b'\x00\x00\x10n\x9a\n\x85\xdaq\xa6\xbd\xa4\xb45\xc2\xa9',
 b'\x00\x01!\x05\xf3E\x84\xfb\xb2\x1f\x16&n"\xa9n',
 b'\x00\x01+VuE<\x8bfoI\x0cB?\x195',
 b'\x00\x02n\x80Z\x8ebV!E\x9aF\xc7>\xfb\x05',
 b'\x00\x02\xd3\x8bqgO\\\xb9\x97\x1aTt$W\xfe',
 b'\x00\x03\\\xdeB@\xb7fV2\x8f\x89\xd8~\xc4\xa2',
 b'\x00\x03t.4\x14\xd5\xec:<m\xff9\xbe\x14e',
 b'\x00\x03w\x014\xe2O\xc0\xba\xec\xaf\x00\x9a\xb2\x93\xf5',
 b'\x00\x03{\x0cj\xf5|\xe4gf\x91s\x9f\xcf|\xd8',
 b'\x00\x03\x81M\xe0\x8c\x01\xe36C\x84\xc8\x81\xb9\xad\xdf',
 b'\x00\x03\x8bb\xe7h\x14\x14M\xb6\xe7^\xfc\xff\xaf\xfb',
 b"\x00\x04'7\x16K(\x0e\x9e\x10\xab\x90\x92\xa63\xc0",
 b'\x00\x04oy\xc2X\xdf\xd27\xf5mi\xb9/\x9a\xbb',
 b'\x00\x05+7\x07\xac\xda\x99\x03E\\\xb3o&<:',
 b'\x00\x05\x8e%\xab\xcdn\xa1\xfd\xa4\xea\xb4-\x88\xed\xc8']

In [14]:
async def get_frontier_url_hashes_index():
    url_hashes = set()
    row = r.minval
    BATCH_SIZE = 1000
    async with db_pool.connection() as conn:
        while True:
            query = (
                r.table('frontier')
                .between(('d06fedd2-c2a8-4ed6-9e39-94ed4036b254',row),
                         ('d06fedd2-c2a8-4ed6-9e39-94ed4036b254',r.maxval),
                         left_bound='open',
                         index='reload_index')
                .order_by(index='reload_index')
                .limit(BATCH_SIZE)
                .map(lambda doc: [doc['url_hash']])
                .reduce(lambda l, r: l.add(r))
            )
            try:
                results = await query.run(conn)
            except r.ReqlNonExistenceError:
                break
            url_hashes.update(bytes(r) for r in results)
            row = results[-1]
    return url_hashes

In [15]:
%time new_frontier_url_hashes_index = crun(get_frontier_url_hashes_index())

CPU times: user 2 s, sys: 248 ms, total: 2.25 s
Wall time: 8.76 s


In [16]:
len(new_frontier_url_hashes_index)

163739

In [17]:
new_frontier_url_hashes_index.pop()

b'\xaeg A\xcd\xcbgj\x18\x06\xd6\x98\x8f\x02\x1d\x07'

This index method is about 4-5x faster than the skip/limit method, and is approaching a reasonable amount of time. But a frontier size of 160K may be on the low end of the scale.

As a last resort, I'll try pickling the set, saving it into RethinkDB, then retrieving it and unpickling it. First we'll create a special table to hold the pickled data.

In [21]:
# Create table
qrun(
    r.table_create('job_url_seen'),
    pool=super_db_pool
)

{'config_changes': [{'new_val': {'db': 'starbelly',
    'durability': 'hard',
    'id': '6f962ab6-e065-4d78-92b9-55cc9bfa1f5e',
    'indexes': [],
    'name': 'job_url_seen',
    'primary_key': 'id',
    'shards': [{'nonvoting_replicas': [],
      'primary_replica': '6fbd1a0195e8_ej8',
      'replicas': ['6fbd1a0195e8_ej8']}],
    'write_acks': 'majority'},
   'old_val': None}],
 'tables_created': 1}

In [20]:
# Drop table
qrun(
    r.table_drop('job_url_seen'),
    pool=super_db_pool
)

{'config_changes': [{'new_val': None,
   'old_val': {'db': 'starbelly',
    'durability': 'hard',
    'id': '7cbccc3a-dc43-4c9b-a62c-428b8be7dd5c',
    'indexes': [],
    'name': 'job_url_seen',
    'primary_key': 'id',
    'shards': [{'nonvoting_replicas': [],
      'primary_replica': '6fbd1a0195e8_ej8',
      'replicas': ['6fbd1a0195e8_ej8']}],
    'write_acks': 'majority'}}],
 'tables_dropped': 1}

In [27]:
set_pkl = pickle.dumps(new_frontier_url_hashes_index)
qrun(
    r.table('job_url_seen').insert({
        'job_id': 'd06fedd2-c2a8-4ed6-9e39-94ed4036b254',
        'pickle': set_pkl,
    })
)

{'deleted': 0,
 'errors': 0,
 'generated_keys': ['169ee2da-0e44-4dc8-a59a-81755b316a74'],
 'inserted': 1,
 'replaced': 0,
 'skipped': 0,
 'unchanged': 0}

In [28]:
%%time
result = qrun(
    r.table('job_url_seen')
     .filter({'job_id': 'd06fedd2-c2a8-4ed6-9e39-94ed4036b254'})
     .nth(0)
)
set_2_pkl = pickle.loads(result['pickle'])

CPU times: user 100 ms, sys: 28 ms, total: 128 ms
Wall time: 201 ms


In [29]:
len(set_2_pkl)

163738

Note: one item is missing from the set because I called `new_frontier_url_hashes_index.pop().pop()` above.

Conclusion: pickling is very fast, although it represents a significant change to the implementation. It runs the risk of inconsistency (e.g. the pickled set not matching the contents of `crawl_item` and `frontier` tables), but given that Starbelly goes to great lengths to pause a crawl in an orderly fashion, this is an acceptable risk.