Skip to content

Commit

Permalink
Implement batch scheduling bridge call; fixes #1318
Browse files Browse the repository at this point in the history
  • Loading branch information
Ninjaclasher committed Mar 23, 2022
1 parent c74956a commit 825d4db
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 21 deletions.
10 changes: 5 additions & 5 deletions judge/admin/contest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from reversion.admin import VersionAdmin

from django_ace import AceWidget
from judge.models import Class, Contest, ContestProblem, ContestSubmission, Profile, Rating, Submission
from judge.models import Class, Contest, ContestProblem, Profile, Rating, Submission
from judge.ratings import rate_contest
from judge.utils.views import NoBatchDeleteMixin
from judge.widgets import AdminHeavySelect2MultipleWidget, AdminHeavySelect2Widget, AdminMartorWidget, \
Expand Down Expand Up @@ -274,13 +274,13 @@ def get_urls(self):
] + super(ContestAdmin, self).get_urls()

def rejudge_view(self, request, contest_id, problem_id):
queryset = ContestSubmission.objects.filter(problem_id=problem_id).select_related('submission')
for model in queryset:
model.submission.judge(rejudge=True, rejudge_user=request.user)
judged = sum(Submission.batch_judge(
Submission.objects.filter(contest__problem_id=problem_id), rejudge=True, rejudge_user=request.user,
))

self.message_user(request, ngettext('%d submission was successfully scheduled for rejudging.',
'%d submissions were successfully scheduled for rejudging.',
len(queryset)) % len(queryset))
judged) % judged)
return HttpResponseRedirect(reverse('admin:judge_contest_change', args=(contest_id,)))

def rate_all_view(self, request):
Expand Down
6 changes: 2 additions & 4 deletions judge/admin/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ def judge(self, request, queryset):
self.message_user(request, gettext('You do not have the permission to rejudge submissions.'),
level=messages.ERROR)
return
queryset = queryset.order_by('id')
if not request.user.has_perm('judge.rejudge_submission_lot') and \
queryset.count() > settings.DMOJ_SUBMISSIONS_REJUDGE_LIMIT:
self.message_user(request, gettext('You do not have the permission to rejudge THAT many submissions.'),
Expand All @@ -169,9 +168,8 @@ def judge(self, request, queryset):
if not request.user.has_perm('judge.edit_all_problem'):
id = request.profile.id
queryset = queryset.filter(Q(problem__authors__id=id) | Q(problem__curators__id=id))
judged = len(queryset)
for model in queryset:
model.judge(rejudge=True, batch_rejudge=True, rejudge_user=request.user)

judged = sum(Submission.batch_judge(queryset, rejudge=True, rejudge_user=request.user))
self.message_user(request, ngettext('%d submission was successfully scheduled for rejudging.',
'%d submissions were successfully scheduled for rejudging.',
judged) % judged)
Expand Down
18 changes: 18 additions & 0 deletions judge/bridge/django_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def __init__(self, request, client_address, server, judges):

self.handlers = {
'submission-request': self.on_submission,
'batch-submission-request': self.on_batch_submission,
'terminate-submission': self.on_termination,
'disconnect-judge': self.on_disconnect_request,
}
Expand Down Expand Up @@ -44,6 +45,23 @@ def on_submission(self, data):
self.judges.judge(id, problem, language, source, judge_id, priority)
return {'name': 'submission-received', 'submission-id': id}

def on_batch_submission(self, data):
ids = []
judge_id = data['judge-id']
priority = data['priority']
if not self.judges.check_priority(priority):
return {'name': 'bad-request'}

for submission in data['submissions']:
id = submission['submission-id']
problem = submission['problem-id']
language = submission['language']
source = submission['source']
self.judges.judge(id, problem, language, source, judge_id, priority)
ids.append(id)

return {'name': 'submissions-received', 'submission-ids': ids}

def on_termination(self, data):
return {'name': 'submission-received', 'judge-aborted': self.judges.abort(data['submission-id'])}

Expand Down
69 changes: 66 additions & 3 deletions judge/judgeapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import socket
import struct
import zlib
from operator import attrgetter

from django.conf import settings
from django.db import transaction
from django.db.models import BooleanField, F, OuterRef, Subquery
from django.db.models.functions import Coalesce
from django.utils import timezone

from judge import event_poster as event
Expand Down Expand Up @@ -50,11 +54,11 @@ def judge_request(packet, reply=True):
return result


def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=None):
def judge_submission(submission, rejudge=False, judge_id=None):
from .models import ContestSubmission, Submission, SubmissionTestCase

updates = {'time': None, 'memory': None, 'points': None, 'result': None, 'case_points': 0, 'case_total': 0,
'error': None, 'rejudged_date': timezone.now() if rejudge or batch_rejudge else None, 'status': 'QU'}
'error': None, 'rejudged_date': timezone.now() if rejudge else None, 'status': 'QU'}
try:
# This is set proactively; it might get unset in judgecallback's on_grading_begin if the problem doesn't
# actually have pretests stored on the judge.
Expand Down Expand Up @@ -86,7 +90,7 @@ def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=No
'language': submission.language.key,
'source': submission.source.source,
'judge-id': judge_id,
'priority': BATCH_REJUDGE_PRIORITY if batch_rejudge else (REJUDGE_PRIORITY if rejudge else priority),
'priority': REJUDGE_PRIORITY if rejudge else priority,
})
except BaseException:
logger.exception('Failed to send request to judge')
Expand All @@ -100,6 +104,65 @@ def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=No
return success


