Skip to content

Commit

Permalink
Merge b96f1f2 into 0d992fb
Browse files Browse the repository at this point in the history
  • Loading branch information
dchhabda committed Dec 7, 2022
2 parents 0d992fb + b96f1f2 commit 5b59204
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 43 deletions.
54 changes: 39 additions & 15 deletions pybossa/cache/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@
from pybossa.cache import task_browse_helpers as cached_task_browse_helpers
from pybossa.sched import Schedulers, get_reserve_task_category_info
from pybossa.contributions_guard import ContributionsGuard
import pandas as pd
from pybossa.sched import get_reserved_categories_cache_keys
from pybossa.util import (
cached_keys_to_reserved_categories,
reserved_category_to_dataframe_query
)


session = db.slave_session
TIMEOUT = ContributionsGuard.STAMP_TTL
Expand Down Expand Up @@ -187,10 +194,9 @@ def n_available_tasks_for_user(project, user_id=None, user_ip=None):
scheduler = project_info.get('sched', 'default')
project_id = project['id'] if type(project) == dict else project.id
reserve_task_config = project_info.get("reserve_tasks", {}).get("category", [])

# Temporarily return magic number to prevent heavy db load with reserved category
if scheduler == Schedulers.task_queue and reserve_task_config:
return 10
res_category_fields = [f"task.info->>'{field}' AS {field}" for field in reserve_task_config]
res_category_fields = ", ".join(res_category_fields)
category_fields = f", {res_category_fields}" if res_category_fields else ""

