/
util.py
199 lines (164 loc) · 5.47 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# -*- coding: utf-8 -*-
"""
kaylee.util
~~~~~~~~~~~
The module contains common Kaylee routines and classes.
:copyright: (c) 2012 by Zaur Nasibov.
:license: MIT, see LICENSE for more details.
"""
#pylint: disable-msg=W0402,W0212,R0913
#W0402: 15,0: Uses of a deprecated module 'string'
#W0212: Access to a protected member _wrapped of a client class
#R0913: random_string: Too many arguments (7/5)
###
import os
import re
import sys
import random
import string
import importlib
import contextlib
import logging
from datetime import timedelta
from .errors import KayleeError
def parse_timedelta(s):
try:
match = parse_timedelta.timeout_regex.match(s)
except AttributeError:
parse_timedelta.timeout_regex = re.compile(
r'((?P<days>\d+?)d)?\s?((?P<hours>\d+?)h)?\s?'
r'((?P<minutes>\d+?)m)?\s?((?P<seconds>\d+?)s)?')
match = parse_timedelta.timeout_regex.match(s)
try:
time_params = {}
for (name, param) in match.groupdict().iteritems():
if param is not None:
time_params[name] = int(param)
if time_params == {}:
raise Exception()
return timedelta(**time_params)
except:
raise KayleeError('Wrong timedelta string: {}'.format(s))
def import_object(name):
modname, objname = name.rsplit('.', 1)
mod = importlib.import_module(modname)
try:
return getattr(mod, objname)
except AttributeError:
raise ImportError('Object {} was not found in module {}'
.format(objname, modname))
def _new_method_proxy(func):
def inner(self, *args):
try:
return func(self._wrapped, *args)
except AttributeError:
self._setup()
return func(self._wrapped, *args)
return inner
class LazyObject(object):
"""
A wrapper for another class that can be used to delay instantiation of the
wrapped class.
By subclassing, you have the opportunity to intercept and alter the
instantiation.
"""
def __init__(self):
self._wrapped = None
__getattr__ = _new_method_proxy(getattr)
def __setattr__(self, name, value):
if name == '_wrapped':
# Assign to __dict__ to avoid infinite __setattr__ loops.
self.__dict__['_wrapped'] = value
else:
try:
setattr(self._wrapped, name, value)
except AttributeError:
self._setup()
setattr(self._wrapped, name, value)
def __delattr__(self, name):
if name == '_wrapped':
raise TypeError('Cannot delete _wrapped.')
try:
delattr(self._wrapped, name)
except AttributeError:
self._setup()
delattr(self._wrapped, name)
def _setup(self, obj = None):
"""
Must be implemented by sub-classes to initialize the wrapped object.
:param obj: An optional object to wrap. Note that checking the object
type and wrapping it must be done in the sub class as well.
"""
raise NotImplementedError
# introspection support:
__dir__ = _new_method_proxy(dir)
class DictAsObjectWrapper(object):
def __init__(self, **kwargs):
for key, val in kwargs.iteritems():
setattr(self, key, val)
class RecursiveDictAsObjectWrapper(object):
def __init__(self, **kwargs):
def _update(obj, d):
for key, val in d.iteritems():
if isinstance(val, dict):
nested_obj = RecursiveDictAsObjectWrapper(**val)
setattr(obj, key, nested_obj)
else:
setattr(obj, key, val)
_update(self, kwargs)
def random_string(length, alphabet=None, lowercase=True, uppercase=True,
digits=True, special=True, extra=''):
if alphabet is None:
src = extra
if lowercase:
src += string.ascii_lowercase
if uppercase:
src += string.ascii_uppercase
if digits:
src += string.digits
if special:
src += '!@#$%^&*()_-+=?/><,.|":;`~'
else:
src = alphabet
return ''.join(random.choice(src) for x in xrange(length))
def get_secret_key(key=None):
if key is not None:
return key
else:
from kaylee import kl
if kl._wrapped is None:
raise KayleeError('Cannot locate a valid secret key because '
'Kaylee proxy object has not beet set up.')
key = kl.config.SECRET_KEY
if key is None:
raise KayleeError('SECRET_KEY configuration option is not '
'defined.')
return key
def is_strong_subclass(C, B):
"""Return whether class C is a subclass (i.e., a derived class) of class B
and C is not B.
"""
return issubclass (C, B) and C is not B
@contextlib.contextmanager
def nostdout(stdout=True, stderr=True):
savestderr = sys.stderr
savestdout = sys.stdout
class Devnull(object):
def write(self, _):
pass
if stdout:
sys.stdout = Devnull()
if stderr:
sys.stderr = Devnull()
yield
sys.stdout = savestdout
sys.stderr = savestderr
def ensure_dir(path):
try:
os.makedirs(path)
except OSError:
if not os.path.isdir(path):
raise
def setup_logging():
log_format = '%(levelname)s[%(name)s]: %(message)s'
logging.basicConfig(level=logging.INFO, format=log_format)