This repository has been archived by the owner on Mar 24, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #247 from alphagov/set_2weeks_capped_size_2
Increase the size of realtime collections and fix cap size of all others
- Loading branch information
Showing
1 changed file
with
186 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
""" | ||
Clean up capped collections: | ||
- Update realtime buckets to be capped at 4mb (4194304b), which gives us | ||
about 2 weeks worth of query depth with a few days tolerance. | ||
- Copy any non realtime collections that are capped into uncapped collections | ||
- Set the metadata for all non realtime collections to be uncapped | ||
""" | ||
from backdrop.core import timeutils | ||
import logging | ||
log = logging.getLogger(__name__) | ||
|
||
CAP_SIZE = 4194304 | ||
|
||
|
||
def get_realtime_collection_names(mongo_db): | ||
return [name for name in mongo_db.collection_names() | ||
if name.endswith("realtime")] | ||
|
||
|
||
def get_non_realtime_capped_collection_names(mongo_db): | ||
for name in mongo_db.collection_names(): | ||
if not name.endswith("realtime"): | ||
stats = mongo_db[name].options() | ||
if stats.get('capped'): | ||
yield name | ||
|
||
|
||
def get_temp_collection_names(mongo_db): | ||
return filter(is_temp_collection_name, mongo_db.collection_names()) | ||
|
||
|
||
def create_new_capped_collection(mongo_db, collection_name): | ||
log.info("Creating new capped collection {0}".format(collection_name)) | ||
mongo_db.create_collection(collection_name, capped=True, size=CAP_SIZE) | ||
|
||
|
||
def create_new_uncapped_collection(mongo_db, collection_name): | ||
log.info("Creating new uncapped collection {0}".format(collection_name)) | ||
mongo_db.create_collection(collection_name, capped=False) | ||
|
||
|
||
def copy_collection(mongo_db, collection_name_from, collection_name_to): | ||
"""Copy all records from one mongodb collection to another""" | ||
log.info("Copying items from {0}...".format(collection_name_from)) | ||
for item in mongo_db[collection_name_from].find(): | ||
mongo_db[collection_name_to].insert(item) | ||
|
||
|
||
def set_bucket_metadata_capped(mongo_db, collection_name): | ||
"""Update the metadata for a collection to the new cap size""" | ||
mongo_db['buckets'].update( | ||
{"name": collection_name}, | ||
{ | ||
"$set": { | ||
"capped_size": CAP_SIZE, | ||
"realtime": True, | ||
"_updated_at": timeutils.now() | ||
} | ||
}, | ||
upsert=False, | ||
multi=False) | ||
|
||
|
||
def set_bucket_metadata_uncapped(mongo_db, collection_name): | ||
mongo_db['buckets'].update( | ||
{"name": collection_name}, | ||
{ | ||
"$set": { | ||
"capped_size": None, | ||
"realtime": False, | ||
"_updated_at": timeutils.now() | ||
} | ||
}, | ||
upsert=False, | ||
multi=False) | ||
|
||
|
||
def rename_collection(mongo_db, collection_name_from, collection_name_to): | ||
log.info("Renaming collection from {0} to old name {1}".format( | ||
collection_name_from, collection_name_to)) | ||
mongo_db[collection_name_from].rename( | ||
collection_name_to, dropTarget=True) | ||
|
||
|
||
def get_temp_names_for_collection(collection_name): | ||
""" | ||
>>> get_temp_names_for_collection("foo") | ||
{'new': 'foo_009_migration_new', 'old': 'foo_009_migration_old'} | ||
""" | ||
suffixes = get_temp_collection_suffixes() | ||
return { | ||
k: collection_name + v for k, v in suffixes.items()} | ||
|
||
|
||
def get_temp_collection_suffixes(): | ||
return { | ||
"old": "_009_migration_old", | ||
"new": "_009_migration_new" | ||
} | ||
|
||
|
||
def is_temp_collection_name(collection_name): | ||
""" | ||
>>> is_temp_collection_name("foo_009_migration_old") | ||
True | ||
>>> is_temp_collection_name("foo_009_migration_new") | ||
True | ||
>>> is_temp_collection_name("foo") | ||
False | ||
>>> # To get rid of some left over tables from an initial run | ||
>>> is_temp_collection_name("foo_backup") | ||
True | ||
""" | ||
for suffix in get_temp_collection_suffixes().values(): | ||
if collection_name.endswith(suffix): | ||
return True | ||
if collection_name.endswith("_backup"): | ||
return True | ||
return False | ||
|
||
|
||
def remove_temporary_collections(mongo_db): | ||
"""Remote any temporary collections that have been left hanging around""" | ||
log.info("Dropping temporary collections") | ||
for collection_name in get_temp_collection_names(mongo_db): | ||
log.info("Dropping temp collection {0}".format(collection_name)) | ||
mongo_db[collection_name].drop() | ||
|
||
|
||
def realtime_bucket_is_correctly_capped(mongo_db, collection_name): | ||
stats = mongo_db[collection_name].options() | ||
return stats.get('capped') is True and stats['size'] == CAP_SIZE | ||
|
||
|
||
def up(db): | ||
mongo_db = db._mongo['backdrop'] | ||
|
||
remove_temporary_collections(mongo_db) | ||
|
||
# Correctly cap all realtime collections | ||
for collection_name in get_realtime_collection_names(mongo_db): | ||
if realtime_bucket_is_correctly_capped(mongo_db, collection_name): | ||
log.info("Skipping {0}, already correctly capped".format( | ||
collection_name)) | ||
continue | ||
|
||
temp_names = get_temp_names_for_collection(collection_name) | ||
|
||
create_new_capped_collection(mongo_db, temp_names['new']) | ||
|
||
copy_collection(mongo_db, collection_name, temp_names['new']) | ||
|
||
rename_collection(mongo_db, collection_name, temp_names['old']) | ||
rename_collection(mongo_db, temp_names['new'], collection_name) | ||
|
||
mongo_db[temp_names['old']].drop() | ||
|
||
set_bucket_metadata_capped(mongo_db, collection_name) | ||
|
||
print("Finished capping {}".format(collection_name)) | ||
|
||
# Uncap all capped non-realtime collections | ||
for collection_name in get_non_realtime_capped_collection_names(mongo_db): | ||
temp_names = get_temp_names_for_collection(collection_name) | ||
|
||
create_new_uncapped_collection(mongo_db, temp_names['new']) | ||
|
||
copy_collection(mongo_db, collection_name, temp_names['new']) | ||
|
||
rename_collection(mongo_db, collection_name, temp_names['old']) | ||
rename_collection(mongo_db, temp_names['new'], collection_name) | ||
|
||
mongo_db[temp_names['old']].drop() | ||
|
||
set_bucket_metadata_uncapped(mongo_db, collection_name) | ||
|
||
print("Finished uncapping {}".format(collection_name)) | ||
|
||
# Update metadata for all non-realtime collections to be uncapped | ||
for collection_name in mongo_db.collection_names(): | ||
if not collection_name.endswith("realtime"): | ||
set_bucket_metadata_uncapped(mongo_db, collection_name) | ||
|
||
print("All done <3") |