This repository has been archived by the owner on Feb 27, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 24
/
utils.py
153 lines (130 loc) · 5.3 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# -*- encoding: utf-8 -*-
# Dissemin: open access policy enforcement tool
# Copyright (C) 2014 Antonin Delpeuch
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
from time import sleep
import logging
import requests
import requests.exceptions
from datetime import datetime
from datetime import timedelta
from dissemin.settings import redis_client
from memoize import memoize
from papers.errors import MetadataSourceException
logger = logging.getLogger('dissemin.' + __name__)
# Run a task at most one at a time
class run_only_once(object):
def __init__(self, base_id, **kwargs):
self.base_id = base_id
self.keys = kwargs.get('keys', [])
self.timeout = int(kwargs.get('timeout', 60*10))
def __call__(self, f):
def inner(*args, **kwargs):
lock_id = self.base_id+'-' + \
('-'.join([str(kwargs.get(key, 'none')) for key in self.keys]))
lock = redis_client.lock(lock_id, timeout=self.timeout)
have_lock = False
result = None
try:
have_lock = lock.acquire(blocking=False)
if have_lock:
result = f(*args, **kwargs)
finally:
if have_lock:
lock.release()
return result
return inner
# Open an URL with retries
def request_retry(url, **kwargs):
"""
Retries a request, with throttling and exponential back-off.
:param url: the URL to fetch
:param data: the GET parameters
:param headers: the HTTP headers
:param timeout: the number of seconds to wait before declaring that an individual request timed out (default 10)
:param retries: the number of times to retry a query (default 3)
:param delay: the minimum delay between requests (default 5)
:param backoff: the multiple used when raising the delay after an unsuccessful query (default 2)
"""
data = kwargs.get('data', None)
timeout = kwargs.get('timeout', 10)
retries = kwargs.get('retries', 3)
delay = kwargs.get('delay', 5)
backoff = kwargs.get('backoff', 2)
headers = kwargs.get('headers', {})
try:
r = requests.get(url,
params=data,
timeout=timeout,
headers=headers,
allow_redirects=True)
r.raise_for_status()
return r
except requests.exceptions.Timeout as e:
if retries <= 0:
raise MetadataSourceException('Timeout: '+str(e))
except requests.exceptions.ConnectionError as e:
if retries <= 0:
raise MetadataSourceException('Connection error: '+str(e))
except requests.exceptions.RequestException as e:
if retries <= 0:
raise MetadataSourceException('Request error: '+str(e))
logger.info("Retrying in "+str(delay)+" seconds with url "+url)
sleep(delay)
return request_retry(url,
data=data,
timeout=timeout,
retries=retries-1,
delay=delay*backoff,
backoff=backoff)
def urlopen_retry(url, **kwargs):
return request_retry(url, **kwargs).text
@memoize(timeout=86400) # 1 day
def cached_urlopen_retry(*args, **kwargs):
return urlopen_retry(*args, **kwargs)
def with_speed_report(generator, name=None, report_delay=timedelta(seconds=10)):
"""
Periodically reports the speed at which we are enumerating the items
of a generator.
:param name: a name to use in the reports (eg "papers from Crossref API")
:param report_delay: print a report every so often
"""
if name is None:
name = getattr(generator, "__name__", "")
last_report = datetime.now()
nb_records_since_last_report = 0
for idx, record in enumerate(generator):
yield record
nb_records_since_last_report += 1
now = datetime.now()
if last_report + report_delay < now:
rate = nb_records_since_last_report / float((now - last_report).total_seconds())
logger.info('{}: {}, {} records/sec'.format(name, idx, rate))
last_report = now
nb_records_since_last_report = 0
def report_speed(name=None, report_delay=timedelta(seconds=10)):
"""
Decorator for a function that returns a generator, see with_speed_report
"""
def decorator(func):
logging_name = name
if logging_name is None:
logging_name = getattr(func, "__name__", "")
def wrapped_generator(*args, **kwargs):
return with_speed_report(func(*args, **kwargs), name=logging_name, report_delay=report_delay)
return wrapped_generator
return decorator