Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Setup Celery worker with Redis broker #70

Merged
merged 1 commit into from
Aug 14, 2018
Merged

Conversation

jraddaoui
Copy link
Collaborator

@jraddaoui jraddaoui commented Jul 26, 2018

  • Add Celery and Redis requirements.
  • Add Celery config in settings.
  • Add celery app and load it on init.
  • Use Redis as Broker and django_celery_results as backend.
  • Move METS extraction and parsing to a shared task.
  • Improvements in METS parsing process.
  • Redirect to collection page on DIP creation.
  • Reflect when a DIP is being imported:
    • Add import_status and ìmport_task_id columns to DIPs model and
      ES doc. Reflect status on related DigitalFiles.
    • Set it to PENDING when the METS handling process is launched and
      to FAILURE or SUCCESS in the task after_return call.
    • For editors and admins:
      • Show spinning icon instead of links for DIPs and DigitalFiles when
        the import is PENDING.
      • Show error message in view pages.
    • For other users:
      • Do not show DIPs or DigitalFiles with an import status of
        FAILURE or PENDING.
    • Add flash messages system to show user notifications on import
      launch and on import failure.
  • Create dips.models.TaskResult proxy model of django_celery_results
    to format error message.
  • Move some functions to helpers and add tests.
  • Add tests for task and after_return calls
  • Set CELERY_BROKER_URL env. var. in Tox test environment.
  • Improve docker-compose environment:
    • Remove ES network and use default network.
    • Set CELERY_BROKER_URL env. var.
    • Add redis and worker services.
    • Add redis_data volume.

TODO:

  • Improve worker/task logging.
  • Show alert message for parsing in progress.
  • Only show failed and in progress DIPs in collection table to editors and admins.
  • Show error message in DIP page on failure (only editors and admins).
  • Check django-celery-results.
  • Look for alternatives to "in progress" system.
  • Add production installation notes.
  • Check persistence in Redis broker.
  • Check database transaction in tasks.
  • Is SQLite enough for this? WAL?
  • Tests.

Connects to #15.

@jraddaoui jraddaoui added this to the beta milestone Jul 26, 2018
@jraddaoui jraddaoui self-assigned this Jul 26, 2018
@codecov
Copy link

codecov bot commented Jul 26, 2018

Codecov Report

Merging #70 into master will increase coverage by 2.04%.
The diff coverage is 55.33%.

@@            Coverage Diff             @@
##           master      #70      +/-   ##
==========================================
+ Coverage   80.17%   82.22%   +2.04%     
==========================================
  Files          27       32       +5     
  Lines         812      945     +133     
==========================================
+ Hits          651      777     +126     
- Misses        161      168       +7
Impacted Files Coverage Δ
accesspoc/dips/helpers.py 100% <100%> (ø) ⬆️
accesspoc/search/documents.py 100% <100%> (ø) ⬆️
accesspoc/dips/tasks.py 100% <100%> (ø)
...cesspoc/dips/migrations/0011_auto_20180729_1659.py 100% <100%> (ø)
accesspoc/dips/migrations/0012_taskresult.py 100% <100%> (ø)
accesspoc/accesspoc/settings.py 100% <100%> (ø) ⬆️
accesspoc/dips/models.py 99.3% <100%> (+0.15%) ⬆️
accesspoc/accesspoc/__init__.py 100% <100%> (ø)
accesspoc/dips/parsemets.py 13.66% <11.66%> (+4.26%) ⬆️
accesspoc/dips/views.py 91.59% <70%> (+3.31%) ⬆️
... and 6 more

Copy link
Collaborator

@sevein sevein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tried this yet but I have some initial comments.

[UPDATE]: I've just read your TODO checlist :)

mets = METS(os.path.abspath(metsfile), dip_id)
mets.parse_mets()
# Delete temporary directoy
shutil.rmtree(tmpdir)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this function would look much nicer if you used a context manager, e.g.:

with tempfile.TemporaryDirectory():
    zip = zipfile.ZipFile(zip_path)
    for info in zip.infolist():
        if re.match(r'.*METS.[0-9a-f\-]{36}.*$', info.filename):
            metsfile = zip.extract(info, tmpdir)
    mets = METS(os.path.abspath(metsfile), dip_id)
    mets.parse_mets()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question is what happens if an exception is raised inside this function and it's not controlled?
