This repository has been archived by the owner on May 22, 2024. It is now read-only.
forked from rucio/rucio
/
preparer.py
124 lines (104 loc) · 4.53 KB
/
preparer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# -*- coding: utf-8 -*-
# Copyright 2020 CERN
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors:
# - Benedikt Ziemons <benedikt.ziemons@cern.ch>, 2020
import logging
import sys
import threading
from time import time
import rucio.db.sqla.util
from rucio.common import exception
from rucio.common.config import config_get
from rucio.common.exception import ConfigNotFound
from rucio.common.utils import get_parsed_throttler_mode, chunks
from rucio.core.config import get
from rucio.core.rse import get_rse_transfer_limits
from rucio.core.transfer import __list_transfer_requests_and_source_replicas
from rucio.db.sqla import models
from rucio.db.sqla.constants import RequestState
from rucio.db.sqla.session import transactional_session
graceful_stop = threading.Event()
def stop():
"""
Graceful exit.
"""
graceful_stop.set()
def run(once=False, sleep_time=60):
"""
Running the preparer daemon either once or by default in a loop until stop is called.
"""
config_loglevel = config_get('common', 'loglevel', raise_exception=False, default='DEBUG').upper()
logging.basicConfig(stream=sys.stdout,
level=getattr(logging, config_loglevel),
format='%(asctime)s\t%(process)d\t%(levelname)s\t%(message)s')
if rucio.db.sqla.util.is_old_db():
raise exception.DatabaseException('Database was not updated, daemon won\'t start')
graceful_stop.wait(10)
while not graceful_stop.is_set():
start_time = time()
run_once()
if once:
return
end_time = time()
time_diff = end_time - start_time
if time_diff < sleep_time:
logging.info('Sleeping for a while : %.2f seconds', (sleep_time - time_diff))
graceful_stop.wait(sleep_time - time_diff)
@transactional_session
def run_once(chunk_size=50, session=None):
req_sources = __list_transfer_requests_and_source_replicas(request_state=RequestState.PREPARING, minimum_distance=True, session=session)
for chunk in chunks(req_sources, chunk_size):
# from rucio.core.transfer.get_transfer_requests_and_source_replicas
for req_id, rule_id, scope, name, md5, adler32, bytes, activity, attributes, previous_attempt_id, dest_rse_id, account, source_rse_id, rse, deterministic, rse_type, path, retry_count, src_url, ranking, link_ranking in chunk:
new_state = _prepare_request(
session=session,
activity=activity,
source_rse_id=source_rse_id,
dest_rse_id=dest_rse_id,
)
session.query(models.Request).filter_by(id=req_id).update({models.Request.state: new_state}, synchronize_session=False)
session.commit()
def _prepare_request(session, activity, source_rse_id, dest_rse_id) -> RequestState:
"""
Takes request attributes to return a new state for the request
based on throttler settings. Always returns QUEUED,
if the throttler mode is not set.
"""
try:
throttler_mode = get('throttler', 'mode', default=None, use_cache=False, session=session)
except ConfigNotFound:
throttler_mode = None
limit_found = False
if throttler_mode:
transfer_limits = get_rse_transfer_limits(session=session)
activity_limit = transfer_limits.get(activity, {})
all_activities_limit = transfer_limits.get('all_activities', {})
direction, all_activities = get_parsed_throttler_mode(throttler_mode)
if direction == 'source':
if all_activities:
if all_activities_limit.get(source_rse_id):
limit_found = True
else:
if activity_limit.get(source_rse_id):
limit_found = True
elif direction == 'destination':
if all_activities:
if all_activities_limit.get(dest_rse_id):
limit_found = True
else:
if activity_limit.get(dest_rse_id):
limit_found = True
return RequestState.WAITING if limit_found else RequestState.QUEUED