Skip to content

Commit

Permalink
Dataset deletion: Mechanism to decrease contention in Undertaker; Fix r…
Browse files Browse the repository at this point in the history
  • Loading branch information
bari12 authored and Ruturaj123 committed Jun 19, 2019
1 parent bc17932 commit 018cc99
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions lib/rucio/daemons/undertaker/undertaker.py
Expand Up @@ -15,7 +15,7 @@
# Authors:
# - Vincent Garonne <vgaronne@gmail.com>, 2013-2018
# - Cedric Serfon <cedric.serfon@cern.ch>, 2013-2015
# - Martin Barisits <martin.barisits@cern.ch>, 2016-2018
# - Martin Barisits <martin.barisits@cern.ch>, 2016-2019
# - Hannes Hansen <hannes.jakob.hansen@cern.ch>, 2018-2019
#
# PY3K COMPATIBLE
Expand All @@ -32,9 +32,15 @@
import time
import traceback

from copy import deepcopy
from datetime import datetime, timedelta
from re import match
from random import randint

from sqlalchemy.exc import DatabaseError

from rucio.common.config import config_get
from rucio.common.exception import DatabaseException, RuleNotFound
from rucio.common.exception import DatabaseException, UnsupportedOperation, RuleNotFound
from rucio.common.utils import chunks
from rucio.core.heartbeat import live, die, sanity_check
from rucio.core.monitor import record_counter
Expand Down Expand Up @@ -62,12 +68,24 @@ def undertaker(worker_number=1, total_workers=1, chunk_size=5, once=False):
pid = os.getpid()
thread = threading.current_thread()
sanity_check(executable='rucio-undertaker', hostname=hostname)

paused_dids = {} # {(scope, name): datetime}

while not GRACEFUL_STOP.is_set():
try:
heartbeat = live(executable='rucio-undertaker', hostname=hostname, pid=pid, thread=thread, older_than=6000)
logging.info('Undertaker({0[worker_number]}/{0[total_workers]}): Live gives {0[heartbeat]}'.format(locals()))

# Refresh paused dids
iter_paused_dids = deepcopy(paused_dids)
for key in iter_paused_dids:
if datetime.utcnow() > paused_dids[key]:
del paused_dids[key]

dids = list_expired_dids(worker_number=heartbeat['assign_thread'] + 1, total_workers=heartbeat['nr_threads'], limit=10000)

dids = [did for did in dids if (did['scope'], did['name']) not in paused_dids]

if not dids and not once:
logging.info('Undertaker(%s): Nothing to do. sleep 60.', worker_number)
time.sleep(60)
Expand All @@ -81,8 +99,14 @@ def undertaker(worker_number=1, total_workers=1, chunk_size=5, once=False):
record_counter(counters='undertaker.delete_dids', delta=len(chunk))
except RuleNotFound as error:
logging.error(error)
except DatabaseException as error:
logging.error('Undertaker(%s): Got database error %s.', worker_number, str(error))
except (DatabaseException, DatabaseError, UnsupportedOperation) as e:
if match('.*ORA-00054.*', str(e.args[0])) or match('.*55P03.*', str(e.args[0])) or match('.*3572.*', str(e.args[0])):
for did in chunk:
paused_dids[(did['scope'], did['name'])] = datetime.utcnow() + timedelta(seconds=randint(600, 2400))
record_counter('undertaker.delete_dids.exceptions.LocksDetected')
logging.warning('undertaker[%s/%s]: Locks detected for chunk', heartbeat['assign_thread'], heartbeat['nr_threads'])
else:
logging.error('Undertaker(%s): Got database error %s.', worker_number, str(error))
except:
logging.critical(traceback.format_exc())
time.sleep(1)
Expand Down

0 comments on commit 018cc99

Please sign in to comment.