Skip to content

Commit

Permalink
OpenConceptLab/ocl_issues#725 | removing celery once lock on task rev…
Browse files Browse the repository at this point in the history
…oke for requeue
  • Loading branch information
snyaggarwal committed May 3, 2021
1 parent 5551a36 commit 5956790
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
17 changes: 16 additions & 1 deletion core/common/utils.py
Expand Up @@ -4,10 +4,11 @@
import tempfile
import uuid
import zipfile
from collections import MutableMapping # pylint: disable=no-name-in-module
from collections import MutableMapping, OrderedDict # pylint: disable=no-name-in-module
from urllib import parse

import requests
from celery_once.helpers import queue_once_key
from dateutil import parser
from django.conf import settings
from django.urls import NoReverseMatch, reverse, get_resolver, resolve, Resolver404
Expand Down Expand Up @@ -557,3 +558,17 @@ def flatten_dict(dikt, parent_key='', sep='__'):
else:
items.append((new_key, str(val)))
return dict(items)


def get_bulk_import_celery_once_lock_key(async_result):
result_args = async_result.args
args = [('to_import', result_args[0]), ('username', result_args[1]), ('update_if_exists', result_args[2])]

if async_result.name == 'core.common.tasks.bulk_import_parallel_inline':
args.append(('threads', result_args[3]))

return get_celery_once_lock_key(async_result.name, args)


def get_celery_once_lock_key(name, args):
return queue_once_key(name, OrderedDict(args), None)
15 changes: 13 additions & 2 deletions core/importers/views.py
@@ -1,9 +1,10 @@
import urllib

from celery.result import AsyncResult
from celery_once import AlreadyQueued
from celery_once import AlreadyQueued, QueueOnce
from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema
from pydash import get
from rest_framework import status
from rest_framework.parsers import MultiPartParser
from rest_framework.permissions import IsAuthenticated, IsAdminUser
Expand All @@ -14,7 +15,8 @@
from core.common.services import RedisService
from core.common.swagger_parameters import update_if_exists_param, task_param, result_param, username_param, \
file_upload_param, file_url_param, parallel_threads_param, verbose_param
from core.common.utils import parse_bulk_import_task_id, task_exists, flower_get, queue_bulk_import
from core.common.utils import parse_bulk_import_task_id, task_exists, flower_get, queue_bulk_import, \
get_bulk_import_celery_once_lock_key
from core.importers.constants import ALREADY_QUEUED, INVALID_UPDATE_IF_EXISTS, NO_CONTENT_TO_IMPORT


Expand Down Expand Up @@ -193,6 +195,15 @@ def delete(request, import_queue=None): # pylint: disable=unused-argument

try:
app.control.revoke(task_id, terminate=True, signal=signal)

# Below code is needed for removing the lock from QueueOnce
result = AsyncResult(task_id)
if (get(result, 'name') or '').startswith('core.common.tasks.bulk_import'):
celery_once_key = get_bulk_import_celery_once_lock_key(result)
if celery_once_key:
celery_once = QueueOnce()
celery_once.name = result.name
celery_once.once_backend.clear_lock(celery_once_key)
except Exception as ex:
return Response(dict(errors=ex.args), status=status.HTTP_400_BAD_REQUEST)

Expand Down

0 comments on commit 5956790

Please sign in to comment.