Skip to content

Commit

Permalink
Merge pull request #33995 from dimagi/gh/wipe-blobdb/remove-async
Browse files Browse the repository at this point in the history
Remove asyncio code in wipe_blobdb management command
  • Loading branch information
gherceg committed Jan 19, 2024
2 parents 5ce2a44 + 67ea922 commit c68333a
Showing 1 changed file with 9 additions and 31 deletions.
40 changes: 9 additions & 31 deletions corehq/blobs/management/commands/wipe_blobdb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import asyncio

from django.core.management import BaseCommand

from corehq.apps.cleanup.utils import confirm_destructive_operation
Expand All @@ -24,51 +22,31 @@ def add_arguments(self, parser):

def handle(self, *args, **options):
confirm_destructive_operation()

# Python 3.7+
# bytes_deleted = asyncio.run(wipe_blobdb(options['commit']))

# Python 3.4+
loop = asyncio.get_event_loop()
try:
future = wipe_blobdb(options['commit'])
bytes_deleted = loop.run_until_complete(future)
finally:
loop.close()
bytes_deleted = wipe_blobdb(options['commit'])

print(f"Deleted {bytes_deleted} bytes.")
if not options['commit']:
print("You need to run with --commit for the deletion to happen.")


async def wipe_blobdb(commit=False):
"""
Wipe shards in parallel
"""
coros = [wipe_shard(dbname, commit)
for dbname in get_db_aliases_for_partitioned_query()]
bytes_deleted_list = await asyncio.gather(*coros)
return sum(bytes_deleted_list)
def wipe_blobdb(commit=False):
bytes_deleted = 0
blob_db = get_blob_db()
for dbname in get_db_aliases_for_partitioned_query():
bytes_deleted += wipe_shard(blob_db, dbname, commit)
return bytes_deleted


async def wipe_shard(dbname, commit=False):
def wipe_shard(blob_db, dbname, commit=False):
bytes_deleted = 0
metas = BlobMeta.objects.using(dbname).order_by('id').all()
for chunk in cursor_paginated(metas, CHUNK_SIZE):
if commit:
await wipe_chunk(chunk)
blob_db.bulk_delete(metas=chunk)
bytes_deleted += sum(meta.content_length for meta in chunk)
return bytes_deleted


async def wipe_chunk(chunk):
"""
An awaitable blob_db.bulk_delete() not to hold up the event loop
"""
blob_db = get_blob_db()
blob_db.bulk_delete(metas=chunk)


def cursor_paginated(queryset, page_size):
"""
Paginates using auto-incremented integer primary key ``id`` instead
Expand Down

0 comments on commit c68333a

Please sign in to comment.