This is the script used to migrate to the updated database version using hashes for URL keys.

Changes:

* `urls` and `blacklist` collection: `_id` renamed to `url`, `_id` set to `str(CityHash64(url))`;
* `texts` collection: the list of `urls` references the url hashes.

In [1]:
from pymongo import MongoClient
from cityhash import CityHash64

In [2]:
DB_FROM_NAME = 'swisstext-bert-torch-3'
DB_TO_NAME = 'sbt-3'

In [3]:
client = MongoClient()
db_from = client[DB_FROM_NAME]
db_to = client[DB_TO_NAME]

Show what collections we have in the old one:

In [4]:
db_from.list_collection_names()

['blacklist', 'texts', 'urls', 'sentences', 'users']

Ensure the target database is empty:

In [5]:
client.drop_database(DB_TO_NAME)

# Generic migration code

In [6]:
def _as_batch(cursor, batch_size=1000):
    # iterate over something (pymongo cursor, generator, ...) by batch. 
    # Note: the last batch may contain less than batch_size elements.
    batch = []
    try:
        while True:
            for _ in range(batch_size):
                batch.append(next(cursor))
            yield batch
            batch = []
    except StopIteration as e:
        if len(batch):
            yield batch

In [7]:
sum(map(len, _as_batch(db_from.urls.find())))

736448

In [8]:
def migrate(dbf, dbt, coll, update_func):
    errors, added = [], []

    query = (update_func(doc) for doc in db_from[coll].find())
    for batch in _as_batch(query):
        res = db_to[coll].insert_many(batch)
        if not res.acknowledged:
            errors.append((batch.copy(), res))
        else:
            added += res.inserted_ids 
        
    print(f'Done. Inserted {len(added)} records. {len(errors)} errors.')
    
    return added, errors

# Migrate URLs & Blacklist

In [9]:
def update_url(mu):
    mu['url'] = mu['_id']
    mu['_id'] = str(CityHash64(mu['url']))
    return mu

Test the thing: 

In [10]:
mu = db_from.urls.find_one()
print('BEFORE:')
display(mu)
print('\nAFTER:')
display(update_url(mu))

BEFORE:


{'_id': 'http:// http://www.blick.ch/news/schweiz/die-kranke-welt-einer-paedophilen-mutter-ich-will-ein-kind-von-meinem-sohn-id5382410.html',
 'source': {'type': 'auto',
  'extra': 'http://www.fcbforum.ch/forum/printthread.php?t=1826&s=6570525eab34ea13c7ef63938a17e028&pp=30&page=306'},
 'date_added': datetime.datetime(2019, 8, 22, 2, 27, 32, 687000),
 'crawl_history': [],
 'count': 0,
 'delta': 0}


AFTER:


{'_id': '16808089807074681085',
 'source': {'type': 'auto',
  'extra': 'http://www.fcbforum.ch/forum/printthread.php?t=1826&s=6570525eab34ea13c7ef63938a17e028&pp=30&page=306'},
 'date_added': datetime.datetime(2019, 8, 22, 2, 27, 32, 687000),
 'crawl_history': [],
 'count': 0,
 'delta': 0,
 'url': 'http:// http://www.blick.ch/news/schweiz/die-kranke-welt-einer-paedophilen-mutter-ich-will-ein-kind-von-meinem-sohn-id5382410.html'}

Let's do it

In [11]:
%%time

added, errors = migrate(db_from, db_to, 'urls', update_url)

Done. Inserted 736448 records. 0 errors.
CPU times: user 13.9 s, sys: 526 ms, total: 14.4 s
Wall time: 19.8 s


In [12]:
%%time

added, errors = migrate(db_from, db_to, 'blacklist', update_url)

Done. Inserted 438227 records. 0 errors.
CPU times: user 5.86 s, sys: 153 ms, total: 6.01 s
Wall time: 8.88 s


# Update texts

In [13]:
def update_text(mt):
    mt['urls'] = [str(CityHash64(u)) for u in mt['urls']]
    return mt

Little test:

In [14]:
mt = db_from.texts.find_one()
del mt['text'] # just for display
print('BEFORE:')
display(mt)
print('\nAFTER:')
mtu = update_text(mt)
display(mtu)
print('\nURL in the new database?')
display(db_to.urls.find_one({'_id': mtu['urls'][0]}))

BEFORE:


{'_id': '100005168160950565666749032805777073165',
 'urls': ['http://www.dieaagfraessene.ch/home/kaeller',
  'http://www.dieaagfraessene.ch/home/kaeller/44--der-cliquekaeller-vo-den-aagfraessene?tmpl=component&print=1&layout=default'],
 'date_added': datetime.datetime(2019, 8, 5, 15, 30, 35, 114000)}


AFTER:


{'_id': '100005168160950565666749032805777073165',
 'urls': ['8599729855727934335', '12249908619951370833'],
 'date_added': datetime.datetime(2019, 8, 5, 15, 30, 35, 114000)}


URL in the new database?


{'_id': '8599729855727934335',
 'source': {'type': 'auto',
  'extra': 'http://www.dieaagfraessene.ch/index.php'},
 'date_added': datetime.datetime(2019, 8, 5, 15, 30, 35, 115000),
 'crawl_history': [{'date': datetime.datetime(2019, 8, 5, 15, 30, 35, 115000),
   'count': 4,
   'hash': '100005168160950565666749032805777073165'}],
 'count': 4,
 'delta': 4,
 'delta_date': datetime.datetime(2019, 8, 5, 15, 30, 35, 115000),
 'url': 'http://www.dieaagfraessene.ch/home/kaeller'}

Do the deed

In [15]:
%%time

added, errors = migrate(db_from, db_to, 'texts', update_text)

Done. Inserted 66123 records. 0 errors.
CPU times: user 2.24 s, sys: 825 ms, total: 3.07 s
Wall time: 5.94 s


# Copy the rest

In [16]:
for coll in ['sentences', 'users']:
    print('migrating', coll)
    migrate(db_from, db_to, coll, lambda t: t)

migrating sentences
Done. Inserted 199886 records. 0 errors.
migrating users
Done. Inserted 1 records. 0 errors.


# Check counts

In [17]:
for coll in db_from.list_collection_names():
    f = db_from[coll].count_documents({})
    t = db_to[coll].count_documents({})
    print(f' {coll:20s}', end=' ')
    if f != t:
        print(f'!!! ERROR !!!! {f} != {t}')
    else:
        print(f'ok: {t}')

 blacklist            ok: 438227
 texts                ok: 66123
 urls                 ok: 736448
 sentences            ok: 199886
 users                ok: 1
