Skip to content

Commit

Permalink
Implement batch scheduling bridge call; fixes DMOJ#1318
Browse files Browse the repository at this point in the history
  • Loading branch information
Ninjaclasher committed Dec 26, 2022
1 parent 7f62071 commit 2a15ef1
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 21 deletions.
10 changes: 5 additions & 5 deletions judge/admin/contest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from reversion.admin import VersionAdmin

from django_ace import AceWidget
from judge.models import Class, Contest, ContestProblem, ContestSubmission, Profile, Rating, Submission
from judge.models import Class, Contest, ContestProblem, Profile, Rating, Submission
from judge.ratings import rate_contest
from judge.utils.views import NoBatchDeleteMixin
from judge.widgets import AdminHeavySelect2MultipleWidget, AdminHeavySelect2Widget, AdminMartorWidget, \
Expand Down Expand Up @@ -274,13 +274,13 @@ def get_urls(self):
] + super(ContestAdmin, self).get_urls()

def rejudge_view(self, request, contest_id, problem_id):
queryset = ContestSubmission.objects.filter(problem_id=problem_id).select_related('submission')
for model in queryset:
model.submission.judge(rejudge=True, rejudge_user=request.user)
judged = sum(Submission.objects.filter(contest__problem_id=problem_id).batch_judge(
rejudge=True, rejudge_user=request.user,
))

self.message_user(request, ngettext('%d submission was successfully scheduled for rejudging.',
'%d submissions were successfully scheduled for rejudging.',
len(queryset)) % len(queryset))
judged) % judged)
return HttpResponseRedirect(reverse('admin:judge_contest_change', args=(contest_id,)))

def rate_all_view(self, request):
Expand Down
6 changes: 2 additions & 4 deletions judge/admin/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ def judge(self, request, queryset):
self.message_user(request, gettext('You do not have the permission to rejudge submissions.'),
level=messages.ERROR)
return
queryset = queryset.order_by('id')
if not request.user.has_perm('judge.rejudge_submission_lot') and \
queryset.count() > settings.DMOJ_SUBMISSIONS_REJUDGE_LIMIT:
self.message_user(request, gettext('You do not have the permission to rejudge THAT many submissions.'),
Expand All @@ -170,9 +169,8 @@ def judge(self, request, queryset):
if not request.user.has_perm('judge.edit_all_problem'):
id = request.profile.id
queryset = queryset.filter(Q(problem__authors__id=id) | Q(problem__curators__id=id))
judged = len(queryset)
for model in queryset:
model.judge(rejudge=True, batch_rejudge=True, rejudge_user=request.user)

judged = sum(queryset.batch_judge(rejudge=True, rejudge_user=request.user))
self.message_user(request, ngettext('%d submission was successfully scheduled for rejudging.',
'%d submissions were successfully scheduled for rejudging.',
judged) % judged)
Expand Down
19 changes: 19 additions & 0 deletions judge/bridge/django_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def __init__(self, request, client_address, server, judges):

self.handlers = {
'submission-request': self.on_submission,
'batch-submission-request': self.on_batch_submission,
'terminate-submission': self.on_termination,
'disconnect-judge': self.on_disconnect_request,
}
Expand Down Expand Up @@ -44,6 +45,24 @@ def on_submission(self, data):
self.judges.judge(id, problem, language, source, judge_id, priority)
return {'name': 'submission-received', 'submission-id': id}

def on_batch_submission(self, data):
ids = []
judge_id = data['judge-id']
for submission in data['submissions']:
priority = submission['priority']
if not self.judges.check_priority(priority):
return {'name': 'bad-request'}

for submission in data['submissions']:
id = submission['submission-id']
problem = submission['problem-id']
language = submission['language']
source = submission['source']
self.judges.judge(id, problem, language, source, judge_id, priority)
ids.append(id)

return {'name': 'submissions-received', 'submission-ids': ids}

def on_termination(self, data):
return {'name': 'submission-received', 'judge-aborted': self.judges.abort(data['submission-id'])}

Expand Down
66 changes: 63 additions & 3 deletions judge/judgeapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
import socket
import struct
import zlib
from operator import attrgetter

from django.conf import settings
from django.db import transaction
from django.db.models import BooleanField, F, OuterRef, Subquery
from django.db.models.functions import Coalesce
from django.utils import timezone

from judge import event_poster as event
Expand Down Expand Up @@ -50,11 +54,11 @@ def judge_request(packet, reply=True):
return result


def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=None):
def judge_submission(submission, rejudge=False, judge_id=None):
from .models import ContestSubmission, Submission, SubmissionTestCase

updates = {'time': None, 'memory': None, 'points': None, 'result': None, 'case_points': 0, 'case_total': 0,
'error': None, 'rejudged_date': timezone.now() if rejudge or batch_rejudge else None, 'status': 'QU'}
'error': None, 'rejudged_date': timezone.now() if rejudge else None, 'status': 'QU'}
try:
# This is set proactively; it might get unset in judgecallback's on_grading_begin if the problem doesn't
# actually have pretests stored on the judge.
Expand Down Expand Up @@ -86,7 +90,7 @@ def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=No
'language': submission.language.key,
'source': submission.source.source,
'judge-id': judge_id,
'priority': BATCH_REJUDGE_PRIORITY if batch_rejudge else (REJUDGE_PRIORITY if rejudge else priority),
'priority': REJUDGE_PRIORITY if rejudge else priority,
})
except BaseException:
logger.exception('Failed to send request to judge')
Expand All @@ -100,6 +104,62 @@ def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=No
return success


