Skip to content

Commit

Permalink
Have backend Celery tasks print slightly more status info
Browse files Browse the repository at this point in the history
- for regular database maintenance
- for project popularity computations
  • Loading branch information
ds283 committed Mar 25, 2024
1 parent 4c406fd commit 71748ae
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
42 changes: 40 additions & 2 deletions app/tasks/maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# Contributors: David Seery <D.Seery@sussex.ac.uk>
#

from datetime import datetime, date
from datetime import datetime, date, timedelta
from io import BytesIO
from pathlib import Path
from typing import List, Iterable, Mapping, Union
Expand All @@ -18,7 +18,7 @@
from celery.exceptions import Ignore
from flask import current_app, render_template
from flask_mailman import EmailMessage
from sqlalchemy import or_
from sqlalchemy import or_, and_
from sqlalchemy.exc import SQLAlchemyError

import app.shared.cloud_object_store.encryption_types as encryptions
Expand Down Expand Up @@ -47,6 +47,7 @@
validate_nonce,
SubmittingStudent,
SelectingStudent,
PopularityRecord,
)
from ..shared.asset_tools import AssetCloudAdapter, AssetCloudScratchContextManager, AssetUploadManager
from ..shared.cloud_object_store import ObjectStore
Expand All @@ -58,24 +59,38 @@ def register_maintenance_tasks(celery):
def maintenance(self):
self.update_state(state=states.STARTED)

print("database maintenance: performing maintenance for ProjectClass records")
project_classes_maintenance(self)

print("database maintenance: performing maintenance for Project records")
projects_maintenance(self)
print("database maintenance: performing maintenance for LiveProject records")
liveprojects_maintenance(self)

print("database maintenance: performing maintenance for ProjectDescription records")
project_descriptions_maintenance(self)

print("database maintenance: performing maintenance for StudentData records")
students_data_maintenance(self)
print("database maintenance: performing maintenance for SubmittingStudent records")
submitting_student_maintenance(self)

print("database maintenance: performing maintenance for AssessorAttendanceData records")
assessor_attendance_maintenance(self)
print("database maintenance: performing maintenance for SubmitterAttendanceData records")
submitter_attendance_maintenance(self)

print("database maintenance: performing maintenance for MatchingEnumeration records")
matching_enumeration_maintenance(self)
print("database maintenance: performing maintenance for ScheduleEnumeration records")
schedule_enumeration_maintenance(self)

print("database maintenance: performing maintenance for SubmissionRecord records")
submission_record_maintenance(self)

print("database maintenance: performing maintenance for PopularityRecord records")
popularity_record_maintenance(self)

self.update_state(state=states.SUCCESS)

@celery.task(bind=True, serializer="pickle", default_retry_delay=30)
Expand All @@ -90,6 +105,29 @@ def fix_unencrypted_assets(self):

self.update_state(state=states.SUCCESS)

def popularity_record_maintenance(self):
# remove any stale PopularityRecord instances (at least 1 day old) that did not get properly filled in
now = datetime.now()
cutoff = now - timedelta(days=1)

try:
db.session.query(PopularityRecord).filter(
and_(
PopularityRecord.datestamp <= cutoff,
or_(
PopularityRecord.score_rank == None,
PopularityRecord.selections_rank == None,
PopularityRecord.bookmarks_rank == None,
PopularityRecord.views_rank == None,
),
)
).delete()
db.session.commit()

except SQLAlchemyError as e:
current_app.logger.exception("SQLAlchemyError exception", exc_info=e)
raise self.retry()

def submitting_student_maintenance(self):
current_year = get_current_year()
try:
Expand Down
12 changes: 12 additions & 0 deletions app/tasks/popularity.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,29 +253,41 @@ def update_project_popularity_data(self, pid):

# we would prefer not to use disable_sync_subtasks, which is discouraged, but needed so that we can call .get() within
# another task - that's needed to simulate the chord behaviour
print(f"update_project_popularity_data({pid}={pcl.name}: compute popularity data for individual projects (via Celery group)")
compute = group(compute_popularity_data.si(proj.id, datestamp, uuid, num_live) for proj in config.live_projects)

compute_task: GroupResult = compute.apply_async()
print(f"update_project_popularity_data({pid}={pcl.name}): waiting for asynchronous group result")
compute_task.get(disable_sync_subtasks=False)
compute_task.forget()

print(f"update_project_popularity_data({pid}={pcl.name}): compute popularity score ranks")
score_rank_task: AsyncResult = compute_popularity_score_rank.si(config.id, uuid, num_live).apply_async()
print(f"update_project_popularity_data({pid}={pcl.name}): waiting for asynchronous group result")
lowest_rank = score_rank_task.get(disable_sync_subtasks=False)
print(f"update_project_popularity_data({pid}={pcl.name}): store lowest popularity score rank")
store_task: AsyncResult = store_lowest_popularity_score_rank.s(lowest_rank, config.id, uuid, num_live).apply_async()
print(f"update_project_popularity_data({pid}={pcl.name}): waiting for asynchronous group result")
store_task.get(disable_sync_subtasks=False)

score_rank_task.forget()
store_task.forget()

print(f"update_project_popularity_data({pid}={pcl.name}): compute views rank")
view_rank_task: AsyncResult = compute_views_rank.si(config.id, uuid, num_live).apply_async()
print(f"update_project_popularity_data({pid}={pcl.name}): waiting for asynchronous group result")
view_rank_task.get(disable_sync_subtasks=False)
view_rank_task.forget()

print(f"update_project_popularity_data({pid}={pcl.name}): compute bookmarks rank")
bookmarks_rank_task: AsyncResult = compute_bookmarks_rank.si(config.id, uuid, num_live).apply_async()
print(f"update_project_popularity_data({pid}={pcl.name}): waiting for asynchronous group result")
bookmarks_rank_task.get(disable_sync_subtasks=False)
bookmarks_rank_task.forget()

print(f"update_project_popularity_data({pid}={pcl.name}): compute selections rank")
selections_rank_task: AsyncResult = compute_selections_rank.si(config.id, uuid, num_live).apply_async()
print(f"update_project_popularity_data({pid}={pcl.name}): waiting for asynchronous group result")
selections_rank_task.get(disable_sync_subtasks=False)
selections_rank_task.forget()

Expand Down

0 comments on commit 71748ae

Please sign in to comment.