Have you tested that case? Is it retried? Is it the error recorded somewhere?

metsfile = zip.extract(info, tmpdir)
# Parse METS file
mets = METS(os.path.abspath(metsfile), dip_id)
mets.parse_mets()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bunch of database action going on here. I'm wondering if Celery wraps all that within a database transaction or you have to do it manually. Have you thought about that at all?

Also, introducing workers may be a reason to move to MySQL. Maybe. We should investigate further. See https://stackoverflow.com/a/10387821. Is WAL enabled for you?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it has to be done manually, working on that ;)

This is interesting: https://www.sqlite.org/tempfiles.html

And Django only complains about the save points when "autocommit" is turned off: https://docs.djangoproject.com/en/1.11/topics/db/transactions/#savepoints-in-sqlite

- CELERY_BROKER_URL=redis://redis:6379
volumes:
- ./accesspoc:/src
command: 'celery -A accesspoc worker -l info'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you using the result backend too?

redis:
image: redis:4-alpine
user: redis
command: '--save "" --appendonly no'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to change this so it actually persists to disk. It'd be a shame that you lose all the data because you decided to restart the service.

@@ -0,0 +1,5 @@
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a proper docstring.



@shared_task
def extract_and_parse_mets(dip_id, zip_path):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -154,6 +154,7 @@ class DIP(AbstractEsModel):
verbose_name=_('collection'),
)
dc = models.OneToOneField(DublinCore, null=True, on_delete=models.SET_NULL)
import_in_progress = models.BooleanField(default=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't a new dip always in progress? i.e. defaut=True?

Also, what would happen if the task never completes? Is there a way to report errors to the user? Do we need the result backend?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set default to true initially and realized that it would require a data migration to set the existing DIPs to false, which made me realize that setting it to true by default is a wrong assumption as the import may not always happen on creation (in the future). I may also remove this field if I find a better solution for this with the results backend.

@jraddaoui
Copy link
Collaborator Author

Thanks @sevein, I really appreciate the initial feedback!

@jraddaoui
Copy link
Collaborator Author

I have updated the TODO list with some of your feedback ;)

@jraddaoui
Copy link
Collaborator Author

Sorry @sevein, I shouldn't have amended the latest commit, since you already reviewed it. I've added your suggestion and a missing migration for the proxy model.

Copy link
Collaborator

@sevein sevein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't had time to test it yet but I've added some comments.

# Get DIP object
dip = DIP.objects.get(pk=self.dip_id)

# Gather info for each file in filegroup "original"
for target in mets_root.findall(".//fileGrp[@USE='original']/file"):
for file in self.mets_root.findall(".//fileGrp[@USE='original']/file"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file is a builtin function, you could call it file_.


return data

def parse_dc(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a private method? If so, the name should start with an underscore to make that clear.

file during its parsing. This function is meant to be called with
`.delay()` to be executed asynchronously by the Celery worker.
"""
with tempfile.TemporaryDirectory() as dir, zipfile.ZipFile(zip_path) as zip:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dir and zip are both builtin functions.

with tempfile.TemporaryDirectory() as dir, zipfile.ZipFile(zip_path) as zip:
# Extract METS file
for info in zip.infolist():
if re.match(r'.*METS.[0-9a-f\-]{36}.*$', info.filename):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Micro-optimization: use regex = re.compile(r'.*METS.[0-9a-f\-]{36}.*$) before the loop starts.

# Parse all DC elements to a dictionary
dc_model = dict()
for elem in dc_xml:
dc_model['%s' % (elem.tag)] = elem.text
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simpler:

dc_model[str(elem.tag)] = elem.text

@@ -19,203 +20,187 @@ def convert_size(size):

class METS(object):
"""
Class for METS file parsing methods
Class for METS file parsing methods.
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment on convert_size so I'll do it here. If it's private, add the underscore to its name.

# Convert from unix to iso8601
data['datemodified'] = datetime.fromtimestamp(unixtime).isoformat()
else:
data['datemodified'] = ''
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this one be equivalent and a little safer?

try:
    unixtime = int(data['datemodified']) / 1000
except ValueError:
    data['datemodified'] = ''
else:
    data['datemodified'] = datetime.fromtimestamp(unixtime).isoformat()

'the METS file. After the process finishes and the interface',
'is reloaded, a link to the Folder will show up in the',
'Folders table at the related Collection page.',
])))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that going to work? I would have thought that the join would never happen because the annotated string is extracted beforehand? An alternative could be:

        messages.info(
            request,
            'A background process has been launched to extract and parse the'
            ' METS file. After the process finishes and the interface is'
            ' reloaded, a link to the Folder will show up in the Folders table'
            ' at the related Collection page.')

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, forgot to change it here from the previous feedback.