def batch_judge_submission(submissions, rejudge=False, judge_id=None):
from .models import Submission, SubmissionTestCase
updates = {
'time': None, 'memory': None, 'points': None, 'result': None, 'case_points': 0, 'case_total': 0,
'error': None, 'rejudged_date': timezone.now() if rejudge else None, 'status': 'QU',
'is_pretested': Coalesce(
Subquery(
Submission.objects.filter(pk=OuterRef('pk'))
.annotate(pretested=F('contest_object__run_pretests_only').bitand(F('contest__problem__is_pretested')))
.values_list('pretested')[:1],
output_field=BooleanField(),
),
False,
),
}

with transaction.atomic():
submission_queryset = Submission.objects.filter(id__in=map(attrgetter('id'), submissions)) \
.exclude(status__in=('P', 'G'))
submission_queryset.update(**updates)

ids = set(submission_queryset.values_list('id', flat=True))
# Do the filtering using the new set of IDs rather than submission_queryset
# because submission_queryset itself takes a list of IDs anyways.
SubmissionTestCase.objects.filter(submission_id__in=ids).delete()

submissions = [submission for submission in submissions if submission.id in ids]

try:
response = judge_request({
'name': 'batch-submission-request',
'submissions': [{
'submission-id': submission.id,
'problem-id': submission.problem.code,
'language': submission.language.key,
'source': submission.source.source,
} for submission in submissions],
'judge-id': judge_id,
# This is technically incorrect when rejudge is False.
# Some submissions might be contest submissions and should have priority "CONTEST_SUBMISSION_PRIORITY".
# It's not worth further complicating this code by separating contest submissions and
# normal submissions as that case is not even being used.
# For the moment, let's just set all submissions to have "DEFAULT_PRIORITY" if we're not rejudging.
'priority': BATCH_REJUDGE_PRIORITY if rejudge else DEFAULT_PRIORITY,
})
except BaseException:
logger.exception('Failed to send request to judge')
processed_ids = set()
else:
processed_ids = set(response['submission-ids']) if 'submission-ids' in response else set()

if processed_ids != ids:
Submission.objects.filter(id__in=ids - processed_ids).update(status='IE', result='IE', error='')

for submission in submissions:
# If the submission is not in processed_ids, that means the submission IE'd and thus is "done" judging.
_post_update_submission(submission, done=submission.id not in processed_ids)


def disconnect_judge(judge, force=False):
judge_request({'name': 'disconnect-judge', 'judge-id': judge.name, 'force': force}, reply=False)

Expand Down
32 changes: 28 additions & 4 deletions judge/models/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@

from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.db import models, transaction
from django.urls import reverse
from django.utils import timezone
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from reversion import revisions

from judge.judgeapi import abort_submission, judge_submission
from judge.judgeapi import abort_submission, batch_judge_submission, judge_submission
from judge.models.problem import Problem, SubmissionSourceAccess, TranslatedProblemForeignKeyQuerySet
from judge.models.profile import Profile
from judge.models.runtime import Language
from judge.utils.iterator import chunk
from judge.utils.unicode import utf8bytes

__all__ = ['SUBMISSION_RESULT', 'Submission', 'SubmissionSource', 'SubmissionTestCase']
Expand Down Expand Up @@ -121,18 +122,41 @@ def long_status(self):
def is_locked(self):
return self.locked_after is not None and self.locked_after < timezone.now()

def judge(self, *args, rejudge=False, force_judge=False, rejudge_user=None, **kwargs):
def judge(self, *, rejudge=False, force_judge=False, rejudge_user=None, **kwargs):
if force_judge or not self.is_locked:
if rejudge:
with revisions.create_revision(manage_manually=True):
if rejudge_user:
revisions.set_user(rejudge_user)
revisions.set_comment('Rejudged')
revisions.add_to_revision(self)
judge_submission(self, *args, rejudge=rejudge, **kwargs)
judge_submission(self, rejudge=rejudge, **kwargs)

judge.alters_data = True

@classmethod
def batch_judge(cls, submissions, *, rejudge=False, force_judge=False, rejudge_user=None, chunk_size=100, **kwargs):
# Don't trust the caller to follow related objects that we need.
submissions = submissions.select_related('problem', 'language', 'source')

if not force_judge:
submissions = submissions.exclude(locked_after__lt=timezone.now())

for current_chunk in chunk(submissions.iterator(chunk_size=chunk_size), chunk_size):
if rejudge:
with transaction.atomic():
for submission in current_chunk:
with revisions.create_revision(manage_manually=True):
if rejudge_user:
revisions.set_user(rejudge_user)
revisions.set_comment('Rejudged')
revisions.add_to_revision(submission)

batch_judge_submission(current_chunk, rejudge=rejudge, **kwargs)
yield len(current_chunk)

batch_judge.alters_data = True

def abort(self):
abort_submission(self)

Expand Down
8 changes: 3 additions & 5 deletions judge/tasks/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ def rejudge_problem_filter(self, problem_id, id_range=None, languages=None, resu

rejudged = 0
with Progress(self, queryset.count()) as p:
for submission in queryset.iterator():
submission.judge(rejudge=True, batch_rejudge=True, rejudge_user=user)
rejudged += 1
if rejudged % 10 == 0:
p.done = rejudged
for completed in Submission.batch_judge(queryset, rejudge=True, rejudge_user=user):
p.did(completed)
rejudged += completed
return rejudged


Expand Down

0 comments on commit 825d4db

Please sign in to comment.