Skip to content
Permalink
Browse files

Big cancellation and validation code refactoring

  • Loading branch information
awst-baum committed May 3, 2019
1 parent fa58200 commit 78a9c3c20d3c787dd88332b22dc6bdd2b0ec588f
Showing with 75 additions and 82 deletions.
  1. +1 −0 .gitignore
  2. +1 −1 start_celery_worker.sh
  3. +1 −1 validator/tests/test_views.py
  4. +72 −80 validator/validation/validation.py
@@ -14,3 +14,4 @@ output
junit.xml
emails
/static/
run_celery/
@@ -1,2 +1,2 @@
#!/bin/bash
celery -A valentina worker --max-tasks-per-child=1 -l INFO --time-limit=16000 --prefetch-multiplier 1
celery -A valentina worker --max-tasks-per-child=1 --concurrency=4 -l INFO --time-limit=16000 --prefetch-multiplier=1 --statedb=run_celery/%n.state
@@ -457,7 +457,7 @@ def test_password_reset(self):
self.assertRedirects(response, reverse('password_reset_done'))

## make sure the right email got sent with correct details
sent_mail = mail.outbox[0]
sent_mail = mail.outbox[0] # @UndefinedVariable
assert sent_mail
assert sent_mail.subject
assert sent_mail.body
@@ -9,17 +9,16 @@
from valentina.celery import app

from celery.app import shared_task
from celery.exceptions import TaskRevokedError, TimeoutError
from dateutil.tz import tzlocal
from netCDF4 import Dataset
from pytesmo.validation_framework.results_manager import netcdf_results_manager
from pytesmo.validation_framework.validation import Validation

import numpy as np
from validator.mailer import send_val_done_notification
from validator.metrics import EssentialMetrics
from validator.models import ValidationRun


from validator.validation.util import mkdir_if_not_exists, first_file_in
from validator.validation.globals import OUTPUT_FOLDER
from validator.validation.readers import create_reader
@@ -102,7 +101,6 @@ def create_pytesmo_validation(validation_run):
else:
scaling_ref=validation_run.ref_dataset.short_name


