From 2a15ef1d9772b17f37bbf4d3095483b0f8788fdd Mon Sep 17 00:00:00 2001 From: Evan Date: Mon, 21 Mar 2022 16:03:08 -0400 Subject: [PATCH] Implement batch scheduling bridge call; fixes #1318 --- judge/admin/contest.py | 10 +++--- judge/admin/submission.py | 6 ++-- judge/bridge/django_handler.py | 19 ++++++++++ judge/judgeapi.py | 66 ++++++++++++++++++++++++++++++++-- judge/models/submission.py | 34 +++++++++++++++--- judge/tasks/submission.py | 8 ++--- 6 files changed, 122 insertions(+), 21 deletions(-) diff --git a/judge/admin/contest.py b/judge/admin/contest.py index 23d12c7e07..f6bef6df92 100644 --- a/judge/admin/contest.py +++ b/judge/admin/contest.py @@ -14,7 +14,7 @@ from reversion.admin import VersionAdmin from django_ace import AceWidget -from judge.models import Class, Contest, ContestProblem, ContestSubmission, Profile, Rating, Submission +from judge.models import Class, Contest, ContestProblem, Profile, Rating, Submission from judge.ratings import rate_contest from judge.utils.views import NoBatchDeleteMixin from judge.widgets import AdminHeavySelect2MultipleWidget, AdminHeavySelect2Widget, AdminMartorWidget, \ @@ -274,13 +274,13 @@ def get_urls(self): ] + super(ContestAdmin, self).get_urls() def rejudge_view(self, request, contest_id, problem_id): - queryset = ContestSubmission.objects.filter(problem_id=problem_id).select_related('submission') - for model in queryset: - model.submission.judge(rejudge=True, rejudge_user=request.user) + judged = sum(Submission.objects.filter(contest__problem_id=problem_id).batch_judge( + rejudge=True, rejudge_user=request.user, + )) self.message_user(request, ngettext('%d submission was successfully scheduled for rejudging.', '%d submissions were successfully scheduled for rejudging.', - len(queryset)) % len(queryset)) + judged) % judged) return HttpResponseRedirect(reverse('admin:judge_contest_change', args=(contest_id,))) def rate_all_view(self, request): diff --git a/judge/admin/submission.py b/judge/admin/submission.py index 0daee14972..8a67f40603 100644 --- a/judge/admin/submission.py +++ b/judge/admin/submission.py @@ -161,7 +161,6 @@ def judge(self, request, queryset): self.message_user(request, gettext('You do not have the permission to rejudge submissions.'), level=messages.ERROR) return - queryset = queryset.order_by('id') if not request.user.has_perm('judge.rejudge_submission_lot') and \ queryset.count() > settings.DMOJ_SUBMISSIONS_REJUDGE_LIMIT: self.message_user(request, gettext('You do not have the permission to rejudge THAT many submissions.'), @@ -170,9 +169,8 @@ def judge(self, request, queryset): if not request.user.has_perm('judge.edit_all_problem'): id = request.profile.id queryset = queryset.filter(Q(problem__authors__id=id) | Q(problem__curators__id=id)) - judged = len(queryset) - for model in queryset: - model.judge(rejudge=True, batch_rejudge=True, rejudge_user=request.user) + + judged = sum(queryset.batch_judge(rejudge=True, rejudge_user=request.user)) self.message_user(request, ngettext('%d submission was successfully scheduled for rejudging.', '%d submissions were successfully scheduled for rejudging.', judged) % judged) diff --git a/judge/bridge/django_handler.py b/judge/bridge/django_handler.py index b284b68f78..eea1ce8cbc 100644 --- a/judge/bridge/django_handler.py +++ b/judge/bridge/django_handler.py @@ -14,6 +14,7 @@ def __init__(self, request, client_address, server, judges): self.handlers = { 'submission-request': self.on_submission, + 'batch-submission-request': self.on_batch_submission, 'terminate-submission': self.on_termination, 'disconnect-judge': self.on_disconnect_request, } @@ -44,6 +45,24 @@ def on_submission(self, data): self.judges.judge(id, problem, language, source, judge_id, priority) return {'name': 'submission-received', 'submission-id': id} + def on_batch_submission(self, data): + ids = [] + judge_id = data['judge-id'] + for submission in data['submissions']: + priority = submission['priority'] + if not self.judges.check_priority(priority): + return {'name': 'bad-request'} + + for submission in data['submissions']: + id = submission['submission-id'] + problem = submission['problem-id'] + language = submission['language'] + source = submission['source'] + self.judges.judge(id, problem, language, source, judge_id, priority) + ids.append(id) + + return {'name': 'submissions-received', 'submission-ids': ids} + def on_termination(self, data): return {'name': 'submission-received', 'judge-aborted': self.judges.abort(data['submission-id'])} diff --git a/judge/judgeapi.py b/judge/judgeapi.py index 42bc000f5c..02821e772f 100644 --- a/judge/judgeapi.py +++ b/judge/judgeapi.py @@ -3,8 +3,12 @@ import socket import struct import zlib +from operator import attrgetter from django.conf import settings +from django.db import transaction +from django.db.models import BooleanField, F, OuterRef, Subquery +from django.db.models.functions import Coalesce from django.utils import timezone from judge import event_poster as event @@ -50,11 +54,11 @@ def judge_request(packet, reply=True): return result -def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=None): +def judge_submission(submission, rejudge=False, judge_id=None): from .models import ContestSubmission, Submission, SubmissionTestCase updates = {'time': None, 'memory': None, 'points': None, 'result': None, 'case_points': 0, 'case_total': 0, - 'error': None, 'rejudged_date': timezone.now() if rejudge or batch_rejudge else None, 'status': 'QU'} + 'error': None, 'rejudged_date': timezone.now() if rejudge else None, 'status': 'QU'} try: # This is set proactively; it might get unset in judgecallback's on_grading_begin if the problem doesn't # actually have pretests stored on the judge. @@ -86,7 +90,7 @@ def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=No 'language': submission.language.key, 'source': submission.source.source, 'judge-id': judge_id, - 'priority': BATCH_REJUDGE_PRIORITY if batch_rejudge else (REJUDGE_PRIORITY if rejudge else priority), + 'priority': REJUDGE_PRIORITY if rejudge else priority, }) except BaseException: logger.exception('Failed to send request to judge') @@ -100,6 +104,62 @@ def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=No return success +def batch_judge_submission(submissions, rejudge=False, judge_id=None): + from .models import Submission, SubmissionTestCase + updates = { + 'time': None, 'memory': None, 'points': None, 'result': None, 'case_points': 0, 'case_total': 0, + 'error': None, 'rejudged_date': timezone.now() if rejudge else None, 'status': 'QU', + 'is_pretested': Coalesce( + Subquery( + Submission.objects.filter(pk=OuterRef('pk')) + .annotate(pretested=F('contest_object__run_pretests_only').bitand(F('contest__problem__is_pretested'))) + .values_list('pretested')[:1], + output_field=BooleanField(), + ), + False, + ), + } + + with transaction.atomic(): + submission_queryset = Submission.objects.filter(id__in=map(attrgetter('id'), submissions)) \ + .exclude(status__in=('P', 'G')) + submission_queryset.update(**updates) + + ids = set(submission_queryset.values_list('id', flat=True)) + # Do the filtering using the new set of IDs rather than submission_queryset + # because submission_queryset itself takes a list of IDs anyways. + SubmissionTestCase.objects.filter(submission_id__in=ids).delete() + + submissions = [submission for submission in submissions if submission.id in ids] + + try: + response = judge_request({ + 'name': 'batch-submission-request', + 'submissions': [{ + 'submission-id': submission.id, + 'problem-id': submission.problem.code, + 'language': submission.language.key, + 'source': submission.source.source, + 'priority': BATCH_REJUDGE_PRIORITY if rejudge else ( + CONTEST_SUBMISSION_PRIORITY if submission.contest_object is not None else DEFAULT_PRIORITY + ) + } for submission in submissions], + 'judge-id': judge_id, + }) + except BaseException: + logger.exception('Failed to send request to judge') + processed_ids = set() + else: + processed_ids = set(response['submission-ids']) if 'submission-ids' in response else set() + + if processed_ids != ids: + Submission.objects.filter(id__in=ids - processed_ids).update(status='IE', result='IE', error='') + + for submission in submissions: + # If the submission is not in processed_ids, that means the submission IE'd and thus is "done" judging. + _post_update_submission(submission, done=submission.id not in processed_ids) + + def disconnect_judge(judge, force=False): judge_request({'name': 'disconnect-judge', 'judge-id': judge.name, 'force': force}, reply=False) diff --git a/judge/models/submission.py b/judge/models/submission.py index 555c38ebfb..c702a24c65 100644 --- a/judge/models/submission.py +++ b/judge/models/submission.py @@ -3,17 +3,19 @@ from django.conf import settings from django.core.exceptions import ObjectDoesNotExist -from django.db import models +from django.db import models, transaction +from django.db.models.query import QuerySet from django.urls import reverse from django.utils import timezone from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ from reversion import revisions -from judge.judgeapi import abort_submission, judge_submission +from judge.judgeapi import abort_submission, batch_judge_submission, judge_submission from judge.models.problem import Problem, SubmissionSourceAccess from judge.models.profile import Profile from judge.models.runtime import Language +from judge.utils.iterator import chunk from judge.utils.unicode import utf8bytes __all__ = ['SUBMISSION_RESULT', 'Submission', 'SubmissionSource', 'SubmissionTestCase'] @@ -33,6 +35,28 @@ ) +class BatchJudgeSubmissionQuerySet(QuerySet): + def batch_judge(self, *, rejudge=False, force_judge=False, rejudge_user=None, chunk_size=100, **kwargs): + # Don't trust the caller to follow related objects that we need. + submissions = self.select_related('problem', 'language', 'source', 'contest_object') + + if not force_judge: + submissions = submissions.exclude(locked_after__lt=timezone.now()) + + for current_chunk in chunk(submissions.iterator(chunk_size=chunk_size), chunk_size): + if rejudge: + with transaction.atomic(): + for submission in current_chunk: + with revisions.create_revision(manage_manually=True): + if rejudge_user: + revisions.set_user(rejudge_user) + revisions.set_comment('Rejudged') + revisions.add_to_revision(submission) + + batch_judge_submission(current_chunk, rejudge=rejudge, **kwargs) + yield len(current_chunk) + + @revisions.register(follow=['test_cases']) class Submission(models.Model): STATUS = ( @@ -88,6 +112,8 @@ class Submission(models.Model): on_delete=models.SET_NULL, related_name='+') locked_after = models.DateTimeField(verbose_name=_('submission lock'), null=True, blank=True) + objects = BatchJudgeSubmissionQuerySet() + @classmethod def result_class_from_code(cls, result, case_points, case_total): if result == 'AC': @@ -119,7 +145,7 @@ def long_status(self): def is_locked(self): return self.locked_after is not None and self.locked_after < timezone.now() - def judge(self, *args, rejudge=False, force_judge=False, rejudge_user=None, **kwargs): + def judge(self, *, rejudge=False, force_judge=False, rejudge_user=None, **kwargs): if force_judge or not self.is_locked: if rejudge: with revisions.create_revision(manage_manually=True): @@ -127,7 +153,7 @@ def judge(self, *args, rejudge=False, force_judge=False, rejudge_user=None, **kw revisions.set_user(rejudge_user) revisions.set_comment('Rejudged') revisions.add_to_revision(self) - judge_submission(self, *args, rejudge=rejudge, **kwargs) + judge_submission(self, rejudge=rejudge, **kwargs) judge.alters_data = True diff --git a/judge/tasks/submission.py b/judge/tasks/submission.py index a190ba71ee..ca687b49c1 100644 --- a/judge/tasks/submission.py +++ b/judge/tasks/submission.py @@ -31,11 +31,9 @@ def rejudge_problem_filter(self, problem_id, id_range=None, languages=None, resu rejudged = 0 with Progress(self, queryset.count()) as p: - for submission in queryset.iterator(): - submission.judge(rejudge=True, batch_rejudge=True, rejudge_user=user) - rejudged += 1 - if rejudged % 10 == 0: - p.done = rejudged + for completed in queryset.batch_judge(rejudge=True, rejudge_user=user): + p.did(completed) + rejudged += completed return rejudged