def batch_judge_submission(submissions, rejudge=False, judge_id=None):
from .models import Submission, SubmissionTestCase
updates = {
'time': None, 'memory': None, 'points': None, 'result': None, 'case_points': 0, 'case_total': 0,
'error': None, 'rejudged_date': timezone.now() if rejudge else None, 'status': 'QU',
'is_pretested': Coalesce(
Subquery(
Submission.objects.filter(pk=OuterRef('pk'))
.annotate(pretested=F('contest_object__run_pretests_only').bitand(F('contest__problem__is_pretested')))
.values_list('pretested')[:1],
output_field=BooleanField(),
),
False,
),
}

with transaction.atomic():
submission_queryset = Submission.objects.filter(id__in=map(attrgetter('id'), submissions)) \
.exclude(status__in=('P', 'G'))
submission_queryset.update(**updates)

ids = set(submission_queryset.values_list('id', flat=True))
# Do the filtering using the new set of IDs rather than submission_queryset
# because submission_queryset itself takes a list of IDs anyways.
SubmissionTestCase.objects.filter(submission_id__in=ids).delete()

submissions = [submission for submission in submissions if submission.id in ids]

try:
response = judge_request({
'name': 'batch-submission-request',
'submissions': [{
'submission-id': submission.id,
'problem-id': submission.problem.code,
'language': submission.language.key,
'source': submission.source.source,
'priority': BATCH_REJUDGE_PRIORITY if rejudge else (
CONTEST_SUBMISSION_PRIORITY if submission.contest_object is not None else DEFAULT_PRIORITY
)
} for submission in submissions],
'judge-id': judge_id,
})
except BaseException:
logger.exception('Failed to send request to judge')
processed_ids = set()
else:
processed_ids = set(response['submission-ids']) if 'submission-ids' in response else set()

if processed_ids != ids:
Submission.objects.filter(id__in=ids - processed_ids).update(status='IE', result='IE', error='')

for submission in submissions:
# If the submission is not in processed_ids, that means the submission IE'd and thus is "done" judging.
_post_update_submission(submission, done=submission.id not in processed_ids)


def disconnect_judge(judge, force=False):
judge_request({'name': 'disconnect-judge', 'judge-id': judge.name, 'force': force}, reply=False)

Expand Down
34 changes: 30 additions & 4 deletions judge/models/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@

from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db import models
from django.db import models, transaction
from django.db.models.query import QuerySet
from django.urls import reverse
from django.utils import timezone
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
from reversion import revisions

from judge.judgeapi import abort_submission, judge_submission
from judge.judgeapi import abort_submission, batch_judge_submission, judge_submission
from judge.models.problem import Problem, SubmissionSourceAccess
from judge.models.profile import Profile
from judge.models.runtime import Language
from judge.utils.iterator import chunk
from judge.utils.unicode import utf8bytes

__all__ = ['SUBMISSION_RESULT', 'Submission', 'SubmissionSource', 'SubmissionTestCase']
Expand All @@ -33,6 +35,28 @@
)


class BatchJudgeSubmissionQuerySet(QuerySet):
def batch_judge(self, *, rejudge=False, force_judge=False, rejudge_user=None, chunk_size=100, **kwargs):
# Don't trust the caller to follow related objects that we need.
submissions = self.select_related('problem', 'language', 'source', 'contest_object')

if not force_judge:
submissions = submissions.exclude(locked_after__lt=timezone.now())

for current_chunk in chunk(submissions.iterator(chunk_size=chunk_size), chunk_size):
if rejudge:
with transaction.atomic():
for submission in current_chunk:
with revisions.create_revision(manage_manually=True):
if rejudge_user:
revisions.set_user(rejudge_user)
revisions.set_comment('Rejudged')
revisions.add_to_revision(submission)

batch_judge_submission(current_chunk, rejudge=rejudge, **kwargs)
yield len(current_chunk)


@revisions.register(follow=['test_cases'])
class Submission(models.Model):
STATUS = (
Expand Down Expand Up @@ -88,6 +112,8 @@ class Submission(models.Model):
on_delete=models.SET_NULL, related_name='+')
locked_after = models.DateTimeField(verbose_name=_('submission lock'), null=True, blank=True)

objects = BatchJudgeSubmissionQuerySet()

@classmethod
def result_class_from_code(cls, result, case_points, case_total):
if result == 'AC':
Expand Down Expand Up @@ -119,15 +145,15 @@ def long_status(self):
def is_locked(self):
return self.locked_after is not None and self.locked_after < timezone.now()

def judge(self, *args, rejudge=False, force_judge=False, rejudge_user=None, **kwargs):
def judge(self, *, rejudge=False, force_judge=False, rejudge_user=None, **kwargs):
if force_judge or not self.is_locked:
if rejudge:
with revisions.create_revision(manage_manually=True):
if rejudge_user:
revisions.set_user(rejudge_user)
revisions.set_comment('Rejudged')
revisions.add_to_revision(self)
judge_submission(self, *args, rejudge=rejudge, **kwargs)
judge_submission(self, rejudge=rejudge, **kwargs)

judge.alters_data = True

Expand Down
8 changes: 3 additions & 5 deletions judge/tasks/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ def rejudge_problem_filter(self, problem_id, id_range=None, languages=None, resu

rejudged = 0
with Progress(self, queryset.count()) as p:
for submission in queryset.iterator():
submission.judge(rejudge=True, batch_rejudge=True, rejudge_user=user)
rejudged += 1
if rejudged % 10 == 0:
p.done = rejudged
for completed in queryset.batch_judge(rejudge=True, rejudge_user=user):
p.did(completed)
rejudged += completed
return rejudged


Expand Down

0 comments on commit 2a15ef1

Please sign in to comment.