val = Validation(
datasets,
spatial_ref=validation_run.ref_dataset.short_name,
@@ -125,9 +123,9 @@ def num_gpis_from_job(job):

@shared_task(bind=True,max_retries=3)
def execute_job(self,validation_id, job):
job_id = execute_job.request.id
task_id = execute_job.request.id
numgpis = num_gpis_from_job(job)
__logger.debug("Executing job {} from validation {}, # of gpis: {}".format(job_id, validation_id, numgpis))
__logger.debug("Executing job {} from validation {}, # of gpis: {}".format(task_id, validation_id, numgpis))
start_time = datetime.now(tzlocal())
try:
validation_run = ValidationRun.objects.get(pk=validation_id)
@@ -136,34 +134,44 @@ def execute_job(self,validation_id, job):
end_time = datetime.now(tzlocal())
duration = end_time - start_time
duration = (duration.days * 86400) + (duration.seconds)
__logger.debug("Finished job {} from validation {}, took {} seconds for {} gpis".format(job_id, validation_id, duration, numgpis))
return {'result':result,'job':job}
__logger.debug("Finished job {} from validation {}, took {} seconds for {} gpis".format(task_id, validation_id, duration, numgpis))
return result
except Exception as e:
self.retry(countdown=2, exc=e)

def check_and_store_results(validation_run, job, results, save_path):
# __logger.debug(job)

def check_and_store_results(job_id, results, save_path):
if len(results) < 1:
__logger.warn('Potentially problematic job: {} - no results'.format(job))
__logger.warn('Potentially problematic job: {} - no results'.format(job_id))
return

if np.isnan(next(iter(results.values()))['R'][0]):
__logger.warn('Potentially problematic job: {} - R is nan'.format(job))

netcdf_results_manager(results, save_path)

def track_celery_task(validation_run, task_id):
celery_task=CeleryTask()
celery_task.validation=validation_run
celery_task.celery_task=uuid.UUID(task_id).hex
celery_task.save()

def celery_task_cancelled(task_id):
## stop_running_validation deletes the validation's tasks from the db. so if they don't exist in the db the task was cancelled
return not CeleryTask.objects.filter(celery_task = task_id).exists()

def untrack_celery_task(task_id):
try:
celery_task=CeleryTask.objects.get(celery_task=task_id)
celery_task.delete()
except CeleryTask.DoesNotExist:
__logger.debug('Task {} already deleted from db.'.format(task_id))

def run_validation(validation_id):
__logger.info("Starting validation: {}".format(validation_id))
validation_run = ValidationRun.objects.get(pk=validation_id)
validation_aborted_flag=False;
validation_aborted = False;


if (hasattr(settings, 'CELERY_TASK_ALWAYS_EAGER')==False) or (settings.CELERY_TASK_ALWAYS_EAGER==False):
if ((not hasattr(settings, 'CELERY_TASK_ALWAYS_EAGER')) or (not settings.CELERY_TASK_ALWAYS_EAGER)):
app.control.add_consumer(validation_run.user.username, reply=True) # @UndefinedVariable

try:
__logger.info("Starting validation: {}".format(validation_id))

run_dir = path.join(OUTPUT_FOLDER, str(validation_run.id))
mkdir_if_not_exists(run_dir)

@@ -178,65 +186,54 @@ def run_validation(validation_id):
async_results = []
job_table = {}
for j in jobs:
job_id = execute_job.apply_async(args=[validation_id, j], queue=validation_run.user.username)
async_results.append(job_id)
job_table[job_id] = j
celery_task=CeleryTask()
celery_task.validation=validation_run
celery_task.celery_task=uuid.UUID(job_id.id).hex
celery_task.save()

executed_jobs = 0
celery_job = execute_job.apply_async(args=[validation_id, j], queue=validation_run.user.username)
async_results.append(celery_job)
job_table[celery_job.id] = j
track_celery_task(validation_run, celery_job.id)

for async_result in async_results:
try:
while True:
task_running = True
while task_running: ## regularly check if the validation has been cancelled in this loop, otherwise we wouldn't notice
try:
result_dict=async_result.get(timeout=10) # calling result.AsyncResult.get
async_result.forget()
break
except Exception as e:
if e.__class__.__name__ != 'TimeoutError':
raise e

try:
celery_task=CeleryTask.objects.get(celery_task=async_result.id)
__logger.debug('Celery task timeout. Continue...')
except Exception:
__logger.debug('Validation got cancelled')
validation_aborted_flag=True
validation_run.progress=-1
validation_run.save()
return validation_run
finally:
try:
celery_task=CeleryTask.objects.get(celery_task=async_result.id)
celery_task.delete()
except Exception:
__logger.debug('Celery task does not exists. Validation run: {} Celery task ID: {}'.format(validation_id,
async_result.id))

results = result_dict['result']
job = result_dict['job']
check_and_store_results(validation_run, job, results, save_path)
validation_run.ok_points += num_gpis_from_job(job_table[async_result])
except Exception:
validation_run.error_points += num_gpis_from_job(job_table[async_result])
__logger.exception('Celery could not execute the job. Job ID: {} Error: {}'.format(async_result.id,async_result.info))
finally:
if validation_aborted_flag:
return
executed_jobs +=1
validation_run.progress=round(((validation_run.ok_points + validation_run.error_points)/validation_run.total_points)*100)
validation_run.save()
__logger.info("Validation {} is {} % done...".format(validation_run.id, validation_run.progress))
results = async_result.get(timeout=10) ## this throws TimeoutError after waiting 10 secs or TaskRevokedError if revoked before starting
async_result.forget() ## if we got here, the task is finished now
task_running = False ## stop looping because task finished
if celery_task_cancelled(async_result.id): ## we can still have a cancelled validation that took less than 10 secs
validation_aborted = True
else:
untrack_celery_task(async_result.id)

except (TimeoutError, TaskRevokedError) as te:
## see if our task got cancelled - if not, just continue loop
if celery_task_cancelled(async_result.id):
task_running = False ## stop looping because we aborted
validation_aborted = True
__logger.debug('Validation got cancelled, dropping task {}: {}'.format(async_result.id, te))

if validation_aborted:
validation_run.error_points += num_gpis_from_job(job_table[async_result.id])
else:
check_and_store_results(async_result.id, results, save_path)
validation_run.ok_points += num_gpis_from_job(job_table[async_result.id])



set_outfile(validation_run, run_dir)
validation_run.save() # let's save before we do anything else...

# only store parameters and produce graphs if we have metrics for at least one gpi - otherwise no netcdf output file
if validation_run.ok_points > 0:
except Exception:
validation_run.error_points += num_gpis_from_job(job_table[async_result.id])
__logger.exception('Celery could not execute the job. Job ID: {} Error: {}'.format(async_result.id, async_result.info))

if not validation_aborted:
validation_run.progress = round(((validation_run.ok_points + validation_run.error_points)/validation_run.total_points)*100)
else:
validation_run.progress = -1
validation_run.save()
__logger.info("Dealt with task {}, validation {} is {} % done...".format(async_result.id, validation_run.id, validation_run.progress))

# once all tasks are finished:
# only store parameters and produce graphs if validation wasn't cancelled and
# we have metrics for at least one gpi - otherwise no netcdf output file
if ((not validation_aborted) and (validation_run.ok_points > 0)):
set_outfile(validation_run, run_dir)
validation_run.save()
save_validation_config(validation_run)
generate_all_graphs(validation_run, run_dir)

@@ -248,14 +245,9 @@ def run_validation(validation_id):
validation_run.save()
__logger.info("Validation finished: {}. Jobs: {}, Errors: {}, OK: {}, End time: {} ".format(
validation_run, validation_run.total_points, validation_run.error_points, validation_run.ok_points,validation_run.end_time))
if validation_aborted_flag==False:
if (validation_run.error_points + validation_run.ok_points) != validation_run.total_points:
__logger.warn("Caution, # of gpis, # of errors, and # of OK points don't match!")
validation_run.save()

send_val_done_notification(validation_run)
send_val_done_notification(validation_run)

# print('Valrun: {}'.format(validation_run))
return validation_run

def stop_running_validation(validation_id):

0 comments on commit 78a9c3c

Please sign in to comment.
You can’t perform that action at this time.