From 59e73e1734d319c03ae32d0b53dd53fd9141342f Mon Sep 17 00:00:00 2001 From: Deepsingh Chhabda Date: Thu, 4 Nov 2021 15:37:41 -0400 Subject: [PATCH 1/9] reserve task category --- pybossa/redis_lock.py | 53 ++++++++++++++++++ pybossa/sched.py | 123 +++++++++++++++++++++++++++++++++++++++--- 2 files changed, 168 insertions(+), 8 deletions(-) diff --git a/pybossa/redis_lock.py b/pybossa/redis_lock.py index 20055b551c..a4c588ec3a 100644 --- a/pybossa/redis_lock.py +++ b/pybossa/redis_lock.py @@ -21,12 +21,14 @@ from contributions_guard import ContributionsGuard from pybossa.core import sentinel +from werkzeug.exceptions import BadRequest TASK_USERS_KEY_PREFIX = 'pybossa:project:task_requested:timestamps:{0}' USER_TASKS_KEY_PREFIX = 'pybossa:user:task_acquired:timestamps:{0}' TASK_ID_PROJECT_ID_KEY_PREFIX = 'pybossa:task_id:project_id:{0}' ACTIVE_USER_KEY = 'pybossa:active_users_in_project:{}' EXPIRE_LOCK_DELAY = 5 +EXPIRE_TASK_CATEGORY_RESERVATION_DELAY = 30*60 def get_active_user_key(project_id): @@ -182,6 +184,57 @@ def _release_expired_locks(self, resource_id, now): if to_delete: self._redis.hdel(resource_id, *to_delete) + def _release_expired_task_category_locks(self, resource_id, now): + expiration = self._redis.get(resource_id) + if now > expiration: + self._redis.delete(resource_id) + + @staticmethod def seconds_remaining(expiration): return float(expiration) - time() + + def get_task_category_lock(self, project_id, user_id = None, category = None, task_id = None): + """ + Returns True when task category for a given user + can be reserved or its already reserved, False otherwise. + """ + + if not project_id: + raise BadRequest('Missing required parameters') + + resource_id = "reserve_task_category:project:{}:category:{}:".format(project_id, "*" if not category else category) + resource_id += "user:{}:".format("*" if not user_id else user_id) + resource_id += "task:{}".format("*" if not task_id else task_id) + + category_keys = self._redis.keys(resource_id) + if len(category_keys): + # if key present but for different user, with redundancy = 1, return false + # TODO: for redundancy > 1, check if additional task run + # available for this user and if so, return category_key else "" + return category_keys[0] + + return "" + + + def aquire_task_category_lock(self, project_id, task_id, user_id, category): + if not(project_id or user_id or task_id or category): + raise BadRequest('Missing required parameters') + + # check task category reserved by user + resource_id = "reserve_task_category:project:{}:category:{}:user:{}:task:{}".format(project_id, category, user_id, task_id) + + timestamp = time() + self._release_expired_task_category_locks(resource_id, timestamp) + expiration = timestamp + self._duration + EXPIRE_TASK_CATEGORY_RESERVATION_DELAY + return self._redis.set(resource_id, expiration) + + + def release_task_category_lock(self, project_id, user_id, category_key, task_id = None): + if not(project_id or user_id or category_key): + raise BadRequest('Missing required parameters') + + task_key = task_id if task_id else "*" + resource_id = "reserve_task_category:project:{}:category:{}:user:{}:task:{}".format(project_id, category_key, user_id, task_key) + keys = self._redis.keys(resource_id) + return self._redis.delete(*keys) diff --git a/pybossa/sched.py b/pybossa/sched.py index 865d2dde39..8a9cf79f15 100644 --- a/pybossa/sched.py +++ b/pybossa/sched.py @@ -37,6 +37,7 @@ from flask import current_app from pybossa import data_access from datetime import datetime +import re session = db.slave_session @@ -249,6 +250,12 @@ def get_candidate_task_ids(project_id, user_id=None, user_ip=None, data = query.limit(limit).offset(offset).all() return _handle_tuples(data) +def task_contains_category(task_id, category): + task = task_repo.get_task(task_id) + if not (task and category): + return False + return all([field in task.info for field in category]) + def locked_scheduler(query_factory): @wraps(query_factory) @@ -256,31 +263,52 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, external_uid=None, limit=1, offset=0, orderby='priority_0', desc=True, rand_within_priority=False, task_type='gold_last', - filter_user_prefs=False): + filter_user_prefs=False, + task_category_filters=""): if offset > 2: raise BadRequest('') if offset > 0: return None + project = project_repo.get(project_id) + if not project: + raise Forbidden('Invalid project_id') + timeout = project.info.get('timeout', TIMEOUT) task_id, lock_seconds = get_task_id_and_duration_for_project_user(project_id, user_id) if lock_seconds > 10: task = session.query(Task).get(task_id) if task: return [task] + task_id = None user_count = get_active_user_count(project_id, sentinel.master) assign_user = json.dumps({'assign_user': [cached_users.get_user_email(user_id)]}) if user_id else None current_app.logger.info( "Project {} - number of current users: {}" .format(project_id, user_count)) + category_filters, category_key = get_task_category_info(project_id, user_id) + limit = current_app.config.get('DB_MAXIMUM_BATCH_SIZE') if filter_user_prefs else user_count + 5 sql = query_factory(project_id, user_id=user_id, limit=limit, rand_within_priority=rand_within_priority, - task_type=task_type) - limit = current_app.config.get('DB_MAXIMUM_BATCH_SIZE') if filter_user_prefs else user_count + 5 + task_type=task_type, task_category_filters=category_filters) rows = session.execute(sql, dict(project_id=project_id, user_id=user_id, assign_user=assign_user, limit=limit)) + + if category_filters and rows and not rows.rowcount: + # With task category set and no records returned, no ongoing tasks with + # task category exist. Hence, query db for tasks without task category + release_task_category_lock(project_id, user_id, category_key, timeout) + sql = query_factory(project_id, user_id=user_id, limit=limit, + rand_within_priority=rand_within_priority, + task_type=task_type) + rows = session.execute(sql, dict(project_id=project_id, + user_id=user_id, + assign_user=assign_user, + limit=limit)) + user_profile = cached_users.get_user_profile_metadata(user_id) + if filter_user_prefs: # validate user qualification and calculate task preference score user_profile = json.loads(user_profile) if user_profile else {} @@ -300,16 +328,59 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, timeout = timeout or TIMEOUT remaining = float('inf') if calibration else n_answers - taskcount if acquire_lock(task_id, user_id, remaining, timeout): + # reserve tasks + aquire_task_category_lock(project_id, task_id, user_id, timeout) return _lock_task_for_user(task_id, project_id, user_id, timeout, calibration) - return [] return template_get_locked_task +def task_category_to_sql_filter(project_id, task_category_key, exclude): + # build sql query filter from task category cache key + filters = "" + + if not (project_id and task_category_key): + return filters + + regex_key = "reserve_task_category:project:{}:category:(.+?):user".format(project_id) + data = re.search(regex_key, task_category_key) + if not data: + return filters + + category = data.group(1) + category_fv = category.split(":") + comparator = "task.info->>'{}' != '{}'" if exclude else "task.info->>'{}' = '{}'" + filters = [comparator.format(category_fv[i], category_fv[i+1]) for i in range(0, len(category_fv), 2)] + filters = " AND ".join(filters) + filters = " AND " + filters + return filters, category + + +def get_task_category_info(project_id, user_id, exclude = False): + """Get reserved category info for a given user under a given project""" + sql_filters, category_config = "", [] + + project = project_repo.get(project_id) + if not (project or project.info.get("sched", "default") in [Schedulers.task_queue]): + return sql_filters, category_config + + timeout = project.info.get("timeout") or TIMEOUT + category_config = project.info.get("reserve_tasks", {}).get("category", []) + if not category_config: + return sql_filters, category_config + + category = ":".join(["{}:*".format(field) for field in category_config]) + lock_manager = LockManager(sentinel.master, timeout) + category_key = lock_manager.get_task_category_lock(project_id, user_id, category) + # TODO: exclude category if there are certain categories reserved by other user + sql_filters, category = task_category_to_sql_filter(project_id, category_key, exclude) + return sql_filters, category + + def locked_task_sql(project_id, user_id=None, limit=1, rand_within_priority=False, task_type='gold_last', filter_user_prefs=False, - priority_sort=True): + priority_sort=True, task_category_filters=""): ''' `task_type` will affect the type of tasks return by the query and can be one one of the following values: @@ -354,11 +425,13 @@ def locked_task_sql(project_id, user_id=None, limit=1, rand_within_priority=Fals AND task.state !='completed' AND task.state !='enrich' {} + {} group by task.id ORDER BY {} LIMIT :limit; - '''.format(' '.join(filters), + '''.format(' '.join(filters), task_category_filters, ','.join(order_by)) + print(sql) return text(sql) @@ -393,7 +466,7 @@ def get_locked_task(project_id, user_id=None, limit=1, rand_within_priority=Fals @locked_scheduler def get_user_pref_task(project_id, user_id=None, limit=1, rand_within_priority=False, - task_type='gold_last', filter_user_prefs=True): + task_type='gold_last', filter_user_prefs=True, task_category_filters=""): """ Select a new task based on user preference set under user profile. For each incomplete task, check if the number of users working on the task @@ -404,7 +477,7 @@ def get_user_pref_task(project_id, user_id=None, limit=1, rand_within_priority=F """ return locked_task_sql(project_id, user_id=user_id, limit=limit, rand_within_priority=rand_within_priority, task_type=task_type, - filter_user_prefs=True) + filter_user_prefs=True, task_category_filters=task_category_filters) TASK_USERS_KEY_PREFIX = 'pybossa:project:task_requested:timestamps:{0}' @@ -433,6 +506,40 @@ def acquire_lock(task_id, user_id, limit, timeout, pipeline=None, execute=True): return False +def release_task_category_lock(project_id, user_id, category_key, timeout): + project = project_repo.get(project_id) + if not (project and category_key and project.info.get("sched", "default") in [Schedulers.task_queue]): + return + + redis_conn = sentinel.master + lock_manager = LockManager(redis_conn, timeout) + return lock_manager.release_task_category_lock(project_id, user_id, category_key) + + +def aquire_task_category_lock(project_id, task_id, user_id, timeout, pipeline=None, execute=True): + task = task_repo.get_task(task_id) + project = project_repo.get(project_id) + if not (task and project and project.info.get("sched", "default") in [Schedulers.task_queue]): + return + + category_config = project.info.get("reserve_tasks", {}).get("category", []) + category_exist = all(task.info.get(field, False) for field in category_config) + if not category_exist: + return + + category = ["{}:{}".format(field, task.info.get(field)) for field in category_config] + category = ":".join(category) + redis_conn = sentinel.master + pipeline = pipeline or redis_conn.pipeline(transaction=True) + lock_manager = LockManager(redis_conn, timeout) + if lock_manager.aquire_task_category_lock(project_id, task_id, user_id, category): + # lock_manager.acquire_lock(user_tasks_key, task_id, float('inf'), pipeline=pipeline) + # if execute: + # return all(not isinstance(r, Exception) for r in pipeline.execute()) + return True + return False + + def lock_task_for_user(task_id, project_id, user_id): sql = ''' SELECT task.id, COUNT(task_run.task_id) AS taskcount, n_answers, task.calibration, From d73bb09e23e5baacef6d289e3bb948cc45976774 Mon Sep 17 00:00:00 2001 From: Deepsingh Chhabda Date: Fri, 5 Nov 2021 11:29:36 -0400 Subject: [PATCH 2/9] Fix tests. Exclude task category reserved by difft user --- pybossa/sched.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pybossa/sched.py b/pybossa/sched.py index 8a9cf79f15..a912c54823 100644 --- a/pybossa/sched.py +++ b/pybossa/sched.py @@ -357,7 +357,7 @@ def task_category_to_sql_filter(project_id, task_category_key, exclude): return filters, category -def get_task_category_info(project_id, user_id, exclude = False): +def get_task_category_info(project_id, user_id): """Get reserved category info for a given user under a given project""" sql_filters, category_config = "", [] @@ -373,7 +373,12 @@ def get_task_category_info(project_id, user_id, exclude = False): category = ":".join(["{}:*".format(field) for field in category_config]) lock_manager = LockManager(sentinel.master, timeout) category_key = lock_manager.get_task_category_lock(project_id, user_id, category) - # TODO: exclude category if there are certain categories reserved by other user + if not category_key: + # no reserved category found for the user + # exclude category when its reserved by other user + category_key = lock_manager.get_task_category_lock(project_id=project_id, user_id=None, category=category) + exclude = len(category_key) > 0 + sql_filters, category = task_category_to_sql_filter(project_id, category_key, exclude) return sql_filters, category @@ -458,10 +463,10 @@ def select_task_for_gold_mode(project, user_id): @locked_scheduler def get_locked_task(project_id, user_id=None, limit=1, rand_within_priority=False, - task_type='gold_last'): + task_type='gold_last', task_category_filters=""): return locked_task_sql(project_id, user_id=user_id, limit=limit, rand_within_priority=rand_within_priority, task_type=task_type, - filter_user_prefs=False) + filter_user_prefs=False, task_category_filters=task_category_filters) @locked_scheduler From 5800b9a8d7db5b87a12f575d71ff4a018ecf749e Mon Sep 17 00:00:00 2001 From: Deepsingh Chhabda Date: Mon, 15 Nov 2021 15:02:41 -0500 Subject: [PATCH 3/9] with no reservation by current user, exclude tasks reserved by other users. add tests --- pybossa/redis_lock.py | 30 +++++--- pybossa/sched.py | 96 +++++++++++++++-------- test/test_reserve_task_category.py | 119 +++++++++++++++++++++++++++++ 3 files changed, 204 insertions(+), 41 deletions(-) create mode 100644 test/test_reserve_task_category.py diff --git a/pybossa/redis_lock.py b/pybossa/redis_lock.py index a4c588ec3a..bb8969c66b 100644 --- a/pybossa/redis_lock.py +++ b/pybossa/redis_lock.py @@ -194,27 +194,39 @@ def _release_expired_task_category_locks(self, resource_id, now): def seconds_remaining(expiration): return float(expiration) - time() - def get_task_category_lock(self, project_id, user_id = None, category = None, task_id = None): + def get_task_category_lock(self, project_id, user_id=None, category=None, exclude_user=False, task_id=None): """ Returns True when task category for a given user can be reserved or its already reserved, False otherwise. + To fetch task category for all users who've reserved the category, pass user_id = None + To fetch task category for all tasks reserved, pass task_id = None + To fetch task category other than user_id, pass exclude_user = True """ if not project_id: raise BadRequest('Missing required parameters') + # with exclude_user set to True, user_id is to be excluded from list of + # task category found for all users. raise error if user_id not passed + if exclude_user and not user_id: + raise BadRequest('Missing user id') + resource_id = "reserve_task_category:project:{}:category:{}:".format(project_id, "*" if not category else category) - resource_id += "user:{}:".format("*" if not user_id else user_id) + resource_id += "user:{}:".format("*" if not user_id or exclude_user else user_id) resource_id += "task:{}".format("*" if not task_id else task_id) category_keys = self._redis.keys(resource_id) - if len(category_keys): - # if key present but for different user, with redundancy = 1, return false - # TODO: for redundancy > 1, check if additional task run - # available for this user and if so, return category_key else "" - return category_keys[0] - - return "" + if not category_keys: + return [] + + # if key present but for different user, with redundancy = 1, return false + # TODO: for redundancy > 1, check if additional task run + # available for this user and if so, return category_key else "" + if exclude_user: + # exclude user_id from list of keys passed + drop_user = ":user:{}:task:".format(user_id) + category_keys = [ key for key in category_keys if drop_user not in key ] + return category_keys def aquire_task_category_lock(self, project_id, task_id, user_id, category): diff --git a/pybossa/sched.py b/pybossa/sched.py index a912c54823..3c85e2bf90 100644 --- a/pybossa/sched.py +++ b/pybossa/sched.py @@ -285,7 +285,12 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, "Project {} - number of current users: {}" .format(project_id, user_count)) - category_filters, category_key = get_task_category_info(project_id, user_id) + category_filters, category_keys = get_task_category_info(project_id, user_id) + if not category_filters: + # no category reserved by user + # exclude categories reserved by other users + category_filters, category_keys = get_task_category_info(project_id, user_id, exclude_user=True) + limit = current_app.config.get('DB_MAXIMUM_BATCH_SIZE') if filter_user_prefs else user_count + 5 sql = query_factory(project_id, user_id=user_id, limit=limit, rand_within_priority=rand_within_priority, @@ -298,7 +303,7 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, if category_filters and rows and not rows.rowcount: # With task category set and no records returned, no ongoing tasks with # task category exist. Hence, query db for tasks without task category - release_task_category_lock(project_id, user_id, category_key, timeout) + release_task_category_lock(project_id, user_id, category_keys, timeout) sql = query_factory(project_id, user_id=user_id, limit=limit, rand_within_priority=rand_within_priority, task_type=task_type) @@ -336,51 +341,77 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, return template_get_locked_task -def task_category_to_sql_filter(project_id, task_category_key, exclude): +def task_category_to_sql_filter(project_id, task_category_keys, exclude): # build sql query filter from task category cache key - filters = "" - - if not (project_id and task_category_key): - return filters - - regex_key = "reserve_task_category:project:{}:category:(.+?):user".format(project_id) - data = re.search(regex_key, task_category_key) + # return sql filter for matching task category keys and list of + # task category keys that qualifies for a given project_id + + filters, category_keys = "", [] + + if not (project_id and len(task_category_keys)): + return filters, category_keys + + # convert task category redis cache key to sql query + # ex "co:name:IBM:ticker:IBM_US" would be converted to + # "task.info->>'co_name' IN ('IBM')" + filters_dict = {} + for item in task_category_keys: + regex_key = "reserve_task_category:project:{}:category:(.+?):user".format(project_id) + data = re.search(regex_key, item) + if not data: + continue + + category_keys += [item] + category = data.group(1) + category_fv = category.split(":") + for i in range(0, len(category_fv), 2): + key, value = category_fv[i], category_fv[i + 1] + if key in filters_dict: + if value not in filters_dict[key]: + filters_dict[key] += [value] + else: + filters_dict[key] = [value] + + # TODO: pull task # from category keys, look for values from task._add_user_info + # generate sql_filter considering value field type instead. + data = [] + for key, value in filters_dict.items(): + val = ["'{}'".format(val) for val in value] + data += ["task.info->>'{}' IN ({})".format(key, ", ".join(val))] if not data: - return filters + return filters, category_keys - category = data.group(1) - category_fv = category.split(":") - comparator = "task.info->>'{}' != '{}'" if exclude else "task.info->>'{}' = '{}'" - filters = [comparator.format(category_fv[i], category_fv[i+1]) for i in range(0, len(category_fv), 2)] - filters = " AND ".join(filters) - filters = " AND " + filters - return filters, category + exclude_clause = "IS NOT TRUE" if exclude else "" + filters = "({}) {}".format(" AND ".join(data), exclude_clause) + return filters, category_keys -def get_task_category_info(project_id, user_id): +def get_task_category_info(project_id, user_id, task_id=None, exclude_user=False): """Get reserved category info for a given user under a given project""" - sql_filters, category_config = "", [] + sql_filters, category_keys = "", [] project = project_repo.get(project_id) - if not (project or project.info.get("sched", "default") in [Schedulers.task_queue]): - return sql_filters, category_config + if not (project and project.info.get("sched", "default") in [Schedulers.task_queue]): + return sql_filters, category_keys timeout = project.info.get("timeout") or TIMEOUT category_config = project.info.get("reserve_tasks", {}).get("category", []) if not category_config: - return sql_filters, category_config + return sql_filters, category_keys category = ":".join(["{}:*".format(field) for field in category_config]) lock_manager = LockManager(sentinel.master, timeout) - category_key = lock_manager.get_task_category_lock(project_id, user_id, category) - if not category_key: + + category_keys = lock_manager.get_task_category_lock(project_id, user_id, category, exclude_user) + exclude = False + if not len(category_keys): # no reserved category found for the user # exclude category when its reserved by other user - category_key = lock_manager.get_task_category_lock(project_id=project_id, user_id=None, category=category) - exclude = len(category_key) > 0 + category_keys = lock_manager.get_task_category_lock(project_id=project_id, user_id=None, category=category, exclude_user=True) + exclude = len(category_keys) > 0 - sql_filters, category = task_category_to_sql_filter(project_id, category_key, exclude) - return sql_filters, category + sql_filters, category_keys = task_category_to_sql_filter(project_id, category_keys, exclude) + return sql_filters, category_keys def locked_task_sql(project_id, user_id=None, limit=1, rand_within_priority=False, @@ -511,14 +542,15 @@ def acquire_lock(task_id, user_id, limit, timeout, pipeline=None, execute=True): return False -def release_task_category_lock(project_id, user_id, category_key, timeout): +def release_task_category_lock(project_id, user_id, category_keys, timeout): project = project_repo.get(project_id) - if not (project and category_key and project.info.get("sched", "default") in [Schedulers.task_queue]): + if not (len(category_keys) and project and project.info.get("sched", "default") in [Schedulers.task_queue]): return redis_conn = sentinel.master lock_manager = LockManager(redis_conn, timeout) - return lock_manager.release_task_category_lock(project_id, user_id, category_key) + for key in category_keys: + lock_manager.release_task_category_lock(project_id, user_id, key) def aquire_task_category_lock(project_id, task_id, user_id, timeout, pipeline=None, execute=True): diff --git a/test/test_reserve_task_category.py b/test/test_reserve_task_category.py new file mode 100644 index 0000000000..9f191a40ef --- /dev/null +++ b/test/test_reserve_task_category.py @@ -0,0 +1,119 @@ +# -*- coding: utf8 -*- +# This file is part of PYBOSSA. +# +# Copyright (C) 2021 Scifabric LTD. +# +# PYBOSSA is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# PYBOSSA is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with PYBOSSA. If not, see . + +from mock import patch +from default import with_context +from helper import sched +from factories import TaskFactory, ProjectFactory, UserFactory +from pybossa.core import project_repo +from pybossa.sched import ( + Schedulers, + task_category_to_sql_filter, + get_task_category_info +) + +class TestReserveTaskCategory(sched.Helper): + + @with_context + def test_task_category_to_sql_filter(self): + # default behavior; returns null filters, category_keys for no category_keys passed + project_id, task_category_key, exclude = "", "", False + filters, category_keys = task_category_to_sql_filter(project_id, task_category_key, exclude) + assert filters == "" and category_keys == [], "filters, category_keys must be []" + + # passing garbage category returns null filters, category_keys + project_id, task_category_keys, exclude = "202", ["bad-category-key"], False + filters, category_keys = task_category_to_sql_filter(project_id, task_category_keys, exclude) + assert filters == "" and category_keys == [], "filters, category must be '', []" + + # task category key exists, returns sql filter and its associated category_keys + project_id, exclude = "202", False + task_category_keys = ["reserve_task_category:project:{}:category:name1:value1:name2:value2:user:1008:task:454".format(project_id)] + filters, category_keys = task_category_to_sql_filter(project_id, task_category_keys, exclude) + assert filters == "(task.info->>'name2' IN ('value2') AND task.info->>'name1' IN ('value1')) " and \ + category_keys == ["reserve_task_category:project:202:category:name1:value1:name2:value2:user:1008:task:454"], "filters, category must be []" + + # test exlude=True, multiple task category keys + # negated sql filter with "NOT IN" clause + # list of category_keys associated with sql filter + project_id, exclude = "202", True + task_category_keys = [ + "reserve_task_category:project:{}:category:name1:value1:name2:value2:user:1008:task:454".format(project_id), + "reserve_task_category:project:{}:category:x:1:y:2:z:3:user:1008:task:454".format(project_id) + ] + filters, category_keys = task_category_to_sql_filter(project_id, task_category_keys, exclude) + assert filters == "(task.info->>'y' IN ('2') AND task.info->>'x' IN ('1') AND task.info->>'z' IN ('3') AND task.info->>'name2' IN ('value2') AND task.info->>'name1' IN ('value1')) IS NOT TRUE" and \ + category_keys == [ + "reserve_task_category:project:202:category:name1:value1:name2:value2:user:1008:task:454", + "reserve_task_category:project:202:category:x:1:y:2:z:3:user:1008:task:454" + ], "filters, category must be as per keys passed and include negate clause" + + + @with_context + @patch('pybossa.redis_lock.LockManager.get_task_category_lock') + def test_get_task_category_info(self, get_task_category_lock): + owner = UserFactory.create(id=500) + project = ProjectFactory.create(owner=owner) + + # test bad project id, user id returns empty sql_filters, category_keys + project_id, user_id = -52, 9999 + sql_filters, category_keys = get_task_category_info(project_id, user_id) + assert sql_filters == "" and category_keys == [], "sql_filters, category_keys must be '', []" + + # empty sql_filters, category_keys for projects with scheduler other than task_queue + project.info['sched'] = Schedulers.locked + project_repo.save(project) + sql_filters, category_keys = get_task_category_info(project.id, owner.id) + assert sql_filters == "" and category_keys == [], "sql_filters, category_keys must be '', []" + + # with no categories configured under project config + # empty sql_filters, category_keys for projects with task queue scheduler + project.info['sched'] = Schedulers.task_queue + project_repo.save(project) + sql_filters, category_keys = get_task_category_info(project.id, owner.id) + assert sql_filters == "" and category_keys == [], "sql_filters, category_keys must be '', []" + + + # with categories configured under project config + # empty sql_filters, category_keys for projects with task queue scheduler + # when there's no category lock present in redis cache + project.info['sched'] = Schedulers.task_queue + project.info['reserve_tasks'] = { + "category": ["field_a", "field_b"] + } + project_repo.save(project) + get_task_category_lock.return_value = [] + sql_filters, category_keys = get_task_category_info(project.id, owner.id) + assert sql_filters == "" and category_keys == [], "sql_filters, category_keys must be '', []" + + # with categories configured under project config + # sql_filters, category_keys for projects with task queue scheduler + # to be built as per category lock present in redis cache + project.info['sched'] = Schedulers.task_queue + project.info['reserve_tasks'] = { + "category": ["field_a", "field_b"] + } + project_repo.save(project) + expected_category_keys = [ + "reserve_task_category:project:{}:category:name1:value1:name2:value2:user:1008:task:454".format(project.id), + "reserve_task_category:project:{}:category:x:1:y:2:z:3:user:1008:task:2344".format(project.id) + ] + get_task_category_lock.return_value = expected_category_keys + sql_filters, category_keys = get_task_category_info(project.id, owner.id) + assert sql_filters == "(task.info->>'y' IN ('2') AND task.info->>'x' IN ('1') AND task.info->>'z' IN ('3') AND task.info->>'name2' IN ('value2') AND task.info->>'name1' IN ('value1')) " and \ + category_keys == expected_category_keys, "sql_filters, category_keys must be non empty" From 8e151bcb776875e6e5d1fcd552cb7c9e249e7d94 Mon Sep 17 00:00:00 2001 From: Deepsingh Chhabda Date: Mon, 29 Nov 2021 12:18:13 -0500 Subject: [PATCH 4/9] multiple users accessing reserve task --- pybossa/api/__init__.py | 5 +- pybossa/cache/helpers.py | 14 +++- pybossa/redis_lock.py | 32 ++++---- pybossa/sched.py | 126 +++++++++++++++++++---------- test/test_reserve_task_category.py | 56 ++++++------- 5 files changed, 142 insertions(+), 91 deletions(-) diff --git a/pybossa/api/__init__.py b/pybossa/api/__init__.py index 3514cc8cf6..3d76d8608c 100644 --- a/pybossa/api/__init__.py +++ b/pybossa/api/__init__.py @@ -71,7 +71,8 @@ from pybossa.cache.helpers import (n_available_tasks, n_available_tasks_for_user, n_unexpired_gold_tasks) from pybossa.sched import (get_project_scheduler_and_timeout, get_scheduler_and_timeout, - has_lock, release_lock, Schedulers, get_locks) + has_lock, release_lock, Schedulers, get_locks, + release_reserve_task_lock_by_id) from pybossa.jobs import send_mail from pybossa.api.project_by_name import ProjectByNameAPI from pybossa.api.pwd_manager import get_pwd_manager @@ -482,6 +483,8 @@ def cancel_task(task_id=None): current_app.logger.info( 'Project {} - user {} cancelled task {}' .format(project.id, current_user.id, task_id)) + release_reserve_task_lock_by_id(project.id, task_id, current_user.id, timeout) + return Response(json.dumps({'success': True}), 200, mimetype="application/json") diff --git a/pybossa/cache/helpers.py b/pybossa/cache/helpers.py index 0701efc426..3b1d75f317 100644 --- a/pybossa/cache/helpers.py +++ b/pybossa/cache/helpers.py @@ -25,10 +25,11 @@ from pybossa.model.project_stats import ProjectStats from pybossa.cache import users as cached_users from pybossa.cache import task_browse_helpers as cached_task_browse_helpers -from pybossa.sched import Schedulers +from pybossa.sched import Schedulers, get_reserve_task_category_info +from pybossa.contributions_guard import ContributionsGuard session = db.slave_session - +TIMEOUT = ContributionsGuard.STAMP_TTL def n_gold_tasks(project_id): """Return the number of gold tasks for a given project""" query = text('''SELECT COUNT(*) AS n_gold_tasks FROM task @@ -181,7 +182,8 @@ def n_available_tasks_for_user(project, user_id=None, user_ip=None): if user_id is None or user_id <= 0: return n_tasks assign_user = json.dumps({'assign_user': [cached_users.get_user_email(user_id)]}) if user_id else None - scheduler = project["info"].get('sched', 'default') if type(project) == dict else project.info.get('sched', 'default') + project_info = project["info"] if type(project) == dict else project.info + scheduler = project_info.get('sched', 'default') project_id = project['id'] if type(project) == dict else project.id if scheduler not in [Schedulers.user_pref, Schedulers.task_queue]: sql = ''' @@ -195,16 +197,20 @@ def n_available_tasks_for_user(project, user_id=None, user_ip=None): else: user_pref_list = cached_users.get_user_preferences(user_id) user_filter_list = cached_users.get_user_filters(user_id) + reserve_task_config = project_info.get("reserve_tasks", {}).get("category", []) + timeout = project_info.get("timeout", TIMEOUT) + reserve_task_filter, _ = get_reserve_task_category_info(reserve_task_config, project_id, timeout, user_id, None, True) sql = ''' SELECT task.id, worker_filter FROM task WHERE project_id=:project_id AND state !='completed' AND state !='enrich' + {} AND id NOT IN (SELECT task_id FROM task_run WHERE project_id=:project_id AND user_id=:user_id) AND ({}) AND ({}) - ;'''.format(user_pref_list, user_filter_list) + ;'''.format(reserve_task_filter, user_pref_list, user_filter_list) sqltext = text(sql) try: result = session.execute(sqltext, dict(project_id=project_id, user_id=user_id, assign_user=assign_user)) diff --git a/pybossa/redis_lock.py b/pybossa/redis_lock.py index bb8969c66b..bc376630c9 100644 --- a/pybossa/redis_lock.py +++ b/pybossa/redis_lock.py @@ -28,7 +28,7 @@ TASK_ID_PROJECT_ID_KEY_PREFIX = 'pybossa:task_id:project_id:{0}' ACTIVE_USER_KEY = 'pybossa:active_users_in_project:{}' EXPIRE_LOCK_DELAY = 5 -EXPIRE_TASK_CATEGORY_RESERVATION_DELAY = 30*60 +EXPIRE_RESERVE_TASK_LOCK_DELAY = 30*60 def get_active_user_key(project_id): @@ -184,7 +184,7 @@ def _release_expired_locks(self, resource_id, now): if to_delete: self._redis.hdel(resource_id, *to_delete) - def _release_expired_task_category_locks(self, resource_id, now): + def _release_expired_reserve_task_locks(self, resource_id, now): expiration = self._redis.get(resource_id) if now > expiration: self._redis.delete(resource_id) @@ -211,9 +211,12 @@ def get_task_category_lock(self, project_id, user_id=None, category=None, exclud if exclude_user and not user_id: raise BadRequest('Missing user id') - resource_id = "reserve_task_category:project:{}:category:{}:".format(project_id, "*" if not category else category) - resource_id += "user:{}:".format("*" if not user_id or exclude_user else user_id) - resource_id += "task:{}".format("*" if not task_id else task_id) + resource_id = "reserve_task:project:{}:category:{}:user:{}:task:{}".format( + project_id, + "*" if not category else category, + "*" if not user_id or exclude_user else user_id, + "*" if not task_id else task_id + ) category_keys = self._redis.keys(resource_id) if not category_keys: @@ -229,24 +232,19 @@ def get_task_category_lock(self, project_id, user_id=None, category=None, exclud return category_keys - def aquire_task_category_lock(self, project_id, task_id, user_id, category): + def aquire_reserve_task_lock(self, project_id, task_id, user_id, category): if not(project_id or user_id or task_id or category): raise BadRequest('Missing required parameters') # check task category reserved by user - resource_id = "reserve_task_category:project:{}:category:{}:user:{}:task:{}".format(project_id, category, user_id, task_id) + resource_id = "reserve_task:project:{}:category:{}:user:{}:task:{}".format(project_id, category, user_id, task_id) timestamp = time() - self._release_expired_task_category_locks(resource_id, timestamp) - expiration = timestamp + self._duration + EXPIRE_TASK_CATEGORY_RESERVATION_DELAY + self._release_expired_reserve_task_locks(resource_id, timestamp) + expiration = timestamp + self._duration + EXPIRE_RESERVE_TASK_LOCK_DELAY return self._redis.set(resource_id, expiration) - def release_task_category_lock(self, project_id, user_id, category_key, task_id = None): - if not(project_id or user_id or category_key): - raise BadRequest('Missing required parameters') - - task_key = task_id if task_id else "*" - resource_id = "reserve_task_category:project:{}:category:{}:user:{}:task:{}".format(project_id, category_key, user_id, task_key) - keys = self._redis.keys(resource_id) - return self._redis.delete(*keys) + def release_reserve_task_lock(self, resource_id, pipeline): + cache = pipeline or self._redis + cache.expire(resource_id, EXPIRE_RESERVE_TASK_LOCK_DELAY) diff --git a/pybossa/sched.py b/pybossa/sched.py index 3c85e2bf90..0c6030c5d3 100644 --- a/pybossa/sched.py +++ b/pybossa/sched.py @@ -130,6 +130,7 @@ def after_save(task_run, conn): '127.0.0.1' if is_locking_scheduler(scheduler): release_lock(task_run.task_id, uid, TIMEOUT) + release_reserve_task_lock_by_id(task_run.project_id, task_run.task_id, uid, TIMEOUT) def get_breadth_first_task(project_id, user_id=None, user_ip=None, @@ -273,6 +274,8 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, if not project: raise Forbidden('Invalid project_id') timeout = project.info.get('timeout', TIMEOUT) + task_queue_scheduler = project.info.get("sched", "default") in [Schedulers.task_queue] + reserve_task_config = project.info.get("reserve_tasks", {}).get("category", []) task_id, lock_seconds = get_task_id_and_duration_for_project_user(project_id, user_id) if lock_seconds > 10: task = session.query(Task).get(task_id) @@ -285,28 +288,39 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, "Project {} - number of current users: {}" .format(project_id, user_count)) - category_filters, category_keys = get_task_category_info(project_id, user_id) - if not category_filters: - # no category reserved by user - # exclude categories reserved by other users - category_filters, category_keys = get_task_category_info(project_id, user_id, exclude_user=True) + sql_filters, exclude_user = "", False + if task_queue_scheduler and reserve_task_config: + sql_filters, category_keys = get_reserve_task_category_info(reserve_task_config, project_id, timeout, user_id) + if not category_keys: + # no category reserved by current user. search categories + # excluding the ones reserved by other users + exclude_user = True + sql_filters, category_keys = get_reserve_task_category_info( + reserve_task_config, project_id, timeout, user_id, None, exclude_user + ) limit = current_app.config.get('DB_MAXIMUM_BATCH_SIZE') if filter_user_prefs else user_count + 5 sql = query_factory(project_id, user_id=user_id, limit=limit, rand_within_priority=rand_within_priority, - task_type=task_type, task_category_filters=category_filters) + task_type=task_type, task_category_filters=sql_filters) rows = session.execute(sql, dict(project_id=project_id, user_id=user_id, assign_user=assign_user, limit=limit)) - if category_filters and rows and not rows.rowcount: - # With task category set and no records returned, no ongoing tasks with - # task category exist. Hence, query db for tasks without task category - release_task_category_lock(project_id, user_id, category_keys, timeout) + if task_queue_scheduler and reserve_task_config and rows and not rows.rowcount and not exclude_user: + # With task category reserved by user and no records returned, + # no ongoing tasks with task category reserved by user exist. + # Hence, query db for tasks excluding task categories reserved + # by other users passing exclude_users = True + exclude_user = True + release_reserve_task_lock_by_keys(category_keys, timeout) + sql_filters, category_keys = get_reserve_task_category_info( + reserve_task_config, project_id, timeout, user_id, None, exclude_user + ) sql = query_factory(project_id, user_id=user_id, limit=limit, rand_within_priority=rand_within_priority, - task_type=task_type) + task_type=task_type, task_category_filters=sql_filters) rows = session.execute(sql, dict(project_id=project_id, user_id=user_id, assign_user=assign_user, @@ -334,29 +348,30 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, remaining = float('inf') if calibration else n_answers - taskcount if acquire_lock(task_id, user_id, remaining, timeout): # reserve tasks - aquire_task_category_lock(project_id, task_id, user_id, timeout) + aquire_reserve_task_lock(project_id, task_id, user_id, timeout) return _lock_task_for_user(task_id, project_id, user_id, timeout, calibration) return [] return template_get_locked_task -def task_category_to_sql_filter(project_id, task_category_keys, exclude): + +def reserve_task_sql_filters(project_id, reserve_task_keys, exclude): # build sql query filter from task category cache key # return sql filter for matching task category keys and list of # task category keys that qualifies for a given project_id filters, category_keys = "", [] - if not (project_id and len(task_category_keys)): + if not (project_id and len(reserve_task_keys)): return filters, category_keys # convert task category redis cache key to sql query # ex "co:name:IBM:ticker:IBM_US" would be converted to # "task.info->>'co_name' IN ('IBM')" filters_dict = {} - for item in task_category_keys: - regex_key = "reserve_task_category:project:{}:category:(.+?):user".format(project_id) + for item in reserve_task_keys: + regex_key = "reserve_task:project:{}:category:(.+?):user".format(project_id) data = re.search(regex_key, item) if not data: continue @@ -383,34 +398,48 @@ def task_category_to_sql_filter(project_id, task_category_keys, exclude): exclude_clause = "IS NOT TRUE" if exclude else "" filters = "({}) {}".format(" AND ".join(data), exclude_clause) + filters = " AND {}".format(filters) if filters else filters return filters, category_keys -def get_task_category_info(project_id, user_id, task_id=None, exclude_user=False): +def get_reserve_task_key(task_id): + reserve_key = "" + task = task_repo.get_task(task_id) + if not task: + return reserve_key + + project = project_repo.get(task.project_id) + if not (project and project.info.get("sched", "default") in [Schedulers.task_queue]): + return reserve_key + + reserve_task_config = project.info.get("reserve_tasks", {}).get("category", []) + if not reserve_task_config: + return reserve_key + + if not all(field in task.info for field in reserve_task_config): + return reserve_key + + reserve_key = ":".join(["{}:{}".format(field, task.info[field]) for field in sorted(reserve_task_config)]) + return reserve_key + + +def get_reserve_task_category_info(reserve_task_config, project_id, timeout, user_id, task_id=None, exclude_user=False): """Get reserved category info for a given user under a given project""" sql_filters, category_keys = "", [] - project = project_repo.get(project_id) - if not (project and project.info.get("sched", "default") in [Schedulers.task_queue]): + if not reserve_task_config: return sql_filters, category_keys - timeout = project.info.get("timeout") or TIMEOUT - category_config = project.info.get("reserve_tasks", {}).get("category", []) - if not category_config: - return sql_filters, category_keys - category = ":".join(["{}:*".format(field) for field in category_config]) + + category = ":".join(["{}:*".format(field) for field in sorted(reserve_task_config)]) lock_manager = LockManager(sentinel.master, timeout) category_keys = lock_manager.get_task_category_lock(project_id, user_id, category, exclude_user) - exclude = False - if not len(category_keys): - # no reserved category found for the user - # exclude category when its reserved by other user - category_keys = lock_manager.get_task_category_lock(project_id=project_id, user_id=None, category=category, exclude_user=True) - exclude = len(category_keys) > 0 - - sql_filters, category_keys = task_category_to_sql_filter(project_id, category_keys, exclude) + if not category_keys: + return sql_filters, category_keys + + sql_filters, category_keys = reserve_task_sql_filters(project_id, category_keys, exclude_user) return sql_filters, category_keys @@ -542,34 +571,47 @@ def acquire_lock(task_id, user_id, limit, timeout, pipeline=None, execute=True): return False -def release_task_category_lock(project_id, user_id, category_keys, timeout): - project = project_repo.get(project_id) - if not (len(category_keys) and project and project.info.get("sched", "default") in [Schedulers.task_queue]): +def release_reserve_task_lock_by_id(project_id, task_id, user_id, timeout): + reserve_key = get_reserve_task_key(task_id) + if not reserve_key: + return + + redis_conn = sentinel.master + pipeline = redis_conn.pipeline(transaction=True) + lock_manager = LockManager(redis_conn, timeout) + resource_id = "reserve_task:project:{}:category:{}:user:{}:task:{}".format( + project_id, reserve_key, user_id, task_id) + lock_manager.release_reserve_task_lock(resource_id, pipeline) + + +def release_reserve_task_lock_by_keys(resource_ids, timeout, pipeline=None): + if not resource_ids: return redis_conn = sentinel.master + pipeline = redis_conn.pipeline(transaction=True) lock_manager = LockManager(redis_conn, timeout) - for key in category_keys: - lock_manager.release_task_category_lock(project_id, user_id, key) + for resource_id in resource_ids: + lock_manager.release_reserve_task_lock(resource_id, pipeline) -def aquire_task_category_lock(project_id, task_id, user_id, timeout, pipeline=None, execute=True): +def aquire_reserve_task_lock(project_id, task_id, user_id, timeout, pipeline=None, execute=True): task = task_repo.get_task(task_id) project = project_repo.get(project_id) if not (task and project and project.info.get("sched", "default") in [Schedulers.task_queue]): return - category_config = project.info.get("reserve_tasks", {}).get("category", []) - category_exist = all(task.info.get(field, False) for field in category_config) + reserve_task_config = project.info.get("reserve_tasks", {}).get("category", []) + category_exist = all(task.info.get(field, False) for field in reserve_task_config) if not category_exist: return - category = ["{}:{}".format(field, task.info.get(field)) for field in category_config] + category = ["{}:{}".format(field, task.info.get(field)) for field in reserve_task_config] category = ":".join(category) redis_conn = sentinel.master pipeline = pipeline or redis_conn.pipeline(transaction=True) lock_manager = LockManager(redis_conn, timeout) - if lock_manager.aquire_task_category_lock(project_id, task_id, user_id, category): + if lock_manager.aquire_reserve_task_lock(project_id, task_id, user_id, category): # lock_manager.acquire_lock(user_tasks_key, task_id, float('inf'), pipeline=pipeline) # if execute: # return all(not isinstance(r, Exception) for r in pipeline.execute()) diff --git a/test/test_reserve_task_category.py b/test/test_reserve_task_category.py index 9f191a40ef..e858f9ed54 100644 --- a/test/test_reserve_task_category.py +++ b/test/test_reserve_task_category.py @@ -23,8 +23,8 @@ from pybossa.core import project_repo from pybossa.sched import ( Schedulers, - task_category_to_sql_filter, - get_task_category_info + reserve_task_sql_filters, + get_reserve_task_category_info ) class TestReserveTaskCategory(sched.Helper): @@ -33,59 +33,61 @@ class TestReserveTaskCategory(sched.Helper): def test_task_category_to_sql_filter(self): # default behavior; returns null filters, category_keys for no category_keys passed project_id, task_category_key, exclude = "", "", False - filters, category_keys = task_category_to_sql_filter(project_id, task_category_key, exclude) + filters, category_keys = reserve_task_sql_filters(project_id, task_category_key, exclude) assert filters == "" and category_keys == [], "filters, category_keys must be []" # passing garbage category returns null filters, category_keys - project_id, task_category_keys, exclude = "202", ["bad-category-key"], False - filters, category_keys = task_category_to_sql_filter(project_id, task_category_keys, exclude) + project_id, reserve_task_keys, exclude = "202", ["bad-category-key"], False + filters, category_keys = reserve_task_sql_filters(project_id, reserve_task_keys, exclude) assert filters == "" and category_keys == [], "filters, category must be '', []" # task category key exists, returns sql filter and its associated category_keys project_id, exclude = "202", False - task_category_keys = ["reserve_task_category:project:{}:category:name1:value1:name2:value2:user:1008:task:454".format(project_id)] - filters, category_keys = task_category_to_sql_filter(project_id, task_category_keys, exclude) - assert filters == "(task.info->>'name2' IN ('value2') AND task.info->>'name1' IN ('value1')) " and \ - category_keys == ["reserve_task_category:project:202:category:name1:value1:name2:value2:user:1008:task:454"], "filters, category must be []" + reserve_task_keys = ["reserve_task:project:{}:category:name1:value1:name2:value2:user:1008:task:454".format(project_id)] + filters, category_keys = reserve_task_sql_filters(project_id, reserve_task_keys, exclude) + assert filters == " AND (task.info->>'name2' IN ('value2') AND task.info->>'name1' IN ('value1')) " and \ + category_keys == ["reserve_task:project:202:category:name1:value1:name2:value2:user:1008:task:454"], "filters, category must be []" # test exlude=True, multiple task category keys # negated sql filter with "NOT IN" clause # list of category_keys associated with sql filter project_id, exclude = "202", True - task_category_keys = [ - "reserve_task_category:project:{}:category:name1:value1:name2:value2:user:1008:task:454".format(project_id), - "reserve_task_category:project:{}:category:x:1:y:2:z:3:user:1008:task:454".format(project_id) + reserve_task_keys = [ + "reserve_task:project:{}:category:name1:value1:name2:value2:user:1008:task:454".format(project_id), + "reserve_task:project:{}:category:x:1:y:2:z:3:user:1008:task:454".format(project_id) ] - filters, category_keys = task_category_to_sql_filter(project_id, task_category_keys, exclude) - assert filters == "(task.info->>'y' IN ('2') AND task.info->>'x' IN ('1') AND task.info->>'z' IN ('3') AND task.info->>'name2' IN ('value2') AND task.info->>'name1' IN ('value1')) IS NOT TRUE" and \ + filters, category_keys = reserve_task_sql_filters(project_id, reserve_task_keys, exclude) + assert filters == " AND (task.info->>'y' IN ('2') AND task.info->>'x' IN ('1') AND task.info->>'z' IN ('3') AND task.info->>'name2' IN ('value2') AND task.info->>'name1' IN ('value1')) IS NOT TRUE" and \ category_keys == [ - "reserve_task_category:project:202:category:name1:value1:name2:value2:user:1008:task:454", - "reserve_task_category:project:202:category:x:1:y:2:z:3:user:1008:task:454" + "reserve_task:project:202:category:name1:value1:name2:value2:user:1008:task:454", + "reserve_task:project:202:category:x:1:y:2:z:3:user:1008:task:454" ], "filters, category must be as per keys passed and include negate clause" @with_context @patch('pybossa.redis_lock.LockManager.get_task_category_lock') - def test_get_task_category_info(self, get_task_category_lock): + def test_get_reserve_task_category_info(self, get_task_category_lock): owner = UserFactory.create(id=500) project = ProjectFactory.create(owner=owner) + reserve_task_config = ["field_a", "field_b"] + timeout = 60 * 60 # test bad project id, user id returns empty sql_filters, category_keys project_id, user_id = -52, 9999 - sql_filters, category_keys = get_task_category_info(project_id, user_id) + sql_filters, category_keys = get_reserve_task_category_info(reserve_task_config, project_id, timeout, user_id) assert sql_filters == "" and category_keys == [], "sql_filters, category_keys must be '', []" # empty sql_filters, category_keys for projects with scheduler other than task_queue project.info['sched'] = Schedulers.locked project_repo.save(project) - sql_filters, category_keys = get_task_category_info(project.id, owner.id) + sql_filters, category_keys = get_reserve_task_category_info(reserve_task_config, project.id, timeout, owner.id) assert sql_filters == "" and category_keys == [], "sql_filters, category_keys must be '', []" # with no categories configured under project config # empty sql_filters, category_keys for projects with task queue scheduler project.info['sched'] = Schedulers.task_queue project_repo.save(project) - sql_filters, category_keys = get_task_category_info(project.id, owner.id) + sql_filters, category_keys = get_reserve_task_category_info(reserve_task_config, project.id, timeout, owner.id) assert sql_filters == "" and category_keys == [], "sql_filters, category_keys must be '', []" @@ -94,11 +96,11 @@ def test_get_task_category_info(self, get_task_category_lock): # when there's no category lock present in redis cache project.info['sched'] = Schedulers.task_queue project.info['reserve_tasks'] = { - "category": ["field_a", "field_b"] + "category": reserve_task_config } project_repo.save(project) get_task_category_lock.return_value = [] - sql_filters, category_keys = get_task_category_info(project.id, owner.id) + sql_filters, category_keys = get_reserve_task_category_info(reserve_task_config, project.id, timeout, owner.id) assert sql_filters == "" and category_keys == [], "sql_filters, category_keys must be '', []" # with categories configured under project config @@ -106,14 +108,14 @@ def test_get_task_category_info(self, get_task_category_lock): # to be built as per category lock present in redis cache project.info['sched'] = Schedulers.task_queue project.info['reserve_tasks'] = { - "category": ["field_a", "field_b"] + "category": reserve_task_config } project_repo.save(project) expected_category_keys = [ - "reserve_task_category:project:{}:category:name1:value1:name2:value2:user:1008:task:454".format(project.id), - "reserve_task_category:project:{}:category:x:1:y:2:z:3:user:1008:task:2344".format(project.id) + "reserve_task:project:{}:category:name1:value1:name2:value2:user:1008:task:454".format(project.id), + "reserve_task:project:{}:category:x:1:y:2:z:3:user:1008:task:2344".format(project.id) ] get_task_category_lock.return_value = expected_category_keys - sql_filters, category_keys = get_task_category_info(project.id, owner.id) - assert sql_filters == "(task.info->>'y' IN ('2') AND task.info->>'x' IN ('1') AND task.info->>'z' IN ('3') AND task.info->>'name2' IN ('value2') AND task.info->>'name1' IN ('value1')) " and \ + sql_filters, category_keys = get_reserve_task_category_info(reserve_task_config, project.id, timeout, owner.id) + assert sql_filters == " AND (task.info->>'y' IN ('2') AND task.info->>'x' IN ('1') AND task.info->>'z' IN ('3') AND task.info->>'name2' IN ('value2') AND task.info->>'name1' IN ('value1')) " and \ category_keys == expected_category_keys, "sql_filters, category_keys must be non empty" From 2dda95f958f1923546c7244a93e7b21d4acf12a6 Mon Sep 17 00:00:00 2001 From: Deepsingh Chhabda Date: Mon, 29 Nov 2021 14:51:35 -0500 Subject: [PATCH 5/9] add tests, fix typo in acquire --- pybossa/redis_lock.py | 2 +- pybossa/sched.py | 12 ++++----- test/test_reserve_task_category.py | 42 ++++++++++++++++++++++++++++-- 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/pybossa/redis_lock.py b/pybossa/redis_lock.py index bc376630c9..ff8fd511c5 100644 --- a/pybossa/redis_lock.py +++ b/pybossa/redis_lock.py @@ -232,7 +232,7 @@ def get_task_category_lock(self, project_id, user_id=None, category=None, exclud return category_keys - def aquire_reserve_task_lock(self, project_id, task_id, user_id, category): + def acquire_reserve_task_lock(self, project_id, task_id, user_id, category): if not(project_id or user_id or task_id or category): raise BadRequest('Missing required parameters') diff --git a/pybossa/sched.py b/pybossa/sched.py index 0c6030c5d3..d472fded62 100644 --- a/pybossa/sched.py +++ b/pybossa/sched.py @@ -348,7 +348,7 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, remaining = float('inf') if calibration else n_answers - taskcount if acquire_lock(task_id, user_id, remaining, timeout): # reserve tasks - aquire_reserve_task_lock(project_id, task_id, user_id, timeout) + acquire_reserve_task_lock(project_id, task_id, user_id, timeout) return _lock_task_for_user(task_id, project_id, user_id, timeout, calibration) return [] @@ -595,23 +595,23 @@ def release_reserve_task_lock_by_keys(resource_ids, timeout, pipeline=None): lock_manager.release_reserve_task_lock(resource_id, pipeline) -def aquire_reserve_task_lock(project_id, task_id, user_id, timeout, pipeline=None, execute=True): +def acquire_reserve_task_lock(project_id, task_id, user_id, timeout, pipeline=None, execute=True): task = task_repo.get_task(task_id) project = project_repo.get(project_id) if not (task and project and project.info.get("sched", "default") in [Schedulers.task_queue]): - return + return False reserve_task_config = project.info.get("reserve_tasks", {}).get("category", []) - category_exist = all(task.info.get(field, False) for field in reserve_task_config) + category_exist = reserve_task_config and all(task.info.get(field, False) for field in reserve_task_config) if not category_exist: - return + return False category = ["{}:{}".format(field, task.info.get(field)) for field in reserve_task_config] category = ":".join(category) redis_conn = sentinel.master pipeline = pipeline or redis_conn.pipeline(transaction=True) lock_manager = LockManager(redis_conn, timeout) - if lock_manager.aquire_reserve_task_lock(project_id, task_id, user_id, category): + if lock_manager.acquire_reserve_task_lock(project_id, task_id, user_id, category): # lock_manager.acquire_lock(user_tasks_key, task_id, float('inf'), pipeline=pipeline) # if execute: # return all(not isinstance(r, Exception) for r in pipeline.execute()) diff --git a/test/test_reserve_task_category.py b/test/test_reserve_task_category.py index e858f9ed54..cf33abca43 100644 --- a/test/test_reserve_task_category.py +++ b/test/test_reserve_task_category.py @@ -20,12 +20,15 @@ from default import with_context from helper import sched from factories import TaskFactory, ProjectFactory, UserFactory -from pybossa.core import project_repo +from pybossa.core import project_repo, sentinel from pybossa.sched import ( Schedulers, reserve_task_sql_filters, - get_reserve_task_category_info + get_reserve_task_category_info, + acquire_reserve_task_lock, + release_reserve_task_lock_by_keys ) +import time class TestReserveTaskCategory(sched.Helper): @@ -119,3 +122,38 @@ def test_get_reserve_task_category_info(self, get_task_category_lock): sql_filters, category_keys = get_reserve_task_category_info(reserve_task_config, project.id, timeout, owner.id) assert sql_filters == " AND (task.info->>'y' IN ('2') AND task.info->>'x' IN ('1') AND task.info->>'z' IN ('3') AND task.info->>'name2' IN ('value2') AND task.info->>'name1' IN ('value1')) " and \ category_keys == expected_category_keys, "sql_filters, category_keys must be non empty" + + @with_context + def test_acquire_and_release_reserve_task_lock(self): + user = UserFactory.create() + # project w/o reserve_tasks configured don't acquire lock + project_info = dict(sched="task_queue_scheduler") + task_info = dict(field_1="abc", field_2=123) + category_fields = ["field_1", "field_2"] + project = ProjectFactory.create(owner=user, info=project_info) + task = TaskFactory.create_batch(1, project=project, n_answers=1, info=task_info)[0] + timeout = 100 + + assert acquire_reserve_task_lock(project.id, task.id, user.id, timeout) == False, "reserve task cannot be acquired due to missing required config" + project.info['reserve_tasks'] = { + "category": ["some_field"] + } + project_repo.save(project) + assert acquire_reserve_task_lock(project.id, task.id, user.id, timeout) == False, "task not having reserve tasks config fields" + + project.info['reserve_tasks'] = { + "category": category_fields + } + project_repo.save(project) + acquire_reserve_task_lock(project.id, task.id, user.id, timeout) + category_key = ":".join(["{}:{}".format(field, task.info[field]) for field in category_fields]) + expected_reserve_task_key = "reserve_task:project:{}:category:{}:user:{}:task:{}".format( + project.id, category_key, user.id, task.id + ) + assert expected_reserve_task_key in sentinel.master.keys(), "reserve task key must exist in redis cache" + + # release reserve task lock + with patch("pybossa.redis_lock.EXPIRE_RESERVE_TASK_LOCK_DELAY", 1): + release_reserve_task_lock_by_keys([expected_reserve_task_key], timeout) + time.sleep(1) + assert expected_reserve_task_key not in sentinel.master.keys(), "reserve task key should not exist in redis cache" From 6b7a5726614b091ff31aaec4090e87b42f5109f2 Mon Sep 17 00:00:00 2001 From: Deepsingh Chhabda Date: Thu, 2 Dec 2021 12:15:14 -0500 Subject: [PATCH 6/9] code review --- pybossa/cache/helpers.py | 2 +- pybossa/sched.py | 13 ++++--------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/pybossa/cache/helpers.py b/pybossa/cache/helpers.py index 3b1d75f317..712a570392 100644 --- a/pybossa/cache/helpers.py +++ b/pybossa/cache/helpers.py @@ -199,7 +199,7 @@ def n_available_tasks_for_user(project, user_id=None, user_ip=None): user_filter_list = cached_users.get_user_filters(user_id) reserve_task_config = project_info.get("reserve_tasks", {}).get("category", []) timeout = project_info.get("timeout", TIMEOUT) - reserve_task_filter, _ = get_reserve_task_category_info(reserve_task_config, project_id, timeout, user_id, None, True) + reserve_task_filter, _ = get_reserve_task_category_info(reserve_task_config, project_id, timeout, user_id, True) sql = ''' SELECT task.id, worker_filter FROM task WHERE project_id=:project_id AND state !='completed' diff --git a/pybossa/sched.py b/pybossa/sched.py index d472fded62..df367774eb 100644 --- a/pybossa/sched.py +++ b/pybossa/sched.py @@ -296,7 +296,7 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, # excluding the ones reserved by other users exclude_user = True sql_filters, category_keys = get_reserve_task_category_info( - reserve_task_config, project_id, timeout, user_id, None, exclude_user + reserve_task_config, project_id, timeout, user_id, exclude_user ) limit = current_app.config.get('DB_MAXIMUM_BATCH_SIZE') if filter_user_prefs else user_count + 5 @@ -316,7 +316,7 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, exclude_user = True release_reserve_task_lock_by_keys(category_keys, timeout) sql_filters, category_keys = get_reserve_task_category_info( - reserve_task_config, project_id, timeout, user_id, None, exclude_user + reserve_task_config, project_id, timeout, user_id, exclude_user ) sql = query_factory(project_id, user_id=user_id, limit=limit, rand_within_priority=rand_within_priority, @@ -370,8 +370,8 @@ def reserve_task_sql_filters(project_id, reserve_task_keys, exclude): # ex "co:name:IBM:ticker:IBM_US" would be converted to # "task.info->>'co_name' IN ('IBM')" filters_dict = {} + regex_key = "reserve_task:project:{}:category:(.+?):user".format(project_id) for item in reserve_task_keys: - regex_key = "reserve_task:project:{}:category:(.+?):user".format(project_id) data = re.search(regex_key, item) if not data: continue @@ -423,15 +423,13 @@ def get_reserve_task_key(task_id): return reserve_key -def get_reserve_task_category_info(reserve_task_config, project_id, timeout, user_id, task_id=None, exclude_user=False): +def get_reserve_task_category_info(reserve_task_config, project_id, timeout, user_id, exclude_user=False): """Get reserved category info for a given user under a given project""" sql_filters, category_keys = "", [] if not reserve_task_config: return sql_filters, category_keys - - category = ":".join(["{}:*".format(field) for field in sorted(reserve_task_config)]) lock_manager = LockManager(sentinel.master, timeout) @@ -612,9 +610,6 @@ def acquire_reserve_task_lock(project_id, task_id, user_id, timeout, pipeline=No pipeline = pipeline or redis_conn.pipeline(transaction=True) lock_manager = LockManager(redis_conn, timeout) if lock_manager.acquire_reserve_task_lock(project_id, task_id, user_id, category): - # lock_manager.acquire_lock(user_tasks_key, task_id, float('inf'), pipeline=pipeline) - # if execute: - # return all(not isinstance(r, Exception) for r in pipeline.execute()) return True return False From d62946437794c7cf40b6dff4cf7d8a4d15d1b54c Mon Sep 17 00:00:00 2001 From: Deepsingh Chhabda Date: Thu, 2 Dec 2021 14:43:38 -0500 Subject: [PATCH 7/9] reserve task expire 5s on cancel --- pybossa/api/__init__.py | 3 ++- pybossa/redis_lock.py | 4 ++-- pybossa/sched.py | 11 ++++++----- test/test_reserve_task_category.py | 6 +++--- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/pybossa/api/__init__.py b/pybossa/api/__init__.py index 3d76d8608c..e05e1cd282 100644 --- a/pybossa/api/__init__.py +++ b/pybossa/api/__init__.py @@ -85,6 +85,7 @@ from pybossa.core import db from pybossa.cache.task_browse_helpers import get_searchable_columns from pybossa.view.projects import get_locked_tasks +from pybossa.redis_lock import EXPIRE_LOCK_DELAY task_fields = [ "id", @@ -483,7 +484,7 @@ def cancel_task(task_id=None): current_app.logger.info( 'Project {} - user {} cancelled task {}' .format(project.id, current_user.id, task_id)) - release_reserve_task_lock_by_id(project.id, task_id, current_user.id, timeout) + release_reserve_task_lock_by_id(project.id, task_id, current_user.id, timeout, expiry=EXPIRE_LOCK_DELAY) return Response(json.dumps({'success': True}), 200, mimetype="application/json") diff --git a/pybossa/redis_lock.py b/pybossa/redis_lock.py index ff8fd511c5..3a7f27fc31 100644 --- a/pybossa/redis_lock.py +++ b/pybossa/redis_lock.py @@ -245,6 +245,6 @@ def acquire_reserve_task_lock(self, project_id, task_id, user_id, category): return self._redis.set(resource_id, expiration) - def release_reserve_task_lock(self, resource_id, pipeline): + def release_reserve_task_lock(self, resource_id, pipeline, expiry): cache = pipeline or self._redis - cache.expire(resource_id, EXPIRE_RESERVE_TASK_LOCK_DELAY) + cache.expire(resource_id, expiry) diff --git a/pybossa/sched.py b/pybossa/sched.py index df367774eb..1758067de4 100644 --- a/pybossa/sched.py +++ b/pybossa/sched.py @@ -27,7 +27,8 @@ from redis_lock import (LockManager, get_active_user_key, get_user_tasks_key, get_task_users_key, get_task_id_project_id_key, register_active_user, unregister_active_user, - get_active_user_count) + get_active_user_count, + EXPIRE_RESERVE_TASK_LOCK_DELAY) from contributions_guard import ContributionsGuard from werkzeug.exceptions import BadRequest, Forbidden import random @@ -569,7 +570,7 @@ def acquire_lock(task_id, user_id, limit, timeout, pipeline=None, execute=True): return False -def release_reserve_task_lock_by_id(project_id, task_id, user_id, timeout): +def release_reserve_task_lock_by_id(project_id, task_id, user_id, timeout, expiry=EXPIRE_RESERVE_TASK_LOCK_DELAY): reserve_key = get_reserve_task_key(task_id) if not reserve_key: return @@ -579,10 +580,10 @@ def release_reserve_task_lock_by_id(project_id, task_id, user_id, timeout): lock_manager = LockManager(redis_conn, timeout) resource_id = "reserve_task:project:{}:category:{}:user:{}:task:{}".format( project_id, reserve_key, user_id, task_id) - lock_manager.release_reserve_task_lock(resource_id, pipeline) + lock_manager.release_reserve_task_lock(resource_id, pipeline, expiry) -def release_reserve_task_lock_by_keys(resource_ids, timeout, pipeline=None): +def release_reserve_task_lock_by_keys(resource_ids, timeout, pipeline=None, expiry=EXPIRE_RESERVE_TASK_LOCK_DELAY): if not resource_ids: return @@ -590,7 +591,7 @@ def release_reserve_task_lock_by_keys(resource_ids, timeout, pipeline=None): pipeline = redis_conn.pipeline(transaction=True) lock_manager = LockManager(redis_conn, timeout) for resource_id in resource_ids: - lock_manager.release_reserve_task_lock(resource_id, pipeline) + lock_manager.release_reserve_task_lock(resource_id, pipeline, expiry) def acquire_reserve_task_lock(project_id, task_id, user_id, timeout, pipeline=None, execute=True): diff --git a/test/test_reserve_task_category.py b/test/test_reserve_task_category.py index cf33abca43..633bb402d6 100644 --- a/test/test_reserve_task_category.py +++ b/test/test_reserve_task_category.py @@ -153,7 +153,7 @@ def test_acquire_and_release_reserve_task_lock(self): assert expected_reserve_task_key in sentinel.master.keys(), "reserve task key must exist in redis cache" # release reserve task lock - with patch("pybossa.redis_lock.EXPIRE_RESERVE_TASK_LOCK_DELAY", 1): - release_reserve_task_lock_by_keys([expected_reserve_task_key], timeout) - time.sleep(1) + expiry = 1 + release_reserve_task_lock_by_keys([expected_reserve_task_key], timeout, expiry=expiry) + time.sleep(expiry) assert expected_reserve_task_key not in sentinel.master.keys(), "reserve task key should not exist in redis cache" From 28c11b9c6cabd56bef804132cd80bb3fd3701283 Mon Sep 17 00:00:00 2001 From: Deepsingh Chhabda Date: Thu, 2 Dec 2021 18:27:39 -0500 Subject: [PATCH 8/9] add tests. remove unused --- pybossa/redis_lock.py | 2 +- pybossa/sched.py | 8 -- test/test_reserve_task_category.py | 129 ++++++++++++++++++++++++++++- test/test_web.py | 55 ++++++++++++ 4 files changed, 184 insertions(+), 10 deletions(-) diff --git a/pybossa/redis_lock.py b/pybossa/redis_lock.py index 3a7f27fc31..9b4003e9f9 100644 --- a/pybossa/redis_lock.py +++ b/pybossa/redis_lock.py @@ -233,7 +233,7 @@ def get_task_category_lock(self, project_id, user_id=None, category=None, exclud def acquire_reserve_task_lock(self, project_id, task_id, user_id, category): - if not(project_id or user_id or task_id or category): + if not(project_id and user_id and task_id and category): raise BadRequest('Missing required parameters') # check task category reserved by user diff --git a/pybossa/sched.py b/pybossa/sched.py index 1758067de4..526364b77c 100644 --- a/pybossa/sched.py +++ b/pybossa/sched.py @@ -252,12 +252,6 @@ def get_candidate_task_ids(project_id, user_id=None, user_ip=None, data = query.limit(limit).offset(offset).all() return _handle_tuples(data) -def task_contains_category(task_id, category): - task = task_repo.get_task(task_id) - if not (task and category): - return False - return all([field in task.info for field in category]) - def locked_scheduler(query_factory): @wraps(query_factory) @@ -272,8 +266,6 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, if offset > 0: return None project = project_repo.get(project_id) - if not project: - raise Forbidden('Invalid project_id') timeout = project.info.get('timeout', TIMEOUT) task_queue_scheduler = project.info.get("sched", "default") in [Schedulers.task_queue] reserve_task_config = project.info.get("reserve_tasks", {}).get("category", []) diff --git a/test/test_reserve_task_category.py b/test/test_reserve_task_category.py index 633bb402d6..a3db26889d 100644 --- a/test/test_reserve_task_category.py +++ b/test/test_reserve_task_category.py @@ -26,9 +26,14 @@ reserve_task_sql_filters, get_reserve_task_category_info, acquire_reserve_task_lock, - release_reserve_task_lock_by_keys + release_reserve_task_lock_by_keys, + release_reserve_task_lock_by_id, + get_reserve_task_key ) import time +from nose.tools import assert_raises +from werkzeug.exceptions import BadRequest + class TestReserveTaskCategory(sched.Helper): @@ -157,3 +162,125 @@ def test_acquire_and_release_reserve_task_lock(self): release_reserve_task_lock_by_keys([expected_reserve_task_key], timeout, expiry=expiry) time.sleep(expiry) assert expected_reserve_task_key not in sentinel.master.keys(), "reserve task key should not exist in redis cache" + + + @with_context + def test_reserve_task_category_lock_raises_exceptions(self): + # missing project_id raises exception + with assert_raises(BadRequest): + get_reserve_task_category_info(["x", "y"], None, 1, 1) + + # missing user id and passing exclude user raises exception + with assert_raises(BadRequest): + get_reserve_task_category_info(["x", "y"], 1, 1, user_id=None, exclude_user=True) + + _, category_keys = get_reserve_task_category_info(["x", "y"], 1, 1, 1) + assert not category_keys, "reserve task category keys should not be present" + + user = UserFactory.create() + project = ProjectFactory.create( + owner=user, + info=dict( + sched="task_queue_scheduler", + reserve_tasks=dict( + category=["field_1", "field_2"] + ) + ) + ) + task = TaskFactory.create_batch( + 1, project=project, n_answers=1, + info=dict(field_1="abc", field_2=123) + )[0] + with assert_raises(BadRequest): + acquire_reserve_task_lock(project.id, task.id, None, 1) + + + @with_context + def test_reserve_task_category_lock_exclude_user(self): + # with exclude_user = True, exclude user category key for user id = `` + reserve_task_config = ["field_1", "field_2"] + user = UserFactory.create() + project = ProjectFactory.create( + owner=user, + info=dict( + sched="task_queue_scheduler", + reserve_tasks=dict( + category=reserve_task_config + ) + ) + ) + tasks = TaskFactory.create_batch( + 2, project=project, n_answers=1, + info=dict(field_1="abc", field_2=123) + ) + + non_excluded_user_id = 2 + acquire_reserve_task_lock(project.id, tasks[0].id, user.id, 1) + acquire_reserve_task_lock(project.id, tasks[1].id, non_excluded_user_id, 1) + expected_category_keys = [ + "reserve_task:project:{}:category:field_1:abc:field_2:123:user:{}:task:{}".format( + project.id, non_excluded_user_id, tasks[1].id + )] + _, category_keys = get_reserve_task_category_info(reserve_task_config, 1, 1, user.id, exclude_user=True) + assert category_keys == expected_category_keys, "reserve task category keys should exclude user {} reserve category key".format(user.id) + # cleanup; release reserve task lock + expiry = 1 + release_reserve_task_lock_by_id(project.id, tasks[0].id, user.id, 1, expiry=expiry) + release_reserve_task_lock_by_id(project.id, tasks[1].id, non_excluded_user_id, 1, expiry=expiry) + + + @with_context + def test_release_reserve_task_lock_by_id(self): + timeout = 100 + category_fields = ["field_1", "field_2"] + user = UserFactory.create() + # project w/o reserve_tasks configured don't acquire lock + project = ProjectFactory.create( + owner=user, + info=dict( + sched="task_queue_scheduler", + reserve_tasks=dict( + category=category_fields + ) + ) + ) + task = TaskFactory.create_batch( + 1, project=project, n_answers=1, + info=dict(field_1="abc", field_2=123) + )[0] + acquire_reserve_task_lock(project.id, task.id, user.id, timeout) + category_key = ":".join(["{}:{}".format(field, task.info[field]) for field in category_fields]) + expected_reserve_task_key = "reserve_task:project:{}:category:{}:user:{}:task:{}".format( + project.id, category_key, user.id, task.id + ) + assert expected_reserve_task_key in sentinel.master.keys(), "reserve task key must exist in redis cache" + + # release reserve task lock + expiry = 1 + release_reserve_task_lock_by_id(project.id, task.id, user.id, timeout, expiry=expiry) + time.sleep(expiry) + assert expected_reserve_task_key not in sentinel.master.keys(), "reserve task key should not exist in redis cache" + + + @with_context + def test_get_reserve_task_key(self): + category_fields = ["field_1", "field_2"] + task_info = dict(field_1="abc", field_2=123) + expected_key = ":".join(["{}:{}".format(field, task_info[field]) for field in sorted(category_fields)]) + user = UserFactory.create() + # project w/o reserve_tasks configured don't acquire lock + project = ProjectFactory.create( + owner=user, + info=dict( + sched="task_queue_scheduler", + reserve_tasks=dict( + category=category_fields + ) + ) + ) + task = TaskFactory.create_batch( + 1, project=project, n_answers=1, + info=task_info + )[0] + reserve_key = get_reserve_task_key(task.id) + assert reserve_key == expected_key, "reserve key expected to be {}".format(expected_key) \ No newline at end of file diff --git a/test/test_web.py b/test/test_web.py index f2b422fa2e..b75af6463c 100644 --- a/test/test_web.py +++ b/test/test_web.py @@ -9233,6 +9233,61 @@ def test_delete_task(self): assert res.status_code == 200, res.status_code assert len(task_repo.filter_tasks_by(project_id=project.id)) == 0 + @with_context + def test_reserve_task_not_present(self): + """Reserve task to return first available task when reserve task category task not present""" + admin = UserFactory.create(admin=True) + admin.set_password('1234') + user_repo.save(admin) + self.signin(email=admin.email_addr, password='1234') + + category_config = ["field_1", "field_2"] + project = ProjectFactory.create( + zip_download=True, owner=admin, + info=dict( + sched="task_queue_scheduler", + reserve_tasks=dict( + category=category_config + ) + ) + ) + task = TaskFactory.create_batch(3, project=project, info=dict(x=1, y=2)) + res = self.app.get('api/project/%s/newtask' % project.id) + data = json.loads(res.data) + assert data["id"] == task[0].id, "First available task to be presented when no task exist with reserver category" + + @with_context + @patch('pybossa.redis_lock.LockManager.get_task_category_lock') + def test_reserve_task_all_reserved_tasks_consumed(self, get_task_category_lock): + """Reserve task to return first available task when all reserve task category task were consumed""" + + admin = UserFactory.create(admin=True) + admin.set_password('1234') + user_repo.save(admin) + self.signin(email=admin.email_addr, password='1234') + + category_config = ["field_1", "field_2"] + project = ProjectFactory.create( + zip_download=True, owner=admin, + info=dict( + sched="task_queue_scheduler", + reserve_tasks=dict( + category=category_config + ) + ) + ) + get_task_category_lock.side_effect = [[ + "reserve_task:project:{}:category:field_1:value1:field_2:value2:user:{}:task:454".format(project.id, admin.id) + ], []] + + task = TaskFactory.create_batch(3, project=project, info=dict(x=1, y=2)) + res = self.app.get('api/project/%s/newtask' % project.id) + data = json.loads(res.data) + assert data["id"] == task[0].id, "First available task to be presented when all tasks with reserver category are consumed" + + + + class TestWebUserMetadataUpdate(web.Helper): From b1c2bde8c7d6a779238c41fc2a5393408d37387d Mon Sep 17 00:00:00 2001 From: Deepsingh Chhabda Date: Fri, 3 Dec 2021 14:48:50 -0500 Subject: [PATCH 9/9] add logging --- pybossa/sched.py | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/pybossa/sched.py b/pybossa/sched.py index 526364b77c..cf0981d089 100644 --- a/pybossa/sched.py +++ b/pybossa/sched.py @@ -287,10 +287,15 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, if not category_keys: # no category reserved by current user. search categories # excluding the ones reserved by other users + current_app.logger.info( + "Project %s, user %s, %s", project_id, user_id, + "No task category reserved by user. Search tasks excuding categories reserved by other users" + ) exclude_user = True sql_filters, category_keys = get_reserve_task_category_info( reserve_task_config, project_id, timeout, user_id, exclude_user ) + current_app.logger.info("SQL filter excuding task categories reserved by other users. sql filter %s", sql_filters) limit = current_app.config.get('DB_MAXIMUM_BATCH_SIZE') if filter_user_prefs else user_count + 5 sql = query_factory(project_id, user_id=user_id, limit=limit, @@ -306,11 +311,16 @@ def template_get_locked_task(project_id, user_id=None, user_ip=None, # no ongoing tasks with task category reserved by user exist. # Hence, query db for tasks excluding task categories reserved # by other users passing exclude_users = True + current_app.logger.info( + "Project %s, user %s, %s", project_id, user_id, + "No task exist with task category already reserved by user. Search tasks excuding categories reserved by other users" + ) exclude_user = True release_reserve_task_lock_by_keys(category_keys, timeout) sql_filters, category_keys = get_reserve_task_category_info( reserve_task_config, project_id, timeout, user_id, exclude_user ) + current_app.logger.info("SQL filter excuding task categories reserved by other users. sql filter %s", sql_filters) sql = query_factory(project_id, user_id=user_id, limit=limit, rand_within_priority=rand_within_priority, task_type=task_type, task_category_filters=sql_filters) @@ -363,6 +373,8 @@ def reserve_task_sql_filters(project_id, reserve_task_keys, exclude): # ex "co:name:IBM:ticker:IBM_US" would be converted to # "task.info->>'co_name' IN ('IBM')" filters_dict = {} + current_app.logger.info("Project %s, exclude %s. Build sql filter from reserver task keys", project_id, exclude) + current_app.logger.info("reserve tasks keys: %s", json.dumps(reserve_task_keys)) regex_key = "reserve_task:project:{}:category:(.+?):user".format(project_id) for item in reserve_task_keys: data = re.search(regex_key, item) @@ -387,11 +399,13 @@ def reserve_task_sql_filters(project_id, reserve_task_keys, exclude): val = ["'{}'".format(val) for val in value] data += ["task.info->>'{}' IN ({})".format(key, ", ".join(val))] if not data: + current_app.logger.info("sql filter %s, reserve keys %s", filters, json.dumps(category_keys)) return filters, category_keys exclude_clause = "IS NOT TRUE" if exclude else "" filters = "({}) {}".format(" AND ".join(data), exclude_clause) filters = " AND {}".format(filters) if filters else filters + current_app.logger.info("sql filter %s, reserve keys %s", filters, json.dumps(category_keys)) return filters, category_keys @@ -425,8 +439,11 @@ def get_reserve_task_category_info(reserve_task_config, project_id, timeout, use category = ":".join(["{}:*".format(field) for field in sorted(reserve_task_config)]) lock_manager = LockManager(sentinel.master, timeout) - category_keys = lock_manager.get_task_category_lock(project_id, user_id, category, exclude_user) + current_app.logger.info( + "Project %s, user %s, reserve config %s, exclude %s. reserve task category keys %s", + project_id, user_id, json.dumps(reserve_task_config), exclude_user, str(category_keys) + ) if not category_keys: return sql_filters, category_keys @@ -573,6 +590,10 @@ def release_reserve_task_lock_by_id(project_id, task_id, user_id, timeout, expir resource_id = "reserve_task:project:{}:category:{}:user:{}:task:{}".format( project_id, reserve_key, user_id, task_id) lock_manager.release_reserve_task_lock(resource_id, pipeline, expiry) + current_app.logger.info( + "Release reserve task lock. project %s, task %s, user %s, expiry %d", + project_id, task_id, user_id, expiry + ) def release_reserve_task_lock_by_keys(resource_ids, timeout, pipeline=None, expiry=EXPIRE_RESERVE_TASK_LOCK_DELAY): @@ -584,6 +605,8 @@ def release_reserve_task_lock_by_keys(resource_ids, timeout, pipeline=None, expi lock_manager = LockManager(redis_conn, timeout) for resource_id in resource_ids: lock_manager.release_reserve_task_lock(resource_id, pipeline, expiry) + current_app.logger.info( + "Release reserve task lock. resource id %s, expiry %d", resource_id, expiry) def acquire_reserve_task_lock(project_id, task_id, user_id, timeout, pipeline=None, execute=True): @@ -603,6 +626,10 @@ def acquire_reserve_task_lock(project_id, task_id, user_id, timeout, pipeline=No pipeline = pipeline or redis_conn.pipeline(transaction=True) lock_manager = LockManager(redis_conn, timeout) if lock_manager.acquire_reserve_task_lock(project_id, task_id, user_id, category): + current_app.logger.info( + "Acquired reserve task lock. project %s, task %s, user %s, category %s", + project_id, task_id, user_id, category + ) return True return False