@@ -0,0 +1,7 @@
"""
This will make sure the Celry app is always imported
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: s/Celry/Celery.

@CCA-Public CCA-Public deleted a comment from sevein Jul 31, 2018
@jraddaoui jraddaoui force-pushed the dev/issue-15-celery branch 2 times, most recently from 083c8f0 to 8a9a073 Compare August 7, 2018 20:13
@jraddaoui
Copy link
Collaborator Author

Hi @sevein, this is ready for another round. I'd appreciate if we can get the code right before I start writing the install notes. Thanks!

@jraddaoui
Copy link
Collaborator Author

Actually, I'd like to take care of #43 too before writing the install notes, and I'd love to have this merged before that, to avoid a lot of conflicts in the settings files. So, maybe we could add the install notes in a different PR after both things are done ;)

@jraddaoui
Copy link
Collaborator Author

Updated the PR description to reflect the latest changes.

Copy link
Collaborator

@sevein sevein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried it and it's really neat, congratulations @jraddaoui.

I've made some more comments but they're mostly nitpicks.

Something that I'd like to ask is to reduce the number of events that we're sending to the logger in tasks.py to just human-actionable events. Most of the places where you're using INFO I'd rather use DEBUG. If something is bad and it requires attention from the operator then use ERROR or WARNING as needed.

As Peter Bourgon wrote in his blog:

[...] services should only log actionable information. That includes serious, panic-level errors that need to be consumed by humans, or structured data that needs to be consumed by machines. An example of the former would be a message signaling that a required database has become completely unavailable. An example of the latter would be a message indicating a media object has been played, recorded so an end-of-day batch process might calculate owed royalties. Logs read by humans should be sparse, ideally silent if nothing is going wrong. Logs read by machines should be well-defined, ideally with a versioned schema.

Thank you, this is really great work.

instance._meta.get_field(field)
setattr(instance, field, value)
except FieldDoesNotExist:
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it'd be clearer if you write:

        try:
            instance._meta.get_field(field)
        except FieldDoesNotExist:
            continue
        setattr(instance, field, value)

@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it doesn't really matter as long as you're planning to squash the migrations soon, but I feel obliged to say again that we should avoid extra migrations as much as possible, e.g. 0011 and 0012 could become just one migration. Also it'd be good if they have meaningful names instead of the name automatically generated by Django.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #78.

info = result.get_error_message()
except TaskResult.DoesNotExist:
info = gettext('A related TaskResult could not be found.')
return ' '.join([intro, info, please])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember that we shouldn't use string concatenation in i18n because each language has its own syntax.

It'd be better to have a single string with parameters, e.g.:

An error occurred during the process executed to extract and parse the METS file - see error below. Please, contact an administrator.

%(error_message)s

import os
import re
import tempfile
import zipfile
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice at some point if you introduce in CI the use of https://github.com/PyCQA/flake8-import-order. It supports several styles: https://github.com/PyCQA/flake8-import-order#styles. Yours is different to all of them I believe. I think that google is my fav - or one of its derivatives.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to #67.

"""
if status not in states.READY_STATES:
return
dip = DIP.objects.get(pk=args[0])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What'd happen if it raises a DIP.DoesNotExist?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, will check.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the worker log:

