Skip to content

Commit

Permalink
Properly update states when flushing email queue
Browse files Browse the repository at this point in the history
- if flushing failed, mark it as failed
- if celery is not used and the email was sent, mark it as sent
- commit either of these state updates
  • Loading branch information
ThiefMaster committed Oct 31, 2017
1 parent 37a4a03 commit 3ee95b0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
6 changes: 3 additions & 3 deletions indico/core/emails.py
Expand Up @@ -51,7 +51,7 @@ def send_email_task(task, email, log_entry=None):
task.retry(countdown=delay, max_retries=(MAX_TRIES - 1))
except MaxRetriesExceededError:
if log_entry:
_update_email_log_state(log_entry, failed=True)
update_email_log_state(log_entry, failed=True)
db.session.commit()
# store the email in case the mail server is unavailable for an
# extended period so someone can recover it using `indico shell`
Expand Down Expand Up @@ -85,10 +85,10 @@ def do_send_email(email, log_entry=None, _from_task=False):
if not _from_task:
logger.info('Sent email "%s"', truncate(email['subject'], 50))
if log_entry:
_update_email_log_state(log_entry)
update_email_log_state(log_entry)


def _update_email_log_state(log_entry, failed=False):
def update_email_log_state(log_entry, failed=False):
if failed:
log_entry.data['state'] = 'failed'
else:
Expand Down
22 changes: 17 additions & 5 deletions indico/core/notifications.py
Expand Up @@ -23,6 +23,7 @@
from flask import g

from indico.core.config import config
from indico.core.db import db
from indico.core.logger import Logger
from indico.util.string import to_unicode, truncate

Expand Down Expand Up @@ -93,8 +94,15 @@ def init_email_queue():


def flush_email_queue():
"""Send all the emails in the queue"""
from indico.core.emails import store_failed_email
"""Send all the emails in the queue.
Note: This function does a database commit to update states
in case of failures or immediately-sent emails. It should only
be called if the session is in a state safe to commit or after
doing a commit/rollback of any other changes that might have
been pending.
"""
from indico.core.emails import store_failed_email, update_email_log_state
queue = g.get('email_queue', [])
if not queue:
return
Expand All @@ -103,16 +111,20 @@ def flush_email_queue():
try:
fn(email, log_entry)
except Exception:
path = store_failed_email(email, log_entry)
# Flushing the email queue happens after a commit.
# If anything goes wrong here we keep going and just log
# it to avoid losing emails in case celery is not used for
# email sending or there is a temporary issue with celery.
# it to avoid losing (more) emails in case celery is not
# used for email sending or there is a temporary issue
# with celery.
if log_entry:
update_email_log_state(log_entry, failed=True)
path = store_failed_email(email, log_entry)
logger.exception('Flushing queued email "%s" failed; stored data in %s',
truncate(email['subject'], 50), path)
# Wait for a short moment in case it's a very temporary issue
time.sleep(0.25)
del queue[:]
db.session.commit()


def make_email(to_list=None, cc_list=None, bcc_list=None, from_address=None, reply_address=None, attachments=None,
Expand Down

0 comments on commit 3ee95b0

Please sign in to comment.