if scheduler not in [Schedulers.user_pref, Schedulers.task_queue]:
sql = '''
Expand All @@ -204,34 +210,52 @@ def n_available_tasks_for_user(project, user_id=None, user_ip=None):
else:
user_pref_list = cached_users.get_user_preferences(user_id)
user_filter_list = cached_users.get_user_filters(user_id)

timeout = project_info.get("timeout", TIMEOUT)
reserve_task_filter, _ = get_reserve_task_category_info(reserve_task_config, project_id, timeout, user_id, True)
sql = '''
SELECT task.id, worker_filter FROM task
sql = f'''
SELECT task.id as task_id, worker_filter, user_pref {category_fields}
FROM task
WHERE project_id=:project_id AND state !='completed'
AND state !='enrich'
{}
AND id NOT IN
(SELECT task_id FROM task_run WHERE
project_id=:project_id AND user_id=:user_id)
AND ({})
AND ({})
;'''.format(reserve_task_filter, user_pref_list, user_filter_list)
AND ({user_pref_list})
AND ({user_filter_list})
;'''
sqltext = text(sql)
try:
result = session.execute(sqltext, dict(project_id=project_id, user_id=user_id, assign_user=assign_user))
params = dict(project_id=project_id, user_id=user_id, assign_user=assign_user)
result = session.execute(sqltext, params)
current_app.logger.info("n_available_tasks_for_user making db request for project_id %d. user_id %d", project_id, user_id)
if scheduler not in [Schedulers.user_pref, Schedulers.task_queue]:
for row in result:
n_tasks = row.n_tasks
return n_tasks
else:
num_available_tasks = 0
_, other_users_category_keys, _ = get_reserved_categories_cache_keys(reserve_task_config, project_id, timeout, user_id)
other_users_categories = cached_keys_to_reserved_categories(project_id, other_users_category_keys)

user_profile = cached_users.get_user_profile_metadata(user_id)
user_profile = json.loads(user_profile) if user_profile else {}
for task_id, w_filter in result:
w_filter = w_filter or {}

if not result:
return 0

headers = list(result.keys())
rows = result.fetchall()
data = pd.DataFrame(rows, columns=headers)
# exclude tasks reserved by users other than current user.
# this will cover all tasks that are reserved by current user
# plus all available tasks not reserved by any other user.
exclude_query = reserved_category_to_dataframe_query(reserve_task_config, other_users_categories, negate_query=True)
if exclude_query:
data = data.query(exclude_query)

# filter records by worker_filter, user_pref
for _, row in data.iterrows():
task_id = row["task_id"]
w_filter = row["worker_filter"] or {}
num_available_tasks += int(
cached_task_browse_helpers.user_meet_task_requirement(task_id, w_filter, user_profile)
)
Expand Down
32 changes: 15 additions & 17 deletions pybossa/redis_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,43 +249,41 @@ def _release_expired_reserve_task_locks(self, resource_id, now):
def seconds_remaining(expiration):
return float(expiration) - time()

def get_task_category_lock(self, project_id, user_id=None, category=None, exclude_user=False, task_id=None):
def get_task_category_lock(self, project_id, user_id=None, category=None, task_id=None):
"""
Returns True when task category for a given user
can be reserved or its already reserved, False otherwise.
Returns:
1. task categories reserved for a given user.
2. task categories reserved by users other than current user.
3. task categories reserved for a given user and other users.
To fetch task category for all users who've reserved the category, pass user_id = None
To fetch task category for all tasks reserved, pass task_id = None
To fetch task category other than user_id, pass exclude_user = True
"""

user_category_keys, other_users_category_keys, all_users_category_keys = [], [], []
if not project_id:
raise BadRequest('Missing required parameters')

# with exclude_user set to True, user_id is to be excluded from list of
# task category found for all users. raise error if user_id not passed
if exclude_user and not user_id:
raise BadRequest('Missing user id')

# release expired task reservations
self._release_expired_reserve_for_project(project_id)

resource_id = "reserve_task:project:{}:category:{}:user:{}:task:{}".format(
resource_id = "reserve_task:project:{}:category:{}:user:*:task:{}".format(
project_id,
"*" if not category else category,
"*" if not user_id or exclude_user else user_id,
"*" if not task_id else task_id
)

category_keys = self.get_reservation_keys(resource_id)
all_users_category_keys = self.get_reservation_keys(resource_id)

# if key present but for different user, with redundancy = 1, return false
# TODO: for redundancy > 1, check if additional task run
# available for this user and if so, return category_key else ""
if exclude_user:
# exclude user_id from list of keys passed
drop_user = ":user:{}:task:".format(user_id)
category_keys = [ key for key in category_keys if drop_user not in key ]
return category_keys
# generate user specific reserved categories
if user_id:
user_categories = f":user:{user_id}:task:"
user_category_keys = [key for key in all_users_category_keys if user_categories in key]
other_users_category_keys = [key for key in all_users_category_keys if user_categories not in key]

return user_category_keys, other_users_category_keys, all_users_category_keys

def acquire_reserve_task_lock(self, project_id, task_id, user_id, category):
if not(project_id and user_id and task_id and category):
Expand Down
37 changes: 36 additions & 1 deletion pybossa/sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,17 @@ def get_reserve_task_category_info(reserve_task_config, project_id, timeout, use
project_id, str(reserve_task_config))
return sql_filters, category_keys

# with exclude_user set to True, user_id is to be excluded from list of
# task category found for all users. raise error if user_id not passed
if exclude_user and not user_id:
raise BadRequest('Missing user id')

category = ":".join(["{}:*".format(field) for field in sorted(reserve_task_config)])
lock_manager = LockManager(sentinel.master, timeout)
category_keys = lock_manager.get_task_category_lock(project_id, user_id, category, exclude_user)
user_category_keys, other_user_category_keys, all_category_keys = lock_manager.get_task_category_lock(project_id, user_id, category)

# with exclude_user True and user_id passed, return other users category keys
category_keys = other_user_category_keys if exclude_user else all_category_keys
current_app.logger.info(
"Project %s, user %s, reserve config %s, exclude %s. reserve task category keys %s",
project_id, user_id, json.dumps(reserve_task_config), exclude_user, str(category_keys)
Expand All @@ -461,6 +469,33 @@ def get_reserve_task_category_info(reserve_task_config, project_id, timeout, use
return sql_filters, category_keys


def get_reserved_categories_cache_keys(reserve_task_config, project_id, timeout, user_id):
"""Get reserved task categories from cache. Returns redis keys for user categories,
redis keys for other users categories and consolidated all categories"""

timeout = timeout or TIMEOUT
user_category_keys, other_user_category_keys, all_user_category_keys = [], [], []

if not reserve_task_config:
return user_category_keys, other_user_category_keys, all_user_category_keys

if current_app.config.get('PRIVATE_INSTANCE'):
current_app.logger.info("Cached categories not found. Reserve task by category disabled for private instance. project_id %s, reserve_task_config %s",
project_id, str(reserve_task_config))
return user_category_keys, other_user_category_keys, all_user_category_keys

category = ":".join(["{}:*".format(field) for field in sorted(reserve_task_config)])
lock_manager = LockManager(sentinel.master, timeout)
user_category_keys, other_user_category_keys, all_user_category_keys = lock_manager.get_task_category_lock(
project_id, user_id, category)

current_app.logger.info("get_reserved_categories_cache_keys. project %s, user %s, reserve task config %s",project_id, user_id, json.dumps(reserve_task_config))
current_app.logger.info("user_category_keys: %s", str(user_category_keys))
current_app.logger.info("other_user_category_keys: %s", str(other_user_category_keys))
current_app.logger.info("all_user_category_keys: %s", str(all_user_category_keys))
return user_category_keys, other_user_category_keys, all_user_category_keys


def locked_task_sql(project_id, user_id=None, limit=1, rand_within_priority=False,
task_type='gold_last', filter_user_prefs=False,
priority_sort=True, task_category_filters=""):
Expand Down
45 changes: 45 additions & 0 deletions pybossa/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from sqlalchemy import text
from sqlalchemy.exc import ProgrammingError
from yacryptopan import CryptoPAn
from werkzeug.exceptions import BadRequest

from pybossa.cloud_store_api.connection import create_connection
from pybossa.cloud_store_api.s3 import get_file_from_s3, delete_file_from_s3
Expand Down Expand Up @@ -1332,3 +1333,47 @@ def remove_element_attributes(page_element, attribute):
for attr in attributes:
if page_element.has_attr(attr):
del page_element[attr]

def cached_keys_to_reserved_categories(project_id, category_keys):
# convert task category redis cache key to list of reserved categories
# eg "co_name:IBM:ticker:IBM_US" would be converted to
# [{"co_name": "IBM", "ticker": "IBM_US"}]

reserved_categories = []
unique_categories = []

if not project_id:
raise BadRequest("Missing project id")

regex_key = f"reserve_task:project:{project_id}:category:(.+?):user"
for item in category_keys:
data = re.search(regex_key, item)
if not data:
continue

category = data.group(1)
if category in unique_categories:
continue
unique_categories.append(category)

category_fv = category.split(":")
category_dict = {category_fv[i]: category_fv[i + 1] for i in range(0, len(category_fv), 2)}
reserved_categories.append(category_dict)
return reserved_categories

def reserved_category_to_dataframe_query(reserved_categories, categories, negate_query=False):
df_query = ""
if not(reserved_categories and categories):
return df_query

df_filters = []
for category in categories:
filters = [f"{res_cat} == '{category[res_cat]}'" if isinstance(category[res_cat], str) else f"{res_cat} == {category[res_cat]}" for res_cat in reserved_categories]
df_filters.append(" & ".join(filters))

concat_op = "&" if negate_query else "|"
prefix_query = "~" if negate_query else ""

df_query = f") {concat_op} {prefix_query}(".join(df_filters)
df_query = f"{prefix_query}({df_query})"
return df_query
Loading

0 comments on commit 5b59204

Please sign in to comment.