worker_1         | [2018-08-14 17:33:13,237: INFO/ForkPoolWorker-8] Task dips.tasks.extract_and_parse_mets[c79c61e0-4441-4cd5-9ebc-fc5732e514dd] succeeded in 0.781619158002286s: None
worker_1         | [2018-08-14 17:33:13,238: WARNING/ForkPoolWorker-8] /usr/local/lib/python3.6/site-packages/celery/app/trace.py:561: RuntimeWarning: Exception raised outside body: Exception('Manually raised.',):
worker_1         | Traceback (most recent call last):
worker_1         |   File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 465, in trace_task
worker_1         |     state, retval, uuid, args, kwargs, None,
worker_1         |   File "/src/dips/tasks.py", line 27, in after_return
worker_1         |     raise Exception('Manually raised.')
worker_1         | Exception: Manually raised.
worker_1         | 
worker_1         |   exc, exc_info.traceback)))
worker_1         | [2018-08-14 17:33:13,244: ERROR/ForkPoolWorker-8] Task dips.tasks.extract_and_parse_mets[c79c61e0-4441-4cd5-9ebc-fc5732e514dd] raised unexpected: Exception('Manually raised.',)
worker_1         | Traceback (most recent call last):
worker_1         |   File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 465, in trace_task
worker_1         |     state, retval, uuid, args, kwargs, None,
worker_1         |   File "/src/dips/tasks.py", line 27, in after_return
worker_1         |     raise Exception('Manually raised.')
worker_1         | Exception: Manually raised.

The worker is still running and, amazingly, the error is recorded in the TaskResult. Great job Celery!

if (digitalfile.dip.import_status == DIP.IMPORT_PENDING or (
not request.user.is_editor() and
digitalfile.dip.import_status == DIP.IMPORT_FAILURE
)):
Copy link
Collaborator

@sevein sevein Aug 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: as you've already done this twice, why not to write a small method in the model that takes the request.user and returns what you're looking for, e.g. digitalfile.dip.is_visitable(request.user).

def is_visitable(self, user):
    if self.import_status == DIP.IMPORT_PENDING:
        return False
    if self.import_status == DIP.IMPORT_FAILURE \
            and user.is_editor():
        return False
   return True

... ^ quick draft to make sure I'm clear on what I'm asking.

# TODO: Avoid this save from updating the related ES
# documents, as a later save when the async_result id
# is added, will repeat the process. It may require
# stop using signals and move to a custom save method.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to file a bug once it's merged! Unless you've already done it :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #79. I'll add more info after this is merged.

@jraddaoui jraddaoui mentioned this pull request Aug 14, 2018
@jraddaoui
Copy link
Collaborator Author

Thanks @sevein!! Great feedback in all the PR, I really appreciate that kind of involvement.

About the log level, the reason I used INFO is because elasticsearch-py logs each request with that level and I don't like them appearing by themselves. I think it's okay to leave them like that and recommend to run the Celery worker with a level of WARNING in the install notes. Please, let me know if you still don't like that idea ;)

@sevein
Copy link
Collaborator

sevein commented Aug 14, 2018

Makes sense!

- Add Celery and Redis requirements.
- Add Celery config in settings.
- Add celery app and load it on init.
- Use Redis as Broker and `django_celery_results` as backend.
- Move METS extraction and parsing to a shared task.
- Improvements in METS parsing process.
- Redirect to collection page on DIP creation.
- Reflect when a DIP is being imported:
  - Add `import_status` and `ìmport_task_id` columns to DIPs model and
    ES doc. Reflect status on related DigitalFiles.
  - Set it to `PENDING` when the METS handling process is launched and
    to `FAILURE` or `SUCCESS` in the task `after_return` call.
  - For editors and admins:
    - Show spinning icon instead of links for DIPs and DigitalFiles when
      the import is `PENDING`.
    - Show error message in view pages.
  - For other users:
    - Do not show DIPs or DigitalFiles with an import status of
      `FAILURE` or `PENDING`.
  - Add flash messages system to show user notifications on import
    launch and on import failure.
- Create `dips.models.TaskResult` proxy model of `django_celery_results`
  to format error message.
- Move some functions to helpers and add tests.
- Add tests for task and `after_return` calls
- Set `CELERY_BROKER_URL` env. var. in Tox test environment.
- Improve docker-compose environment:
  - Remove ES network and use default network.
  - Set `CELERY_BROKER_URL` env. var.
  - Add redis and worker services.
  - Add redis_data volume.
@jraddaoui jraddaoui merged commit 46b8b42 into master Aug 14, 2018
@jraddaoui jraddaoui deleted the dev/issue-15-celery branch August 14, 2018 18:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants