Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RDISCROWD-4705: Reserve task category #639

Merged
merged 9 commits into from Dec 6, 2021
5 changes: 4 additions & 1 deletion pybossa/api/__init__.py
Expand Up @@ -71,7 +71,8 @@
from pybossa.cache.helpers import (n_available_tasks, n_available_tasks_for_user,
n_unexpired_gold_tasks)
from pybossa.sched import (get_project_scheduler_and_timeout, get_scheduler_and_timeout,
has_lock, release_lock, Schedulers, get_locks)
has_lock, release_lock, Schedulers, get_locks,
release_reserve_task_lock_by_id)
from pybossa.jobs import send_mail
from pybossa.api.project_by_name import ProjectByNameAPI
from pybossa.api.pwd_manager import get_pwd_manager
Expand Down Expand Up @@ -482,6 +483,8 @@ def cancel_task(task_id=None):
current_app.logger.info(
'Project {} - user {} cancelled task {}'
.format(project.id, current_user.id, task_id))
release_reserve_task_lock_by_id(project.id, task_id, current_user.id, timeout)


return Response(json.dumps({'success': True}), 200, mimetype="application/json")

Expand Down
14 changes: 10 additions & 4 deletions pybossa/cache/helpers.py
Expand Up @@ -25,10 +25,11 @@
from pybossa.model.project_stats import ProjectStats
from pybossa.cache import users as cached_users
from pybossa.cache import task_browse_helpers as cached_task_browse_helpers
from pybossa.sched import Schedulers
from pybossa.sched import Schedulers, get_reserve_task_category_info
from pybossa.contributions_guard import ContributionsGuard

session = db.slave_session

TIMEOUT = ContributionsGuard.STAMP_TTL
def n_gold_tasks(project_id):
"""Return the number of gold tasks for a given project"""
query = text('''SELECT COUNT(*) AS n_gold_tasks FROM task
Expand Down Expand Up @@ -181,7 +182,8 @@ def n_available_tasks_for_user(project, user_id=None, user_ip=None):
if user_id is None or user_id <= 0:
return n_tasks
assign_user = json.dumps({'assign_user': [cached_users.get_user_email(user_id)]}) if user_id else None
scheduler = project["info"].get('sched', 'default') if type(project) == dict else project.info.get('sched', 'default')
project_info = project["info"] if type(project) == dict else project.info
scheduler = project_info.get('sched', 'default')
project_id = project['id'] if type(project) == dict else project.id
if scheduler not in [Schedulers.user_pref, Schedulers.task_queue]:
sql = '''
Expand All @@ -195,16 +197,20 @@ def n_available_tasks_for_user(project, user_id=None, user_ip=None):
else:
user_pref_list = cached_users.get_user_preferences(user_id)
user_filter_list = cached_users.get_user_filters(user_id)
reserve_task_config = project_info.get("reserve_tasks", {}).get("category", [])
timeout = project_info.get("timeout", TIMEOUT)
reserve_task_filter, _ = get_reserve_task_category_info(reserve_task_config, project_id, timeout, user_id, None, True)
sql = '''
SELECT task.id, worker_filter FROM task
WHERE project_id=:project_id AND state !='completed'
AND state !='enrich'
{}
AND id NOT IN
(SELECT task_id FROM task_run WHERE
project_id=:project_id AND user_id=:user_id)
AND ({})
AND ({})
;'''.format(user_pref_list, user_filter_list)
;'''.format(reserve_task_filter, user_pref_list, user_filter_list)
sqltext = text(sql)
try:
result = session.execute(sqltext, dict(project_id=project_id, user_id=user_id, assign_user=assign_user))
Expand Down
63 changes: 63 additions & 0 deletions pybossa/redis_lock.py
Expand Up @@ -21,12 +21,14 @@

from contributions_guard import ContributionsGuard
from pybossa.core import sentinel
from werkzeug.exceptions import BadRequest

TASK_USERS_KEY_PREFIX = 'pybossa:project:task_requested:timestamps:{0}'
USER_TASKS_KEY_PREFIX = 'pybossa:user:task_acquired:timestamps:{0}'
TASK_ID_PROJECT_ID_KEY_PREFIX = 'pybossa:task_id:project_id:{0}'
ACTIVE_USER_KEY = 'pybossa:active_users_in_project:{}'
EXPIRE_LOCK_DELAY = 5
EXPIRE_RESERVE_TASK_LOCK_DELAY = 30*60
dchhabda marked this conversation as resolved.
Show resolved Hide resolved


def get_active_user_key(project_id):
Expand Down Expand Up @@ -182,6 +184,67 @@ def _release_expired_locks(self, resource_id, now):
if to_delete:
self._redis.hdel(resource_id, *to_delete)

def _release_expired_reserve_task_locks(self, resource_id, now):
expiration = self._redis.get(resource_id)
if now > expiration:
self._redis.delete(resource_id)


@staticmethod
def seconds_remaining(expiration):
return float(expiration) - time()

def get_task_category_lock(self, project_id, user_id=None, category=None, exclude_user=False, task_id=None):
"""
Returns True when task category for a given user
can be reserved or its already reserved, False otherwise.
To fetch task category for all users who've reserved the category, pass user_id = None
To fetch task category for all tasks reserved, pass task_id = None
To fetch task category other than user_id, pass exclude_user = True
"""

if not project_id:
raise BadRequest('Missing required parameters')

# with exclude_user set to True, user_id is to be excluded from list of
# task category found for all users. raise error if user_id not passed
if exclude_user and not user_id:
raise BadRequest('Missing user id')

resource_id = "reserve_task:project:{}:category:{}:user:{}:task:{}".format(
project_id,
"*" if not category else category,
"*" if not user_id or exclude_user else user_id,
"*" if not task_id else task_id
)

category_keys = self._redis.keys(resource_id)
if not category_keys:
return []

# if key present but for different user, with redundancy = 1, return false
# TODO: for redundancy > 1, check if additional task run
# available for this user and if so, return category_key else ""
if exclude_user:
# exclude user_id from list of keys passed
drop_user = ":user:{}:task:".format(user_id)
category_keys = [ key for key in category_keys if drop_user not in key ]
return category_keys


def acquire_reserve_task_lock(self, project_id, task_id, user_id, category):
if not(project_id or user_id or task_id or category):
raise BadRequest('Missing required parameters')

# check task category reserved by user
resource_id = "reserve_task:project:{}:category:{}:user:{}:task:{}".format(project_id, category, user_id, task_id)

timestamp = time()
self._release_expired_reserve_task_locks(resource_id, timestamp)
expiration = timestamp + self._duration + EXPIRE_RESERVE_TASK_LOCK_DELAY
return self._redis.set(resource_id, expiration)


def release_reserve_task_lock(self, resource_id, pipeline):
cache = pipeline or self._redis
cache.expire(resource_id, EXPIRE_RESERVE_TASK_LOCK_DELAY)