From 51ebd712142014a98886ed670cf2cbe04b3e60cd Mon Sep 17 00:00:00 2001 From: Benedikt Ziemons Date: Wed, 19 Aug 2020 18:12:13 +0200 Subject: [PATCH] Core & Internals: Add is_old_db method; Fix #1157 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … to test for an old alembic revision in the database. Move alembic migration to tools/alembic_migration.sh. Add is_old_db test in alembic_migration.sh. Add startup tests for old DB in all daemons. Add test_daemons.py for testing all daemons on that behavior. --- lib/rucio/alembicrevision.py | 19 ++++ lib/rucio/daemons/abacus/account.py | 15 ++- .../daemons/abacus/collection_replica.py | 11 +- lib/rucio/daemons/abacus/rse.py | 13 ++- lib/rucio/daemons/atropos/atropos.py | 17 ++- lib/rucio/daemons/automatix/automatix.py | 17 +-- lib/rucio/daemons/badreplicas/minos.py | 6 +- .../badreplicas/minos_temporary_expiration.py | 8 +- lib/rucio/daemons/badreplicas/necromancer.py | 18 ++-- lib/rucio/daemons/c3po/c3po.py | 32 +++--- lib/rucio/daemons/cache/consumer.py | 13 ++- lib/rucio/daemons/conveyor/finisher.py | 26 +++-- lib/rucio/daemons/conveyor/fts_throttler.py | 22 ++-- lib/rucio/daemons/conveyor/poller.py | 27 +++-- lib/rucio/daemons/conveyor/poller_latest.py | 16 ++- lib/rucio/daemons/conveyor/receiver.py | 13 ++- lib/rucio/daemons/conveyor/stager.py | 25 +++-- lib/rucio/daemons/conveyor/submitter.py | 38 ++++--- lib/rucio/daemons/conveyor/throttler.py | 19 ++-- lib/rucio/daemons/follower/follower.py | 17 ++- lib/rucio/daemons/hermes/hermes.py | 24 +++-- lib/rucio/daemons/hermes/hermes2.py | 6 +- lib/rucio/daemons/judge/cleaner.py | 13 ++- lib/rucio/daemons/judge/evaluator.py | 12 ++- lib/rucio/daemons/judge/injector.py | 10 +- lib/rucio/daemons/judge/repairer.py | 17 +-- .../daemons/oauthmanager/oauthmanager.py | 15 ++- lib/rucio/daemons/reaper/dark_reaper.py | 16 ++- lib/rucio/daemons/reaper/light_reaper.py | 16 ++- lib/rucio/daemons/reaper/reaper.py | 21 ++-- lib/rucio/daemons/reaper/reaper2.py | 22 ++-- .../suspicious_replica_recoverer.py | 40 +++---- .../sonar/distribution/distribution_daemon.py | 19 ++-- .../sonar/sonar/sonar_v3_dev_daemon.py | 17 +-- lib/rucio/daemons/tracer/kronos.py | 30 ++++-- .../daemons/transmogrifier/transmogrifier.py | 15 ++- lib/rucio/daemons/undertaker/undertaker.py | 24 +++-- lib/rucio/db/sqla/util.py | 37 ++++++- lib/rucio/tests/test_abacus_account.py | 34 +++--- .../tests/test_abacus_collection_replica.py | 38 +++---- lib/rucio/tests/test_abacus_rse.py | 32 +++--- lib/rucio/tests/test_daemons.py | 100 ++++++++++++++++++ tools/alembic_migration.sh | 35 ++++++ tools/run_multi_vo_tests_docker.sh | 9 +- tools/run_tests.sh | 11 +- tools/run_tests_docker.sh | 11 +- 46 files changed, 683 insertions(+), 313 deletions(-) create mode 100644 lib/rucio/alembicrevision.py create mode 100644 lib/rucio/tests/test_daemons.py create mode 100755 tools/alembic_migration.sh diff --git a/lib/rucio/alembicrevision.py b/lib/rucio/alembicrevision.py new file mode 100644 index 0000000000..e76aaed6d4 --- /dev/null +++ b/lib/rucio/alembicrevision.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 CERN for the benefit of the ATLAS collaboration. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Authors: +# - Benedikt Ziemons , 2020 + +ALEMBIC_REVISION = '50280c53117c' # the current alembic head revision diff --git a/lib/rucio/daemons/abacus/account.py b/lib/rucio/daemons/abacus/account.py index 5270ec31a0..f9f1337e42 100644 --- a/lib/rucio/daemons/abacus/account.py +++ b/lib/rucio/daemons/abacus/account.py @@ -1,4 +1,5 @@ -# Copyright 2014-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2014-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,11 +14,12 @@ # limitations under the License. # # Authors: -# - Martin Barisits , 2014-2016 -# - Vincent Garonne , 2014-2018 +# - Martin Barisits , 2014-2019 +# - Vincent Garonne , 2014-2018 # - Hannes Hansen , 2018-2019 -# - Brandon White , 2019-2020 +# - Brandon White , 2019 # - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -33,10 +35,12 @@ import time import traceback +from rucio.common import exception from rucio.common.config import config_get from rucio.common.utils import get_thread_with_periodic_running_function from rucio.core.heartbeat import live, die, sanity_check from rucio.core.account_counter import get_updated_account_counters, update_account_counter, fill_account_counter_history_table +from rucio.db.sqla.util import is_old_db graceful_stop = threading.Event() @@ -109,6 +113,9 @@ def run(once=False, threads=1, fill_history_table=False): """ Starts up the Abacus-Account threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + executable = 'abacus-account' hostname = socket.gethostname() sanity_check(executable=executable, hostname=hostname) diff --git a/lib/rucio/daemons/abacus/collection_replica.py b/lib/rucio/daemons/abacus/collection_replica.py index 787d253764..4187896ef1 100644 --- a/lib/rucio/daemons/abacus/collection_replica.py +++ b/lib/rucio/daemons/abacus/collection_replica.py @@ -1,4 +1,5 @@ -# Copyright 2014-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,8 +14,9 @@ # limitations under the License. # # Authors: -# - Hannes Hansen , 2018 +# - Hannes Hansen , 2018-2019 # - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 """ Abacus-Collection-Replica is a daemon to update collection replica. @@ -28,9 +30,11 @@ import time import traceback +from rucio.common import exception from rucio.common.config import config_get from rucio.core.heartbeat import live, die, sanity_check from rucio.core.replica import get_cleaned_updated_collection_replicas, update_collection_replica +from rucio.db.sqla.util import is_old_db graceful_stop = threading.Event() @@ -102,6 +106,9 @@ def run(once=False, threads=1): """ Starts up the Abacus-Collection-Replica threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + executable = 'abacus-collection-replica' hostname = socket.gethostname() sanity_check(executable=executable, hostname=hostname) diff --git a/lib/rucio/daemons/abacus/rse.py b/lib/rucio/daemons/abacus/rse.py index 6d5c6ffe5a..fce97adf01 100644 --- a/lib/rucio/daemons/abacus/rse.py +++ b/lib/rucio/daemons/abacus/rse.py @@ -1,4 +1,5 @@ -# Copyright 2014-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2014-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,10 +15,11 @@ # # Authors: # - Martin Barisits , 2014-2016 -# - Vincent Garonne , 2018 +# - Vincent Garonne , 2018 # - Hannes Hansen , 2018-2019 -# - Brandon White , 2019-2020 +# - Brandon White , 2019 # - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -33,10 +35,12 @@ import time import traceback +from rucio.common import exception from rucio.common.config import config_get from rucio.common.utils import get_thread_with_periodic_running_function from rucio.core.heartbeat import live, die, sanity_check from rucio.core.rse_counter import get_updated_rse_counters, update_rse_counter, fill_rse_counter_history_table +from rucio.db.sqla.util import is_old_db graceful_stop = threading.Event() @@ -108,6 +112,9 @@ def run(once=False, threads=1, fill_history_table=False): """ Starts up the Abacus-RSE threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + executable = 'abacus-rse' hostname = socket.gethostname() sanity_check(executable=executable, hostname=hostname) diff --git a/lib/rucio/daemons/atropos/atropos.py b/lib/rucio/daemons/atropos/atropos.py index de3fe1850f..a4a4c354d4 100644 --- a/lib/rucio/daemons/atropos/atropos.py +++ b/lib/rucio/daemons/atropos/atropos.py @@ -1,4 +1,5 @@ -# Copyright 2016-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,14 +14,15 @@ # limitations under the License. # # Authors: -# - Cedric Serfon , 2016-2018 -# - Vincent Garonne , 2018 +# - Mario Lassnig , 2018 # - Dimitrios Christidis , 2018-2019 # - Hannes Hansen , 2018 # - Andrew Lister , 2019 -# - Brandon White , 2019-2020 +# - Brandon White , 2019 +# - Patrick Austin , 2020 # - Thomas Beermann , 2020 -# - Patrick Austin, , 2020 +# - Eli Chadwick , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -35,6 +37,7 @@ from sys import exc_info, stdout from traceback import format_exception +from rucio.common import exception from rucio.db.sqla.constants import LifetimeExceptionsState from rucio.common.config import config_get from rucio.common.exception import InvalidRSEExpression, RuleNotFound @@ -44,6 +47,7 @@ from rucio.core.rse import get_rse_name, get_rse_vo from rucio.core.rse_expression_parser import parse_expression from rucio.core.rule import get_rules_beyond_eol, update_rule +from rucio.db.sqla.util import is_old_db logging.basicConfig(stream=stdout, level=getattr(logging, @@ -203,6 +207,9 @@ def run(threads=1, bulk=100, date_check=None, dry_run=True, grace_period=86400, """ Starts up the atropos threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + if not date_check: date_check = datetime.datetime.now() else: diff --git a/lib/rucio/daemons/automatix/automatix.py b/lib/rucio/daemons/automatix/automatix.py index 6b8f5dacec..598934f9e3 100644 --- a/lib/rucio/daemons/automatix/automatix.py +++ b/lib/rucio/daemons/automatix/automatix.py @@ -1,4 +1,5 @@ -# Copyright 2013-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,16 +14,15 @@ # limitations under the License. # # Authors: -# - Vincent Garonne , 2013-2018 -# - Cedric Serfon , 2013-2020 -# - Ralph Vigne , 2013 -# - Mario Lassnig , 2014 -# - Tomas Kouba , 2015 +# - Mario Lassnig , 2018 # - Hannes Hansen , 2018 # - Andrew Lister , 2019 +# - Brandon White , 2019 +# - Cedric Serfon , 2020 # - Eli Chadwick , 2020 # - Patrick Austin , 2020 # - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -44,10 +44,12 @@ from time import sleep, time from rucio.client import Client +from rucio.common import exception from rucio.common.config import config_get from rucio.common.utils import adler32 from rucio.core.config import get from rucio.core import monitor, heartbeat +from rucio.db.sqla.util import is_old_db from rucio.rse import rsemanager as rsemgr from rucio.core.scope import list_scopes from rucio.common.types import InternalScope @@ -300,6 +302,9 @@ def run(total_workers=1, once=False, inputfile=None): """ Starts up the automatix threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + try: sites = [s.strip() for s in get('automatix', 'sites').split(',')] except Exception: diff --git a/lib/rucio/daemons/badreplicas/minos.py b/lib/rucio/daemons/badreplicas/minos.py index e695b17516..d11dec75f2 100644 --- a/lib/rucio/daemons/badreplicas/minos.py +++ b/lib/rucio/daemons/badreplicas/minos.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -37,6 +38,7 @@ from datetime import datetime from sys import stdout +from rucio.common import exception from rucio.db.sqla.constants import BadFilesStatus, BadPFNStatus, ReplicaState from rucio.db.sqla.session import get_session @@ -50,7 +52,7 @@ from rucio.core.rse import get_rse_name from rucio.core import heartbeat - +from rucio.db.sqla.util import is_old_db logging.basicConfig(stream=stdout, level=getattr(logging, @@ -271,6 +273,8 @@ def run(threads=1, bulk=100, once=False, sleep_time=60): """ Starts up the minos threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') if once: logging.info('Will run only one iteration in a single threaded mode') diff --git a/lib/rucio/daemons/badreplicas/minos_temporary_expiration.py b/lib/rucio/daemons/badreplicas/minos_temporary_expiration.py index 9456f7d901..9e617bb8a5 100644 --- a/lib/rucio/daemons/badreplicas/minos_temporary_expiration.py +++ b/lib/rucio/daemons/badreplicas/minos_temporary_expiration.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,8 +14,8 @@ # limitations under the License. # # Authors: -# - Cedric Serfon , 2018-2019 # - Martin Barisits , 2018-2019 +# - Cedric Serfon , 2019 # - Andrew Lister , 2019 # - Brandon White , 2019 # - Thomas Beermann , 2020 @@ -32,6 +33,7 @@ from sys import stdout +from rucio.common import exception from rucio.db.sqla.constants import BadFilesStatus, ReplicaState from rucio.db.sqla.session import get_session @@ -43,7 +45,7 @@ bulk_delete_bad_replicas, list_expired_temporary_unavailable_replicas) from rucio.core import heartbeat - +from rucio.db.sqla.util import is_old_db logging.basicConfig(stream=stdout, level=getattr(logging, @@ -163,6 +165,8 @@ def run(threads=1, bulk=100, once=False, sleep_time=60): """ Starts up the minos threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') if once: logging.info('Will run only one iteration in a single threaded mode') diff --git a/lib/rucio/daemons/badreplicas/necromancer.py b/lib/rucio/daemons/badreplicas/necromancer.py index b86a0a7898..2e0b016ae1 100644 --- a/lib/rucio/daemons/badreplicas/necromancer.py +++ b/lib/rucio/daemons/badreplicas/necromancer.py @@ -1,4 +1,5 @@ -# Copyright 2014-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,13 +14,11 @@ # limitations under the License. # # Authors: -# - Cedric Serfon , 2014-2019 -# - Vincent Garonne , 2015-2018 -# - Mario Lassnig , 2015 -# - Wen Guan , 2015 -# - Hannes Hansen , 2018-2019 -# - Brandon White , 2019-2020 +# - Cedric Serfon , 2018-2019 +# - Martin Barisits , 2019 +# - Brandon White , 2019 # - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -35,6 +34,7 @@ from sys import exc_info, stdout from traceback import format_exception +from rucio.common import exception from rucio.db.sqla.constants import ReplicaState from rucio.common.config import config_get from rucio.common.utils import chunks @@ -42,7 +42,7 @@ from rucio.core import monitor, heartbeat from rucio.core.replica import list_bad_replicas, get_replicas_state, list_bad_replicas_history, update_bad_replicas_history from rucio.core.rule import update_rules_for_lost_replica, update_rules_for_bad_replica - +from rucio.db.sqla.util import is_old_db logging.basicConfig(stream=stdout, level=getattr(logging, @@ -150,6 +150,8 @@ def run(threads=1, bulk=100, once=False): """ Starts up the necromancer threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') if once: logging.info('Will run only one iteration in a single threaded mode') diff --git a/lib/rucio/daemons/c3po/c3po.py b/lib/rucio/daemons/c3po/c3po.py index 2910f5cbea..c4872c1da0 100644 --- a/lib/rucio/daemons/c3po/c3po.py +++ b/lib/rucio/daemons/c3po/c3po.py @@ -1,4 +1,5 @@ -# Copyright 2015-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2015-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,9 +15,11 @@ # # Authors: # - Thomas Beermann , 2015-2017 -# - Vincent Garonne , 2017-2018 +# - Vincent Garonne , 2017-2018 # - Hannes Hansen , 2018-2019 +# - Andrew Lister , 2019 # - Patrick Austin , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -28,27 +31,29 @@ from datetime import datetime from hashlib import md5 from json import dumps -try: - from Queue import Queue -except ImportError: - from queue import Queue -from six import string_types from sys import stdout -from time import sleep from threading import Event, Thread +from time import sleep from uuid import uuid4 from requests import post from requests.auth import HTTPBasicAuth from requests.exceptions import RequestException +from six import string_types from rucio.client import Client +from rucio.common import exception from rucio.common.config import config_get, config_get_options -from rucio.common.exception import RucioException from rucio.common.types import InternalScope from rucio.daemons.c3po.collectors.free_space import FreeSpaceCollector from rucio.daemons.c3po.collectors.jedi_did import JediDIDCollector from rucio.daemons.c3po.collectors.workload import WorkloadCollector +from rucio.db.sqla.util import is_old_db + +try: + from Queue import Queue +except ImportError: + from queue import Queue logging.basicConfig(stream=stdout, level=getattr(logging, @@ -256,7 +261,7 @@ def place_replica(once=False, # DO IT! try: add_rule(client, {'scope': did[0].external, 'name': did[1]}, decision.get('source_rse'), decision.get('destination_rse')) - except RucioException as e: + except exception.RucioException as e: logging.debug(e) w = 0 @@ -289,6 +294,9 @@ def run(once=False, """ Starts up the main thread """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + logging.info('activating C-3PO') thread_list = [] @@ -327,5 +335,5 @@ def run(once=False, while len(thread_list) > 0: [t.join(timeout=3) for t in thread_list if t and t.isAlive()] - except Exception as exception: - logging.critical(exception) + except Exception as error: + logging.critical(error) diff --git a/lib/rucio/daemons/cache/consumer.py b/lib/rucio/daemons/cache/consumer.py index f61bdf4785..e9fa0c23c4 100644 --- a/lib/rucio/daemons/cache/consumer.py +++ b/lib/rucio/daemons/cache/consumer.py @@ -1,4 +1,5 @@ -# Copyright 2014-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2014-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,12 +14,13 @@ # limitations under the License. # # Authors: -# - Wen Guan , 2014 -# - Vincent Garonne , 2016-2018 +# - Wen Guan , 2014 +# - Vincent Garonne , 2016-2018 # - Mario Lassnig , 2017 # - Robert Illingworth , 2018 # - Hannes Hansen , 2018 # - Andrew Lister , 2019 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -37,12 +39,13 @@ import json import stomp +from rucio.common import exception from rucio.common.config import config_get, config_get_int from rucio.common.types import InternalScope from rucio.core.monitor import record_counter from rucio.core.volatile_replica import add_volatile_replicas, delete_volatile_replicas from rucio.core.rse import get_rse_id - +from rucio.db.sqla.util import is_old_db logging.getLogger("stomp").setLevel(logging.CRITICAL) @@ -179,6 +182,8 @@ def run(num_thread=1): """ Starts up the rucio cache consumer thread """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') logging.info('starting consumer thread') threads = [threading.Thread(target=consumer, kwargs={'id': i, 'num_thread': num_thread}) for i in range(0, num_thread)] diff --git a/lib/rucio/daemons/conveyor/finisher.py b/lib/rucio/daemons/conveyor/finisher.py index 6db3e538f1..460e022add 100644 --- a/lib/rucio/daemons/conveyor/finisher.py +++ b/lib/rucio/daemons/conveyor/finisher.py @@ -1,4 +1,5 @@ -# Copyright 2015-2019 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2015-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,17 +14,18 @@ # limitations under the License. # # Authors: -# - Wen Guan , 2015-2016 +# - Wen Guan , 2015-2016 # - Mario Lassnig , 2015-2019 -# - Vincent Garonne , 2015-2018 +# - Vincent Garonne , 2015-2018 # - Martin Barisits , 2015-2019 # - Cedric Serfon , 2017-2020 # - Hannes Hansen , 2018 # - Robert Illingworth , 2019 # - Andrew Lister , 2019 -# - Brandon White , 2019-2020 +# - Brandon White , 2019 # - Eli Chadwick , 2020 # - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -43,27 +45,29 @@ import time import traceback -try: - from urlparse import urlparse # py2 -except ImportError: - from urllib.parse import urlparse # py3 - from dogpile.cache import make_region from dogpile.cache.api import NoValue from sqlalchemy.exc import DatabaseError +from rucio.common import exception from rucio.common.config import config_get +from rucio.common.exception import DatabaseException, ConfigNotFound, UnsupportedOperation, ReplicaNotFound, RequestNotFound from rucio.common.types import InternalAccount from rucio.common.utils import chunks -from rucio.common.exception import DatabaseException, ConfigNotFound, UnsupportedOperation, ReplicaNotFound, RequestNotFound from rucio.core import request as request_core, heartbeat, replica as replica_core from rucio.core.config import items from rucio.core.monitor import record_timer, record_counter from rucio.core.rse import list_rses from rucio.db.sqla.constants import RequestState, RequestType, ReplicaState, BadFilesStatus from rucio.db.sqla.session import transactional_session +from rucio.db.sqla.util import is_old_db from rucio.rse import rsemanager +try: + from urlparse import urlparse # py2 +except ImportError: + from urllib.parse import urlparse # py3 + logging.basicConfig(stream=sys.stdout, level=getattr(logging, @@ -187,6 +191,8 @@ def run(once=False, total_threads=1, sleep_time=60, activities=None, bulk=100, d """ Starts up the conveyer threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') if once: logging.info('executing one finisher iteration only') diff --git a/lib/rucio/daemons/conveyor/fts_throttler.py b/lib/rucio/daemons/conveyor/fts_throttler.py index 32d73a3f57..2fe088243b 100644 --- a/lib/rucio/daemons/conveyor/fts_throttler.py +++ b/lib/rucio/daemons/conveyor/fts_throttler.py @@ -1,4 +1,5 @@ -# Copyright 2019 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2019-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,10 +14,11 @@ # limitations under the License. # # Authors: -# - Dilaksun Bavarajan , 2019 +# - Dilaksun Bavarajan , 2019 # - Martin Barisits , 2019 -# - Brandon White , 2019-2020 +# - Brandon White , 2019 # - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -28,20 +30,22 @@ """ from __future__ import division +import datetime +import json import logging -import sys -import threading import os import socket +import sys +import threading import time import traceback -import json -import datetime -import requests +import requests +from rucio.common import exception from rucio.common.config import config_get from rucio.core import heartbeat +from rucio.db.sqla.util import is_old_db from rucio.transfertool.fts3 import FTS3Transfertool logging.basicConfig(stream=sys.stdout, @@ -437,6 +441,8 @@ def run(once=False, cycle_interval=3600): """ Starts up the conveyer fts throttler thread. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') logging.info('starting throttler thread') fts_throttler_thread = threading.Thread(target=fts_throttler, kwargs={'once': once, 'cycle_interval': cycle_interval}) diff --git a/lib/rucio/daemons/conveyor/poller.py b/lib/rucio/daemons/conveyor/poller.py index 89aba2e17f..0efc0e8f3d 100644 --- a/lib/rucio/daemons/conveyor/poller.py +++ b/lib/rucio/daemons/conveyor/poller.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright 2013-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,12 +16,15 @@ # Authors: # - Mario Lassnig , 2013-2020 # - Cedric Serfon , 2013-2018 -# - Vincent Garonne , 2014-2018 -# - Wen Guan , 2014-2016 -# - Martin Barisits , 2016-2017 +# - Vincent Garonne , 2014-2018 +# - Wen Guan , 2014-2016 +# - Martin Barisits , 2016-2020 # - Hannes Hansen , 2018 -# - Brandon White , 2019-2020 +# - maatthias , 2019 +# - Brandon White , 2019 +# - Nick Smith , 2020 # - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -40,21 +44,24 @@ import threading import time import traceback - from collections import defaultdict -try: - from ConfigParser import NoOptionError # py2 -except Exception: - from configparser import NoOptionError # py3 + from requests.exceptions import RequestException from sqlalchemy.exc import DatabaseError +from rucio.common import exception from rucio.common.config import config_get from rucio.common.exception import DatabaseException, TransferToolTimeout, TransferToolWrongAnswer from rucio.common.utils import chunks from rucio.core import heartbeat, transfer as transfer_core, request as request_core from rucio.core.monitor import record_timer, record_counter from rucio.db.sqla.constants import RequestState, RequestType +from rucio.db.sqla.util import is_old_db + +try: + from ConfigParser import NoOptionError # py2 +except Exception: + from configparser import NoOptionError # py3 logging.basicConfig(stream=sys.stdout, @@ -179,6 +186,8 @@ def run(once=False, sleep_time=60, activities=None, """ Starts up the conveyer threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') if activity_shares: diff --git a/lib/rucio/daemons/conveyor/poller_latest.py b/lib/rucio/daemons/conveyor/poller_latest.py index ebbc04b36d..a99f329ff2 100644 --- a/lib/rucio/daemons/conveyor/poller_latest.py +++ b/lib/rucio/daemons/conveyor/poller_latest.py @@ -1,4 +1,5 @@ -# Copyright 2015-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2015-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,10 +14,12 @@ # limitations under the License. # # Authors: -# - Wen Guan , 2015-2016 -# - Vincent Garonne , 2015-2018 +# - Wen Guan , 2015-2016 +# - Vincent Garonne , 2015-2018 # - Martin Barisits , 2016-2017 # - Cedric Serfon , 2018 +# - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -27,19 +30,20 @@ import datetime import logging import os -import sys import socket +import sys import threading import time import traceback from requests.exceptions import RequestException +from rucio.common import exception from rucio.common.config import config_get from rucio.core import heartbeat, transfer, request from rucio.core.monitor import record_timer, record_counter from rucio.db.sqla.constants import FTSState - +from rucio.db.sqla.util import is_old_db logging.basicConfig(stream=sys.stdout, level=getattr(logging, @@ -135,6 +139,8 @@ def run(once=False, last_nhours=1, external_hosts=None, fts_wait=1800, total_thr """ Starts up the conveyer threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') if not external_hosts: external_hosts = [] diff --git a/lib/rucio/daemons/conveyor/receiver.py b/lib/rucio/daemons/conveyor/receiver.py index 8987d914da..b349241856 100644 --- a/lib/rucio/daemons/conveyor/receiver.py +++ b/lib/rucio/daemons/conveyor/receiver.py @@ -1,4 +1,5 @@ -# Copyright 2015-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2015-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,15 +14,16 @@ # limitations under the License. # # Authors: -# - Wen Guan , 2015-2016 +# - Wen Guan , 2015-2016 # - Mario Lassnig , 2015 # - Martin Barisits , 2015-2018 -# - Vincent Garonne , 2015-2018 +# - Vincent Garonne , 2015-2018 # - Cedric Serfon , 2018 # - Robert Illingworth , 2018 # - Hannes Hansen , 2018 # - Andrew Lister , 2019 # - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -43,13 +45,14 @@ import stomp +from rucio.common import exception from rucio.common.config import config_get, config_get_bool, config_get_int from rucio.common.policy import get_policy from rucio.core import heartbeat, request from rucio.core.monitor import record_counter from rucio.core.transfer import set_transfer_update_time from rucio.db.sqla.constants import RequestState, FTSCompleteState - +from rucio.db.sqla.util import is_old_db logging.getLogger("stomp").setLevel(logging.CRITICAL) @@ -291,6 +294,8 @@ def run(once=False, total_threads=1, full_mode=False): """ Starts up the receiver thread """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') logging.info('starting receiver thread') threads = [threading.Thread(target=receiver, kwargs={'id': i, diff --git a/lib/rucio/daemons/conveyor/stager.py b/lib/rucio/daemons/conveyor/stager.py index 09b6822ca1..fa084588d7 100644 --- a/lib/rucio/daemons/conveyor/stager.py +++ b/lib/rucio/daemons/conveyor/stager.py @@ -1,4 +1,5 @@ -# Copyright 2015-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2015-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,14 +14,15 @@ # limitations under the License. # # Authors: -# - Wen Guan , 2015-2016 +# - Wen Guan , 2015-2016 # - Martin Barisits , 2015-2017 -# - Vincent Garonne , 2016-2018 +# - Vincent Garonne , 2016-2018 # - Thomas Beermann , 2017-2020 # - Cedric Serfon , 2018-2019 # - Hannes Hansen , 2018 -# - Brandon White , 2019-2020 +# - Brandon White , 2019 # - Patrick Austin , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -37,13 +39,9 @@ import threading import time import traceback - from collections import defaultdict -try: - from ConfigParser import NoOptionError # py2 -except Exception: - from configparser import NoOptionError # py3 +from rucio.common import exception from rucio.common.config import config_get, config_get_bool from rucio.core import heartbeat from rucio.core.monitor import record_counter, record_timer @@ -51,6 +49,13 @@ from rucio.core.staging import get_stagein_requests_and_source_replicas from rucio.daemons.conveyor.common import submit_transfer, bulk_group_transfer, get_conveyor_rses from rucio.db.sqla.constants import RequestState +from rucio.db.sqla.util import is_old_db + +try: + from ConfigParser import NoOptionError # py2 +except Exception: + from configparser import NoOptionError # py3 + logging.basicConfig(stream=sys.stdout, level=getattr(logging, @@ -193,6 +198,8 @@ def run(once=False, total_threads=1, group_bulk=1, group_policy='rule', mock=Fal """ Starts up the conveyer threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') if mock: logging.info('mock source replicas: enabled') diff --git a/lib/rucio/daemons/conveyor/submitter.py b/lib/rucio/daemons/conveyor/submitter.py index cf41b96814..13bc6a9a8e 100644 --- a/lib/rucio/daemons/conveyor/submitter.py +++ b/lib/rucio/daemons/conveyor/submitter.py @@ -1,4 +1,5 @@ -# Copyright 2013-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2013-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,15 +17,20 @@ # - Mario Lassnig , 2013-2015 # - Cedric Serfon , 2013-2019 # - Ralph Vigne , 2013 -# - Vincent Garonne , 2014-2018 +# - Vincent Garonne , 2014-2018 # - Martin Barisits , 2014-2020 -# - Wen Guan , 2014-2016 -# - Tomas Kouba , 2014 -# - Joaquin Bogado , 2016 -# - Hannes Hansen , 2018 -# - Brandon White , 2019-2020 +# - Wen Guan , 2014-2016 +# - Tomáš Kouba , 2014 +# - Joaquín Bogado , 2016 +# - dciangot , 2018 +# - Hannes Hansen , 2018-2019 +# - maatthias , 2019 +# - Brandon White , 2019 # - Thomas Beermann , 2020 +# - Nick Smith , 2020 +# - James Perry , 2020 # - Patrick Austin , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -42,21 +48,25 @@ import threading import time import traceback - from collections import defaultdict -try: - from ConfigParser import NoOptionError # py2 -except Exception: - from configparser import NoOptionError # py3 -from six import iteritems + from prometheus_client import Counter +from six import iteritems +from rucio.common import exception from rucio.common.config import config_get, config_get_bool from rucio.common.schema import get_schema_value from rucio.core import heartbeat, request as request_core, transfer as transfer_core from rucio.core.monitor import record_counter, record_timer from rucio.daemons.conveyor.common import submit_transfer, bulk_group_transfer, get_conveyor_rses, USER_ACTIVITY from rucio.db.sqla.constants import RequestState +from rucio.db.sqla.util import is_old_db + +try: + from ConfigParser import NoOptionError # py2 +except Exception: + from configparser import NoOptionError # py3 + logging.basicConfig(stream=sys.stdout, level=getattr(logging, @@ -249,6 +259,8 @@ def run(once=False, group_bulk=1, group_policy='rule', mock=False, """ Starts up the conveyer threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') if mock: logging.info('mock source replicas: enabled') diff --git a/lib/rucio/daemons/conveyor/throttler.py b/lib/rucio/daemons/conveyor/throttler.py index 5c3f33bc45..9961fee810 100644 --- a/lib/rucio/daemons/conveyor/throttler.py +++ b/lib/rucio/daemons/conveyor/throttler.py @@ -1,4 +1,5 @@ -# Copyright 2016-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2016-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,15 +14,15 @@ # limitations under the License. # # Authors: -# - Wen Guan , 2016 -# - Vincent Garonne , 2016-2018 +# - Wen Guan , 2016 +# - Vincent Garonne , 2016-2018 # - Martin Barisits , 2017 # - Cedric Serfon , 2018 -# - Hannes Hansen , 2018 -# - Andrew Lister , 2019 # - Hannes Hansen , 2018-2019 -# - Brandon White , 2019-2020 +# - Andrew Lister , 2019 +# - Brandon White , 2019 # - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -40,7 +41,7 @@ import time import traceback - +from rucio.common import exception from rucio.common.config import config_get from rucio.common.utils import get_parsed_throttler_mode from rucio.core import heartbeat, config as config_core @@ -48,6 +49,7 @@ from rucio.core.request import get_stats_by_activity_direction_state, release_all_waiting_requests, release_waiting_requests_fifo, release_waiting_requests_grouped_fifo from rucio.core.rse import get_rse, set_rse_transfer_limits, delete_rse_transfer_limits, get_rse_transfer_limits from rucio.db.sqla.constants import RequestState +from rucio.db.sqla.util import is_old_db logging.basicConfig(stream=sys.stdout, level=getattr(logging, @@ -126,6 +128,9 @@ def run(once=False, sleep_time=600): """ Starts up the conveyer threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + if once: logging.info('running throttler one iteration only') throttler(once=True, sleep_time=sleep_time) diff --git a/lib/rucio/daemons/follower/follower.py b/lib/rucio/daemons/follower/follower.py index 538141eb35..0cc494c48e 100644 --- a/lib/rucio/daemons/follower/follower.py +++ b/lib/rucio/daemons/follower/follower.py @@ -1,4 +1,5 @@ -# Copyright 2014-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2019-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,21 +14,24 @@ # limitations under the License. # # Authors: -# - Ruturaj Gujar, , 2019 +# - Ruturaj Gujar , 2019 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE import logging import os import socket +import sys import threading import time -import sys +from rucio.common import exception +from rucio.common.config import config_get from rucio.common.utils import get_thread_with_periodic_running_function -from rucio.core.heartbeat import live, die, sanity_check from rucio.core.did import create_reports -from rucio.common.config import config_get +from rucio.core.heartbeat import live, die, sanity_check +from rucio.db.sqla.util import is_old_db graceful_stop = threading.Event() @@ -78,6 +82,9 @@ def run(once=False, threads=1): """ Starts up the follower threads """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + hostname = socket.gethostname() sanity_check(executable='rucio-follower', hostname=hostname) diff --git a/lib/rucio/daemons/hermes/hermes.py b/lib/rucio/daemons/hermes/hermes.py index 0421a26ca0..0fe9ad4b56 100644 --- a/lib/rucio/daemons/hermes/hermes.py +++ b/lib/rucio/daemons/hermes/hermes.py @@ -1,4 +1,5 @@ -# Copyright 2014-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2014-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,10 +16,13 @@ # Authors: # - Mario Lassnig , 2014-2018 # - Thomas Beermann , 2014-2020 -# - Wen Guan , 2014-2015 -# - Vincent Garonne , 2015-2018 -# - Martin Barisits , 2016-2017 +# - Wen Guan , 2014-2015 +# - Vincent Garonne , 2015-2018 +# - Martin Barisits , 2016-2019 # - Robert Illingworth , 2018 +# - Hannes Hansen , 2019 +# - Eric Vaandering , 2019 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -36,19 +40,19 @@ import threading import time import traceback - from email.mime.text import MIMEText -from six import PY2 -from prometheus_client import Counter -from sqlalchemy.orm.exc import NoResultFound import stomp +from prometheus_client import Counter +from six import PY2 +from sqlalchemy.orm.exc import NoResultFound +from rucio.common import exception from rucio.common.config import config_get, config_get_int, config_get_bool from rucio.core.heartbeat import live, die, sanity_check from rucio.core.message import retrieve_messages, delete_messages from rucio.core.monitor import record_counter - +from rucio.db.sqla.util import is_old_db logging.getLogger('requests').setLevel(logging.CRITICAL) logging.getLogger('stomp').setLevel(logging.CRITICAL) @@ -396,6 +400,8 @@ def run(once=False, send_email=True, threads=1, bulk=1000, delay=10, broker_time ''' Starts up the hermes threads. ''' + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') logging.info('resolving brokers') diff --git a/lib/rucio/daemons/hermes/hermes2.py b/lib/rucio/daemons/hermes/hermes2.py index ec17b45bad..ebd6d5af2e 100644 --- a/lib/rucio/daemons/hermes/hermes2.py +++ b/lib/rucio/daemons/hermes/hermes2.py @@ -1,4 +1,5 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- # Copyright 2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -38,12 +39,13 @@ import requests +from rucio.common import exception from rucio.common.config import config_get from rucio.core import heartbeat from rucio.core.config import get from rucio.core.message import retrieve_messages, delete_messages, update_messages_services from rucio.common.exception import ConfigNotFound - +from rucio.db.sqla.util import is_old_db logging.getLogger('requests').setLevel(logging.CRITICAL) @@ -280,6 +282,8 @@ def run(once=False, threads=1, bulk=1000, sleep_time=10, broker_timeout=3): ''' Starts up the hermes2 threads. ''' + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') logging.info('starting hermes2 threads') thread_list = [threading.Thread(target=hermes2, kwargs={'thread': cnt, diff --git a/lib/rucio/daemons/judge/cleaner.py b/lib/rucio/daemons/judge/cleaner.py index 16020a76a8..9a714c133b 100644 --- a/lib/rucio/daemons/judge/cleaner.py +++ b/lib/rucio/daemons/judge/cleaner.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright 2013-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -35,21 +36,20 @@ import threading import time import traceback - - from copy import deepcopy from datetime import datetime, timedelta -from re import match from random import randint +from re import match from sqlalchemy.exc import DatabaseError +from rucio.common import exception from rucio.common.config import config_get from rucio.common.exception import DatabaseException, UnsupportedOperation, RuleNotFound from rucio.core.heartbeat import live, die, sanity_check -from rucio.core.rule import delete_rule, get_expired_rules from rucio.core.monitor import record_counter -from rucio.db.sqla.util import get_db_time +from rucio.core.rule import delete_rule, get_expired_rules +from rucio.db.sqla.util import get_db_time, is_old_db graceful_stop = threading.Event() @@ -156,6 +156,9 @@ def run(once=False, threads=1): """ Starts up the Judge-Clean threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + client_time, db_time = datetime.utcnow(), get_db_time() max_offset = timedelta(hours=1, seconds=10) if type(db_time) is datetime: diff --git a/lib/rucio/daemons/judge/evaluator.py b/lib/rucio/daemons/judge/evaluator.py index 5d222ac647..daa88c0603 100644 --- a/lib/rucio/daemons/judge/evaluator.py +++ b/lib/rucio/daemons/judge/evaluator.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright 2013-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -36,21 +37,22 @@ import threading import time import traceback - from datetime import datetime, timedelta -from re import match from random import randint -from six import iteritems +from re import match +from six import iteritems from sqlalchemy.exc import DatabaseError from sqlalchemy.orm.exc import FlushError +from rucio.common import exception from rucio.common.config import config_get from rucio.common.exception import DatabaseException, DataIdentifierNotFound, ReplicationRuleCreationTemporaryFailed from rucio.common.types import InternalScope from rucio.core.heartbeat import live, die, sanity_check -from rucio.core.rule import re_evaluate_did, get_updated_dids, delete_updated_did from rucio.core.monitor import record_counter +from rucio.core.rule import re_evaluate_did, get_updated_dids, delete_updated_did +from rucio.db.sqla.util import is_old_db graceful_stop = threading.Event() @@ -183,6 +185,8 @@ def run(once=False, threads=1): """ Starts up the Judge-Eval threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') executable = 'judge-evaluator' hostname = socket.gethostname() diff --git a/lib/rucio/daemons/judge/injector.py b/lib/rucio/daemons/judge/injector.py index d598be9a5d..c1e19e1bf0 100644 --- a/lib/rucio/daemons/judge/injector.py +++ b/lib/rucio/daemons/judge/injector.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright 2015-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -33,20 +34,21 @@ import threading import time import traceback - from copy import deepcopy from datetime import datetime, timedelta -from re import match from random import randint +from re import match from sqlalchemy.exc import DatabaseError +from rucio.common import exception from rucio.common.config import config_get from rucio.common.exception import (DatabaseException, RuleNotFound, RSEBlacklisted, ReplicationRuleCreationTemporaryFailed, InsufficientAccountLimit) from rucio.core.heartbeat import live, die, sanity_check -from rucio.core.rule import inject_rule, get_injected_rules, update_rule from rucio.core.monitor import record_counter +from rucio.core.rule import inject_rule, get_injected_rules, update_rule +from rucio.db.sqla.util import is_old_db graceful_stop = threading.Event() @@ -167,6 +169,8 @@ def run(once=False, threads=1): """ Starts up the Judge-Injector threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') executable = 'judge-injector' hostname = socket.gethostname() diff --git a/lib/rucio/daemons/judge/repairer.py b/lib/rucio/daemons/judge/repairer.py index 3e55d93c5f..9833594e57 100644 --- a/lib/rucio/daemons/judge/repairer.py +++ b/lib/rucio/daemons/judge/repairer.py @@ -1,4 +1,5 @@ -# Copyright 2013-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2013-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,11 +15,12 @@ # # Authors: # - Martin Barisits , 2013-2016 -# - Vincent Garonne , 2014-2018 +# - Vincent Garonne , 2014-2018 # - Mario Lassnig , 2014-2015 # - Hannes Hansen , 2018-2019 -# - Brandon White , 2019-2020 +# - Brandon White , 2019 # - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -33,19 +35,20 @@ import threading import time import traceback - from copy import deepcopy from datetime import datetime, timedelta -from re import match from random import randint +from re import match from sqlalchemy.exc import DatabaseError +from rucio.common import exception from rucio.common.config import config_get from rucio.common.exception import DatabaseException from rucio.core.heartbeat import live, die, sanity_check -from rucio.core.rule import repair_rule, get_stuck_rules from rucio.core.monitor import record_counter +from rucio.core.rule import repair_rule, get_stuck_rules +from rucio.db.sqla.util import is_old_db graceful_stop = threading.Event() @@ -153,6 +156,8 @@ def run(once=False, threads=1): """ Starts up the Judge-Repairer threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') executable = 'judge-repairer' hostname = socket.gethostname() diff --git a/lib/rucio/daemons/oauthmanager/oauthmanager.py b/lib/rucio/daemons/oauthmanager/oauthmanager.py index fde19c1b78..89a12bf461 100644 --- a/lib/rucio/daemons/oauthmanager/oauthmanager.py +++ b/lib/rucio/daemons/oauthmanager/oauthmanager.py @@ -1,5 +1,6 @@ #!/usr/bin/env python -# Copyright 2012-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2019-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,8 +15,9 @@ # limitations under the License. # # Authors: -# - Jaroslav Guenther, , 2019 -# - Thomas Beermann , 2020 +# - Jaroslav Guenther , 2019-2020 +# - Thomas Beermann , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -43,13 +45,16 @@ from re import match from sys import stdout +from sqlalchemy.exc import DatabaseError + +from rucio.common import exception from rucio.common.config import config_get from rucio.common.exception import DatabaseException from rucio.core.authentication import delete_expired_tokens from rucio.core.heartbeat import die, live, sanity_check from rucio.core.monitor import record_counter, record_timer from rucio.core.oidc import delete_expired_oauthrequests, refresh_jwt_tokens -from sqlalchemy.exc import DatabaseError +from rucio.db.sqla.util import is_old_db logging.basicConfig(stream=stdout, level=getattr(logging, @@ -184,6 +189,8 @@ def run(once=False, threads=1, loop_rate=300, max_rows=100): """ Starts up the OAuth Manager threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') sanity_check(executable='OAuthManager', hostname=socket.gethostname()) diff --git a/lib/rucio/daemons/reaper/dark_reaper.py b/lib/rucio/daemons/reaper/dark_reaper.py index 7f02bce254..a5f2bbb8c2 100644 --- a/lib/rucio/daemons/reaper/dark_reaper.py +++ b/lib/rucio/daemons/reaper/dark_reaper.py @@ -1,4 +1,5 @@ -# Copyright 2016-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2016-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,14 +14,15 @@ # limitations under the License. # # Authors: -# - Vincent Garonne , 2016-2018 +# - Vincent Garonne , 2016-2018 # - Martin Barisits , 2016 # - Thomas Beermann , 2016-2019 # - Hannes Hansen , 2018-2019 # - Andrew Lister , 2019 # - Cedric Serfon , 2020 -# - Brandon White , 2019-2020 +# - Brandon White , 2019 # - Patrick Austin , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -38,6 +40,7 @@ import time import traceback +from rucio.common import exception from rucio.common.config import config_get, config_get_bool from rucio.common.exception import (SourceNotFound, DatabaseException, ServiceUnavailable, RSEAccessDenied, ResourceTemporaryUnavailable, @@ -47,10 +50,10 @@ from rucio.core.quarantined_replica import (list_quarantined_replicas, delete_quarantined_replicas, list_rses) -from rucio.rse import rsemanager as rsemgr from rucio.core.rse_expression_parser import parse_expression from rucio.core.vo import list_vos - +from rucio.db.sqla.util import is_old_db +from rucio.rse import rsemanager as rsemgr logging.getLogger("requests").setLevel(logging.CRITICAL) @@ -195,6 +198,9 @@ def run(total_workers=1, chunk_size=100, once=False, rses=[], scheme=None, all_r :param vos: VOs on which to look for RSEs. Only used in multi-VO mode. If None, we either use all VOs if run from "def", or the current VO otherwise. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + logging.info('main: starting processes') multi_vo = config_get_bool('common', 'multi_vo', raise_exception=False, default=False) diff --git a/lib/rucio/daemons/reaper/light_reaper.py b/lib/rucio/daemons/reaper/light_reaper.py index 00cf0de1f8..890df3233c 100644 --- a/lib/rucio/daemons/reaper/light_reaper.py +++ b/lib/rucio/daemons/reaper/light_reaper.py @@ -1,4 +1,5 @@ -# Copyright 2016-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2016-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,12 +14,13 @@ # limitations under the License. # # Authors: -# - Vincent Garonne , 2016-2018 +# - Vincent Garonne , 2016-2018 # - Hannes Hansen , 2018-2019 # - Andrew Lister , 2019 # - Thomas Beermann , 2019 -# - Brandon White , 2019-2020 +# - Brandon White , 2019 # - Patrick Austin , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -36,6 +38,7 @@ import time import traceback +from rucio.common import exception from rucio.common.config import config_get, config_get_bool from rucio.common.exception import (SourceNotFound, DatabaseException, ServiceUnavailable, RSEAccessDenied, RSENotFound, ResourceTemporaryUnavailable, VONotFound) @@ -44,9 +47,9 @@ from rucio.core.message import add_message from rucio.core.rse_expression_parser import parse_expression from rucio.core.temporary_did import (list_expired_temporary_dids, delete_temporary_dids) -from rucio.rse import rsemanager as rsemgr from rucio.core.vo import list_vos - +from rucio.db.sqla.util import is_old_db +from rucio.rse import rsemanager as rsemgr logging.getLogger("requests").setLevel(logging.CRITICAL) @@ -190,6 +193,9 @@ def run(total_workers=1, chunk_size=100, once=False, rses=[], scheme=None, all_r :param vos: VOs on which to look for RSEs. Only used in multi-VO mode. If None, we either use all VOs if run from "def", or the current VO otherwise. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + logging.info('main: starting processes') multi_vo = config_get_bool('common', 'multi_vo', raise_exception=False, default=False) diff --git a/lib/rucio/daemons/reaper/reaper.py b/lib/rucio/daemons/reaper/reaper.py index 88eeed0c2e..790276bca5 100644 --- a/lib/rucio/daemons/reaper/reaper.py +++ b/lib/rucio/daemons/reaper/reaper.py @@ -1,4 +1,5 @@ -# Copyright 2016-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2016-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,15 +14,17 @@ # limitations under the License. # # Authors: -# - Vincent Garonne , 2016-2018 -# - Martin Barisits , 2016 +# - Vincent Garonne , 2016-2018 +# - Martin Barisits , 2016-2019 # - Thomas Beermann , 2016-2019 -# - Wen Guan , 2016 +# - Wen Guan , 2016 # - Hannes Hansen , 2018-2019 # - Dimitrios Christidis , 2019 +# - James Perry , 2019 # - Andrew Lister , 2019 -# - Brandon White , 2019-2020 +# - Brandon White , 2019 # - Patrick Austin , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -43,7 +46,7 @@ import time import traceback -from rucio.db.sqla.constants import ReplicaState +from rucio.common import exception from rucio.common.config import config_get, config_get_bool from rucio.common.exception import (SourceNotFound, ServiceUnavailable, RSEAccessDenied, ReplicaUnAvailable, ResourceTemporaryUnavailable, @@ -60,9 +63,10 @@ from rucio.core.rse import get_rse_attribute, sort_rses, get_rse_name from rucio.core.rse_expression_parser import parse_expression from rucio.core.vo import list_vos +from rucio.db.sqla.constants import ReplicaState +from rucio.db.sqla.util import is_old_db from rucio.rse import rsemanager as rsemgr - logging.getLogger("requests").setLevel(logging.CRITICAL) logging.basicConfig(stream=sys.stdout, @@ -381,6 +385,9 @@ def run(total_workers=1, chunk_size=100, threads_per_worker=None, once=False, gr :param vos: VOs on which to look for RSEs. Only used in multi-VO mode. If None, we either use all VOs if run from "def", or the current VO otherwise. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + logging.info('main: starting processes') multi_vo = config_get_bool('common', 'multi_vo', raise_exception=False, default=False) diff --git a/lib/rucio/daemons/reaper/reaper2.py b/lib/rucio/daemons/reaper/reaper2.py index aae8d7b78e..c0dbdf8945 100644 --- a/lib/rucio/daemons/reaper/reaper2.py +++ b/lib/rucio/daemons/reaper/reaper2.py @@ -1,4 +1,5 @@ -# Copyright 2016-2020 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2019-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,16 +14,12 @@ # limitations under the License. # # Authors: -# - Vincent Garonne , 2016-2018 -# - Martin Barisits , 2016-2020 -# - Thomas Beermann , 2016-2020 -# - Wen Guan , 2016 -# - Hannes Hansen , 2018-2019 -# - Dimitrios Christidis , 2019 # - Cedric Serfon , 2019-2020 # - Andrew Lister , 2019 # - Brandon White , 2019 +# - Martin Barisits , 2019-2020 # - James Perry , 2019 +# - Thomas Beermann , 2020 # - Eli Chadwick , 2020 # - Mario Lassnig , 2020 # - Benedikt Ziemons , 2020 @@ -38,23 +35,23 @@ import logging import os -import socket import random +import socket import sys import threading import time import traceback - +from collections import OrderedDict from datetime import datetime, timedelta from math import ceil from operator import itemgetter -from collections import OrderedDict from dogpile.cache import make_region from dogpile.cache.api import NO_VALUE from prometheus_client import Counter from sqlalchemy.exc import DatabaseError, IntegrityError +from rucio.common import exception from rucio.common.config import config_get, config_get_bool from rucio.common.exception import (DatabaseException, RSENotFound, ConfigNotFound, ReplicaUnAvailable, ReplicaNotFound, ServiceUnavailable, @@ -71,9 +68,9 @@ from rucio.core.rse_expression_parser import parse_expression from rucio.core.rule import get_evaluation_backlog from rucio.core.vo import list_vos +from rucio.db.sqla.util import is_old_db from rucio.rse import rsemanager as rsemgr - logging.getLogger("reaper").setLevel(logging.CRITICAL) logging.basicConfig(stream=sys.stdout, @@ -611,6 +608,9 @@ def run(threads=1, chunk_size=100, once=False, greedy=False, rses=None, scheme=N :param delay_seconds: The delay to query replicas in BEING_DELETED state. :param sleep_time: Time between two cycles. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + logging.info('main: starting processes') rses_to_process = get_rses_to_process(rses, include_rses, exclude_rses, vos) diff --git a/lib/rucio/daemons/replicarecoverer/suspicious_replica_recoverer.py b/lib/rucio/daemons/replicarecoverer/suspicious_replica_recoverer.py index b2cc47f7b6..2ccacb6609 100644 --- a/lib/rucio/daemons/replicarecoverer/suspicious_replica_recoverer.py +++ b/lib/rucio/daemons/replicarecoverer/suspicious_replica_recoverer.py @@ -1,5 +1,6 @@ #!/usr/bin/env python -# Copyright 2012-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2019-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,12 +15,13 @@ # limitations under the License. # # Authors: -# - Cedric Serfon, , 2018 -# - Jaroslav Guenther, , 2019 -# - Andrew Lister, , 2019 -# - Martin Barisits, , 2019 -# - Brandon White, , 2019 -# - Patrick Austin , 2020 +# - Jaroslav Guenther , 2019 +# - Andrew Lister , 2019 +# - Martin Barisits , 2019 +# - Brandon White , 2019 +# - Patrick Austin , 2020 +# - Benedikt Ziemons , 2020 +# # PY3K COMPATIBLE """ @@ -29,30 +31,29 @@ from __future__ import print_function +import logging import os +import socket import threading -import traceback import time -import logging -import socket -from sys import stdout, argv -from re import match +import traceback from datetime import datetime, timedelta +from re import match +from sys import stdout, argv from sqlalchemy.exc import DatabaseError +from rucio.common import exception +from rucio.common.config import config_get, config_get_bool +from rucio.common.exception import DatabaseException, VONotFound, InvalidRSEExpression +from rucio.common.types import InternalAccount from rucio.core.heartbeat import live, die, sanity_check from rucio.core.monitor import record_counter from rucio.core.replica import list_replicas, declare_bad_file_replicas, get_suspicious_files from rucio.core.rse_expression_parser import parse_expression from rucio.core.vo import list_vos - from rucio.db.sqla.constants import BadFilesStatus -from rucio.db.sqla.util import get_db_time -from rucio.common.config import config_get, config_get_bool -from rucio.common.exception import DatabaseException, VONotFound, InvalidRSEExpression -from rucio.common.types import InternalAccount - +from rucio.db.sqla.util import get_db_time, is_old_db logging.basicConfig(stream=stdout, level=getattr(logging, @@ -239,10 +240,11 @@ def declare_suspicious_replicas_bad(once=False, younger_than=3, nattempts=10, rs def run(once=False, younger_than=3, nattempts=10, rse_expression='MOCK', vos=None, max_replicas_per_rse=100): - """ Starts up the Suspicious-Replica-Recoverer threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') client_time, db_time = datetime.utcnow(), get_db_time() max_offset = timedelta(hours=1, seconds=10) diff --git a/lib/rucio/daemons/sonar/distribution/distribution_daemon.py b/lib/rucio/daemons/sonar/distribution/distribution_daemon.py index db872effbc..52d339f539 100644 --- a/lib/rucio/daemons/sonar/distribution/distribution_daemon.py +++ b/lib/rucio/daemons/sonar/distribution/distribution_daemon.py @@ -1,4 +1,5 @@ -# Copyright 2017-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,8 +14,8 @@ # limitations under the License. # # Authors: -# - Vitjan Zavrtanik , 2017 -# - Vincent Garonne , 2017-2018 +# - Mario Lassnig , 2018 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -23,20 +24,21 @@ """ import glob +import logging import os import subprocess import sys import threading -import logging import time from rucio.client.client import Client +from rucio.common import exception from rucio.common.config import config_get from rucio.common.exception import DuplicateRule -from rucio.common.exception import ReplicationRuleCreationTemporaryFailed -from rucio.common.exception import RSEBlacklisted from rucio.common.exception import InsufficientAccountLimit - +from rucio.common.exception import RSEBlacklisted +from rucio.common.exception import ReplicationRuleCreationTemporaryFailed +from rucio.db.sqla.util import is_old_db GRACEFUL_STOP = threading.Event() logging.basicConfig(stream=sys.stdout, @@ -134,6 +136,9 @@ def run(): """ Runs the distribution daemon """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + thread = threading.Thread(target=run_distribution, kwargs={}) thread.start() while thread and thread.isAlive(): diff --git a/lib/rucio/daemons/sonar/sonar/sonar_v3_dev_daemon.py b/lib/rucio/daemons/sonar/sonar/sonar_v3_dev_daemon.py index 3b76d1027b..575a56fde6 100644 --- a/lib/rucio/daemons/sonar/sonar/sonar_v3_dev_daemon.py +++ b/lib/rucio/daemons/sonar/sonar/sonar_v3_dev_daemon.py @@ -1,4 +1,5 @@ -# Copyright 2017-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,9 +14,9 @@ # limitations under the License. # # Authors: -# - Vitjan Zavrtanik , 2017 -# - Vincent Garonne , 2017-2018 +# - Mario Lassnig , 2018 # - Hannes Hansen , 2018 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -34,13 +35,14 @@ from requests import ConnectionError +from rucio.client.client import Client +from rucio.common import exception from rucio.common.config import config_get from rucio.common.exception import AccessDenied, DuplicateRule -from rucio.common.exception import ReplicationRuleCreationTemporaryFailed from rucio.common.exception import RSEBlacklisted, RuleNotFound -from rucio.client.client import Client +from rucio.common.exception import ReplicationRuleCreationTemporaryFailed from rucio.daemons.sonar.sonar.get_current_traffic import get_link_traffic - +from rucio.db.sqla.util import is_old_db logging.basicConfig(stream=sys.stdout, level=getattr(logging, @@ -419,6 +421,9 @@ def run(): """ Starts the Sonar thread. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + threads = [] threads.append(threading.Thread(target=sonar_tests, kwargs={}, name='Sonar_test_v3')) for thread in threads: diff --git a/lib/rucio/daemons/tracer/kronos.py b/lib/rucio/daemons/tracer/kronos.py index d4a7e5ed86..c800e12aca 100644 --- a/lib/rucio/daemons/tracer/kronos.py +++ b/lib/rucio/daemons/tracer/kronos.py @@ -1,4 +1,5 @@ -# Copyright 2014-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2014-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,12 +16,15 @@ # Authors: # - Thomas Beermann , 2014-2020 # - Ralph Vigne , 2014 -# - Vincent Garonne , 2015-2018 +# - Vincent Garonne , 2015-2018 # - Mario Lassnig , 2015 -# - Wen Guan , 2015 -# - Cedric Serfon , 2018 +# - Wen Guan , 2015 +# - Cedric Serfon , 2018 # - Robert Illingworth , 2018 +# - Martin Barisits , 2018 # - Andrew Lister , 2019 +# - Eli Chadwick , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -31,14 +35,9 @@ import logging import re import socket - from datetime import datetime from json import loads as jloads, dumps as jdumps from os import getpid -try: - from Queue import Queue # py2 -except ImportError: - from queue import Queue # py3 from sys import stdout from threading import Event, Thread, current_thread from time import sleep, time @@ -46,17 +45,25 @@ from stomp import Connection +from rucio.common import exception from rucio.common.config import config_get, config_get_bool, config_get_int from rucio.common.exception import ConfigNotFound, RSENotFound from rucio.common.types import InternalAccount, InternalScope -from rucio.core.monitor import record_counter, record_timer from rucio.core.config import get from rucio.core.did import touch_dids, list_parent_dids from rucio.core.heartbeat import live, die, sanity_check from rucio.core.lock import touch_dataset_locks +from rucio.core.monitor import record_counter, record_timer from rucio.core.replica import touch_replica, touch_collection_replicas, declare_bad_file_replicas from rucio.core.rse import get_rse_id from rucio.db.sqla.constants import DIDType, BadFilesStatus +from rucio.db.sqla.util import is_old_db + +try: + from Queue import Queue # py2 +except ImportError: + from queue import Queue # py3 + logging.getLogger("stomp").setLevel(logging.CRITICAL) @@ -503,6 +510,9 @@ def run(once=False, threads=1, sleep_time_datasets=60, sleep_time_files=60): """ Starts up the consumer threads """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + logging.info('resolving brokers') brokers_alias = [] diff --git a/lib/rucio/daemons/transmogrifier/transmogrifier.py b/lib/rucio/daemons/transmogrifier/transmogrifier.py index b4b674501d..b9fc53b718 100644 --- a/lib/rucio/daemons/transmogrifier/transmogrifier.py +++ b/lib/rucio/daemons/transmogrifier/transmogrifier.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,16 +14,12 @@ # limitations under the License. # # Authors: -# - Cedric Serfon , 2013-2020 -# - Vincent Garonne , 2014-2018 -# - David Cameron , 2014 -# - Mario Lassnig , 2015-2018 -# - Wen Guan , 2015 -# - Martin Barisits , 2016-2017 +# - Mario Lassnig , 2018 # - Hannes Hansen , 2018 # - Robert Illingworth , 2019 # - Andrew Lister , 2019 # - Brandon White , 2019 +# - Cedric Serfon , 2020 # - Patrick Austin , 2020 # - Thomas Beermann , 2020 # - Eli Chadwick , 2020 @@ -43,7 +40,7 @@ from sys import exc_info, stdout from traceback import format_exception - +from rucio.common import exception from rucio.db.sqla.constants import DIDType, SubscriptionState from rucio.common.exception import (DatabaseException, DataIdentifierNotFound, InvalidReplicationRule, DuplicateRule, RSEBlacklisted, InvalidRSEExpression, InsufficientTargetRSEs, InsufficientAccountLimit, InputValidationError, RSEOverQuota, @@ -59,7 +56,7 @@ from rucio.core.rse_selector import RSESelector from rucio.core.rule import add_rule, list_rules, get_rule from rucio.core.subscription import list_subscriptions, update_subscription - +from rucio.db.sqla.util import is_old_db logging.basicConfig(stream=stdout, level=getattr(logging, @@ -465,6 +462,8 @@ def run(threads=1, bulk=100, once=False, sleep_time=60): """ Starts up the transmogrifier threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') if once: logging.info('Will run only one iteration in a single threaded mode') diff --git a/lib/rucio/daemons/undertaker/undertaker.py b/lib/rucio/daemons/undertaker/undertaker.py index 0e9a67d9d6..dadd830181 100644 --- a/lib/rucio/daemons/undertaker/undertaker.py +++ b/lib/rucio/daemons/undertaker/undertaker.py @@ -1,4 +1,5 @@ -# Copyright 2013-2018 CERN for the benefit of the ATLAS collaboration. +# -*- coding: utf-8 -*- +# Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,13 +14,14 @@ # limitations under the License. # # Authors: -# - Vincent Garonne , 2013-2018 -# - Cedric Serfon , 2013-2015 -# - Martin Barisits , 2016-2019 +# - Mario Lassnig , 2018 +# - Martin Barisits , 2018-2019 # - Hannes Hansen , 2018-2019 # - Andrew Lister , 2019 -# - Brandon White , 2019-2020 +# - Brandon White , 2019 # - Thomas Beermann , 2020 +# - Eli Chadwick , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE @@ -29,26 +31,27 @@ import logging import os -import sys import socket +import sys import threading import time import traceback - from copy import deepcopy from datetime import datetime, timedelta -from re import match from random import randint +from re import match from sqlalchemy.exc import DatabaseError +from rucio.common import exception from rucio.common.config import config_get from rucio.common.exception import DatabaseException, UnsupportedOperation, RuleNotFound from rucio.common.types import InternalAccount from rucio.common.utils import chunks +from rucio.core.did import list_expired_dids, delete_dids from rucio.core.heartbeat import live, die, sanity_check from rucio.core.monitor import record_counter -from rucio.core.did import list_expired_dids, delete_dids +from rucio.db.sqla.util import is_old_db logging.getLogger("requests").setLevel(logging.CRITICAL) @@ -135,6 +138,9 @@ def run(once=False, total_workers=1, chunk_size=10): """ Starts up the undertaker threads. """ + if is_old_db(): + raise exception.DatabaseException('Database was not updated, daemon won\'t start') + logging.info('main: starting threads') threads = [threading.Thread(target=undertaker, kwargs={'worker_number': i, 'total_workers': total_workers, 'once': once, 'chunk_size': chunk_size}) for i in range(0, total_workers)] [t.start() for t in threads] diff --git a/lib/rucio/db/sqla/util.py b/lib/rucio/db/sqla/util.py index 662d6479b1..01b146ff2a 100644 --- a/lib/rucio/db/sqla/util.py +++ b/lib/rucio/db/sqla/util.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright 2015-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,17 +14,22 @@ # limitations under the License. # # Authors: -# - Vincent Garonne , 2015-2016 -# - Martin Barisits , 2017 -# - Mario Lassnig , 2018-2019 +# - Vincent Garonne , 2015-2016 +# - Martin Barisits , 2017-2019 +# - Mario Lassnig , 2018-2019 # - Hannes Hansen , 2019 # - Andrew Lister , 2019 # - Ruturaj Gujar , 2019 +# - Eli Chadwick , 2020 +# - Eric Vaandering , 2020 # - Patrick Austin , 2020 +# - Benedikt Ziemons , 2020 # # PY3K COMPATIBLE from __future__ import print_function + +import sys from base64 import b64encode from datetime import datetime from hashlib import sha256 @@ -32,20 +38,21 @@ from alembic import command from alembic.config import Config - from six import PY3 - from sqlalchemy import func from sqlalchemy.engine import reflection from sqlalchemy.exc import IntegrityError from sqlalchemy.schema import CreateSchema, MetaData, Table, DropTable, ForeignKeyConstraint, DropConstraint from sqlalchemy.sql.expression import select, text +from rucio import alembicrevision from rucio.common.config import config_get from rucio.common.types import InternalAccount from rucio.core.account_counter import create_counters_for_new_account from rucio.db.sqla import session, models from rucio.db.sqla.constants import AccountStatus, AccountType, IdentityType +from rucio.db.sqla.models import AlembicVersion +from rucio.db.sqla.session import get_engine def build_database(echo=True): @@ -263,3 +270,23 @@ def get_count(q): count_q = q.statement.with_only_columns([func.count()]).order_by(None) count = q.session.execute(count_q).scalar() return count + + +def is_old_db(): + """ + Returns true, if alembic is used and the database is not on the + same revision as the code base. + """ + schema = config_get('database', 'schema', raise_exception=False) + + # checks if alembic is being used by looking up the AlembicVersion table + if not get_engine().has_table(models.AlembicVersion.__tablename__, schema): + print("no table", models.AlembicVersion.__tablename__, file=sys.stderr) + return False + + s = session.get_session() + + query = s.query(models.AlembicVersion.version_num) + print("result", alembicrevision.ALEMBIC_REVISION, query.first(), file=sys.stderr) + + return query.count() != 0 and str(query.first()[0]) != alembicrevision.ALEMBIC_REVISION diff --git a/lib/rucio/tests/test_abacus_account.py b/lib/rucio/tests/test_abacus_account.py index c4f7a0e3bf..9cf91eeafb 100644 --- a/lib/rucio/tests/test_abacus_account.py +++ b/lib/rucio/tests/test_abacus_account.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -45,30 +46,31 @@ class TestAbacusAccount(unittest.TestCase): + rse = 'MOCK4' + file_sizes = 2 + vo = {} - def setUp(self): - self.rse = 'MOCK4' - self.file_sizes = 2 - self.upload_client = UploadClient() - self.account_client = AccountClient() - self.session = get_session() + @classmethod + def setUpClass(cls): + cls.upload_client = UploadClient() + cls.account_client = AccountClient() + cls.session = get_session() if config_get_bool('common', 'multi_vo', raise_exception=False, default=False): - self.vo = {'vo': config_get('client', 'vo', raise_exception=False, default='tst')} - else: - self.vo = {} + cls.vo = {'vo': config_get('client', 'vo', raise_exception=False, default='tst')} - self.account = InternalAccount('root', **self.vo) - self.scope = InternalScope('mock', **self.vo) - self.rse_id = get_rse_id(self.rse, session=self.session, **self.vo) + cls.account = InternalAccount('root', **cls.vo) + cls.scope = InternalScope('mock', **cls.vo) + cls.rse_id = get_rse_id(cls.rse, session=cls.session, **cls.vo) - def tearDown(self): + @classmethod + def tearDownClass(cls): undertaker.run(once=True) cleaner.run(once=True) - if self.vo: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (self.vo['vo'], self.rse), greedy=True) + if cls.vo: + reaper.run(once=True, include_rses='vo=%s&(%s)' % (cls.vo['vo'], cls.rse), greedy=True) else: - reaper.run(once=True, include_rses=self.rse, greedy=True) + reaper.run(once=True, include_rses=cls.rse, greedy=True) def test_abacus_account(self): """ ABACUS (ACCOUNT): Test update of account usage """ diff --git a/lib/rucio/tests/test_abacus_collection_replica.py b/lib/rucio/tests/test_abacus_collection_replica.py index d149277039..ad74d05dcd 100644 --- a/lib/rucio/tests/test_abacus_collection_replica.py +++ b/lib/rucio/tests/test_abacus_collection_replica.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -42,33 +43,34 @@ class TestAbacusCollectionReplica(unittest.TestCase): + account = 'root' + scope = 'mock' + rse = 'MOCK5' + file_sizes = 2 + vo = {} - def setUp(self): - self.account = 'root' - self.scope = 'mock' - self.rse = 'MOCK5' - self.file_sizes = 2 - self.dataset = 'dataset_%s' % generate_uuid() + @classmethod + def setUpClass(cls): + cls.dataset = 'dataset_%s' % generate_uuid() - self.rule_client = RuleClient() - self.did_client = DIDClient() - self.replica_client = ReplicaClient() - self.upload_client = UploadClient() + cls.rule_client = RuleClient() + cls.did_client = DIDClient() + cls.replica_client = ReplicaClient() + cls.upload_client = UploadClient() if config_get_bool('common', 'multi_vo', raise_exception=False, default=False): - self.vo = {'vo': config_get('client', 'vo', raise_exception=False, default='tst')} - else: - self.vo = {} + cls.vo = {'vo': config_get('client', 'vo', raise_exception=False, default='tst')} - self.rse_id = get_rse_id(rse=self.rse, **self.vo) + cls.rse_id = get_rse_id(rse=cls.rse, **cls.vo) - def tearDown(self): + @classmethod + def tearDownClass(cls): undertaker.run(once=True) cleaner.run(once=True) - if self.vo: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (self.vo['vo'], self.rse), greedy=True) + if cls.vo: + reaper.run(once=True, include_rses='vo=%s&(%s)' % (cls.vo['vo'], cls.rse), greedy=True) else: - reaper.run(once=True, include_rses=self.rse, greedy=True) + reaper.run(once=True, include_rses=cls.rse, greedy=True) def test_abacus_collection_replica(self): """ ABACUS (COLLECTION REPLICA): Test update of collection replica. """ diff --git a/lib/rucio/tests/test_abacus_rse.py b/lib/rucio/tests/test_abacus_rse.py index ab6634fdec..6f03f154e4 100644 --- a/lib/rucio/tests/test_abacus_rse.py +++ b/lib/rucio/tests/test_abacus_rse.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright 2018-2020 CERN for the benefit of the ATLAS collaboration. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -39,29 +40,30 @@ class TestAbacusRSE(unittest.TestCase): + account = 'root' + scope = 'mock' + rse = 'MOCK4' + file_sizes = 2 + vo = {} - def setUp(self): - self.account = 'root' - self.scope = 'mock' - self.rse = 'MOCK4' - self.file_sizes = 2 - self.upload_client = UploadClient() - self.session = get_session() + @classmethod + def setUpClass(cls): + cls.upload_client = UploadClient() + cls.session = get_session() if config_get_bool('common', 'multi_vo', raise_exception=False, default=False): - self.vo = {'vo': config_get('client', 'vo', raise_exception=False, default='tst')} - else: - self.vo = {} + cls.vo = {'vo': config_get('client', 'vo', raise_exception=False, default='tst')} - self.rse_id = get_rse_id(self.rse, session=self.session, **self.vo) + cls.rse_id = get_rse_id(cls.rse, session=cls.session, **cls.vo) - def tearDown(self): + @classmethod + def tearDownClass(cls): undertaker.run(once=True) cleaner.run(once=True) - if self.vo: - reaper.run(once=True, include_rses='vo=%s&(%s)' % (self.vo['vo'], self.rse), greedy=True) + if cls.vo: + reaper.run(once=True, include_rses='vo=%s&(%s)' % (cls.vo['vo'], cls.rse), greedy=True) else: - reaper.run(once=True, include_rses=self.rse, greedy=True) + reaper.run(once=True, include_rses=cls.rse, greedy=True) def test_abacus_rse(self): """ ABACUS (RSE): Test update of RSE usage. """ diff --git a/lib/rucio/tests/test_daemons.py b/lib/rucio/tests/test_daemons.py new file mode 100644 index 0000000000..5a9315d74b --- /dev/null +++ b/lib/rucio/tests/test_daemons.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 CERN for the benefit of the ATLAS collaboration. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Authors: +# - Benedikt Ziemons , 2020 +# +# PY3K COMPATIBLE + +import sys + +import pytest + +from rucio.common import exception +from rucio.daemons import abacus, badreplicas, cache, conveyor, hermes as hermespkg, judge, reaper as reaperpkg, replicarecoverer, tracer +from rucio.daemons.abacus import account, collection_replica, rse +from rucio.daemons.atropos import atropos +from rucio.daemons.automatix import automatix +from rucio.daemons.badreplicas import minos, minos_temporary_expiration, necromancer +from rucio.daemons.c3po import c3po +from rucio.daemons.cache import consumer +from rucio.daemons.conveyor import finisher, fts_throttler, poller, poller_latest, receiver, stager, submitter, throttler +from rucio.daemons.follower import follower +from rucio.daemons.hermes import hermes, hermes2 +from rucio.daemons.judge import cleaner, evaluator, injector, repairer +from rucio.daemons.oauthmanager import oauthmanager +from rucio.daemons.reaper import dark_reaper, light_reaper, reaper, reaper2 +from rucio.daemons.replicarecoverer import suspicious_replica_recoverer +from rucio.daemons.sonar.distribution import distribution_daemon +from rucio.daemons.tracer import kronos +from rucio.daemons.transmogrifier import transmogrifier +from rucio.daemons.undertaker import undertaker + +if sys.version_info >= (3, 3): + from unittest import mock +else: + import mock + + +DAEMONS = [ + abacus.account, + abacus.collection_replica, + abacus.rse, + atropos, + automatix, + badreplicas.minos, + badreplicas.minos_temporary_expiration, + badreplicas.necromancer, + c3po, + cache.consumer, + conveyor.finisher, + conveyor.fts_throttler, + conveyor.poller, + conveyor.poller_latest, + conveyor.receiver, + conveyor.stager, + conveyor.submitter, + conveyor.throttler, + follower, + hermespkg.hermes, + hermespkg.hermes2, + judge.cleaner, + judge.evaluator, + judge.injector, + judge.repairer, + oauthmanager, + reaperpkg.dark_reaper, + reaperpkg.light_reaper, + reaperpkg.reaper, + reaperpkg.reaper2, + replicarecoverer.suspicious_replica_recoverer, + distribution_daemon, + # sonar_v3_dev_daemon, -- lib/rucio/common/config.py:55: NoSectionError: No section: 'sonar' + tracer.kronos, + transmogrifier, + undertaker, +] + + +@mock.patch('rucio.db.sqla.util.is_old_db') +@pytest.mark.parametrize('daemon', DAEMONS) +def test_conveyor_finisher_fail_on_old_database(mock_is_old_db, daemon): + """ DAEMON: Test daemon failure on old database """ + mock_is_old_db.return_value = True + + with pytest.raises(exception.DatabaseException, match='Database was not updated, daemon won\'t start'): + daemon.run() + + mock_is_old_db.assert_called() diff --git a/tools/alembic_migration.sh b/tools/alembic_migration.sh new file mode 100755 index 0000000000..f0974d12ae --- /dev/null +++ b/tools/alembic_migration.sh @@ -0,0 +1,35 @@ +#!/bin/bash +# Copyright 2020 CERN for the benefit of the ATLAS collaboration. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail +IFS=$'\n\t' + +echo "Downgrading the DB to base" +alembic downgrade base + +echo "Check if is_old_db function is returning false after the full downgrade" +PYTHONPATH=lib python -c 'import sys; import rucio.db.sqla.util; sys.exit(1 if rucio.db.sqla.util.is_old_db() else 0);' + +echo "Updating the DB to head-1" +alembic upgrade head-1 + +echo "Check if is_old_db function is returning true before the full upgrade" +PYTHONPATH=lib python -c 'import sys; import rucio.db.sqla.util; sys.exit(0 if rucio.db.sqla.util.is_old_db() else 1);' + +echo "Upgrading the DB to head" +alembic upgrade head + +echo "Check if is_old_db function returns false after the full upgrade" +PYTHONPATH=lib python -c 'import sys; import rucio.db.sqla.util; sys.exit(1 if rucio.db.sqla.util.is_old_db() else 0);' diff --git a/tools/run_multi_vo_tests_docker.sh b/tools/run_multi_vo_tests_docker.sh index 4758b0f4df..4788f37da4 100755 --- a/tools/run_multi_vo_tests_docker.sh +++ b/tools/run_multi_vo_tests_docker.sh @@ -72,14 +72,9 @@ if [ -f /tmp/rucio.db ]; then fi echo 'Running full alembic migration' -alembic -c /opt/rucio/etc/alembic.ini downgrade base +ALEMBIC_CONFIG="/opt/rucio/etc/alembic.ini" tools/alembic_migration.sh if [ $? != 0 ]; then - echo 'Failed to downgrade the database!' - exit 1 -fi -alembic -c /opt/rucio/etc/alembic.ini upgrade head -if [ $? != 0 ]; then - echo 'Failed to upgrade the database!' + echo 'Failed to run alembic migration!' exit 1 fi diff --git a/tools/run_tests.sh b/tools/run_tests.sh index 79ebbcdec4..0576f943bc 100755 --- a/tools/run_tests.sh +++ b/tools/run_tests.sh @@ -117,15 +117,10 @@ fi if test ${alembic}; then echo 'Running full alembic migration' - alembic downgrade base + tools/alembic_migration.sh if [ $? != 0 ]; then - echo 'Failed to downgrade the database!' - exit - fi - alembic upgrade head - if [ $? != 0 ]; then - echo 'Failed to upgrade the database!' - exit + echo 'Failed to run alembic migration!' + exit 1 fi fi diff --git a/tools/run_tests_docker.sh b/tools/run_tests_docker.sh index 4ea2b97d33..4e405110f2 100755 --- a/tools/run_tests_docker.sh +++ b/tools/run_tests_docker.sh @@ -92,15 +92,10 @@ if [ -f /tmp/rucio.db ]; then chmod 777 /tmp/rucio.db fi -echo 'Running full alembic migration' -alembic -c /opt/rucio/etc/alembic.ini downgrade base +echo "Running full alembic migration" +ALEMBIC_CONFIG="/opt/rucio/etc/alembic.ini" tools/alembic_migration.sh if [ $? != 0 ]; then - echo 'Failed to downgrade the database!' - exit 1 -fi -alembic -c /opt/rucio/etc/alembic.ini upgrade head -if [ $? != 0 ]; then - echo 'Failed to upgrade the database!' + echo 'Failed to run alembic migration!' exit 1 fi